cluster - remote block device work
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  *
47  * TODO:
48  *      Handle circuit disconnects, leave bio's pending
49  *      Restart bio's on circuit reconnect.
50  */
51 #include <sys/param.h>
52 #include <sys/systm.h>
53 #include <sys/buf.h>
54 #include <sys/conf.h>
55 #include <sys/device.h>
56 #include <sys/devicestat.h>
57 #include <sys/disk.h>
58 #include <sys/kernel.h>
59 #include <sys/malloc.h>
60 #include <sys/sysctl.h>
61 #include <sys/proc.h>
62 #include <sys/queue.h>
63 #include <sys/udev.h>
64 #include <sys/uuid.h>
65 #include <sys/kern_syscall.h>
66
67 #include <sys/dmsg.h>
68 #include <sys/xdiskioctl.h>
69
70 #include <sys/buf2.h>
71 #include <sys/thread2.h>
72
73 struct xa_softc;
74
75 struct xa_tag {
76         TAILQ_ENTRY(xa_tag) entry;
77         struct xa_softc *xa;
78         dmsg_blk_error_t status;
79         kdmsg_state_t   *state;
80         struct bio      *bio;
81         uint64_t        circuit;
82         int             running;        /* transaction running */
83         int             waitseq;        /* streaming reply */
84         int             done;           /* final (transaction closed) */
85 };
86
87 typedef struct xa_tag   xa_tag_t;
88
89 struct xa_softc {
90         TAILQ_ENTRY(xa_softc) entry;
91         cdev_t          dev;
92         kdmsg_iocom_t   iocom;
93         struct xdisk_attach_ioctl xaioc;
94         struct disk_info info;
95         struct disk     disk;
96         uuid_t          pfs_fsid;
97         int             unit;
98         int             serializing;
99         int             attached;
100         int             opencnt;
101         uint64_t        keyid;
102         xa_tag_t        *opentag;
103         TAILQ_HEAD(, bio) bioq;
104         TAILQ_HEAD(, xa_tag) tag_freeq;
105         TAILQ_HEAD(, xa_tag) tag_pendq;
106         TAILQ_HEAD(, kdmsg_circuit) circq;
107         struct lwkt_token tok;
108 };
109
110 typedef struct xa_softc xa_softc_t;
111
112 #define MAXTAGS         64      /* no real limit */
113
114 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
115 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
116 static void xa_exit(kdmsg_iocom_t *iocom);
117 static void xa_terminate_check(struct xa_softc *xa);
118 static int xa_rcvdmsg(kdmsg_msg_t *msg);
119 static void xa_autodmsg(kdmsg_msg_t *msg);
120
121 static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio);
122 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg);
123 static uint32_t xa_wait(xa_tag_t *tag, int seq);
124 static void xa_done(xa_tag_t *tag, int wasbio);
125 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
126 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
127 static void xa_restart_deferred(xa_softc_t *xa);
128
129 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
130
131 /*
132  * Control device, issue ioctls to create xa devices.
133  */
134 static d_open_t xdisk_open;
135 static d_close_t xdisk_close;
136 static d_ioctl_t xdisk_ioctl;
137
138 static struct dev_ops xdisk_ops = {
139         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
140         .d_open =       xdisk_open,
141         .d_close =      xdisk_close,
142         .d_ioctl =      xdisk_ioctl
143 };
144
145 /*
146  * XA disk devices
147  */
148 static d_open_t xa_open;
149 static d_close_t xa_close;
150 static d_ioctl_t xa_ioctl;
151 static d_strategy_t xa_strategy;
152 static d_psize_t xa_size;
153
154 static struct dev_ops xa_ops = {
155         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
156         .d_open =       xa_open,
157         .d_close =      xa_close,
158         .d_ioctl =      xa_ioctl,
159         .d_read =       physread,
160         .d_write =      physwrite,
161         .d_strategy =   xa_strategy,
162         .d_psize =      xa_size
163 };
164
165 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
166 static int xdisk_opencount;
167 static cdev_t xdisk_dev;
168 static TAILQ_HEAD(, xa_softc) xa_queue;
169
170 /*
171  * Module initialization
172  */
173 static int
174 xdisk_modevent(module_t mod, int type, void *data)
175 {
176         switch (type) {
177         case MOD_LOAD:
178                 TAILQ_INIT(&xa_queue);
179                 xdisk_dev = make_dev(&xdisk_ops, 0,
180                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
181                 break;
182         case MOD_UNLOAD:
183         case MOD_SHUTDOWN:
184                 if (xdisk_opencount || TAILQ_FIRST(&xa_queue))
185                         return (EBUSY);
186                 if (xdisk_dev) {
187                         destroy_dev(xdisk_dev);
188                         xdisk_dev = NULL;
189                 }
190                 dev_ops_remove_all(&xdisk_ops);
191                 dev_ops_remove_all(&xa_ops);
192                 break;
193         default:
194                 break;
195         }
196         return 0;
197 }
198
199 DEV_MODULE(xdisk, xdisk_modevent, 0);
200
201 /*
202  * Control device
203  */
204 static int
205 xdisk_open(struct dev_open_args *ap)
206 {
207         kprintf("XDISK_OPEN\n");
208         lwkt_gettoken(&xdisk_token);
209         ++xdisk_opencount;
210         lwkt_reltoken(&xdisk_token);
211         return(0);
212 }
213
214 static int
215 xdisk_close(struct dev_close_args *ap)
216 {
217         kprintf("XDISK_CLOSE\n");
218         lwkt_gettoken(&xdisk_token);
219         --xdisk_opencount;
220         lwkt_reltoken(&xdisk_token);
221         return(0);
222 }
223
224 static int
225 xdisk_ioctl(struct dev_ioctl_args *ap)
226 {
227         int error;
228
229         switch(ap->a_cmd) {
230         case XDISKIOCATTACH:
231                 error = xdisk_attach((void *)ap->a_data);
232                 break;
233         case XDISKIOCDETACH:
234                 error = xdisk_detach((void *)ap->a_data);
235                 break;
236         default:
237                 error = ENOTTY;
238                 break;
239         }
240         return error;
241 }
242
243 /************************************************************************
244  *                              DMSG INTERFACE                          *
245  ************************************************************************/
246
247 static int
248 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
249 {
250         xa_softc_t *xa;
251         xa_tag_t *tag;
252         struct file *fp;
253         int unit;
254         int n;
255         char devname[64];
256         cdev_t dev;
257
258         kprintf("xdisk attach %d %jd/%d %s %s\n",
259                 xaioc->fd, (intmax_t)xaioc->bytes, xaioc->blksize,
260                 xaioc->cl_label, xaioc->fs_label);
261
262         /*
263          * Normalize ioctl params
264          */
265         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
266         if (fp == NULL)
267                 return EINVAL;
268         if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0)
269                 return EINVAL;
270         if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0)
271                 return EINVAL;
272         if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE)
273                 return EINVAL;
274
275         /*
276          * See if the serial number is already present.  If we are
277          * racing a termination the disk subsystem may still have
278          * duplicate entries not yet removed so we wait a bit and
279          * retry.
280          */
281         lwkt_gettoken(&xdisk_token);
282 again:
283         TAILQ_FOREACH(xa, &xa_queue, entry) {
284                 if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
285                            xaioc->fs_label) == 0) {
286                         if (xa->serializing) {
287                                 tsleep(xa, 0, "xadelay", hz / 10);
288                                 goto again;
289                         }
290                         xa->serializing = 1;
291                         kdmsg_iocom_uninit(&xa->iocom);
292                         break;
293                 }
294         }
295
296         /*
297          * Create a new xa if not already present
298          */
299         if (xa == NULL) {
300                 unit = 0;
301                 for (;;) {
302                         TAILQ_FOREACH(xa, &xa_queue, entry) {
303                                 if (xa->unit == unit)
304                                         break;
305                         }
306                         if (xa == NULL)
307                                 break;
308                         ++unit;
309                 }
310                 xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
311                 kprintf("ALLOCATE XA %p\n", xa);
312                 xa->unit = unit;
313                 xa->serializing = 1;
314                 lwkt_token_init(&xa->tok, "xa");
315                 TAILQ_INIT(&xa->circq);
316                 TAILQ_INIT(&xa->bioq);
317                 TAILQ_INIT(&xa->tag_freeq);
318                 TAILQ_INIT(&xa->tag_pendq);
319                 for (n = 0; n < MAXTAGS; ++n) {
320                         tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO);
321                         tag->xa = xa;
322                         TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
323                 }
324                 TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
325         }
326         xa->xaioc = *xaioc;
327         xa->attached = 1;
328         lwkt_reltoken(&xdisk_token);
329
330         /*
331          * Create device
332          */
333         if (xa->dev == NULL) {
334                 dev = disk_create(unit, &xa->disk, &xa_ops);
335                 dev->si_drv1 = xa;
336                 xa->dev = dev;
337         }
338
339         xa->info.d_media_blksize = xaioc->blksize;
340         xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize;
341         xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
342         xa->info.d_secpertrack = 32;
343         xa->info.d_nheads = 64;
344         xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads;
345         xa->info.d_ncylinders = 0;
346         if (xa->xaioc.fs_label[0])
347                 xa->info.d_serialno = xa->xaioc.fs_label;
348
349         /*
350          * Set up messaging connection
351          */
352         ksnprintf(devname, sizeof(devname), "xa%d", unit);
353         kdmsg_iocom_init(&xa->iocom, xa,
354                          KDMSG_IOCOMF_AUTOCONN |
355                          KDMSG_IOCOMF_AUTOSPAN |
356                          KDMSG_IOCOMF_AUTOCIRC |
357                          KDMSG_IOCOMF_AUTOFORGE,
358                          M_XDISK, xa_rcvdmsg);
359         xa->iocom.exit_func = xa_exit;
360
361         kdmsg_iocom_reconnect(&xa->iocom, fp, devname);
362
363         /*
364          * Setup our LNK_CONN advertisement for autoinitiate.
365          *
366          * Our filter is setup to only accept PEER_BLOCK/SERVER
367          * advertisements.
368          */
369         xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
370         xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
371         xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
372         xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
373         xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
374         ksnprintf(xa->iocom.auto_lnk_conn.cl_label,
375                   sizeof(xa->iocom.auto_lnk_conn.cl_label),
376                   "%s", xaioc->cl_label);
377
378         /*
379          * We need a unique pfs_fsid to avoid confusion.
380          * We supply a rendezvous fs_label using the serial number.
381          */
382         kern_uuidgen(&xa->pfs_fsid, 1);
383         xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid;
384         ksnprintf(xa->iocom.auto_lnk_conn.fs_label,
385                   sizeof(xa->iocom.auto_lnk_conn.fs_label),
386                   "%s", xaioc->fs_label);
387
388         /*
389          * Setup our LNK_SPAN advertisement for autoinitiate
390          */
391         xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT;
392         xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
393         xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
394         ksnprintf(xa->iocom.auto_lnk_span.cl_label,
395                   sizeof(xa->iocom.auto_lnk_span.cl_label),
396                   "%s", xa->xaioc.cl_label);
397
398         kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg);
399         disk_setdiskinfo_sync(&xa->disk, &xa->info);
400
401         lwkt_gettoken(&xdisk_token);
402         xa->serializing = 0;
403         xa_terminate_check(xa);
404         lwkt_reltoken(&xdisk_token);
405
406         return(0);
407 }
408
409 static int
410 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
411 {
412         struct xa_softc *xa;
413
414         lwkt_gettoken(&xdisk_token);
415         for (;;) {
416                 TAILQ_FOREACH(xa, &xa_queue, entry) {
417                         if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
418                                    xaioc->fs_label) == 0) {
419                                 break;
420                         }
421                 }
422                 if (xa == NULL || xa->serializing == 0) {
423                         xa->serializing = 1;
424                         break;
425                 }
426                 tsleep(xa, 0, "xadet", hz / 10);
427         }
428         if (xa) {
429                 kprintf("DETACHING XA\n");
430                 kdmsg_iocom_uninit(&xa->iocom);
431                 xa->serializing = 0;
432         }
433         lwkt_reltoken(&xdisk_token);
434         return(0);
435 }
436
437 /*
438  * Called from iocom core transmit thread upon disconnect.
439  */
440 static
441 void
442 xa_exit(kdmsg_iocom_t *iocom)
443 {
444         struct xa_softc *xa = iocom->handle;
445
446         kprintf("XA_EXIT UNIT %d\n", xa->unit);
447
448         if (xa->serializing == 0)
449                 kdmsg_iocom_uninit(iocom);
450
451         /*
452          * If the drive is not in use and no longer attach it can be
453          * destroyed.
454          */
455         lwkt_gettoken(&xdisk_token);
456         xa->attached = 0;
457         xa_terminate_check(xa);
458         lwkt_reltoken(&xdisk_token);
459 }
460
461 /*
462  * Determine if we can destroy the xa_softc.
463  *
464  * Called with xdisk_token held.
465  */
466 static
467 void
468 xa_terminate_check(struct xa_softc *xa)
469 {
470         xa_tag_t *tag;
471
472         if (xa->opencnt || xa->attached || xa->serializing)
473                 return;
474         xa->serializing = 1;
475         kprintf("TERMINATE XA %p %d\n", xa, xa->unit);
476         kdmsg_iocom_uninit(&xa->iocom);
477         if (xa->dev) {
478                 disk_destroy(&xa->disk);
479                 xa->dev->si_drv1 = NULL;
480                 xa->dev = NULL;
481         }
482         kprintf("REMOVEQ   XA %p %d\n", xa, xa->unit);
483         KKASSERT(xa->opencnt == 0 && xa->attached == 0);
484         kprintf("IOCOMUN   XA %p %d\n", xa, xa->unit);
485         while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
486                 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
487                 tag->xa = NULL;
488                 kfree(tag, M_XDISK);
489         }
490         KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
491         TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
492         kfree(xa, M_XDISK);
493         kprintf("xa_close: destroy unreferenced disk\n");
494 }
495
496 /*
497  * Shim to catch and record virtual circuit events.
498  */
499 static void
500 xa_autodmsg(kdmsg_msg_t *msg)
501 {
502         struct xa_softc *xa = msg->iocom->handle;
503         kdmsg_circuit_t *circ;
504         kdmsg_circuit_t *cscan;
505         uint32_t xcmd;
506
507         /*
508          * Because this is just a shim we don't have a state callback for
509          * the transactions we are sniffing, so make things easier by
510          * calculating the original command along with the current message's
511          * flags.  This is because transactions are made up of numerous
512          * messages and only the first typically specifies the actual command.
513          */
514         if (msg->state) {
515                 xcmd = msg->state->icmd |
516                        (msg->any.head.cmd & (DMSGF_CREATE |
517                                              DMSGF_DELETE |
518                                              DMSGF_REPLY));
519         } else {
520                 xcmd = msg->any.head.cmd;
521         }
522
523         /*
524          * Add or remove a circuit, sorted by weight (lower numbers are
525          * better).
526          */
527         switch(xcmd) {
528         case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
529                 /*
530                  * Track established circuits
531                  */
532                 kprintf("XA: Received autodmsg: CREATE+REPLY\n");
533                 circ = msg->state->any.circ;
534                 lwkt_gettoken(&xa->tok);
535                 if (circ->recorded == 0) {
536                         TAILQ_FOREACH(cscan, &xa->circq, entry) {
537                                 if (circ->weight < cscan->weight)
538                                         break;
539                         }
540                         if (cscan)
541                                 TAILQ_INSERT_BEFORE(cscan, circ, entry);
542                         else
543                                 TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
544                         circ->recorded = 1;
545                 }
546
547                 /*
548                  * Restart any deferred I/O.
549                  */
550                 xa_restart_deferred(xa);
551                 lwkt_reltoken(&xa->tok);
552                 break;
553         case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
554                 kprintf("XA: Received autodmsg: DELETE+REPLY\n");
555                 circ = msg->state->any.circ;
556                 lwkt_gettoken(&xa->tok);
557                 if (circ->recorded) {
558                         TAILQ_REMOVE(&xa->circq, circ, entry);
559                         circ->recorded = 0;
560                 }
561                 lwkt_reltoken(&xa->tok);
562                 break;
563         default:
564                 break;
565         }
566 }
567
568 static int
569 xa_rcvdmsg(kdmsg_msg_t *msg)
570 {
571         switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
572         case DMSG_DBG_SHELL:
573                 /*
574                  * Execute shell command (not supported atm).
575                  *
576                  * This is a one-way packet but if not (e.g. if part of
577                  * a streaming transaction), we will have already closed
578                  * our end.
579                  */
580                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
581                 break;
582         case DMSG_DBG_SHELL | DMSGF_REPLY:
583                 /*
584                  * Receive one or more replies to a shell command that we
585                  * sent.
586                  *
587                  * This is a one-way packet but if not (e.g. if part of
588                  * a streaming transaction), we will have already closed
589                  * our end.
590                  */
591                 if (msg->aux_data) {
592                         msg->aux_data[msg->aux_size - 1] = 0;
593                         kprintf("DEBUGMSG: %s\n", msg->aux_data);
594                 }
595                 break;
596         default:
597                 /*
598                  * Unsupported LNK message received.  We only need to
599                  * reply if it's a transaction in order to close our end.
600                  * Ignore any one-way messages are any further messages
601                  * associated with the transaction.
602                  *
603                  * NOTE: This case also includes DMSG_LNK_ERROR messages
604                  *       which might be one-way, replying to those would
605                  *       cause an infinite ping-pong.
606                  */
607                 if (msg->any.head.cmd & DMSGF_CREATE)
608                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
609                 break;
610         }
611         return(0);
612 }
613
614
615 /************************************************************************
616  *                         XA DEVICE INTERFACE                          *
617  ************************************************************************/
618
619 static int
620 xa_open(struct dev_open_args *ap)
621 {
622         cdev_t dev = ap->a_head.a_dev;
623         xa_softc_t *xa;
624         xa_tag_t *tag;
625         kdmsg_msg_t *msg;
626         int error;
627
628         dev->si_bsize_phys = 512;
629         dev->si_bsize_best = 32768;
630
631         /*
632          * Interlock open with opencnt, wait for attachment operations
633          * to finish.
634          */
635         lwkt_gettoken(&xdisk_token);
636 again:
637         xa = dev->si_drv1;
638         if (xa == NULL) {
639                 lwkt_reltoken(&xdisk_token);
640                 return ENXIO;   /* raced destruction */
641         }
642         if (xa->serializing) {
643                 tsleep(xa, 0, "xarace", hz / 10);
644                 goto again;
645         }
646
647         /*
648          * Serialize initial open
649          */
650         if (xa->opencnt++ > 0) {
651                 lwkt_reltoken(&xdisk_token);
652                 return(0);
653         }
654         xa->serializing = 1;
655         lwkt_reltoken(&xdisk_token);
656
657         kprintf("XA OPEN COMMAND\n");
658
659         tag = xa_setup_cmd(xa, NULL);
660         if (tag == NULL) {
661                 lwkt_gettoken(&xdisk_token);
662                 KKASSERT(xa->opencnt > 0);
663                 --xa->opencnt;
664                 xa->serializing = 0;
665                 xa_terminate_check(xa);
666                 lwkt_reltoken(&xdisk_token);
667                 return(ENXIO);
668         }
669         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
670                               DMSG_BLK_OPEN | DMSGF_CREATE,
671                               xa_sync_completion, tag);
672         msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
673         xa_start(tag, msg);
674         if (xa_wait(tag, 0) == 0) {
675                 kprintf("XA OPEN GOOD\n");
676                 xa->keyid = tag->status.keyid;
677                 xa->opentag = tag;      /* leave tag open */
678                 xa->serializing = 0;
679                 error = 0;
680         } else {
681                 kprintf("XA OPEN BAD\n");
682                 xa_done(tag, 0);
683                 lwkt_gettoken(&xdisk_token);
684                 KKASSERT(xa->opencnt > 0);
685                 --xa->opencnt;
686                 xa->serializing = 0;
687                 xa_terminate_check(xa);
688                 lwkt_reltoken(&xdisk_token);
689                 error = ENXIO;
690         }
691         return (error);
692 }
693
694 static int
695 xa_close(struct dev_close_args *ap)
696 {
697         cdev_t dev = ap->a_head.a_dev;
698         xa_softc_t *xa;
699         xa_tag_t *tag;
700
701         xa = dev->si_drv1;
702         if (xa == NULL)
703                 return ENXIO;   /* raced destruction */
704
705         lwkt_gettoken(&xa->tok);
706         if ((tag = xa->opentag) != NULL) {
707                 xa->opentag = NULL;
708                 kdmsg_state_reply(tag->state, DMSG_ERR_NOSUPP);
709                 while (tag->done == 0)
710                         xa_wait(tag, tag->waitseq);
711                 xa_done(tag, 0);
712         }
713         lwkt_reltoken(&xa->tok);
714
715         lwkt_gettoken(&xdisk_token);
716         KKASSERT(xa->opencnt > 0);
717         --xa->opencnt;
718         xa_terminate_check(xa);
719         lwkt_reltoken(&xdisk_token);
720
721         return(0);
722 }
723
724 static int
725 xa_strategy(struct dev_strategy_args *ap)
726 {
727         xa_softc_t *xa = ap->a_head.a_dev->si_drv1;
728         xa_tag_t *tag;
729         struct bio *bio = ap->a_bio;
730
731 #if 0
732         bio->bio_buf->b_error = ENXIO;
733         bio->bio_buf->b_flags |= B_ERROR;
734         biodone(bio);
735         return(0);
736 #endif
737
738         tag = xa_setup_cmd(xa, bio);
739         if (tag)
740                 xa_start(tag, NULL);
741         return(0);
742 }
743
744 static int
745 xa_ioctl(struct dev_ioctl_args *ap)
746 {
747         return(ENOTTY);
748 }
749
750 static int
751 xa_size(struct dev_psize_args *ap)
752 {
753         struct xa_softc *xa;
754
755         if ((xa = ap->a_head.a_dev->si_drv1) == NULL)
756                 return (ENXIO);
757         ap->a_result = xa->info.d_media_blocks;
758         return (0);
759 }
760
761 /************************************************************************
762  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
763  ************************************************************************
764  *
765  * Implement tag/msg setup and related functions.
766  */
767 static xa_tag_t *
768 xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
769 {
770         kdmsg_circuit_t *circ;
771         xa_tag_t *tag;
772
773         /*
774          * Only get a tag if we have a valid virtual circuit to the server.
775          */
776         lwkt_gettoken(&xa->tok);
777         if ((circ = TAILQ_FIRST(&xa->circq)) == NULL) {
778                 tag = NULL;
779         } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
780                 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
781                 tag->bio = bio;
782                 tag->circuit = circ->circ_state->msgid;
783         }
784
785         /*
786          * If we can't dispatch now and this is a bio, queue it for later.
787          */
788         if (tag == NULL && bio) {
789                 TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act);
790         }
791         lwkt_reltoken(&xa->tok);
792
793         return (tag);
794 }
795
796 static void
797 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
798 {
799         xa_softc_t *xa = tag->xa;
800
801         if (msg == NULL) {
802                 struct bio *bio;
803                 struct buf *bp;
804
805                 KKASSERT(tag->bio);
806                 bio = tag->bio;
807                 bp = bio->bio_buf;
808
809                 switch(bp->b_cmd) {
810                 case BUF_CMD_READ:
811                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
812                                               DMSG_BLK_READ |
813                                               DMSGF_CREATE | DMSGF_DELETE,
814                                               xa_bio_completion, tag);
815                         msg->any.blk_read.keyid = xa->keyid;
816                         msg->any.blk_read.offset = bio->bio_offset;
817                         msg->any.blk_read.bytes = bp->b_bcount;
818                         break;
819                 case BUF_CMD_WRITE:
820                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
821                                               DMSG_BLK_WRITE |
822                                               DMSGF_CREATE | DMSGF_DELETE,
823                                               xa_bio_completion, tag);
824                         msg->any.blk_write.keyid = xa->keyid;
825                         msg->any.blk_write.offset = bio->bio_offset;
826                         msg->any.blk_write.bytes = bp->b_bcount;
827                         msg->aux_data = bp->b_data;
828                         msg->aux_size = bp->b_bcount;
829                         break;
830                 case BUF_CMD_FLUSH:
831                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
832                                               DMSG_BLK_FLUSH |
833                                               DMSGF_CREATE | DMSGF_DELETE,
834                                               xa_bio_completion, tag);
835                         msg->any.blk_flush.keyid = xa->keyid;
836                         msg->any.blk_flush.offset = bio->bio_offset;
837                         msg->any.blk_flush.bytes = bp->b_bcount;
838                         break;
839                 case BUF_CMD_FREEBLKS:
840                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
841                                               DMSG_BLK_FREEBLKS |
842                                               DMSGF_CREATE | DMSGF_DELETE,
843                                               xa_bio_completion, tag);
844                         msg->any.blk_freeblks.keyid = xa->keyid;
845                         msg->any.blk_freeblks.offset = bio->bio_offset;
846                         msg->any.blk_freeblks.bytes = bp->b_bcount;
847                         break;
848                 default:
849                         bp->b_flags |= B_ERROR;
850                         bp->b_error = EIO;
851                         biodone(bio);
852                         tag->bio = NULL;
853                         break;
854                 }
855         }
856
857         tag->done = 0;
858         tag->waitseq = 0;
859         if (msg) {
860 #if 0
861                 lwkt_gettoken(&xa->tok);
862                 TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
863 #endif
864                 tag->state = msg->state;
865 #if 0
866                 lwkt_reltoken(&xa->tok);
867 #endif
868                 kdmsg_msg_write(msg);
869         } else {
870                 xa_done(tag, 1);
871         }
872 }
873
874 static uint32_t
875 xa_wait(xa_tag_t *tag, int seq)
876 {
877         xa_softc_t *xa = tag->xa;
878
879         lwkt_gettoken(&xa->tok);
880         while (tag->waitseq == seq)
881                 tsleep(tag, 0, "xawait", 0);
882         lwkt_reltoken(&xa->tok);
883         return (tag->status.head.error);
884 }
885
886 static void
887 xa_done(xa_tag_t *tag, int wasbio)
888 {
889         xa_softc_t *xa = tag->xa;
890         struct bio *bio;
891
892         KKASSERT(tag->bio == NULL);
893         tag->done = 1;
894
895         lwkt_gettoken(&xa->tok);
896         if ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
897                 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
898                 tag->bio = bio;
899                 lwkt_reltoken(&xa->tok);
900                 xa_start(tag, NULL);
901         } else {
902                 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
903                 lwkt_reltoken(&xa->tok);
904         }
905 }
906
907 static int
908 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
909 {
910         xa_tag_t *tag = state->any.any;
911         xa_softc_t *xa = tag->xa;
912
913         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
914         case DMSG_LNK_ERROR | DMSGF_REPLY:
915                 bzero(&tag->status, sizeof(tag->status));
916                 tag->status.head = msg->any.head;
917                 break;
918         case DMSG_BLK_ERROR | DMSGF_REPLY:
919                 tag->status = msg->any.blk_error;
920                 break;
921         }
922         kprintf("XA_SYNC_COMPLETION ERROR %u RESID %u\n",
923                 tag->status.head.error, tag->status.resid);
924         if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
925                 kdmsg_msg_reply(msg, 0);        /* terminate our side */
926                 tag->done = 1;
927         }
928         lwkt_gettoken(&xa->tok);
929         ++tag->waitseq;
930         lwkt_reltoken(&xa->tok);
931
932         wakeup(tag);
933
934         return (0);
935 }
936
937 static int
938 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
939 {
940         xa_tag_t *tag = state->any.any;
941         /*xa_softc_t *xa = tag->xa;*/
942         struct bio *bio;
943         struct buf *bp;
944
945         /*
946          * Get the bio from the tag.  If no bio is present we just do
947          * 'done' handling.
948          */
949         if ((bio = tag->bio) == NULL)
950                 goto handle_done;
951         bp = bio->bio_buf;
952
953         /*
954          * Process return status
955          */
956         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
957         case DMSG_LNK_ERROR | DMSGF_REPLY:
958                 bzero(&tag->status, sizeof(tag->status));
959                 tag->status.head = msg->any.head;
960                 if (tag->status.head.error)
961                         tag->status.resid = bp->b_bcount;
962                 else
963                         tag->status.resid = 0;
964                 break;
965         case DMSG_BLK_ERROR | DMSGF_REPLY:
966                 tag->status = msg->any.blk_error;
967                 break;
968         }
969         kprintf("XA_BIO_COMPLETION ERROR %u RESID %u\n",
970                 tag->status.head.error, tag->status.resid);
971
972         /*
973          * Process bio completion
974          *
975          * For reads any returned data is zero-extended if necessary, so
976          * the server can short-cut any all-zeros reads if it desires.
977          */
978         switch(bp->b_cmd) {
979         case BUF_CMD_READ:
980                 if (msg->aux_data && msg->aux_size) {
981                         if (msg->aux_size < bp->b_bcount) {
982                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
983                                 bzero(bp->b_data + msg->aux_size,
984                                       bp->b_bcount - msg->aux_size);
985                         } else {
986                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
987                         }
988                 } else {
989                         bzero(bp->b_data, bp->b_bcount);
990                 }
991                 /* fall through */
992         case BUF_CMD_WRITE:
993         case BUF_CMD_FLUSH:
994         case BUF_CMD_FREEBLKS:
995         default:
996                 if (tag->status.resid > bp->b_bcount)
997                         tag->status.resid = bp->b_bcount;
998                 bp->b_resid = tag->status.resid;
999                 if ((bp->b_error = tag->status.head.error) != 0) {
1000                         bp->b_flags |= B_ERROR;
1001                 } else {
1002                         bp->b_resid = 0;
1003                 }
1004                 biodone(bio);
1005                 tag->bio = NULL;
1006                 break;
1007         }
1008
1009         /*
1010          * Handle completion of the transaction.  If the bioq is not empty
1011          * we can initiate another bio on the same tag.
1012          */
1013 handle_done:
1014         if (msg->any.head.cmd & DMSGF_DELETE)
1015                 xa_done(tag, 1);
1016         return (0);
1017 }
1018
1019 /*
1020  * Restart as much deferred I/O as we can.
1021  *
1022  * Called with xa->tok held
1023  */
1024 static
1025 void
1026 xa_restart_deferred(xa_softc_t *xa)
1027 {
1028         struct bio *bio;
1029         xa_tag_t *tag;
1030
1031         while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
1032                 tag = xa_setup_cmd(xa, NULL);
1033                 if (tag == NULL)
1034                         break;
1035                 kprintf("xa: Restart BIO %p on %s\n",
1036                         bio, xa->iocom.auto_lnk_conn.fs_label);
1037                 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
1038                 tag->bio = bio;
1039                 xa_start(tag, NULL);
1040         }
1041 }