cluster - Major kernel component work (diskiocom, xdisk, kdmsg)
[dragonfly.git] / sys / dev / disk / xdisk / xdisk.c
index 95132b0..940b153 100644 (file)
@@ -77,8 +77,8 @@ struct xa_tag {
        struct xa_softc *xa;
        dmsg_blk_error_t status;
        kdmsg_state_t   *state;
+       kdmsg_circuit_t *circ;
        struct bio      *bio;
-       uint64_t        circuit;
        int             running;        /* transaction running */
        int             waitseq;        /* streaming reply */
        int             done;           /* final (transaction closed) */
@@ -204,7 +204,6 @@ DEV_MODULE(xdisk, xdisk_modevent, 0);
 static int
 xdisk_open(struct dev_open_args *ap)
 {
-       kprintf("XDISK_OPEN\n");
        lwkt_gettoken(&xdisk_token);
        ++xdisk_opencount;
        lwkt_reltoken(&xdisk_token);
@@ -214,7 +213,6 @@ xdisk_open(struct dev_open_args *ap)
 static int
 xdisk_close(struct dev_close_args *ap)
 {
-       kprintf("XDISK_CLOSE\n");
        lwkt_gettoken(&xdisk_token);
        --xdisk_opencount;
        lwkt_reltoken(&xdisk_token);
@@ -255,10 +253,6 @@ xdisk_attach(struct xdisk_attach_ioctl *xaioc)
        char devname[64];
        cdev_t dev;
 
-       kprintf("xdisk attach %d %jd/%d %s %s\n",
-               xaioc->fd, (intmax_t)xaioc->bytes, xaioc->blksize,
-               xaioc->cl_label, xaioc->fs_label);
-
        /*
         * Normalize ioctl params
         */
@@ -308,7 +302,6 @@ again:
                        ++unit;
                }
                xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
-               kprintf("ALLOCATE XA %p\n", xa);
                xa->unit = unit;
                xa->serializing = 1;
                lwkt_token_init(&xa->tok, "xa");
@@ -323,6 +316,10 @@ again:
                }
                TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
        }
+
+       /*
+        * (xa) is now serializing.
+        */
        xa->xaioc = *xaioc;
        xa->attached = 1;
        lwkt_reltoken(&xdisk_token);
@@ -426,7 +423,6 @@ xdisk_detach(struct xdisk_attach_ioctl *xaioc)
                tsleep(xa, 0, "xadet", hz / 10);
        }
        if (xa) {
-               kprintf("DETACHING XA\n");
                kdmsg_iocom_uninit(&xa->iocom);
                xa->serializing = 0;
        }
@@ -443,19 +439,36 @@ xa_exit(kdmsg_iocom_t *iocom)
 {
        struct xa_softc *xa = iocom->handle;
 
-       kprintf("XA_EXIT UNIT %d\n", xa->unit);
+       lwkt_gettoken(&xa->tok);
+       lwkt_gettoken(&xdisk_token);
+
+       /*
+        * We must wait for any I/O's to complete to ensure that all
+        * state structure references are cleaned up before returning.
+        */
+       xa->attached = -1;      /* force deferral or failure */
+       while (TAILQ_FIRST(&xa->tag_pendq)) {
+               tsleep(xa, 0, "xabiow", hz / 10);
+       }
 
-       if (xa->serializing == 0)
+       /*
+        * All serializing code checks for de-initialization so only
+        * do it if we aren't already serializing.
+        */
+       if (xa->serializing == 0) {
+               xa->serializing = 1;
                kdmsg_iocom_uninit(iocom);
+               xa->serializing = 0;
+       }
 
        /*
         * If the drive is not in use and no longer attach it can be
         * destroyed.
         */
-       lwkt_gettoken(&xdisk_token);
        xa->attached = 0;
        xa_terminate_check(xa);
        lwkt_reltoken(&xdisk_token);
+       lwkt_reltoken(&xa->tok);
 }
 
 /*
@@ -468,20 +481,35 @@ void
 xa_terminate_check(struct xa_softc *xa)
 {
        xa_tag_t *tag;
+       struct bio *bio;
 
        if (xa->opencnt || xa->attached || xa->serializing)
                return;
        xa->serializing = 1;
-       kprintf("TERMINATE XA %p %d\n", xa, xa->unit);
        kdmsg_iocom_uninit(&xa->iocom);
+
+       /*
+        * When destroying an xa make sure all pending I/O (typically
+        * from the disk probe) is done.
+        *
+        * XXX what about new I/O initiated prior to disk_destroy().
+        */
+       while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
+               TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
+               if ((bio = tag->bio) != NULL) {
+                       tag->bio = NULL;
+                       bio->bio_buf->b_error = ENXIO;
+                       bio->bio_buf->b_flags |= B_ERROR;
+                       biodone(bio);
+               }
+               TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
+       }
        if (xa->dev) {
                disk_destroy(&xa->disk);
                xa->dev->si_drv1 = NULL;
                xa->dev = NULL;
        }
