kernel - Preliminary xdisk remote block driver for cluster
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  */
47 #include <sys/param.h>
48 #include <sys/systm.h>
49 #include <sys/buf.h>
50 #include <sys/conf.h>
51 #include <sys/device.h>
52 #include <sys/devicestat.h>
53 #include <sys/disk.h>
54 #include <sys/kernel.h>
55 #include <sys/malloc.h>
56 #include <sys/sysctl.h>
57 #include <sys/proc.h>
58 #include <sys/queue.h>
59 #include <sys/udev.h>
60 #include <sys/uuid.h>
61 #include <sys/kern_syscall.h>
62
63 #include <sys/dmsg.h>
64 #include <sys/xdiskioctl.h>
65
66 #include <sys/buf2.h>
67 #include <sys/thread2.h>
68
69 struct xa_softc;
70
71 struct xa_tag {
72         TAILQ_ENTRY(xa_tag) entry;
73         struct xa_softc *xa;
74         dmsg_blk_error_t status;
75         kdmsg_state_t   *state;
76         struct bio      *bio;
77         uint64_t        circuit;
78         int             running;        /* transaction running */
79         int             waitseq;        /* streaming reply */
80         int             done;           /* final (transaction closed) */
81 };
82
83 typedef struct xa_tag   xa_tag_t;
84
85 struct xa_softc {
86         TAILQ_ENTRY(xa_softc) entry;
87         cdev_t          dev;
88         kdmsg_iocom_t   iocom;
89         struct xdisk_attach_ioctl xaioc;
90         struct disk_info info;
91         struct disk     disk;
92         uuid_t          pfs_fsid;
93         int             unit;
94         int             serializing;
95         int             attached;
96         int             opencnt;
97         uint64_t        keyid;
98         xa_tag_t        *opentag;
99         TAILQ_HEAD(, bio) bioq;
100         TAILQ_HEAD(, xa_tag) tag_freeq;
101         TAILQ_HEAD(, xa_tag) tag_pendq;
102         TAILQ_HEAD(, kdmsg_circuit) circq;
103         struct lwkt_token tok;
104 };
105
106 typedef struct xa_softc xa_softc_t;
107
108 #define MAXTAGS         64      /* no real limit */
109
110 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
111 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
112 static void xa_exit(kdmsg_iocom_t *iocom);
113 static void xa_terminate_check(struct xa_softc *xa);
114 static int xa_rcvdmsg(kdmsg_msg_t *msg);
115 static void xa_autodmsg(kdmsg_msg_t *msg);
116
117 static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio);
118 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg);
119 static uint32_t xa_wait(xa_tag_t *tag, int seq);
120 static void xa_done(xa_tag_t *tag, int wasbio);
121 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
122 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
123
124 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
125
126 /*
127  * Control device, issue ioctls to create xa devices.
128  */
129 static d_open_t xdisk_open;
130 static d_close_t xdisk_close;
131 static d_ioctl_t xdisk_ioctl;
132
133 static struct dev_ops xdisk_ops = {
134         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
135         .d_open =       xdisk_open,
136         .d_close =      xdisk_close,
137         .d_ioctl =      xdisk_ioctl
138 };
139
140 /*
141  * XA disk devices
142  */
143 static d_open_t xa_open;
144 static d_close_t xa_close;
145 static d_ioctl_t xa_ioctl;
146 static d_strategy_t xa_strategy;
147 static d_psize_t xa_size;
148
149 static struct dev_ops xa_ops = {
150         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
151         .d_open =       xa_open,
152         .d_close =      xa_close,
153         .d_ioctl =      xa_ioctl,
154         .d_read =       physread,
155         .d_write =      physwrite,
156         .d_strategy =   xa_strategy,
157         .d_psize =      xa_size
158 };
159
160 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
161 static int xdisk_opencount;
162 static cdev_t xdisk_dev;
163 static TAILQ_HEAD(, xa_softc) xa_queue;
164
165 /*
166  * Module initialization
167  */
168 static int
169 xdisk_modevent(module_t mod, int type, void *data)
170 {
171         switch (type) {
172         case MOD_LOAD:
173                 TAILQ_INIT(&xa_queue);
174                 xdisk_dev = make_dev(&xdisk_ops, 0,
175                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
176                 break;
177         case MOD_UNLOAD:
178         case MOD_SHUTDOWN:
179                 if (xdisk_opencount || TAILQ_FIRST(&xa_queue))
180                         return (EBUSY);
181                 if (xdisk_dev) {
182                         destroy_dev(xdisk_dev);
183                         xdisk_dev = NULL;
184                 }
185                 dev_ops_remove_all(&xdisk_ops);
186                 dev_ops_remove_all(&xa_ops);
187                 break;
188         default:
189                 break;
190         }
191         return 0;
192 }
193
194 DEV_MODULE(xdisk, xdisk_modevent, 0);
195
196 /*
197  * Control device
198  */
199 static int
200 xdisk_open(struct dev_open_args *ap)
201 {
202         kprintf("XDISK_OPEN\n");
203         lwkt_gettoken(&xdisk_token);
204         ++xdisk_opencount;
205         lwkt_reltoken(&xdisk_token);
206         return(0);
207 }
208
209 static int
210 xdisk_close(struct dev_close_args *ap)
211 {
212         kprintf("XDISK_CLOSE\n");
213         lwkt_gettoken(&xdisk_token);
214         --xdisk_opencount;
215         lwkt_reltoken(&xdisk_token);
216         return(0);
217 }
218
219 static int
220 xdisk_ioctl(struct dev_ioctl_args *ap)
221 {
222         int error;
223
224         switch(ap->a_cmd) {
225         case XDISKIOCATTACH:
226                 error = xdisk_attach((void *)ap->a_data);
227                 break;
228         case XDISKIOCDETACH:
229                 error = xdisk_detach((void *)ap->a_data);
230                 break;
231         default:
232                 error = ENOTTY;
233                 break;
234         }
235         return error;
236 }
237
238 /************************************************************************
239  *                              DMSG INTERFACE                          *
240  ************************************************************************/
241
242 static int
243 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
244 {
245         xa_softc_t *xa;
246         xa_tag_t *tag;
247         struct file *fp;
248         int unit;
249         int n;
250         char devname[64];
251         cdev_t dev;
252
253         kprintf("xdisk attach %d %jd/%d %s %s\n",
254                 xaioc->fd, (intmax_t)xaioc->bytes, xaioc->blksize,
255                 xaioc->cl_label, xaioc->fs_label);
256
257         /*
258          * Normalize ioctl params
259          */
260         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
261         if (fp == NULL)
262                 return EINVAL;
263         if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0)
264                 return EINVAL;
265         if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0)
266                 return EINVAL;
267         if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE)
268                 return EINVAL;
269
270         /*
271          * See if the serial number is already present.  If we are
272          * racing a termination the disk subsystem may still have
273          * duplicate entries not yet removed so we wait a bit and
274          * retry.
275          */
276         lwkt_gettoken(&xdisk_token);
277 again:
278         TAILQ_FOREACH(xa, &xa_queue, entry) {
279                 if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
280                            xaioc->fs_label) == 0) {
281                         if (xa->serializing) {
282                                 tsleep(xa, 0, "xadelay", hz / 10);
283                                 goto again;
284                         }
285                         xa->serializing = 1;
286                         kdmsg_iocom_uninit(&xa->iocom);
287                         break;
288                 }
289         }
290
291         /*
292          * Create a new xa if not already present
293          */
294         if (xa == NULL) {
295                 unit = 0;
296                 for (;;) {
297                         TAILQ_FOREACH(xa, &xa_queue, entry) {
298                                 if (xa->unit == unit)
299                                         break;
300                         }
301                         if (xa == NULL)
302                                 break;
303                         ++unit;
304                 }
305                 xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
306                 kprintf("ALLOCATE XA %p\n", xa);
307                 xa->unit = unit;
308                 xa->serializing = 1;
309                 lwkt_token_init(&xa->tok, "xa");
310                 TAILQ_INIT(&xa->circq);
311                 TAILQ_INIT(&xa->bioq);
312                 TAILQ_INIT(&xa->tag_freeq);
313                 TAILQ_INIT(&xa->tag_pendq);
314                 for (n = 0; n < MAXTAGS; ++n) {
315                         tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO);
316                         tag->xa = xa;
317                         TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
318                 }
319                 TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
320         }
321         xa->xaioc = *xaioc;
322         xa->attached = 1;
323         lwkt_reltoken(&xdisk_token);
324
325         /*
326          * Create device
327          */
328         if (xa->dev == NULL) {
329                 dev = disk_create(unit, &xa->disk, &xa_ops);
330                 dev->si_drv1 = xa;
331                 xa->dev = dev;
332         }
333
334         xa->info.d_media_blksize = xaioc->blksize;
335         xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize;
336         xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
337         xa->info.d_secpertrack = 32;
338         xa->info.d_nheads = 64;
339         xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads;
340         xa->info.d_ncylinders = 0;
341         if (xa->xaioc.fs_label[0])
342                 xa->info.d_serialno = xa->xaioc.fs_label;
343
344         /*
345          * Set up messaging connection
346          */
347         ksnprintf(devname, sizeof(devname), "xa%d", unit);
348         kdmsg_iocom_init(&xa->iocom, xa,
349                          KDMSG_IOCOMF_AUTOCONN |
350                          KDMSG_IOCOMF_AUTOSPAN |
351                          KDMSG_IOCOMF_AUTOCIRC |
352                          KDMSG_IOCOMF_AUTOFORGE,
353                          M_XDISK, xa_rcvdmsg);
354         xa->iocom.exit_func = xa_exit;
355
356         kdmsg_iocom_reconnect(&xa->iocom, fp, devname);
357
358         /*
359          * Setup our LNK_CONN advertisement for autoinitiate.
360          *
361          * Our filter is setup to only accept PEER_BLOCK/SERVER
362          * advertisements.
363          */
364         xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
365         xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
366         xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
367         xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
368         xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
369         ksnprintf(xa->iocom.auto_lnk_conn.cl_label,
370                   sizeof(xa->iocom.auto_lnk_conn.cl_label),
371                   "%s", xaioc->cl_label);
372
373         /*
374          * We need a unique pfs_fsid to avoid confusion.
375          * We supply a rendezvous fs_label using the serial number.
376          */
377         kern_uuidgen(&xa->pfs_fsid, 1);
378         xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid;
379         ksnprintf(xa->iocom.auto_lnk_conn.fs_label,
380                   sizeof(xa->iocom.auto_lnk_conn.fs_label),
381                   "%s", xaioc->fs_label);
382
383         /*
384          * Setup our LNK_SPAN advertisement for autoinitiate
385          */
386         xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT;
387         xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
388         xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
389         ksnprintf(xa->iocom.auto_lnk_span.cl_label,
390                   sizeof(xa->iocom.auto_lnk_span.cl_label),
391                   "%s", xa->xaioc.cl_label);
392
393         kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg);
394         disk_setdiskinfo_sync(&xa->disk, &xa->info);
395
396         lwkt_gettoken(&xdisk_token);
397         xa->serializing = 0;
398         xa_terminate_check(xa);
399         lwkt_reltoken(&xdisk_token);
400
401         return(0);
402 }
403
404 static int
405 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
406 {
407         struct xa_softc *xa;
408
409         lwkt_gettoken(&xdisk_token);
410         for (;;) {
411                 TAILQ_FOREACH(xa, &xa_queue, entry) {
412                         if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
413                                    xaioc->fs_label) == 0) {
414                                 break;
415                         }
416                 }
417                 if (xa == NULL || xa->serializing == 0) {
418                         xa->serializing = 1;
419                         break;
420                 }
421                 tsleep(xa, 0, "xadet", hz / 10);
422         }
423         if (xa) {
424                 kprintf("DETACHING XA\n");
425                 kdmsg_iocom_uninit(&xa->iocom);
426                 xa->serializing = 0;
427         }
428         lwkt_reltoken(&xdisk_token);
429         return(0);
430 }
431
432 /*
433  * Called from iocom core transmit thread upon disconnect.
434  */
435 static
436 void
437 xa_exit(kdmsg_iocom_t *iocom)
438 {
439         struct xa_softc *xa = iocom->handle;
440
441         kprintf("XA_EXIT UNIT %d\n", xa->unit);
442
443         if (xa->serializing == 0)
444                 kdmsg_iocom_uninit(iocom);
445
446         /*
447          * If the drive is not in use and no longer attach it can be
448          * destroyed.
449          */
450         lwkt_gettoken(&xdisk_token);
451         xa->attached = 0;
452         xa_terminate_check(xa);
453         lwkt_reltoken(&xdisk_token);
454 }
455
456 /*
457  * Determine if we can destroy the xa_softc.
458  *
459  * Called with xdisk_token held.
460  */
461 static
462 void
463 xa_terminate_check(struct xa_softc *xa)
464 {
465         xa_tag_t *tag;
466
467         if (xa->opencnt || xa->attached || xa->serializing)
468                 return;
469         xa->serializing = 1;
470         kprintf("TERMINATE XA %p %d\n", xa, xa->unit);
471         kdmsg_iocom_uninit(&xa->iocom);
472         if (xa->dev) {
473                 disk_destroy(&xa->disk);
474                 xa->dev->si_drv1 = NULL;
475                 xa->dev = NULL;
476         }
477         kprintf("REMOVEQ   XA %p %d\n", xa, xa->unit);
478         KKASSERT(xa->opencnt == 0 && xa->attached == 0);
479         kprintf("IOCOMUN   XA %p %d\n", xa, xa->unit);
480         while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
481                 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
482                 tag->xa = NULL;
483                 kfree(tag, M_XDISK);
484         }
485         KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
486         TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
487         kfree(xa, M_XDISK);
488         kprintf("xa_close: destroy unreferenced disk\n");
489 }
490
491 /*
492  * Shim to catch and record virtual circuit events.
493  */
494 static void
495 xa_autodmsg(kdmsg_msg_t *msg)
496 {
497         struct xa_softc *xa = msg->iocom->handle;
498         kdmsg_circuit_t *circ;
499         kdmsg_circuit_t *cscan;
500         uint32_t xcmd;
501
502         /*
503          * Because this is just a shim we don't have a state callback for
504          * the transactions we are sniffing, so make things easier by
505          * calculating the original command along with the current message's
506          * flags.  This is because transactions are made up of numerous
507          * messages and only the first typically specifies the actual command.
508          */
509         if (msg->state) {
510                 xcmd = msg->state->icmd |
511                        (msg->any.head.cmd & (DMSGF_CREATE |
512                                              DMSGF_DELETE |
513                                              DMSGF_REPLY));
514         } else {
515                 xcmd = msg->any.head.cmd;
516         }
517
518         /*
519          * Add or remove a circuit, sorted by weight (lower numbers are
520          * better).
521          */
522         switch(xcmd) {
523         case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
524                 kprintf("XA: Received autodmsg: CREATE+REPLY\n");
525                 circ = msg->state->any.circ;
526                 lwkt_gettoken(&xa->tok);
527                 if (circ->recorded == 0) {
528                         TAILQ_FOREACH(cscan, &xa->circq, entry) {
529                                 if (circ->weight < cscan->weight)
530                                         break;
531                         }
532                         if (cscan)
533                                 TAILQ_INSERT_BEFORE(cscan, circ, entry);
534                         else
535                                 TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
536                         circ->recorded = 1;
537                 }
538                 lwkt_reltoken(&xa->tok);
539                 break;
540         case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
541                 kprintf("XA: Received autodmsg: DELETE+REPLY\n");
542                 circ = msg->state->any.circ;
543                 lwkt_gettoken(&xa->tok);
544                 if (circ->recorded) {
545                         TAILQ_REMOVE(&xa->circq, circ, entry);
546                         circ->recorded = 0;
547                 }
548                 lwkt_reltoken(&xa->tok);
549                 break;
550         default:
551                 break;
552         }
553 }
554
555 static int
556 xa_rcvdmsg(kdmsg_msg_t *msg)
557 {
558         switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
559         case DMSG_DBG_SHELL:
560                 /*
561                  * Execute shell command (not supported atm).
562                  *
563                  * This is a one-way packet but if not (e.g. if part of
564                  * a streaming transaction), we will have already closed
565                  * our end.
566                  */
567                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
568                 break;
569         case DMSG_DBG_SHELL | DMSGF_REPLY:
570                 /*
571                  * Receive one or more replies to a shell command that we
572                  * sent.
573                  *
574                  * This is a one-way packet but if not (e.g. if part of
575                  * a streaming transaction), we will have already closed
576                  * our end.
577                  */
578                 if (msg->aux_data) {
579                         msg->aux_data[msg->aux_size - 1] = 0;
580                         kprintf("DEBUGMSG: %s\n", msg->aux_data);
581                 }
582                 break;
583         default:
584                 /*
585                  * Unsupported LNK message received.  We only need to
586                  * reply if it's a transaction in order to close our end.
587                  * Ignore any one-way messages are any further messages
588                  * associated with the transaction.
589                  *
590                  * NOTE: This case also includes DMSG_LNK_ERROR messages
591                  *       which might be one-way, replying to those would
592                  *       cause an infinite ping-pong.
593                  */
594                 if (msg->any.head.cmd & DMSGF_CREATE)
595                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
596                 break;
597         }
598         return(0);
599 }
600
601
602 /************************************************************************
603  *                         XA DEVICE INTERFACE                          *
604  ************************************************************************/
605
606 static int
607 xa_open(struct dev_open_args *ap)
608 {
609         cdev_t dev = ap->a_head.a_dev;
610         xa_softc_t *xa;
611         xa_tag_t *tag;
612         kdmsg_msg_t *msg;
613         int error;
614
615         dev->si_bsize_phys = 512;
616         dev->si_bsize_best = 32768;
617
618         /*
619          * Interlock open with opencnt, wait for attachment operations
620          * to finish.
621          */
622         lwkt_gettoken(&xdisk_token);
623 again:
624         xa = dev->si_drv1;
625         if (xa == NULL) {
626                 lwkt_reltoken(&xdisk_token);
627                 return ENXIO;   /* raced destruction */
628         }
629         if (xa->serializing) {
630                 tsleep(xa, 0, "xarace", hz / 10);
631                 goto again;
632         }
633
634         /*
635          * Serialize initial open
636          */
637         if (xa->opencnt++ > 0) {
638                 lwkt_reltoken(&xdisk_token);
639                 return(0);
640         }
641         xa->serializing = 1;
642         lwkt_reltoken(&xdisk_token);
643
644         kprintf("XA OPEN COMMAND\n");
645
646         tag = xa_setup_cmd(xa, NULL);
647         if (tag == NULL) {
648                 lwkt_gettoken(&xdisk_token);
649                 KKASSERT(xa->opencnt > 0);
650                 --xa->opencnt;
651                 xa->serializing = 0;
652                 xa_terminate_check(xa);
653                 lwkt_reltoken(&xdisk_token);
654                 return(ENXIO);
655         }
656         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
657                               DMSG_BLK_OPEN | DMSGF_CREATE,
658                               xa_sync_completion, tag);
659         msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
660         xa_start(tag, msg);
661         if (xa_wait(tag, 0) == 0) {
662                 kprintf("XA OPEN GOOD\n");
663                 xa->keyid = tag->status.keyid;
664                 xa->opentag = tag;      /* leave tag open */
665                 xa->serializing = 0;
666                 error = 0;
667         } else {
668                 kprintf("XA OPEN BAD\n");
669                 xa_done(tag, 0);
670                 lwkt_gettoken(&xdisk_token);
671                 KKASSERT(xa->opencnt > 0);
672                 --xa->opencnt;
673                 xa->serializing = 0;
674                 xa_terminate_check(xa);
675                 lwkt_reltoken(&xdisk_token);
676                 error = ENXIO;
677         }
678         return (error);
679 }
680
681 static int
682 xa_close(struct dev_close_args *ap)
683 {
684         cdev_t dev = ap->a_head.a_dev;
685         xa_softc_t *xa;
686         xa_tag_t *tag;
687
688         xa = dev->si_drv1;
689         if (xa == NULL)
690                 return ENXIO;   /* raced destruction */
691
692         lwkt_gettoken(&xa->tok);
693         if ((tag = xa->opentag) != NULL) {
694                 xa->opentag = NULL;
695                 kdmsg_state_reply(tag->state, DMSG_ERR_NOSUPP);
696                 while (tag->done == 0)
697                         xa_wait(tag, tag->waitseq);
698                 xa_done(tag, 0);
699         }
700         lwkt_reltoken(&xa->tok);
701
702         lwkt_gettoken(&xdisk_token);
703         KKASSERT(xa->opencnt > 0);
704         --xa->opencnt;
705         xa_terminate_check(xa);
706         lwkt_reltoken(&xdisk_token);
707
708         return(0);
709 }
710
711 static int
712 xa_strategy(struct dev_strategy_args *ap)
713 {
714         xa_softc_t *xa = ap->a_head.a_dev->si_drv1;
715         xa_tag_t *tag;
716         struct bio *bio = ap->a_bio;
717
718         bio->bio_buf->b_error = ENXIO;
719         bio->bio_buf->b_flags |= B_ERROR;
720         biodone(bio);
721         return(0);
722
723         tag = xa_setup_cmd(xa, bio);
724         if (tag)
725                 xa_start(tag, NULL);
726         return(0);
727 }
728
729 static int
730 xa_ioctl(struct dev_ioctl_args *ap)
731 {
732         return(ENOTTY);
733 }
734
735 static int
736 xa_size(struct dev_psize_args *ap)
737 {
738         struct xa_softc *xa;
739
740         if ((xa = ap->a_head.a_dev->si_drv1) == NULL)
741                 return (ENXIO);
742         ap->a_result = xa->info.d_media_blocks;
743         return (0);
744 }
745
746 /************************************************************************
747  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
748  ************************************************************************
749  *
750  * Implement tag/msg setup and related functions.
751  */
752 static xa_tag_t *
753 xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
754 {
755         kdmsg_circuit_t *circ;
756         xa_tag_t *tag;
757
758         /*
759          * Only get a tag if we have a valid virtual circuit to the server.
760          */
761         lwkt_gettoken(&xa->tok);
762         if ((circ = TAILQ_FIRST(&xa->circq)) == NULL) {
763                 tag = NULL;
764         } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
765                 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
766                 TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
767                 tag->bio = bio;
768                 tag->circuit = circ->circ_state->msgid;
769         }
770
771         /*
772          * If we can't dispatch now and this is a bio, queue it for later.
773          */
774         if (tag == NULL && bio) {
775                 TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act);
776         }
777         lwkt_reltoken(&xa->tok);
778
779         return (tag);
780 }
781
782 static void
783 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
784 {
785         xa_softc_t *xa = tag->xa;
786
787         if (msg == NULL) {
788                 struct bio *bio;
789                 struct buf *bp;
790
791                 KKASSERT(tag->bio);
792                 bio = tag->bio;
793                 bp = bio->bio_buf;
794
795                 switch(bp->b_cmd) {
796                 case BUF_CMD_READ:
797                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
798                                               DMSG_BLK_READ |
799                                               DMSGF_CREATE | DMSGF_DELETE,
800                                               xa_bio_completion, tag);
801                         msg->any.blk_read.keyid = xa->keyid;
802                         msg->any.blk_read.offset = bio->bio_offset;
803                         msg->any.blk_read.bytes = bp->b_bcount;
804                         break;
805                 case BUF_CMD_WRITE:
806                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
807                                               DMSG_BLK_WRITE |
808                                               DMSGF_CREATE | DMSGF_DELETE,
809                                               xa_bio_completion, tag);
810                         msg->any.blk_write.keyid = xa->keyid;
811                         msg->any.blk_write.offset = bio->bio_offset;
812                         msg->any.blk_write.bytes = bp->b_bcount;
813                         msg->aux_data = bp->b_data;
814                         msg->aux_size = bp->b_bcount;
815                         break;
816                 case BUF_CMD_FLUSH:
817                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
818                                               DMSG_BLK_FLUSH |
819                                               DMSGF_CREATE | DMSGF_DELETE,
820                                               xa_bio_completion, tag);
821                         msg->any.blk_flush.keyid = xa->keyid;
822                         msg->any.blk_flush.offset = bio->bio_offset;
823                         msg->any.blk_flush.bytes = bp->b_bcount;
824                         break;
825                 case BUF_CMD_FREEBLKS:
826                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
827                                               DMSG_BLK_FREEBLKS |
828                                               DMSGF_CREATE | DMSGF_DELETE,
829                                               xa_bio_completion, tag);
830                         msg->any.blk_freeblks.keyid = xa->keyid;
831                         msg->any.blk_freeblks.offset = bio->bio_offset;
832                         msg->any.blk_freeblks.bytes = bp->b_bcount;
833                         break;
834                 default:
835                         bp->b_flags |= B_ERROR;
836                         bp->b_error = EIO;
837                         biodone(bio);
838                         tag->bio = NULL;
839                         break;
840                 }
841         }
842
843         tag->done = 0;
844         tag->waitseq = 0;
845         if (msg) {
846                 tag->state = msg->state;
847                 kdmsg_msg_write(msg);
848         } else {
849                 xa_done(tag, 1);
850         }
851 }
852
853 static uint32_t
854 xa_wait(xa_tag_t *tag, int seq)
855 {
856         xa_softc_t *xa = tag->xa;
857
858         lwkt_gettoken(&xa->tok);
859         while (tag->waitseq == seq)
860                 tsleep(tag, 0, "xawait", 0);
861         lwkt_reltoken(&xa->tok);
862         return (tag->status.head.error);
863 }
864
865 static void
866 xa_done(xa_tag_t *tag, int wasbio)
867 {
868         xa_softc_t *xa = tag->xa;
869         struct bio *bio;
870
871         KKASSERT(tag->msg == NULL);
872         KKASSERT(tag->bio == NULL);
873         tag->done = 1;
874
875         lwkt_gettoken(&xa->tok);
876         if ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
877                 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
878                 tag->bio = bio;
879                 lwkt_reltoken(&xa->tok);
880                 xa_start(tag, NULL);
881         } else {
882                 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
883                 lwkt_reltoken(&xa->tok);
884         }
885 }
886
887 static int
888 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
889 {
890         xa_tag_t *tag = state->any.any;
891         xa_softc_t *xa = tag->xa;
892
893         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
894         case DMSG_LNK_ERROR | DMSGF_REPLY:
895                 bzero(&tag->status, sizeof(tag->status));
896                 tag->status.head = msg->any.head;
897                 break;
898         case DMSG_BLK_ERROR | DMSGF_REPLY:
899                 tag->status = msg->any.blk_error;
900                 break;
901         }
902         kprintf("XA_SYNC_COMPLETION ERROR %u RESID %u\n",
903                 tag->status.head.error, tag->status.resid);
904         if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
905                 kdmsg_msg_reply(msg, 0);        /* terminate our side */
906                 tag->done = 1;
907         }
908         lwkt_gettoken(&xa->tok);
909         ++tag->waitseq;
910         lwkt_reltoken(&xa->tok);
911
912         wakeup(tag);
913
914         return (0);
915 }
916
917 static int
918 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
919 {
920         xa_tag_t *tag = state->any.any;
921         /*xa_softc_t *xa = tag->xa;*/
922         struct bio *bio;
923         struct buf *bp;
924
925         /*
926          * Get the bio from the tag.  If no bio is present we just do
927          * 'done' handling.
928          */
929         if ((bio = tag->bio) == NULL)
930                 goto handle_done;
931         bp = bio->bio_buf;
932
933         /*
934          * Process return status
935          */
936         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
937         case DMSG_LNK_ERROR | DMSGF_REPLY:
938                 bzero(&tag->status, sizeof(tag->status));
939                 tag->status.head = msg->any.head;
940                 if (tag->status.head.error)
941                         tag->status.resid = bp->b_bcount;
942                 else
943                         tag->status.resid = 0;
944                 break;
945         case DMSG_BLK_ERROR | DMSGF_REPLY:
946                 tag->status = msg->any.blk_error;
947                 break;
948         }
949         kprintf("XA_BIO_COMPLETION ERROR %u RESID %u\n",
950                 tag->status.head.error, tag->status.resid);
951
952         /*
953          * Process bio completion
954          *
955          * For reads any returned data is zero-extended if necessary, so
956          * the server can short-cut any all-zeros reads if it desires.
957          */
958         switch(bp->b_cmd) {
959         case BUF_CMD_READ:
960                 if (msg->aux_data && msg->aux_size) {
961                         if (msg->aux_size < bp->b_bcount) {
962                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
963                                 bzero(bp->b_data + msg->aux_size,
964                                       bp->b_bcount - msg->aux_size);
965                         } else {
966                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
967                         }
968                 } else {
969                         bzero(bp->b_data, bp->b_bcount);
970                 }
971                 /* fall through */
972         case BUF_CMD_WRITE:
973         case BUF_CMD_FLUSH:
974         case BUF_CMD_FREEBLKS:
975         default:
976                 if (tag->status.resid > bp->b_bcount)
977                         tag->status.resid = bp->b_bcount;
978                 bp->b_resid = tag->status.resid;
979                 if ((bp->b_error = tag->status.head.error) != 0) {
980                         bp->b_flags |= B_ERROR;
981                 } else {
982                         bp->b_resid = 0;
983                 }
984                 biodone(bio);
985                 tag->bio = NULL;
986                 break;
987         }
988
989         /*
990          * Handle completion of the transaction.  If the bioq is not empty
991          * we can initiate another bio on the same tag.
992          */
993 handle_done:
994         if (msg->any.head.cmd & DMSGF_DELETE)
995                 xa_done(tag, 1);
996         return (0);
997 }