hammer2 - more dmsg/separation work
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
1 /*
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * This module allows disk devices to be created and associated with a
36  * communications pipe or socket.  You open the device and issue an
37  * ioctl() to install a new disk along with its communications descriptor.
38  *
39  * All further communication occurs via the descriptor using the DMSG
40  * LNK_CONN, LNK_SPAN, and BLOCK protocols.  The descriptor can be a
41  * direct connection to a remote machine's disk (in-kernenl), to a remote
42  * cluster controller, to the local cluster controller, etc.
43  *
44  * /dev/xdisk is the control device, issue ioctl()s to create the /dev/xa%d
45  * devices.  These devices look like raw disks to the system.
46  *
47  * TODO:
48  *      Handle circuit disconnects, leave bio's pending
49  *      Restart bio's on circuit reconnect.
50  */
51 #include <sys/param.h>
52 #include <sys/systm.h>
53 #include <sys/buf.h>
54 #include <sys/conf.h>
55 #include <sys/device.h>
56 #include <sys/devicestat.h>
57 #include <sys/disk.h>
58 #include <sys/kernel.h>
59 #include <sys/malloc.h>
60 #include <sys/sysctl.h>
61 #include <sys/proc.h>
62 #include <sys/queue.h>
63 #include <sys/udev.h>
64 #include <sys/uuid.h>
65 #include <sys/kern_syscall.h>
66
67 #include <sys/dmsg.h>
68 #include <sys/xdiskioctl.h>
69
70 #include <sys/buf2.h>
71 #include <sys/thread2.h>
72
73 struct xa_softc;
74
75 struct xa_tag {
76         TAILQ_ENTRY(xa_tag) entry;
77         struct xa_softc *xa;
78         dmsg_blk_error_t status;
79         kdmsg_state_t   *state;
80         kdmsg_circuit_t *circ;
81         struct bio      *bio;
82         int             running;        /* transaction running */
83         int             waitseq;        /* streaming reply */
84         int             done;           /* final (transaction closed) */
85 };
86
87 typedef struct xa_tag   xa_tag_t;
88
89 struct xa_softc {
90         TAILQ_ENTRY(xa_softc) entry;
91         cdev_t          dev;
92         kdmsg_iocom_t   iocom;
93         struct xdisk_attach_ioctl xaioc;
94         struct disk_info info;
95         struct disk     disk;
96         uuid_t          pfs_fsid;
97         int             unit;
98         int             serializing;
99         int             attached;
100         int             opencnt;
101         uint64_t        keyid;
102         xa_tag_t        *opentag;
103         TAILQ_HEAD(, bio) bioq;
104         TAILQ_HEAD(, xa_tag) tag_freeq;
105         TAILQ_HEAD(, xa_tag) tag_pendq;
106         TAILQ_HEAD(, kdmsg_circuit) circq;
107         struct lwkt_token tok;
108 };
109
110 typedef struct xa_softc xa_softc_t;
111
112 #define MAXTAGS         64      /* no real limit */
113
114 static int xdisk_attach(struct xdisk_attach_ioctl *xaioc);
115 static int xdisk_detach(struct xdisk_attach_ioctl *xaioc);
116 static void xa_exit(kdmsg_iocom_t *iocom);
117 static void xa_terminate_check(struct xa_softc *xa);
118 static int xa_rcvdmsg(kdmsg_msg_t *msg);
119 static void xa_autodmsg(kdmsg_msg_t *msg);
120
121 static xa_tag_t *xa_setup_cmd(xa_softc_t *xa, struct bio *bio);
122 static void xa_start(xa_tag_t *tag, kdmsg_msg_t *msg);
123 static uint32_t xa_wait(xa_tag_t *tag, int seq);
124 static void xa_done(xa_tag_t *tag, int wasbio);
125 static int xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
126 static int xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg);
127 static void xa_restart_deferred(xa_softc_t *xa);
128
129 MALLOC_DEFINE(M_XDISK, "Networked disk client", "Network Disks");
130
131 /*
132  * Control device, issue ioctls to create xa devices.
133  */
134 static d_open_t xdisk_open;
135 static d_close_t xdisk_close;
136 static d_ioctl_t xdisk_ioctl;
137
138 static struct dev_ops xdisk_ops = {
139         { "xdisk", 0, D_MPSAFE | D_TRACKCLOSE },
140         .d_open =       xdisk_open,
141         .d_close =      xdisk_close,
142         .d_ioctl =      xdisk_ioctl
143 };
144
145 /*
146  * XA disk devices
147  */
148 static d_open_t xa_open;
149 static d_close_t xa_close;
150 static d_ioctl_t xa_ioctl;
151 static d_strategy_t xa_strategy;
152 static d_psize_t xa_size;
153
154 static struct dev_ops xa_ops = {
155         { "xa", 0, D_DISK | D_CANFREE | D_MPSAFE | D_TRACKCLOSE },
156         .d_open =       xa_open,
157         .d_close =      xa_close,
158         .d_ioctl =      xa_ioctl,
159         .d_read =       physread,
160         .d_write =      physwrite,
161         .d_strategy =   xa_strategy,
162         .d_psize =      xa_size
163 };
164
165 static struct lwkt_token xdisk_token = LWKT_TOKEN_INITIALIZER(xdisk_token);
166 static int xdisk_opencount;
167 static cdev_t xdisk_dev;
168 static TAILQ_HEAD(, xa_softc) xa_queue;
169
170 /*
171  * Module initialization
172  */
173 static int
174 xdisk_modevent(module_t mod, int type, void *data)
175 {
176         switch (type) {
177         case MOD_LOAD:
178                 TAILQ_INIT(&xa_queue);
179                 xdisk_dev = make_dev(&xdisk_ops, 0,
180                                      UID_ROOT, GID_WHEEL, 0600, "xdisk");
181                 break;
182         case MOD_UNLOAD:
183         case MOD_SHUTDOWN:
184                 if (xdisk_opencount || TAILQ_FIRST(&xa_queue))
185                         return (EBUSY);
186                 if (xdisk_dev) {
187                         destroy_dev(xdisk_dev);
188                         xdisk_dev = NULL;
189                 }
190                 dev_ops_remove_all(&xdisk_ops);
191                 dev_ops_remove_all(&xa_ops);
192                 break;
193         default:
194                 break;
195         }
196         return 0;
197 }
198
199 DEV_MODULE(xdisk, xdisk_modevent, 0);
200
201 /*
202  * Control device
203  */
204 static int
205 xdisk_open(struct dev_open_args *ap)
206 {
207         lwkt_gettoken(&xdisk_token);
208         ++xdisk_opencount;
209         lwkt_reltoken(&xdisk_token);
210         return(0);
211 }
212
213 static int
214 xdisk_close(struct dev_close_args *ap)
215 {
216         lwkt_gettoken(&xdisk_token);
217         --xdisk_opencount;
218         lwkt_reltoken(&xdisk_token);
219         return(0);
220 }
221
222 static int
223 xdisk_ioctl(struct dev_ioctl_args *ap)
224 {
225         int error;
226
227         switch(ap->a_cmd) {
228         case XDISKIOCATTACH:
229                 error = xdisk_attach((void *)ap->a_data);
230                 break;
231         case XDISKIOCDETACH:
232                 error = xdisk_detach((void *)ap->a_data);
233                 break;
234         default:
235                 error = ENOTTY;
236                 break;
237         }
238         return error;
239 }
240
241 /************************************************************************
242  *                              DMSG INTERFACE                          *
243  ************************************************************************/
244
245 static int
246 xdisk_attach(struct xdisk_attach_ioctl *xaioc)
247 {
248         xa_softc_t *xa;
249         xa_tag_t *tag;
250         struct file *fp;
251         int unit;
252         int n;
253         char devname[64];
254         cdev_t dev;
255
256         /*
257          * Normalize ioctl params
258          */
259         fp = holdfp(curproc->p_fd, xaioc->fd, -1);
260         if (fp == NULL)
261                 return EINVAL;
262         if (xaioc->cl_label[sizeof(xaioc->cl_label) - 1] != 0)
263                 return EINVAL;
264         if (xaioc->fs_label[sizeof(xaioc->fs_label) - 1] != 0)
265                 return EINVAL;
266         if (xaioc->blksize < DEV_BSIZE || xaioc->blksize > MAXBSIZE)
267                 return EINVAL;
268
269         /*
270          * See if the serial number is already present.  If we are
271          * racing a termination the disk subsystem may still have
272          * duplicate entries not yet removed so we wait a bit and
273          * retry.
274          */
275         lwkt_gettoken(&xdisk_token);
276 again:
277         TAILQ_FOREACH(xa, &xa_queue, entry) {
278                 if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
279                            xaioc->fs_label) == 0) {
280                         if (xa->serializing) {
281                                 tsleep(xa, 0, "xadelay", hz / 10);
282                                 goto again;
283                         }
284                         xa->serializing = 1;
285                         kdmsg_iocom_uninit(&xa->iocom);
286                         break;
287                 }
288         }
289
290         /*
291          * Create a new xa if not already present
292          */
293         if (xa == NULL) {
294                 unit = 0;
295                 for (;;) {
296                         TAILQ_FOREACH(xa, &xa_queue, entry) {
297                                 if (xa->unit == unit)
298                                         break;
299                         }
300                         if (xa == NULL)
301                                 break;
302                         ++unit;
303                 }
304                 xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
305                 xa->unit = unit;
306                 xa->serializing = 1;
307                 lwkt_token_init(&xa->tok, "xa");
308                 TAILQ_INIT(&xa->circq);
309                 TAILQ_INIT(&xa->bioq);
310                 TAILQ_INIT(&xa->tag_freeq);
311                 TAILQ_INIT(&xa->tag_pendq);
312                 for (n = 0; n < MAXTAGS; ++n) {
313                         tag = kmalloc(sizeof(*tag), M_XDISK, M_WAITOK|M_ZERO);
314                         tag->xa = xa;
315                         TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
316                 }
317                 TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
318         } else {
319                 unit = xa->unit;
320         }
321
322         /*
323          * (xa) is now serializing.
324          */
325         xa->xaioc = *xaioc;
326         xa->attached = 1;
327         lwkt_reltoken(&xdisk_token);
328
329         /*
330          * Create device
331          */
332         if (xa->dev == NULL) {
333                 dev = disk_create(unit, &xa->disk, &xa_ops);
334                 dev->si_drv1 = xa;
335                 xa->dev = dev;
336         }
337
338         xa->info.d_media_blksize = xaioc->blksize;
339         xa->info.d_media_blocks = xaioc->bytes / xaioc->blksize;
340         xa->info.d_dsflags = DSO_MBRQUIET | DSO_RAWPSIZE;
341         xa->info.d_secpertrack = 32;
342         xa->info.d_nheads = 64;
343         xa->info.d_secpercyl = xa->info.d_secpertrack * xa->info.d_nheads;
344         xa->info.d_ncylinders = 0;
345         if (xa->xaioc.fs_label[0])
346                 xa->info.d_serialno = xa->xaioc.fs_label;
347
348         /*
349          * Set up messaging connection
350          */
351         ksnprintf(devname, sizeof(devname), "xa%d", unit);
352         kdmsg_iocom_init(&xa->iocom, xa,
353                          KDMSG_IOCOMF_AUTOCONN |
354                          KDMSG_IOCOMF_AUTORXSPAN |
355                          KDMSG_IOCOMF_AUTOTXSPAN |
356                          KDMSG_IOCOMF_AUTORXCIRC |
357                          KDMSG_IOCOMF_AUTOTXCIRC,
358                          M_XDISK, xa_rcvdmsg);
359         xa->iocom.exit_func = xa_exit;
360
361         kdmsg_iocom_reconnect(&xa->iocom, fp, devname);
362
363         /*
364          * Setup our LNK_CONN advertisement for autoinitiate.
365          *
366          * Our filter is setup to only accept PEER_BLOCK/SERVER
367          * advertisements.
368          */
369         xa->iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_CLIENT;
370         xa->iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
371         xa->iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
372         xa->iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
373         xa->iocom.auto_lnk_conn.pfs_mask = 1LLU << DMSG_PFSTYPE_SERVER;
374         ksnprintf(xa->iocom.auto_lnk_conn.cl_label,
375                   sizeof(xa->iocom.auto_lnk_conn.cl_label),
376                   "%s", xaioc->cl_label);
377
378         /*
379          * We need a unique pfs_fsid to avoid confusion.
380          * We supply a rendezvous fs_label using the serial number.
381          */
382         kern_uuidgen(&xa->pfs_fsid, 1);
383         xa->iocom.auto_lnk_conn.pfs_fsid = xa->pfs_fsid;
384         ksnprintf(xa->iocom.auto_lnk_conn.fs_label,
385                   sizeof(xa->iocom.auto_lnk_conn.fs_label),
386                   "%s", xaioc->fs_label);
387
388         /*
389          * Setup our LNK_SPAN advertisement for autoinitiate
390          */
391         xa->iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_CLIENT;
392         xa->iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
393         xa->iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
394         ksnprintf(xa->iocom.auto_lnk_span.cl_label,
395                   sizeof(xa->iocom.auto_lnk_span.cl_label),
396                   "%s", xa->xaioc.cl_label);
397
398         kdmsg_iocom_autoinitiate(&xa->iocom, xa_autodmsg);
399         disk_setdiskinfo_sync(&xa->disk, &xa->info);
400
401         lwkt_gettoken(&xdisk_token);
402         xa->serializing = 0;
403         xa_terminate_check(xa);
404         lwkt_reltoken(&xdisk_token);
405
406         return(0);
407 }
408
409 static int
410 xdisk_detach(struct xdisk_attach_ioctl *xaioc)
411 {
412         struct xa_softc *xa;
413
414         lwkt_gettoken(&xdisk_token);
415         for (;;) {
416                 TAILQ_FOREACH(xa, &xa_queue, entry) {
417                         if (strcmp(xa->iocom.auto_lnk_conn.fs_label,
418                                    xaioc->fs_label) == 0) {
419                                 break;
420                         }
421                 }
422                 if (xa == NULL || xa->serializing == 0) {
423                         xa->serializing = 1;
424                         break;
425                 }
426                 tsleep(xa, 0, "xadet", hz / 10);
427         }
428         if (xa) {
429                 kdmsg_iocom_uninit(&xa->iocom);
430                 xa->serializing = 0;
431         }
432         lwkt_reltoken(&xdisk_token);
433         return(0);
434 }
435
436 /*
437  * Called from iocom core transmit thread upon disconnect.
438  */
439 static
440 void
441 xa_exit(kdmsg_iocom_t *iocom)
442 {
443         struct xa_softc *xa = iocom->handle;
444
445         lwkt_gettoken(&xa->tok);
446         lwkt_gettoken(&xdisk_token);
447
448         /*
449          * We must wait for any I/O's to complete to ensure that all
450          * state structure references are cleaned up before returning.
451          */
452         xa->attached = -1;      /* force deferral or failure */
453         while (TAILQ_FIRST(&xa->tag_pendq)) {
454                 tsleep(xa, 0, "xabiow", hz / 10);
455         }
456
457         /*
458          * All serializing code checks for de-initialization so only
459          * do it if we aren't already serializing.
460          */
461         if (xa->serializing == 0) {
462                 xa->serializing = 1;
463                 kdmsg_iocom_uninit(iocom);
464                 xa->serializing = 0;
465         }
466
467         /*
468          * If the drive is not in use and no longer attach it can be
469          * destroyed.
470          */
471         xa->attached = 0;
472         xa_terminate_check(xa);
473         lwkt_reltoken(&xdisk_token);
474         lwkt_reltoken(&xa->tok);
475 }
476
477 /*
478  * Determine if we can destroy the xa_softc.
479  *
480  * Called with xdisk_token held.
481  */
482 static
483 void
484 xa_terminate_check(struct xa_softc *xa)
485 {
486         xa_tag_t *tag;
487         struct bio *bio;
488
489         if (xa->opencnt || xa->attached || xa->serializing)
490                 return;
491         xa->serializing = 1;
492         kdmsg_iocom_uninit(&xa->iocom);
493
494         /*
495          * When destroying an xa make sure all pending I/O (typically
496          * from the disk probe) is done.
497          *
498          * XXX what about new I/O initiated prior to disk_destroy().
499          */
500         while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
501                 TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
502                 if ((bio = tag->bio) != NULL) {
503                         tag->bio = NULL;
504                         bio->bio_buf->b_error = ENXIO;
505                         bio->bio_buf->b_flags |= B_ERROR;
506                         biodone(bio);
507                 }
508                 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
509         }
510         if (xa->dev) {
511                 disk_destroy(&xa->disk);
512                 xa->dev->si_drv1 = NULL;
513                 xa->dev = NULL;
514         }
515         KKASSERT(xa->opencnt == 0 && xa->attached == 0);
516         while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
517                 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
518                 tag->xa = NULL;
519                 kfree(tag, M_XDISK);
520         }
521         KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
522         TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
523         kfree(xa, M_XDISK);
524 }
525
526 /*
527  * Shim to catch and record virtual circuit events.
528  */
529 static void
530 xa_autodmsg(kdmsg_msg_t *msg)
531 {
532         xa_softc_t *xa = msg->iocom->handle;
533
534         kdmsg_circuit_t *circ;
535         kdmsg_circuit_t *cscan;
536         uint32_t xcmd;
537
538         /*
539          * Because this is just a shim we don't have a state callback for
540          * the transactions we are sniffing, so make things easier by
541          * calculating the original command along with the current message's
542          * flags.  This is because transactions are made up of numerous
543          * messages and only the first typically specifies the actual command.
544          */
545         if (msg->state) {
546                 xcmd = msg->state->icmd |
547                        (msg->any.head.cmd & (DMSGF_CREATE |
548                                              DMSGF_DELETE |
549                                              DMSGF_REPLY));
550         } else {
551                 xcmd = msg->any.head.cmd;
552         }
553
554         /*
555          * Add or remove a circuit, sorted by weight (lower numbers are
556          * better).
557          */
558         switch(xcmd) {
559         case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_REPLY:
560                 /*
561                  * Track established circuits
562                  */
563                 circ = msg->state->any.circ;
564                 lwkt_gettoken(&xa->tok);
565                 if (circ->recorded == 0) {
566                         TAILQ_FOREACH(cscan, &xa->circq, entry) {
567                                 if (circ->weight < cscan->weight)
568                                         break;
569                         }
570                         if (cscan)
571                                 TAILQ_INSERT_BEFORE(cscan, circ, entry);
572                         else
573                                 TAILQ_INSERT_TAIL(&xa->circq, circ, entry);
574                         circ->recorded = 1;
575                 }
576
577                 /*
578                  * Restart any deferred I/O.
579                  */
580                 xa_restart_deferred(xa);
581                 lwkt_reltoken(&xa->tok);
582                 break;
583         case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
584                 /*
585                  * Losing virtual circuit.  Remove the circ from contention.
586                  */
587                 circ = msg->state->any.circ;
588                 lwkt_gettoken(&xa->tok);
589                 if (circ->recorded) {
590                         TAILQ_REMOVE(&xa->circq, circ, entry);
591                         circ->recorded = 0;
592                 }
593                 xa_restart_deferred(xa);
594                 lwkt_reltoken(&xa->tok);
595                 break;
596         default:
597                 break;
598         }
599 }
600
601 static int
602 xa_rcvdmsg(kdmsg_msg_t *msg)
603 {
604         switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
605         case DMSG_DBG_SHELL:
606                 /*
607                  * Execute shell command (not supported atm).
608                  *
609                  * This is a one-way packet but if not (e.g. if part of
610                  * a streaming transaction), we will have already closed
611                  * our end.
612                  */
613                 kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
614                 break;
615         case DMSG_DBG_SHELL | DMSGF_REPLY:
616                 /*
617                  * Receive one or more replies to a shell command that we
618                  * sent.
619                  *
620                  * This is a one-way packet but if not (e.g. if part of
621                  * a streaming transaction), we will have already closed
622                  * our end.
623                  */
624                 if (msg->aux_data) {
625                         msg->aux_data[msg->aux_size - 1] = 0;
626                         kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
627                 }
628                 break;
629         default:
630                 /*
631                  * Unsupported LNK message received.  We only need to
632                  * reply if it's a transaction in order to close our end.
633                  * Ignore any one-way messages are any further messages
634                  * associated with the transaction.
635                  *
636                  * NOTE: This case also includes DMSG_LNK_ERROR messages
637                  *       which might be one-way, replying to those would
638                  *       cause an infinite ping-pong.
639                  */
640                 if (msg->any.head.cmd & DMSGF_CREATE)
641                         kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
642                 break;
643         }
644         return(0);
645 }
646
647
648 /************************************************************************
649  *                         XA DEVICE INTERFACE                          *
650  ************************************************************************/
651
652 static int
653 xa_open(struct dev_open_args *ap)
654 {
655         cdev_t dev = ap->a_head.a_dev;
656         xa_softc_t *xa;
657         xa_tag_t *tag;
658         kdmsg_msg_t *msg;
659         int error;
660
661         dev->si_bsize_phys = 512;
662         dev->si_bsize_best = 32768;
663
664         /*
665          * Interlock open with opencnt, wait for attachment operations
666          * to finish.
667          */
668         lwkt_gettoken(&xdisk_token);
669 again:
670         xa = dev->si_drv1;
671         if (xa == NULL) {
672                 lwkt_reltoken(&xdisk_token);
673                 return ENXIO;   /* raced destruction */
674         }
675         if (xa->serializing) {
676                 tsleep(xa, 0, "xarace", hz / 10);
677                 goto again;
678         }
679         if (xa->attached == 0) {
680                 lwkt_reltoken(&xdisk_token);
681                 return ENXIO;   /* raced destruction */
682         }
683
684         /*
685          * Serialize initial open
686          */
687         if (xa->opencnt++ > 0) {
688                 lwkt_reltoken(&xdisk_token);
689                 return(0);
690         }
691         xa->serializing = 1;
692         lwkt_reltoken(&xdisk_token);
693
694         tag = xa_setup_cmd(xa, NULL);
695         if (tag == NULL) {
696                 lwkt_gettoken(&xdisk_token);
697                 KKASSERT(xa->opencnt > 0);
698                 --xa->opencnt;
699                 xa->serializing = 0;
700                 xa_terminate_check(xa);
701                 lwkt_reltoken(&xdisk_token);
702                 return(ENXIO);
703         }
704         msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
705                               DMSG_BLK_OPEN | DMSGF_CREATE,
706                               xa_sync_completion, tag);
707         msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
708         xa_start(tag, msg);
709         if (xa_wait(tag, 0) == 0) {
710                 xa->keyid = tag->status.keyid;
711                 xa->opentag = tag;      /* leave tag open */
712                 xa->serializing = 0;
713                 error = 0;
714         } else {
715                 xa_done(tag, 0);
716                 lwkt_gettoken(&xdisk_token);
717                 KKASSERT(xa->opencnt > 0);
718                 --xa->opencnt;
719                 xa->serializing = 0;
720                 xa_terminate_check(xa);
721                 lwkt_reltoken(&xdisk_token);
722                 error = ENXIO;
723         }
724         return (error);
725 }
726
727 static int
728 xa_close(struct dev_close_args *ap)
729 {
730         cdev_t dev = ap->a_head.a_dev;
731         xa_softc_t *xa;
732         xa_tag_t *tag;
733
734         xa = dev->si_drv1;
735         if (xa == NULL)
736                 return ENXIO;   /* raced destruction */
737
738         lwkt_gettoken(&xa->tok);
739         if ((tag = xa->opentag) != NULL) {
740                 xa->opentag = NULL;
741                 kdmsg_state_reply(tag->state, 0);
742                 while (tag->done == 0)
743                         xa_wait(tag, tag->waitseq);
744                 xa_done(tag, 0);
745         }
746         lwkt_reltoken(&xa->tok);
747
748         lwkt_gettoken(&xdisk_token);
749         KKASSERT(xa->opencnt > 0);
750         --xa->opencnt;
751         xa_terminate_check(xa);
752         lwkt_reltoken(&xdisk_token);
753
754         return(0);
755 }
756
757 static int
758 xa_strategy(struct dev_strategy_args *ap)
759 {
760         xa_softc_t *xa = ap->a_head.a_dev->si_drv1;
761         xa_tag_t *tag;
762         struct bio *bio = ap->a_bio;
763
764         /*
765          * Allow potentially temporary link failures to fail the I/Os
766          * only if the device is not open.  That is, we allow the disk
767          * probe code prior to mount to fail.
768          */
769         if (xa->attached == 0 && xa->opencnt == 0) {
770                 bio->bio_buf->b_error = ENXIO;
771                 bio->bio_buf->b_flags |= B_ERROR;
772                 biodone(bio);
773                 return(0);
774         }
775
776         tag = xa_setup_cmd(xa, bio);
777         if (tag)
778                 xa_start(tag, NULL);
779         return(0);
780 }
781
782 static int
783 xa_ioctl(struct dev_ioctl_args *ap)
784 {
785         return(ENOTTY);
786 }
787
788 static int
789 xa_size(struct dev_psize_args *ap)
790 {
791         struct xa_softc *xa;
792
793         if ((xa = ap->a_head.a_dev->si_drv1) == NULL)
794                 return (ENXIO);
795         ap->a_result = xa->info.d_media_blocks;
796         return (0);
797 }
798
799 /************************************************************************
800  *                  XA BLOCK PROTOCOL STATE MACHINE                     *
801  ************************************************************************
802  *
803  * Implement tag/msg setup and related functions.
804  */
805 static xa_tag_t *
806 xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
807 {
808         kdmsg_circuit_t *circ;
809         xa_tag_t *tag;
810
811         /*
812          * Only get a tag if we have a valid virtual circuit to the server.
813          */
814         lwkt_gettoken(&xa->tok);
815         TAILQ_FOREACH(circ, &xa->circq, entry) {
816                 if (circ->lost == 0)
817                         break;
818         }
819         if (circ == NULL || xa->attached <= 0) {
820                 tag = NULL;
821         } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
822                 TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
823                 tag->bio = bio;
824                 tag->circ = circ;
825                 kdmsg_circ_hold(circ);
826                 TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
827         }
828
829         /*
830          * If we can't dispatch now and this is a bio, queue it for later.
831          */
832         if (tag == NULL && bio) {
833                 TAILQ_INSERT_TAIL(&xa->bioq, bio, bio_act);
834         }
835         lwkt_reltoken(&xa->tok);
836
837         return (tag);
838 }
839
840 static void
841 xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
842 {
843         xa_softc_t *xa = tag->xa;
844
845         if (msg == NULL) {
846                 struct bio *bio;
847                 struct buf *bp;
848
849                 KKASSERT(tag->bio);
850                 bio = tag->bio;
851                 bp = bio->bio_buf;
852
853                 switch(bp->b_cmd) {
854                 case BUF_CMD_READ:
855                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
856                                               DMSG_BLK_READ |
857                                               DMSGF_CREATE | DMSGF_DELETE,
858                                               xa_bio_completion, tag);
859                         msg->any.blk_read.keyid = xa->keyid;
860                         msg->any.blk_read.offset = bio->bio_offset;
861                         msg->any.blk_read.bytes = bp->b_bcount;
862                         break;
863                 case BUF_CMD_WRITE:
864                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
865                                               DMSG_BLK_WRITE |
866                                               DMSGF_CREATE | DMSGF_DELETE,
867                                               xa_bio_completion, tag);
868                         msg->any.blk_write.keyid = xa->keyid;
869                         msg->any.blk_write.offset = bio->bio_offset;
870                         msg->any.blk_write.bytes = bp->b_bcount;
871                         msg->aux_data = bp->b_data;
872                         msg->aux_size = bp->b_bcount;
873                         break;
874                 case BUF_CMD_FLUSH:
875                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
876                                               DMSG_BLK_FLUSH |
877                                               DMSGF_CREATE | DMSGF_DELETE,
878                                               xa_bio_completion, tag);
879                         msg->any.blk_flush.keyid = xa->keyid;
880                         msg->any.blk_flush.offset = bio->bio_offset;
881                         msg->any.blk_flush.bytes = bp->b_bcount;
882                         break;
883                 case BUF_CMD_FREEBLKS:
884                         msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
885                                               DMSG_BLK_FREEBLKS |
886                                               DMSGF_CREATE | DMSGF_DELETE,
887                                               xa_bio_completion, tag);
888                         msg->any.blk_freeblks.keyid = xa->keyid;
889                         msg->any.blk_freeblks.offset = bio->bio_offset;
890                         msg->any.blk_freeblks.bytes = bp->b_bcount;
891                         break;
892                 default:
893                         bp->b_flags |= B_ERROR;
894                         bp->b_error = EIO;
895                         biodone(bio);
896                         tag->bio = NULL;
897                         break;
898                 }
899         }
900
901         tag->done = 0;
902         tag->waitseq = 0;
903         if (msg) {
904                 tag->state = msg->state;
905                 kdmsg_msg_write(msg);
906         } else {
907                 xa_done(tag, 1);
908         }
909 }
910
911 static uint32_t
912 xa_wait(xa_tag_t *tag, int seq)
913 {
914         xa_softc_t *xa = tag->xa;
915
916         lwkt_gettoken(&xa->tok);
917         while (tag->waitseq == seq)
918                 tsleep(tag, 0, "xawait", 0);
919         lwkt_reltoken(&xa->tok);
920         return (tag->status.head.error);
921 }
922
923 static void
924 xa_done(xa_tag_t *tag, int wasbio)
925 {
926         xa_softc_t *xa = tag->xa;
927         struct bio *bio;
928
929         KKASSERT(tag->bio == NULL);
930         tag->done = 1;
931         tag->state = NULL;
932
933         lwkt_gettoken(&xa->tok);
934         if (wasbio && (bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
935                 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
936                 tag->bio = bio;
937                 lwkt_reltoken(&xa->tok);
938                 xa_start(tag, NULL);
939         } else {
940                 if (tag->circ) {
941                         kdmsg_circ_drop(tag->circ);
942                         tag->circ = NULL;
943                 }
944                 TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
945                 TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
946                 lwkt_reltoken(&xa->tok);
947         }
948 }
949
950 static int
951 xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
952 {
953         xa_tag_t *tag = state->any.any;
954         xa_softc_t *xa = tag->xa;
955
956         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
957         case DMSG_LNK_ERROR | DMSGF_REPLY:
958                 bzero(&tag->status, sizeof(tag->status));
959                 tag->status.head = msg->any.head;
960                 break;
961         case DMSG_BLK_ERROR | DMSGF_REPLY:
962                 tag->status = msg->any.blk_error;
963                 break;
964         }
965         lwkt_gettoken(&xa->tok);
966         if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
967                 if (xa->opentag == tag) {
968                         xa->opentag = NULL;     /* XXX */
969                         kdmsg_state_reply(tag->state, 0);
970                         xa_done(tag, 0);
971                         lwkt_reltoken(&xa->tok);
972                         return(0);
973                 } else {
974                         tag->done = 1;
975                 }
976         }
977         ++tag->waitseq;
978         lwkt_reltoken(&xa->tok);
979
980         wakeup(tag);
981
982         return (0);
983 }
984
985 static int
986 xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
987 {
988         xa_tag_t *tag = state->any.any;
989         xa_softc_t *xa = tag->xa;
990         struct bio *bio;
991         struct buf *bp;
992
993         /*
994          * Get the bio from the tag.  If no bio is present we just do
995          * 'done' handling.
996          */
997         if ((bio = tag->bio) == NULL)
998                 goto handle_done;
999         bp = bio->bio_buf;
1000
1001         /*
1002          * Process return status
1003          */
1004         switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
1005         case DMSG_LNK_ERROR | DMSGF_REPLY:
1006                 bzero(&tag->status, sizeof(tag->status));
1007                 tag->status.head = msg->any.head;
1008                 if (tag->status.head.error)
1009                         tag->status.resid = bp->b_bcount;
1010                 else
1011                         tag->status.resid = 0;
1012                 break;
1013         case DMSG_BLK_ERROR | DMSGF_REPLY:
1014                 tag->status = msg->any.blk_error;
1015                 break;
1016         }
1017
1018         /*
1019          * Potentially move the bio back onto the pending queue if the
1020          * device is open and the error is related to losing the virtual
1021          * circuit.
1022          */
1023         if (tag->status.head.error &&
1024             (msg->any.head.cmd & DMSGF_DELETE) && xa->opencnt) {
1025                 if (tag->status.head.error == DMSG_ERR_LOSTLINK ||
1026                     tag->status.head.error == DMSG_ERR_CANTCIRC) {
1027                         goto handle_repend;
1028                 }
1029         }
1030
1031         /*
1032          * Process bio completion
1033          *
1034          * For reads any returned data is zero-extended if necessary, so
1035          * the server can short-cut any all-zeros reads if it desires.
1036          */
1037         switch(bp->b_cmd) {
1038         case BUF_CMD_READ:
1039                 if (msg->aux_data && msg->aux_size) {
1040                         if (msg->aux_size < bp->b_bcount) {
1041                                 bcopy(msg->aux_data, bp->b_data, msg->aux_size);
1042                                 bzero(bp->b_data + msg->aux_size,
1043                                       bp->b_bcount - msg->aux_size);
1044                         } else {
1045                                 bcopy(msg->aux_data, bp->b_data, bp->b_bcount);
1046                         }
1047                 } else {
1048                         bzero(bp->b_data, bp->b_bcount);
1049                 }
1050                 /* fall through */
1051         case BUF_CMD_WRITE:
1052         case BUF_CMD_FLUSH:
1053         case BUF_CMD_FREEBLKS:
1054         default:
1055                 if (tag->status.resid > bp->b_bcount)
1056                         tag->status.resid = bp->b_bcount;
1057                 bp->b_resid = tag->status.resid;
1058                 if ((bp->b_error = tag->status.head.error) != 0) {
1059                         bp->b_flags |= B_ERROR;
1060                 } else {
1061                         bp->b_resid = 0;
1062                 }
1063                 biodone(bio);
1064                 tag->bio = NULL;
1065                 break;
1066         }
1067
1068         /*
1069          * Handle completion of the transaction.  If the bioq is not empty
1070          * we can initiate another bio on the same tag.
1071          *
1072          * NOTE: Most of our transactions will be single-message
1073          *       CREATE+DELETEs, so we won't have to terminate the
1074          *       transaction separately, here.  But just in case they
1075          *       aren't be sure to terminate the transaction.
1076          */
1077 handle_done:
1078         if (msg->any.head.cmd & DMSGF_DELETE) {
1079                 xa_done(tag, 1);
1080                 if ((state->txcmd & DMSGF_DELETE) == 0)
1081                         kdmsg_msg_reply(msg, 0);
1082         }
1083         return (0);
1084
1085         /*
1086          * Handle the case where the transaction failed due to a
1087          * connectivity issue.  The tag is put away with wasbio=0
1088          * and we restart the bio.
1089          *
1090          * Setting circ->lost causes xa_setup_cmd() to skip the circuit.
1091          * Other circuits might still be live.  Once a circuit gets messed
1092          * up it will (eventually) be deleted so we can simply leave (lost)
1093          * set forever after.
1094          */
1095 handle_repend:
1096         lwkt_gettoken(&xa->tok);
1097         kprintf("BIO CIRC FAILURE, REPEND BIO %p\n", bio);
1098         tag->circ->lost = 1;
1099         tag->bio = NULL;
1100         xa_done(tag, 0);
1101         if ((state->txcmd & DMSGF_DELETE) == 0)
1102                 kdmsg_msg_reply(msg, 0);
1103
1104         /*
1105          * Restart or requeue the bio
1106          */
1107         tag = xa_setup_cmd(xa, bio);
1108         if (tag)
1109                 xa_start(tag, NULL);
1110         lwkt_reltoken(&xa->tok);
1111         return (0);
1112 }
1113
1114 /*
1115  * Restart as much deferred I/O as we can.
1116  *
1117  * Called with xa->tok held
1118  */
1119 static
1120 void
1121 xa_restart_deferred(xa_softc_t *xa)
1122 {
1123         struct bio *bio;
1124         xa_tag_t *tag;
1125
1126         while ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
1127                 tag = xa_setup_cmd(xa, NULL);
1128                 if (tag == NULL)
1129                         break;
1130                 TAILQ_REMOVE(&xa->bioq, bio, bio_act);
1131                 tag->bio = bio;
1132                 xa_start(tag, NULL);
1133         }
1134 }