-       kprintf("REMOVEQ   XA %p %d\n", xa, xa->unit);
        KKASSERT(xa->opencnt == 0 && xa->attached == 0);
-       kprintf("IOCOMUN   XA %p %d\n", xa, xa->unit);
        while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
                TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
                tag->xa = NULL;
@@ -490,7 +518,6 @@ xa_terminate_check(struct xa_softc *xa)
        KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
        TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
        kfree(xa, M_XDISK);
-       kprintf("xa_close: destroy unreferenced disk\n");
 }
 
 /*
@@ -499,7 +526,8 @@ xa_terminate_check(struct xa_softc *xa)
 static void
 xa_autodmsg(kdmsg_msg_t *msg)
 {
-       struct xa_softc *xa = msg->iocom->handle;
+       xa_softc_t *xa = msg->iocom->handle;
+
        kdmsg_circuit_t *circ;
        kdmsg_circuit_t *cscan;
        uint32_t xcmd;
@@ -529,7 +557,6 @@ xa_autodmsg(kdmsg_msg_t *msg)
                /*
                 * Track established circuits
                 */
-               kprintf("XA: Received autodmsg: CREATE+REPLY\n");
                circ = msg->state->any.circ;
                lwkt_gettoken(&xa->tok);
                if (circ->recorded == 0) {
@@ -551,7 +578,9 @@ xa_autodmsg(kdmsg_msg_t *msg)
                lwkt_reltoken(&xa->tok);
                break;
        case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
-               kprintf("XA: Received autodmsg: DELETE+REPLY\n");
+               /*
+                * Losing virtual circuit.  Scan pending tags.
+                */
                circ = msg->state->any.circ;
                lwkt_gettoken(&xa->tok);
                if (circ->recorded) {
@@ -590,7 +619,7 @@ xa_rcvdmsg(kdmsg_msg_t *msg)
                 */
                if (msg->aux_data) {
                        msg->aux_data[msg->aux_size - 1] = 0;
-                       kprintf("DEBUGMSG: %s\n", msg->aux_data);
+                       kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
                }
                break;
        default:
@@ -643,6 +672,10 @@ again:
                tsleep(xa, 0, "xarace", hz / 10);
                goto again;
        }
+       if (xa->attached == 0) {
+               lwkt_reltoken(&xdisk_token);
+               return ENXIO;   /* raced destruction */
+       }
 
        /*
         * Serialize initial open
@@ -654,8 +687,6 @@ again:
        xa->serializing = 1;
        lwkt_reltoken(&xdisk_token);
 
-       kprintf("XA OPEN COMMAND\n");
-
        tag = xa_setup_cmd(xa, NULL);
        if (tag == NULL) {
                lwkt_gettoken(&xdisk_token);
@@ -666,19 +697,17 @@ again:
                lwkt_reltoken(&xdisk_token);
                return(ENXIO);
        }
-       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                              DMSG_BLK_OPEN | DMSGF_CREATE,
                              xa_sync_completion, tag);
        msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
        xa_start(tag, msg);
        if (xa_wait(tag, 0) == 0) {
-               kprintf("XA OPEN GOOD\n");
                xa->keyid = tag->status.keyid;
                xa->opentag = tag;      /* leave tag open */
                xa->serializing = 0;
                error = 0;
        } else {
-               kprintf("XA OPEN BAD\n");
                xa_done(tag, 0);
                lwkt_gettoken(&xdisk_token);
                KKASSERT(xa->opencnt > 0);
@@ -728,12 +757,17 @@ xa_strategy(struct dev_strategy_args *ap)
        xa_tag_t *tag;
        struct bio *bio = ap->a_bio;
 
-#if 0
-       bio->bio_buf->b_error = ENXIO;
-       bio->bio_buf->b_flags |= B_ERROR;
-       biodone(bio);
-       return(0);
-#endif
+       /*
+        * Allow potentially temporary link failures to fail the I/Os
+        * only if the device is not open.  That is, we allow the disk
+        * probe code prior to mount to fail.
+        */
+       if (xa->attached == 0 && xa->opencnt == 0) {
+               bio->bio_buf->b_error = ENXIO;
+               bio->bio_buf->b_flags |= B_ERROR;
+               biodone(bio);
+               return(0);
+       }
 
        tag = xa_setup_cmd(xa, bio);
        if (tag)
@@ -774,12 +808,14 @@ xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
         * Only get a tag if we have a valid virtual circuit to the server.
         */
        lwkt_gettoken(&xa->tok);
-       if ((circ = TAILQ_FIRST(&xa->circq)) == NULL) {
+       if ((circ = TAILQ_FIRST(&xa->circq)) == NULL || xa->attached <= 0) {
                tag = NULL;
        } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
                TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
                tag->bio = bio;
-               tag->circuit = circ->circ_state->msgid;
+               tag->circ = circ;
+               kdmsg_circ_hold(circ);
+               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
        }
 
        /*
@@ -808,7 +844,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
 
                switch(bp->b_cmd) {
                case BUF_CMD_READ:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_READ |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -817,7 +853,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                        msg->any.blk_read.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_WRITE:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_WRITE |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -828,7 +864,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                        msg->aux_size = bp->b_bcount;
                        break;
                case BUF_CMD_FLUSH:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_FLUSH |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -837,7 +873,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                        msg->any.blk_flush.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_FREEBLKS:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_FREEBLKS |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -857,14 +893,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
        tag->done = 0;
        tag->waitseq = 0;
        if (msg) {
-#if 0
-               lwkt_gettoken(&xa->tok);
-               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
-#endif
                tag->state = msg->state;
-#if 0
-               lwkt_reltoken(&xa->tok);
-#endif
                kdmsg_msg_write(msg);
        } else {
                xa_done(tag, 1);
@@ -891,6 +920,7 @@ xa_done(xa_tag_t *tag, int wasbio)
 
        KKASSERT(tag->bio == NULL);
        tag->done = 1;
+       tag->state = NULL;
 
        lwkt_gettoken(&xa->tok);
        if ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
@@ -899,6 +929,11 @@ xa_done(xa_tag_t *tag, int wasbio)
                lwkt_reltoken(&xa->tok);
                xa_start(tag, NULL);
        } else {
+               if (tag->circ) {
+                       kdmsg_circ_drop(tag->circ);
+                       tag->circ = NULL;
+               }
+               TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
                TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
                lwkt_reltoken(&xa->tok);
        }
@@ -919,8 +954,6 @@ xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
                tag->status = msg->any.blk_error;
                break;
        }
-       kprintf("XA_SYNC_COMPLETION ERROR %u RESID %u\n",
-               tag->status.head.error, tag->status.resid);
        if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
                kdmsg_msg_reply(msg, 0);        /* terminate our side */
                tag->done = 1;
@@ -966,8 +999,6 @@ xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
                tag->status = msg->any.blk_error;
                break;
        }
-       kprintf("XA_BIO_COMPLETION ERROR %u RESID %u\n",
-               tag->status.head.error, tag->status.resid);
 
        /*
         * Process bio completion
@@ -1009,10 +1040,19 @@ xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
        /*
         * Handle completion of the transaction.  If the bioq is not empty
         * we can initiate another bio on the same tag.
+        *
+        * NOTE: Most of our transactions will be single-message
+        *       CREATE+DELETEs, so we won't have to terminate the
+        *       transaction separately, here.  But just in case they
+        *       aren't be sure to terminate the transaction.
         */
 handle_done:
-       if (msg->any.head.cmd & DMSGF_DELETE)
+       if (msg->any.head.cmd & DMSGF_DELETE) {
                xa_done(tag, 1);
+               if ((state->txcmd & DMSGF_DELETE) == 0) {
+                       kdmsg_msg_reply(msg, 0);
+               }
+       }
        return (0);
 }
 
@@ -1032,8 +1072,6 @@ xa_restart_deferred(xa_softc_t *xa)
                tag = xa_setup_cmd(xa, NULL);
                if (tag == NULL)
                        break;
-               kprintf("xa: Restart BIO %p on %s\n",
-                       bio, xa->iocom.auto_lnk_conn.fs_label);
                TAILQ_REMOVE(&xa->bioq, bio, bio_act);
                tag->bio = bio;
                xa_start(tag, NULL);