cluster - Major kernel component work (diskiocom, xdisk, kdmsg)
authorMatthew Dillon <dillon@apollo.backplane.com>
Mon, 3 Dec 2012 05:40:13 +0000 (21:40 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 3 Dec 2012 07:25:55 +0000 (23:25 -0800)
* Major implementation and stabilization work.  Fix lots of bugs,
  improve the AUTO flags and APIs.

* xdisk and diskiocom are now operational for the first time.

  xdisk is a consumer of remote block devices.  You pass it one end of
  a pipe and use an ioctl to create /dev/xa* devices.  The kernel's
  nominal disk management subsystem handles /dev/serno/* and will also
  probe the /dev/xa* devices to create the slices.

  diskiocom is a block device service.  An ioctl is used to connect a
  raw disk device to the network via a pipe.

* The hammer2 service demon ties the two together over the network.

* Initial testing with a local hammer2 service daemon looping da0 to xa0
  succeeded, ~46MB/sec unoptimized using dd.

lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c
lib/libdmsg/service.c
sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sys/dev/disk/xdisk/xdisk.c
sys/kern/kern_dmsg.c
sys/kern/subr_diskiocom.c
sys/sys/dmsg.h

index 0006b5a..4cb2aaf 100644 (file)
@@ -1264,7 +1264,7 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
        TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
                hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
                         DMSG_ALIGN;
-               abytes = msg->aux_size;
+               abytes = DMSG_DOALIGN(msg->aux_size);
                assert(hoff <= hbytes && aoff <= abytes);
 
                if (hoff < hbytes) {
@@ -1333,7 +1333,7 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
                                ioq->fifo_cdx = 0;
                                ioq->fifo_end = 0;
                        }
-                       nact = n;
+                       /* XXX what if interrupted mid-write? */
                } else {
                        nact = 0;
                }
@@ -1353,7 +1353,7 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
                         DMSG_ALIGN;
-               abytes = msg->aux_size;
+               abytes = DMSG_DOALIGN(msg->aux_size);
 
                if ((size_t)nact < hbytes - ioq->hbytes) {
                        ioq->hbytes += nact;
@@ -2036,9 +2036,6 @@ dmsg_state_free(dmsg_state_t *state)
                fprintf(stderr, "terminate state %p id=%08x\n",
                        state, (uint32_t)state->msgid);
        }
-       fprintf(stderr,
-               "dmsg_state_free state %p any.any %p func %p icmd %08x\n",
-               state, state->any.any, state->func, state->icmd);
        if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
                closefrom(3);
        assert(state->any.any == NULL);
index 73f943a..2544d8b 100644 (file)
@@ -823,9 +823,12 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                tx_state = RB_FIND(dmsg_state_tree,
                                   &iocomA->circuit0.statewr_tree,
                                   &dummy);
-               /* XXX state refs */
-               assert(tx_state);
                pthread_mutex_unlock(&iocomA->mtx);
+               if (tx_state == NULL) {
+                       fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
 
                /* locate h2span_link */
                rx_state = tx_state->any.relay->source_rt;
@@ -886,6 +889,10 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * to (B).
                 */
                iocomA = msg->iocom;
+               if (msg->state->any.circ == NULL) {
+                       /* already returned an error/deleted */
+                       break;
+               }
                circA = msg->state->any.circ;
                circB = circA->peer;
                assert(msg->state == circA->state);
index a72fd82..655895a 100644 (file)
@@ -161,9 +161,7 @@ master_link_rxmsg(dmsg_msg_t *msg)
         * might have REPLY set.
         */
        state = msg->state;
-       cmd = state ? state->msg->any.head.cmd : msg->any.head.cmd;
-
-       fprintf(stderr, "service-receive: %s\n", dmsg_msg_str(msg));
+       cmd = state ? state->icmd : msg->any.head.cmd;
 
        if (state && state->func) {
                assert(state->func != NULL);
@@ -200,7 +198,7 @@ dmsg_msg_dbg(dmsg_msg_t *msg)
                if (msg->aux_data)
                        msg->aux_data[msg->aux_size - 1] = 0;
                msg->iocom->dbgmsg_callback(msg);
-               dmsg_msg_reply(msg, 0);
+               dmsg_msg_reply(msg, 0); /* XXX send prompt instead */
                break;
        case DMSG_DBG_SHELL | DMSGF_REPLY:
                /*
index 56448a9..6e68b0d 100644 (file)
@@ -181,6 +181,8 @@ hammer2_shell_parse(dmsg_msg_t *msg)
                ;
        } else if (strcmp(cmd, "span") == 0) {
                shell_span(circuit, cmdbuf);
+       } else if (strcmp(cmd, "circ") == 0) {
+               shell_circ(circuit, cmdbuf);
        } else if (strcmp(cmd, "tree") == 0) {
                dmsg_shell_tree(circuit, cmdbuf); /* dump spanning tree */
        } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
index ec7d273..c494172 100644 (file)
@@ -240,6 +240,7 @@ service_node_handler(void **opaquep, struct dmsg_msg *msg, int op)
                info->block = msg->any.lnk_span.media.block;
                fprintf(stderr, "NODE ADD %s serno %s\n",
                        info->cl_label, info->fs_label);
+               info->attached = 1;
                xdisk_reconnect(info);
                break;
        case DMSG_NODEOP_DEL:
index 95132b0..940b153 100644 (file)
@@ -77,8 +77,8 @@ struct xa_tag {
        struct xa_softc *xa;
        dmsg_blk_error_t status;
        kdmsg_state_t   *state;
+       kdmsg_circuit_t *circ;
        struct bio      *bio;
-       uint64_t        circuit;
        int             running;        /* transaction running */
        int             waitseq;        /* streaming reply */
        int             done;           /* final (transaction closed) */
@@ -204,7 +204,6 @@ DEV_MODULE(xdisk, xdisk_modevent, 0);
 static int
 xdisk_open(struct dev_open_args *ap)
 {
-       kprintf("XDISK_OPEN\n");
        lwkt_gettoken(&xdisk_token);
        ++xdisk_opencount;
        lwkt_reltoken(&xdisk_token);
@@ -214,7 +213,6 @@ xdisk_open(struct dev_open_args *ap)
 static int
 xdisk_close(struct dev_close_args *ap)
 {
-       kprintf("XDISK_CLOSE\n");
        lwkt_gettoken(&xdisk_token);
        --xdisk_opencount;
        lwkt_reltoken(&xdisk_token);
@@ -255,10 +253,6 @@ xdisk_attach(struct xdisk_attach_ioctl *xaioc)
        char devname[64];
        cdev_t dev;
 
-       kprintf("xdisk attach %d %jd/%d %s %s\n",
-               xaioc->fd, (intmax_t)xaioc->bytes, xaioc->blksize,
-               xaioc->cl_label, xaioc->fs_label);
-
        /*
         * Normalize ioctl params
         */
@@ -308,7 +302,6 @@ again:
                        ++unit;
                }
                xa = kmalloc(sizeof(*xa), M_XDISK, M_WAITOK|M_ZERO);
-               kprintf("ALLOCATE XA %p\n", xa);
                xa->unit = unit;
                xa->serializing = 1;
                lwkt_token_init(&xa->tok, "xa");
@@ -323,6 +316,10 @@ again:
                }
                TAILQ_INSERT_TAIL(&xa_queue, xa, entry);
        }
+
+       /*
+        * (xa) is now serializing.
+        */
        xa->xaioc = *xaioc;
        xa->attached = 1;
        lwkt_reltoken(&xdisk_token);
@@ -426,7 +423,6 @@ xdisk_detach(struct xdisk_attach_ioctl *xaioc)
                tsleep(xa, 0, "xadet", hz / 10);
        }
        if (xa) {
-               kprintf("DETACHING XA\n");
                kdmsg_iocom_uninit(&xa->iocom);
                xa->serializing = 0;
        }
@@ -443,19 +439,36 @@ xa_exit(kdmsg_iocom_t *iocom)
 {
        struct xa_softc *xa = iocom->handle;
 
-       kprintf("XA_EXIT UNIT %d\n", xa->unit);
+       lwkt_gettoken(&xa->tok);
+       lwkt_gettoken(&xdisk_token);
+
+       /*
+        * We must wait for any I/O's to complete to ensure that all
+        * state structure references are cleaned up before returning.
+        */
+       xa->attached = -1;      /* force deferral or failure */
+       while (TAILQ_FIRST(&xa->tag_pendq)) {
+               tsleep(xa, 0, "xabiow", hz / 10);
+       }
 
-       if (xa->serializing == 0)
+       /*
+        * All serializing code checks for de-initialization so only
+        * do it if we aren't already serializing.
+        */
+       if (xa->serializing == 0) {
+               xa->serializing = 1;
                kdmsg_iocom_uninit(iocom);
+               xa->serializing = 0;
+       }
 
        /*
         * If the drive is not in use and no longer attach it can be
         * destroyed.
         */
-       lwkt_gettoken(&xdisk_token);
        xa->attached = 0;
        xa_terminate_check(xa);
        lwkt_reltoken(&xdisk_token);
+       lwkt_reltoken(&xa->tok);
 }
 
 /*
@@ -468,20 +481,35 @@ void
 xa_terminate_check(struct xa_softc *xa)
 {
        xa_tag_t *tag;
+       struct bio *bio;
 
        if (xa->opencnt || xa->attached || xa->serializing)
                return;
        xa->serializing = 1;
-       kprintf("TERMINATE XA %p %d\n", xa, xa->unit);
        kdmsg_iocom_uninit(&xa->iocom);
+
+       /*
+        * When destroying an xa make sure all pending I/O (typically
+        * from the disk probe) is done.
+        *
+        * XXX what about new I/O initiated prior to disk_destroy().
+        */
+       while ((tag = TAILQ_FIRST(&xa->tag_pendq)) != NULL) {
+               TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
+               if ((bio = tag->bio) != NULL) {
+                       tag->bio = NULL;
+                       bio->bio_buf->b_error = ENXIO;
+                       bio->bio_buf->b_flags |= B_ERROR;
+                       biodone(bio);
+               }
+               TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
+       }
        if (xa->dev) {
                disk_destroy(&xa->disk);
                xa->dev->si_drv1 = NULL;
                xa->dev = NULL;
        }
-       kprintf("REMOVEQ   XA %p %d\n", xa, xa->unit);
        KKASSERT(xa->opencnt == 0 && xa->attached == 0);
-       kprintf("IOCOMUN   XA %p %d\n", xa, xa->unit);
        while ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
                TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
                tag->xa = NULL;
@@ -490,7 +518,6 @@ xa_terminate_check(struct xa_softc *xa)
        KKASSERT(TAILQ_EMPTY(&xa->tag_pendq));
        TAILQ_REMOVE(&xa_queue, xa, entry); /* XXX */
        kfree(xa, M_XDISK);
-       kprintf("xa_close: destroy unreferenced disk\n");
 }
 
 /*
@@ -499,7 +526,8 @@ xa_terminate_check(struct xa_softc *xa)
 static void
 xa_autodmsg(kdmsg_msg_t *msg)
 {
-       struct xa_softc *xa = msg->iocom->handle;
+       xa_softc_t *xa = msg->iocom->handle;
+
        kdmsg_circuit_t *circ;
        kdmsg_circuit_t *cscan;
        uint32_t xcmd;
@@ -529,7 +557,6 @@ xa_autodmsg(kdmsg_msg_t *msg)
                /*
                 * Track established circuits
                 */
-               kprintf("XA: Received autodmsg: CREATE+REPLY\n");
                circ = msg->state->any.circ;
                lwkt_gettoken(&xa->tok);
                if (circ->recorded == 0) {
@@ -551,7 +578,9 @@ xa_autodmsg(kdmsg_msg_t *msg)
                lwkt_reltoken(&xa->tok);
                break;
        case DMSG_LNK_CIRC | DMSGF_DELETE | DMSGF_REPLY:
-               kprintf("XA: Received autodmsg: DELETE+REPLY\n");
+               /*
+                * Losing virtual circuit.  Scan pending tags.
+                */
                circ = msg->state->any.circ;
                lwkt_gettoken(&xa->tok);
                if (circ->recorded) {
@@ -590,7 +619,7 @@ xa_rcvdmsg(kdmsg_msg_t *msg)
                 */
                if (msg->aux_data) {
                        msg->aux_data[msg->aux_size - 1] = 0;
-                       kprintf("DEBUGMSG: %s\n", msg->aux_data);
+                       kprintf("xdisk: DEBUGMSG: %s\n", msg->aux_data);
                }
                break;
        default:
@@ -643,6 +672,10 @@ again:
                tsleep(xa, 0, "xarace", hz / 10);
                goto again;
        }
+       if (xa->attached == 0) {
+               lwkt_reltoken(&xdisk_token);
+               return ENXIO;   /* raced destruction */
+       }
 
        /*
         * Serialize initial open
@@ -654,8 +687,6 @@ again:
        xa->serializing = 1;
        lwkt_reltoken(&xdisk_token);
 
-       kprintf("XA OPEN COMMAND\n");
-
        tag = xa_setup_cmd(xa, NULL);
        if (tag == NULL) {
                lwkt_gettoken(&xdisk_token);
@@ -666,19 +697,17 @@ again:
                lwkt_reltoken(&xdisk_token);
                return(ENXIO);
        }
-       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                              DMSG_BLK_OPEN | DMSGF_CREATE,
                              xa_sync_completion, tag);
        msg->any.blk_open.modes = DMSG_BLKOPEN_RD | DMSG_BLKOPEN_WR;
        xa_start(tag, msg);
        if (xa_wait(tag, 0) == 0) {
-               kprintf("XA OPEN GOOD\n");
                xa->keyid = tag->status.keyid;
                xa->opentag = tag;      /* leave tag open */
                xa->serializing = 0;
                error = 0;
        } else {
-               kprintf("XA OPEN BAD\n");
                xa_done(tag, 0);
                lwkt_gettoken(&xdisk_token);
                KKASSERT(xa->opencnt > 0);
@@ -728,12 +757,17 @@ xa_strategy(struct dev_strategy_args *ap)
        xa_tag_t *tag;
        struct bio *bio = ap->a_bio;
 
-#if 0
-       bio->bio_buf->b_error = ENXIO;
-       bio->bio_buf->b_flags |= B_ERROR;
-       biodone(bio);
-       return(0);
-#endif
+       /*
+        * Allow potentially temporary link failures to fail the I/Os
+        * only if the device is not open.  That is, we allow the disk
+        * probe code prior to mount to fail.
+        */
+       if (xa->attached == 0 && xa->opencnt == 0) {
+               bio->bio_buf->b_error = ENXIO;
+               bio->bio_buf->b_flags |= B_ERROR;
+               biodone(bio);
+               return(0);
+       }
 
        tag = xa_setup_cmd(xa, bio);
        if (tag)
@@ -774,12 +808,14 @@ xa_setup_cmd(xa_softc_t *xa, struct bio *bio)
         * Only get a tag if we have a valid virtual circuit to the server.
         */
        lwkt_gettoken(&xa->tok);
-       if ((circ = TAILQ_FIRST(&xa->circq)) == NULL) {
+       if ((circ = TAILQ_FIRST(&xa->circq)) == NULL || xa->attached <= 0) {
                tag = NULL;
        } else if ((tag = TAILQ_FIRST(&xa->tag_freeq)) != NULL) {
                TAILQ_REMOVE(&xa->tag_freeq, tag, entry);
                tag->bio = bio;
-               tag->circuit = circ->circ_state->msgid;
+               tag->circ = circ;
+               kdmsg_circ_hold(circ);
+               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
        }
 
        /*
@@ -808,7 +844,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
 
                switch(bp->b_cmd) {
                case BUF_CMD_READ:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_READ |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -817,7 +853,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                        msg->any.blk_read.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_WRITE:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_WRITE |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -828,7 +864,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                        msg->aux_size = bp->b_bcount;
                        break;
                case BUF_CMD_FLUSH:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_FLUSH |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -837,7 +873,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
                        msg->any.blk_flush.bytes = bp->b_bcount;
                        break;
                case BUF_CMD_FREEBLKS:
-                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circuit,
+                       msg = kdmsg_msg_alloc(&xa->iocom, tag->circ,
                                              DMSG_BLK_FREEBLKS |
                                              DMSGF_CREATE | DMSGF_DELETE,
                                              xa_bio_completion, tag);
@@ -857,14 +893,7 @@ xa_start(xa_tag_t *tag, kdmsg_msg_t *msg)
        tag->done = 0;
        tag->waitseq = 0;
        if (msg) {
-#if 0
-               lwkt_gettoken(&xa->tok);
-               TAILQ_INSERT_TAIL(&xa->tag_pendq, tag, entry);
-#endif
                tag->state = msg->state;
-#if 0
-               lwkt_reltoken(&xa->tok);
-#endif
                kdmsg_msg_write(msg);
        } else {
                xa_done(tag, 1);
@@ -891,6 +920,7 @@ xa_done(xa_tag_t *tag, int wasbio)
 
        KKASSERT(tag->bio == NULL);
        tag->done = 1;
+       tag->state = NULL;
 
        lwkt_gettoken(&xa->tok);
        if ((bio = TAILQ_FIRST(&xa->bioq)) != NULL) {
@@ -899,6 +929,11 @@ xa_done(xa_tag_t *tag, int wasbio)
                lwkt_reltoken(&xa->tok);
                xa_start(tag, NULL);
        } else {
+               if (tag->circ) {
+                       kdmsg_circ_drop(tag->circ);
+                       tag->circ = NULL;
+               }
+               TAILQ_REMOVE(&xa->tag_pendq, tag, entry);
                TAILQ_INSERT_TAIL(&xa->tag_freeq, tag, entry);
                lwkt_reltoken(&xa->tok);
        }
@@ -919,8 +954,6 @@ xa_sync_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
                tag->status = msg->any.blk_error;
                break;
        }
-       kprintf("XA_SYNC_COMPLETION ERROR %u RESID %u\n",
-               tag->status.head.error, tag->status.resid);
        if (msg->any.head.cmd & DMSGF_DELETE) { /* receive termination */
                kdmsg_msg_reply(msg, 0);        /* terminate our side */
                tag->done = 1;
@@ -966,8 +999,6 @@ xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
                tag->status = msg->any.blk_error;
                break;
        }
-       kprintf("XA_BIO_COMPLETION ERROR %u RESID %u\n",
-               tag->status.head.error, tag->status.resid);
 
        /*
         * Process bio completion
@@ -1009,10 +1040,19 @@ xa_bio_completion(kdmsg_state_t *state, kdmsg_msg_t *msg)
        /*
         * Handle completion of the transaction.  If the bioq is not empty
         * we can initiate another bio on the same tag.
+        *
+        * NOTE: Most of our transactions will be single-message
+        *       CREATE+DELETEs, so we won't have to terminate the
+        *       transaction separately, here.  But just in case they
+        *       aren't be sure to terminate the transaction.
         */
 handle_done:
-       if (msg->any.head.cmd & DMSGF_DELETE)
+       if (msg->any.head.cmd & DMSGF_DELETE) {
                xa_done(tag, 1);
+               if ((state->txcmd & DMSGF_DELETE) == 0) {
+                       kdmsg_msg_reply(msg, 0);
+               }
+       }
        return (0);
 }
 
@@ -1032,8 +1072,6 @@ xa_restart_deferred(xa_softc_t *xa)
                tag = xa_setup_cmd(xa, NULL);
                if (tag == NULL)
                        break;
-               kprintf("xa: Restart BIO %p on %s\n",
-                       bio, xa->iocom.auto_lnk_conn.fs_label);
                TAILQ_REMOVE(&xa->bioq, bio, bio_act);
                tag->bio = bio;
                xa_start(tag, NULL);
index 999b071..5246bd9 100644 (file)
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+/*
+ * TODO: txcmd CREATE state is deferred by txmsgq, need to calculate
+ *      a streaming response.  See subr_diskiocom()'s diskiodone().
+ */
 #include <sys/param.h>
 #include <sys/types.h>
 #include <sys/kernel.h>
 #include <sys/dmsg.h>
 
 RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
+RB_GENERATE(kdmsg_circuit_tree, kdmsg_circuit, rbnode, kdmsg_circuit_cmp);
+
+static int kdmsg_msg_receive_handling(kdmsg_msg_t *msg);
+static int kdmsg_circ_msgrx(kdmsg_msg_t *msg);
+static int kdmsg_state_msgrx(kdmsg_msg_t *msg);
+static int kdmsg_state_msgtx(kdmsg_msg_t *msg);
+static void kdmsg_state_cleanuprx(kdmsg_msg_t *msg);
+static void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
+static void kdmsg_state_abort(kdmsg_state_t *state);
+static void kdmsg_state_free(kdmsg_state_t *state);
 
-static void kdmsg_circ_free_check(kdmsg_circuit_t *circ);
 static void kdmsg_iocom_thread_rd(void *arg);
 static void kdmsg_iocom_thread_wr(void *arg);
 static int kdmsg_autorxmsg(kdmsg_msg_t *msg);
@@ -62,6 +75,29 @@ static int kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
 
 static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token);
 
+void
+kdmsg_circ_hold(kdmsg_circuit_t *circ)
+{
+       atomic_add_int(&circ->refs, 1);
+}
+
+void
+kdmsg_circ_drop(kdmsg_circuit_t *circ)
+{
+       kdmsg_iocom_t *iocom;
+
+       if (atomic_fetchadd_int(&circ->refs, -1) == 1) {
+               KKASSERT(circ->span_state == NULL &&
+                        circ->circ_state == NULL &&
+                        circ->rcirc_state == NULL &&
+                        circ->recorded == 0);
+               iocom = circ->iocom;
+               circ->iocom = NULL;
+               kfree(circ, iocom->mmsg);
+       }
+}
+
+
 /*
  * Initialize the roll-up communications structure for a network
  * messaging session.  This function does not install the socket.
@@ -78,6 +114,7 @@ kdmsg_iocom_init(kdmsg_iocom_t *iocom, void *handle, uint32_t flags,
        iocom->flags = flags;
        lockinit(&iocom->msglk, "h2msg", 0, 0);
        TAILQ_INIT(&iocom->msgq);
+       RB_INIT(&iocom->circ_tree);
        RB_INIT(&iocom->staterd_tree);
        RB_INIT(&iocom->statewr_tree);
 }
@@ -93,10 +130,11 @@ kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
        /*
         * Destroy the current connection
         */
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
        while (iocom->msgrd_td || iocom->msgwr_td) {
                wakeup(&iocom->msg_ctl);
-               tsleep(iocom, 0, "clstrkl", hz);
+               lksleep(iocom, &iocom->msglk, 0, "clstrkl", hz);
        }
 
        /*
@@ -113,11 +151,13 @@ kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
        iocom->msg_ctl = 0;
        iocom->msg_fp = fp;
        iocom->msg_seq = 0;
+       iocom->flags &= ~KDMSG_IOCOMF_EXITNOACC;
 
        lwkt_create(kdmsg_iocom_thread_rd, iocom, &iocom->msgrd_td,
                    NULL, 0, -1, "%s-msgrd", subsysname);
        lwkt_create(kdmsg_iocom_thread_wr, iocom, &iocom->msgwr_td,
                    NULL, 0, -1, "%s-msgwr", subsysname);
+       lockmgr(&iocom->msglk, LK_RELEASE);
 }
 
 /*
@@ -140,7 +180,7 @@ kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom,
 
        iocom->auto_callback = auto_callback;
 
-       msg = kdmsg_msg_alloc(iocom, 0,
+       msg = kdmsg_msg_alloc(iocom, NULL,
                              DMSG_LNK_CONN | DMSGF_CREATE,
                              kdmsg_lnk_conn_reply, NULL);
        iocom->auto_lnk_conn.head = msg->any.head;
@@ -157,20 +197,28 @@ kdmsg_lnk_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
        kdmsg_msg_t *rmsg;
 
        if (msg->any.head.cmd & DMSGF_CREATE) {
-               rmsg = kdmsg_msg_alloc(iocom, 0,
+               rmsg = kdmsg_msg_alloc(iocom, NULL,
                                       DMSG_LNK_SPAN | DMSGF_CREATE,
                                       kdmsg_lnk_span_reply, NULL);
                iocom->auto_lnk_span.head = rmsg->any.head;
                rmsg->any.lnk_span = iocom->auto_lnk_span;
                kdmsg_msg_write(rmsg);
        }
+
+       /*
+        * Process shim after the CONN is acknowledged and before the CONN
+        * transaction is deleted.  For deletions this gives device drivers
+        * the ability to interlock new operations on the circuit before
+        * it becomes illegal and panics.
+        */
+       if (iocom->auto_callback)
+               iocom->auto_callback(msg);
+
        if ((state->txcmd & DMSGF_DELETE) == 0 &&
            (msg->any.head.cmd & DMSGF_DELETE)) {
                iocom->conn_state = NULL;
                kdmsg_msg_reply(msg, 0);
        }
-       if (iocom->auto_callback)
-               iocom->auto_callback(msg);
 
        return (0);
 }
@@ -179,14 +227,19 @@ static
 int
 kdmsg_lnk_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
 {
-       /*kdmsg_iocom_t *iocom = state->iocom;*/
+       /*
+        * Be sure to process shim before terminating the SPAN
+        * transaction.  Gives device drivers the ability to
+        * interlock new operations on the circuit before it
+        * becomes illegal and panics.
+        */
+       if (state->iocom->auto_callback)
+               state->iocom->auto_callback(msg);
 
        if ((state->txcmd & DMSGF_DELETE) == 0 &&
            (msg->any.head.cmd & DMSGF_DELETE)) {
                kdmsg_msg_reply(msg, 0);
        }
-       if (state->iocom->auto_callback)
-               state->iocom->auto_callback(msg);
        return (0);
 }
 
@@ -199,11 +252,12 @@ kdmsg_iocom_uninit(kdmsg_iocom_t *iocom)
        /*
         * Ask the cluster controller to go away
         */
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
 
        while (iocom->msgrd_td || iocom->msgwr_td) {
                wakeup(&iocom->msg_ctl);
-               tsleep(iocom, 0, "clstrkl", hz);
+               lksleep(iocom, &iocom->msglk, 0, "clstrkl", hz);
        }
 
        /*
@@ -213,6 +267,7 @@ kdmsg_iocom_uninit(kdmsg_iocom_t *iocom)
                fdrop(iocom->msg_fp);
                iocom->msg_fp = NULL;
        }
+       lockmgr(&iocom->msglk, LK_RELEASE);
 }
 
 /*
@@ -226,9 +281,10 @@ kdmsg_iocom_thread_rd(void *arg)
 {
        kdmsg_iocom_t *iocom = arg;
        dmsg_hdr_t hdr;
-       kdmsg_msg_t *msg;
+       kdmsg_msg_t *msg = NULL;
        kdmsg_state_t *state;
        size_t hbytes;
+       size_t abytes;
        int error = 0;
 
        while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0) {
@@ -251,7 +307,7 @@ kdmsg_iocom_thread_rd(void *arg)
                        break;
                }
                /* XXX messy: mask cmd to avoid allocating state */
-               msg = kdmsg_msg_alloc(iocom, 0,
+               msg = kdmsg_msg_alloc(iocom, NULL,
                                      hdr.cmd & DMSGF_BASECMDMASK,
                                      NULL, NULL);
                msg->any.head = hdr;
@@ -266,7 +322,7 @@ kdmsg_iocom_thread_rd(void *arg)
                                break;
                        }
                }
-               msg->aux_size = hdr.aux_bytes * DMSG_ALIGN;
+               msg->aux_size = hdr.aux_bytes;
                if (msg->aux_size > DMSG_AUX_MAX) {
                        kprintf("kdmsg: illegal msg payload size %zd\n",
                                msg->aux_size);
@@ -274,46 +330,19 @@ kdmsg_iocom_thread_rd(void *arg)
                        break;
                }
                if (msg->aux_size) {
-                       msg->aux_data = kmalloc(msg->aux_size, iocom->mmsg,
-                                               M_WAITOK | M_ZERO);
+                       abytes = DMSG_DOALIGN(msg->aux_size);
+                       msg->aux_data = kmalloc(abytes, iocom->mmsg, M_WAITOK);
                        msg->flags |= KDMSG_FLAG_AUXALLOC;
                        error = fp_read(iocom->msg_fp, msg->aux_data,
-                                       msg->aux_size,
-                                       NULL, 1, UIO_SYSSPACE);
+                                       abytes, NULL, 1, UIO_SYSSPACE);
                        if (error) {
                                kprintf("kdmsg: short msg payload received\n");
                                break;
                        }
                }
 
-               /*
-                * State machine tracking, state assignment for msg,
-                * returns error and discard status.  Errors are fatal
-                * to the connection except for EALREADY which forces
-                * a discard without execution.
-                */
-               error = kdmsg_state_msgrx(msg);
-               if (error) {
-                       /*
-                        * Raw protocol or connection error
-                        */
-                       kdmsg_msg_free(msg);
-                       if (error == EALREADY)
-                               error = 0;
-               } else if (msg->state && msg->state->func) {
-                       /*
-                        * Message related to state which already has a
-                        * handling function installed for it.
-                        */
-                       error = msg->state->func(msg->state, msg);
-                       kdmsg_state_cleanuprx(msg);
-               } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) {
-                       error = kdmsg_autorxmsg(msg);
-                       kdmsg_state_cleanuprx(msg);
-               } else {
-                       error = iocom->rcvmsg(msg);
-                       kdmsg_state_cleanuprx(msg);
-               }
+               (void)kdmsg_circ_msgrx(msg);
+               error = kdmsg_msg_receive_handling(msg);
                msg = NULL;
        }
 
@@ -321,11 +350,8 @@ kdmsg_iocom_thread_rd(void *arg)
                kprintf("kdmsg: read failed error %d\n", error);
 
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-       if (msg) {
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
+       if (msg)
                kdmsg_msg_free(msg);
-       }
 
        if ((state = iocom->freerd_state) != NULL) {
                iocom->freerd_state = NULL;
@@ -378,6 +404,7 @@ kdmsg_iocom_thread_wr(void *arg)
        kdmsg_msg_t *msg;
        kdmsg_state_t *state;
        ssize_t res;
+       size_t abytes;
        int error = 0;
        int retries = 20;
 
@@ -423,22 +450,28 @@ kdmsg_iocom_thread_wr(void *arg)
 
                        /*
                         * Dump the message to the pipe or socket.
+                        *
+                        * We have to clean up the message as if the transmit
+                        * succeeded even if it failed.
                         */
                        error = fp_write(iocom->msg_fp, &msg->any,
                                         msg->hdr_size, &res, UIO_SYSSPACE);
                        if (error || res != msg->hdr_size) {
                                if (error == 0)
                                        error = EINVAL;
+                               kdmsg_state_cleanuptx(msg);
                                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                                break;
                        }
                        if (msg->aux_size) {
+                               abytes = DMSG_DOALIGN(msg->aux_size);
                                error = fp_write(iocom->msg_fp,
-                                                msg->aux_data, msg->aux_size,
+                                                msg->aux_data, abytes,
                                                 &res, UIO_SYSSPACE);
-                               if (error || res != msg->aux_size) {
+                               if (error || res != abytes) {
                                        if (error == 0)
                                                error = EINVAL;
+                                       kdmsg_state_cleanuptx(msg);
                                        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                                        break;
                                }
@@ -453,12 +486,7 @@ kdmsg_iocom_thread_wr(void *arg)
         */
        if (error)
                kprintf("kdmsg: write failed error %d\n", error);
-
-       if (msg) {
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
-               kdmsg_msg_free(msg);
-       }
+       kprintf("thread_wr: Terminating iocom\n");
 
        /*
         * Shutdown the socket.  This will cause the rx thread to get an
@@ -482,78 +510,48 @@ kdmsg_iocom_thread_wr(void *arg)
 
        /*
         * Simulate received MSGF_DELETE's for any remaining states.
+        * (For remote masters).
+        *
+        * Drain the message queue to handle any device initiated writes
+        * due to state callbacks.
         */
 cleanuprd:
+       kdmsg_drain_msgq(iocom);
        RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
-               if (state->func &&
-                   (state->rxcmd & DMSGF_DELETE) == 0) {
+               if ((state->rxcmd & DMSGF_DELETE) == 0) {
                        lockmgr(&iocom->msglk, LK_RELEASE);
-                       msg = kdmsg_msg_alloc(iocom, state->circuit,
-                                             DMSG_LNK_ERROR,
-                                             NULL, NULL);
-                       if ((state->rxcmd & DMSGF_CREATE) == 0)
-                               msg->any.head.cmd |= DMSGF_CREATE;
-                       msg->any.head.cmd |= DMSGF_DELETE;
-                       msg->any.head.error = DMSG_ERR_LOSTLINK;
-                       msg->state = state;
-                       state->rxcmd = msg->any.head.cmd &
-                                      ~DMSGF_DELETE;
-                       msg->state->func(state, msg);
-                       kdmsg_state_cleanuprx(msg);
+                       kdmsg_state_abort(state);
                        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                        goto cleanuprd;
                }
-               if (state->func == NULL) {
-                       state->flags &= ~KDMSG_STATE_INSERTED;
-                       RB_REMOVE(kdmsg_state_tree,
-                                 &iocom->staterd_tree, state);
-                       kdmsg_state_free(state);
-                       goto cleanuprd;
-               }
        }
 
        /*
-        * NOTE: We have to drain the msgq to handle situations
-        *       where received states have built up output
-        *       messages, to avoid creating messages with
-        *       duplicate CREATE/DELETE flags.
+        * Simulate received MSGF_DELETE's for any remaining states.
+        * (For local masters).
         */
 cleanupwr:
        kdmsg_drain_msgq(iocom);
        RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
-               if (state->func &&
-                   (state->rxcmd & DMSGF_DELETE) == 0) {
+               if ((state->rxcmd & DMSGF_DELETE) == 0) {
                        lockmgr(&iocom->msglk, LK_RELEASE);
-                       msg = kdmsg_msg_alloc(iocom, state->circuit,
-                                             DMSG_LNK_ERROR,
-                                             NULL, NULL);
-                       if ((state->rxcmd & DMSGF_CREATE) == 0)
-                               msg->any.head.cmd |= DMSGF_CREATE;
-                       msg->any.head.cmd |= DMSGF_DELETE |
-                                            DMSGF_REPLY;
-                       msg->any.head.error = DMSG_ERR_LOSTLINK;
-                       msg->state = state;
-                       state->rxcmd = msg->any.head.cmd &
-                                      ~DMSGF_DELETE;
-                       msg->state->func(state, msg);
-                       kdmsg_state_cleanuprx(msg);
+                       kdmsg_state_abort(state);
                        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                        goto cleanupwr;
                }
-               if (state->func == NULL) {
-                       state->flags &= ~KDMSG_STATE_INSERTED;
-                       RB_REMOVE(kdmsg_state_tree,
-                                 &iocom->statewr_tree, state);
-                       kdmsg_state_free(state);
-                       goto cleanupwr;
-               }
        }
 
-       kdmsg_drain_msgq(iocom);
+       /*
+        * Retry until all work is done
+        */
        if (--retries == 0)
                panic("kdmsg: comm thread shutdown couldn't drain");
-       if (RB_ROOT(&iocom->statewr_tree))
-               goto cleanupwr;
+       if (TAILQ_FIRST(&iocom->msgq) ||
+           RB_ROOT(&iocom->staterd_tree) ||
+           RB_ROOT(&iocom->statewr_tree)) {
+               goto cleanuprd;
+       }
+       iocom->flags |= KDMSG_IOCOMF_EXITNOACC;
 
        if ((state = iocom->freewr_state) != NULL) {
                iocom->freewr_state = NULL;
@@ -606,8 +604,6 @@ kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
        while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
                TAILQ_REMOVE(&iocom->msgq, msg, qentry);
                lockmgr(&iocom->msglk, LK_RELEASE);
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
                if (kdmsg_state_msgtx(msg))
                        kdmsg_msg_free(msg);
                else
@@ -617,6 +613,89 @@ kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
 }
 
 /*
+ * Do all processing required to handle a freshly received message
+ * after its low level header has been validated.
+ */
+static
+int
+kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
+{
+       kdmsg_iocom_t *iocom = msg->iocom;
+       int error;
+
+       /*
+        * State machine tracking, state assignment for msg,
+        * returns error and discard status.  Errors are fatal
+        * to the connection except for EALREADY which forces
+        * a discard without execution.
+        */
+       error = kdmsg_state_msgrx(msg);
+       if (error) {
+               /*
+                * Raw protocol or connection error
+                */
+               kdmsg_msg_free(msg);
+               if (error == EALREADY)
+                       error = 0;
+       } else if (msg->state && msg->state->func) {
+               /*
+                * Message related to state which already has a
+                * handling function installed for it.
+                */
+               error = msg->state->func(msg->state, msg);
+               kdmsg_state_cleanuprx(msg);
+       } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) {
+               error = kdmsg_autorxmsg(msg);
+               kdmsg_state_cleanuprx(msg);
+       } else {
+               error = iocom->rcvmsg(msg);
+               kdmsg_state_cleanuprx(msg);
+       }
+       return error;
+}
+
+/*
+ * Process circuit tracking (NEEDS WORK)
+ *
+ * Called with msglk held and the msg dequeued.
+ */
+static
+int
+kdmsg_circ_msgrx(kdmsg_msg_t *msg)
+{
+       kdmsg_circuit_t dummy;
+       kdmsg_circuit_t *circ;
+       int error = 0;
+
+       if (msg->any.head.circuit) {
+               dummy.msgid = msg->any.head.circuit;
+               lwkt_gettoken(&kdmsg_token);
+               circ = RB_FIND(kdmsg_circuit_tree, &msg->iocom->circ_tree,
+                              &dummy);
+               if (circ) {
+                       msg->circ = circ;
+                       kdmsg_circ_hold(circ);
+               }
+               if (circ == NULL) {
+                       kprintf("KDMSG_CIRC_MSGRX CMD %08x: IOCOM %p "
+                               "Bad circuit %016jx\n",
+                               msg->any.head.cmd,
+                               msg->iocom,
+                               (intmax_t)msg->any.head.circuit);
+                       kprintf("KDMSG_CIRC_MSGRX: Avail circuits: ");
+                       RB_FOREACH(circ, kdmsg_circuit_tree,
+                                  &msg->iocom->circ_tree) {
+                               kprintf(" %016jx", (intmax_t)circ->msgid);
+                       }
+                       kprintf("\n");
+                       error = EINVAL;
+               }
+               lwkt_reltoken(&kdmsg_token);
+       }
+       return (error);
+}
+
+/*
  * Process state tracking for a message after reception, prior to
  * execution.
  *
@@ -683,6 +762,7 @@ kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
  * one-off message is a command or reply.  For example, one-off replies
  * will typically just contain status updates.
  */
+static
 int
 kdmsg_state_msgrx(kdmsg_msg_t *msg)
 {
@@ -710,7 +790,7 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
        state->msgid = msg->any.head.msgid;
-       state->circuit = msg->any.head.circuit;
+       state->circ = msg->circ;
        state->iocom = iocom;
        if (msg->any.head.cmd & DMSGF_REPLY)
                state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree, state);
@@ -750,7 +830,8 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->txcmd = DMSGF_REPLY;
                state->msgid = msg->any.head.msgid;
-               state->circuit = msg->any.head.circuit;
+               if ((state->circ = msg->circ) != NULL)
+                       kdmsg_circ_hold(state->circ);
                RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state);
                state->flags |= KDMSG_STATE_INSERTED;
                error = 0;
@@ -764,8 +845,8 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("kdmsg_state_msgrx: no state "
-                                       "for DELETE\n");
+                               kprintf("kdmsg_state_msgrx: "
+                                       "no state for DELETE\n");
                                error = EINVAL;
                        }
                        break;
@@ -779,8 +860,8 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("kdmsg_state_msgrx: state reused "
-                                       "for DELETE\n");
+                               kprintf("kdmsg_state_msgrx: "
+                                       "state reused for DELETE\n");
                                error = EINVAL;
                        }
                        break;
@@ -878,19 +959,50 @@ static int
 kdmsg_autorxmsg(kdmsg_msg_t *msg)
 {
        kdmsg_iocom_t *iocom = msg->iocom;
+       kdmsg_circuit_t *circ;
        int error = 0;
+       uint32_t cmd;
 
-       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
+       /*
+        * Process a combination of the transaction command and the message
+        * flags.  For the purposes of this routine, the message command is
+        * only relevant when it initiates a transaction (where it is
+        * recorded in icmd).
+        */
+       cmd = (msg->state ? msg->state->icmd : msg->any.head.cmd) &
+             DMSGF_BASECMDMASK;
+       cmd |= msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY);
+
+       switch(cmd) {
        case DMSG_LNK_CONN | DMSGF_CREATE:
+       case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
                /*
                 * Received LNK_CONN transaction.  Transmit response and
                 * leave transaction open, which allows the other end to
                 * start to the SPAN protocol.
+                *
+                * Handle shim after acknowledging the CONN.
+                */
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                       if (iocom->flags & KDMSG_IOCOMF_AUTOCONN) {
+                               kdmsg_msg_result(msg, 0);
+                               if (iocom->auto_callback)
+                                       iocom->auto_callback(msg);
+                       } else {
+                               error = iocom->rcvmsg(msg);
+                       }
+                       break;
+               }
+               /* fall through */
+       case DMSG_LNK_CONN | DMSGF_DELETE:
+               /*
+                * This message is usually simulated after a link is lost
+                * to clean up the transaction.
                 */
                if (iocom->flags & KDMSG_IOCOMF_AUTOCONN) {
-                       kdmsg_msg_result(msg, 0);
                        if (iocom->auto_callback)
                                iocom->auto_callback(msg);
+                       kdmsg_msg_reply(msg, 0);
                } else {
                        error = iocom->rcvmsg(msg);
                }
@@ -902,8 +1014,11 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg)
                 * but we must leave the transaction open.
                 *
                 * If AUTOCIRC is set automatically initiate a virtual circuit
-                * to the span.  This will attach a kdmsg_circuit to the
-                * SPAN state.
+                * to the received span.  This will attach a kdmsg_circuit
+                * to the SPAN state.  The circuit is lost when the span is
+                * lost.
+                *
+                * Handle shim after acknowledging the SPAN.
                 */
                if (iocom->flags & KDMSG_IOCOMF_AUTOSPAN) {
                        if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
@@ -920,12 +1035,20 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg)
                }
                /* fall through */
        case DMSG_LNK_SPAN | DMSGF_DELETE:
+               /*
+                * Process shims (auto_callback) before cleaning up the
+                * circuit structure and closing the transactions.  Device
+                * driver should ensure that the circuit is not used after
+                * the auto_callback() returns.
+                *
+                * Handle shim before closing the SPAN transaction.
+                */
                if (iocom->flags & KDMSG_IOCOMF_AUTOSPAN) {
+                       if (iocom->auto_callback)
+                               iocom->auto_callback(msg);
                        if (iocom->flags & KDMSG_IOCOMF_AUTOFORGE)
                                kdmsg_autocirc(msg);
                        kdmsg_msg_reply(msg, 0);
-                       if (iocom->auto_callback)
-                               iocom->auto_callback(msg);
                } else {
                        error = iocom->rcvmsg(msg);
                }
@@ -937,16 +1060,35 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg)
                 * leave the transaction open, allowing the circuit.  The
                 * remote can start issuing commands to us over the circuit
                 * even before we respond.
-                *
-                * Theoretically we should track the circuit id but it
-                * should not be necessary, kernel devices don't usually
-                * care how many circuits are forged to a device, only how
-                * many OPENs are active.
                 */
                if (iocom->flags & KDMSG_IOCOMF_AUTOCIRC) {
-                       kprintf("kdmsg: CREATE LINK_CIRC\n");
-                       kdmsg_msg_result(msg, 0);
                        if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                               circ = kmalloc(sizeof(*circ), iocom->mmsg,
+                                              M_WAITOK | M_ZERO);
+                               msg->state->any.circ = circ;
+                               circ->iocom = iocom;
+                               circ->rcirc_state = msg->state;
+                               kdmsg_circ_hold(circ);  /* for rcirc_state */
+                               circ->weight = 0;
+                               circ->msgid = circ->rcirc_state->msgid;
+                               /* XXX no span link for received circuits */
+                               kdmsg_circ_hold(circ);  /* for circ_state */
+#if 0
+                               kprintf("KDMSG VC: RECEIVE CIRC CREATE "
+                                       "IOCOM %p MSGID %016jx\n",
+                                       msg->iocom, circ->msgid);
+#endif
+
+                               if (RB_INSERT(kdmsg_circuit_tree,
+                                             &iocom->circ_tree, circ)) {
+                                       panic("duplicate circuitid allocated");
+                               }
+                               kdmsg_msg_result(msg, 0);
+
+                               /*
+                                * Handle shim after adding the circuit and
+                                * after acknowledging the CIRC.
+                                */
                                if (iocom->auto_callback)
                                        iocom->auto_callback(msg);
                                break;
@@ -959,9 +1101,28 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg)
                /* fall through */
        case DMSG_LNK_CIRC | DMSGF_DELETE:
                if (iocom->flags & KDMSG_IOCOMF_AUTOCIRC) {
-                       kprintf("kdmsg: DELETE LINK_CIRC\n");
+                       circ = msg->state->any.circ;
+                       if (circ == NULL)
+                               break;
+
+                       /*
+                        * Handle shim before terminating the circuit.
+                        */
+#if 0
+                       kprintf("KDMSG VC: RECEIVE CIRC DELETE "
+                               "IOCOM %p MSGID %016jx\n",
+                               msg->iocom, circ->msgid);
+#endif
                        if (iocom->auto_callback)
                                iocom->auto_callback(msg);
+
+                       KKASSERT(circ->rcirc_state == msg->state);
+                       circ->rcirc_state = NULL;
+                       msg->state->any.circ = NULL;
+                       lwkt_gettoken(&kdmsg_token);
+                       RB_REMOVE(kdmsg_circuit_tree, &iocom->circ_tree, circ);
+                       lwkt_reltoken(&kdmsg_token);
+                       kdmsg_circ_drop(circ);  /* for rcirc_state */
                        kdmsg_msg_reply(msg, 0);
                } else {
                        error = iocom->rcvmsg(msg);
@@ -982,11 +1143,15 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg)
 }
 
 /*
- * Handle automatic management of virtual circuits for received SPANs.
+ * Handle automatic forging of virtual circuits based on received SPANs.
+ * (AUTOFORGE).  Note that other code handles tracking received circuit
+ * transactions (AUTOCIRC).
  *
  * We can ignore non-transactions here.  Use trans->icmd to test the
  * transactional command (once past the CREATE the individual message
  * commands are not usually the icmd).
+ *
+ * XXX locks
  */
 static
 void
@@ -1001,20 +1166,32 @@ kdmsg_autocirc(kdmsg_msg_t *msg)
 
        /*
         * Gaining the SPAN, automatically forge a circuit to the target.
+        *
+        * NOTE!! The shim is not executed until we receive an acknowlegement
+        *        to our forged LNK_CIRC (see kdmsg_autocirc_reply()).
         */
        if (msg->state->icmd == DMSG_LNK_SPAN &&
            (msg->any.head.cmd & DMSGF_CREATE)) {
-               kprintf("KDMSG VC: CREATE SPAN->CIRC MSGID %016jx\n",
-                       (intmax_t)msg->any.head.msgid);
                circ = kmalloc(sizeof(*circ), iocom->mmsg, M_WAITOK | M_ZERO);
                msg->state->any.circ = circ;
                circ->iocom = iocom;
                circ->span_state = msg->state;
-               xmsg = kdmsg_msg_alloc(iocom, 0,
+               kdmsg_circ_hold(circ);  /* for span_state */
+               xmsg = kdmsg_msg_alloc(iocom, NULL,
                                       DMSG_LNK_CIRC | DMSGF_CREATE,
                                       kdmsg_autocirc_reply, circ);
                circ->circ_state = xmsg->state;
                circ->weight = msg->any.lnk_span.dist;
+               circ->msgid = circ->circ_state->msgid;
+               kdmsg_circ_hold(circ);  /* for circ_state */
+#if 0
+               kprintf("KDMSG VC: CREATE SPAN->CIRC IOCOM %p MSGID %016jx\n",
+                       msg->iocom, circ->msgid);
+#endif
+
+               if (RB_INSERT(kdmsg_circuit_tree, &iocom->circ_tree, circ))
+                       panic("duplicate circuitid allocated");
+
                xmsg->any.lnk_circ.target = msg->any.head.msgid;
                kdmsg_msg_write(xmsg);
        }
@@ -1029,12 +1206,16 @@ kdmsg_autocirc(kdmsg_msg_t *msg)
        if (msg->state->icmd == DMSG_LNK_SPAN &&
            (msg->any.head.cmd & DMSGF_DELETE) &&
            msg->state->any.circ) {
-               kprintf("KDMSG VC: DELETE SPAN->CIRC\n");
                circ = msg->state->any.circ;
                lwkt_gettoken(&kdmsg_token);
                circ->span_state = NULL;
                msg->state->any.circ = NULL;
-               kdmsg_circ_free_check(circ);
+               RB_REMOVE(kdmsg_circuit_tree, &iocom->circ_tree, circ);
+#if 0
+               kprintf("KDMSG VC: DELETE SPAN->CIRC IOCOM %p MSGID %016jx\n",
+                       msg->iocom, (intmax_t)circ->msgid);
+#endif
+               kdmsg_circ_drop(circ);  /* for span_state */
                lwkt_reltoken(&kdmsg_token);
        }
 }
@@ -1047,8 +1228,8 @@ kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
        kdmsg_circuit_t *circ = state->any.circ;
 
        /*
-        * Shim is typically used by the end point to record the circuit
-        * on CREATE and erase it on DELETE.
+        * Call shim after receiving an acknowlegement to our forged
+        * circuit and before processing a received termination.
         */
        if (iocom->auto_callback)
                iocom->auto_callback(msg);
@@ -1058,32 +1239,26 @@ kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
         */
        if ((state->txcmd & DMSGF_DELETE) == 0 &&
            (msg->any.head.cmd & DMSGF_DELETE)) {
+#if 0
                kprintf("KDMSG VC: DELETE CIRC FROM REMOTE\n");
+#endif
                lwkt_gettoken(&kdmsg_token);
                circ->circ_state = NULL;
                state->any.circ = NULL;
-               kdmsg_circ_free_check(circ);
+               kdmsg_circ_drop(circ);          /* for circ_state */
                lwkt_reltoken(&kdmsg_token);
                kdmsg_msg_reply(msg, 0);
        }
        return (0);
 }
 
+/*
+ * Post-receive-handling message and state cleanup.  This routine is called
+ * after the state function handling/callback to properly dispose of the
+ * message and update or dispose of the state.
+ */
 static
 void
-kdmsg_circ_free_check(kdmsg_circuit_t *circ)
-{
-       kdmsg_iocom_t *iocom = circ->iocom;
-
-       if (circ->span_state == NULL &&
-           circ->circ_state == NULL &&
-           circ->recorded == 0) {
-               circ->iocom = NULL;
-               kfree(circ, iocom->mmsg);
-       }
-}
-
-void
 kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
 {
        kdmsg_iocom_t *iocom = msg->iocom;
@@ -1108,22 +1283,56 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
                                          &iocom->staterd_tree, state);
                        }
                        state->flags &= ~KDMSG_STATE_INSERTED;
+                       if (msg != state->msg)
+                               kdmsg_msg_free(msg);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                        kdmsg_state_free(state);
                } else {
-                       if (state->msg != msg)
+                       if (msg != state->msg)
                                kdmsg_msg_free(msg);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                }
-       } else if (state->msg != msg) {
+       } else if (msg != state->msg) {
                kdmsg_msg_free(msg);
        }
 }
 
 /*
+ * Simulate receiving a message which terminates an activate transaction
+ * state.  Our simulated received message must set DELETE and may also
+ * have to set CREATE.  It must also ensure that all fields are set such
+ * that the receive handling code can find the state (kdmsg_state_msgrx())
+ * or an endless loop will ensue.
+ *
+ * This is used when the other end of the link or virtual circuit is dead
+ * so the device driver gets a completed transaction for all pending states.
+ */
+static
+void
+kdmsg_state_abort(kdmsg_state_t *state)
+{
+       kdmsg_iocom_t *iocom = state->iocom;
+       kdmsg_msg_t *msg;
+
+       msg = kdmsg_msg_alloc(iocom, state->circ,
+                             DMSG_LNK_ERROR,
+                             NULL, NULL);
+       if ((state->rxcmd & DMSGF_CREATE) == 0)
+               msg->any.head.cmd |= DMSGF_CREATE;
+       msg->any.head.cmd |= DMSGF_DELETE | (state->rxcmd & DMSGF_REPLY);
+       msg->any.head.error = DMSG_ERR_LOSTLINK;
+       msg->any.head.msgid = state->msgid;
+       msg->state = state;
+       if ((msg->circ = state->circ) != NULL)
+               kdmsg_circ_hold(msg->circ);
+       kdmsg_msg_receive_handling(msg);
+}
+
+/*
  * Process state tracking for a message prior to transmission.
  *
- * Called with msglk held and the msg dequeued.
+ * Called with msglk held and the msg dequeued.  Returns non-zero if
+ * the message is bad and should be deleted by the caller.
  *
  * One-off messages are usually with dummy state and msg->state may be NULL
  * in this situation.
@@ -1133,6 +1342,7 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
  * May request that caller discard the message by setting *discardp to 1.
  * A NULL state may be returned in this case.
  */
+static
 int
 kdmsg_state_msgtx(kdmsg_msg_t *msg)
 {
@@ -1314,6 +1524,7 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
        return (error);
 }
 
+static
 void
 kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
 {
@@ -1339,19 +1550,21 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                                          &iocom->statewr_tree, state);
                        }
                        state->flags &= ~KDMSG_STATE_INSERTED;
-                       msg->state = NULL;
+                       if (msg != state->msg)
+                               kdmsg_msg_free(msg);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                        kdmsg_state_free(state);
                } else {
-                       if (state->msg != msg)
+                       if (msg != state->msg)
                                kdmsg_msg_free(msg);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                }
-       } else if (state->msg != msg) {
+       } else if (msg != state->msg) {
                kdmsg_msg_free(msg);
        }
 }
 
+static
 void
 kdmsg_state_free(kdmsg_state_t *state)
 {
@@ -1362,12 +1575,14 @@ kdmsg_state_free(kdmsg_state_t *state)
        msg = state->msg;
        state->msg = NULL;
        kfree(state, iocom->mmsg);
-       if (msg)
+       if (msg) {
+               msg->state = NULL;
                kdmsg_msg_free(msg);
+       }
 }
 
 kdmsg_msg_t *
-kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit, uint32_t cmd,
+kdmsg_msg_alloc(kdmsg_iocom_t *iocom, kdmsg_circuit_t *circ, uint32_t cmd,
                int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
 {
        kdmsg_msg_t *msg;
@@ -1381,8 +1596,12 @@ kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit, uint32_t cmd,
        msg->hdr_size = hbytes;
        msg->iocom = iocom;
        msg->any.head.magic = DMSG_HDR_MAGIC;
-       msg->any.head.circuit = circuit;
        msg->any.head.cmd = cmd;
+       if (circ) {
+               kdmsg_circ_hold(circ);
+               msg->circ = circ;
+               msg->any.head.circuit = circ->msgid;
+       }
 
        if (cmd & DMSGF_CREATE) {
                /*
@@ -1396,9 +1615,11 @@ kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit, uint32_t cmd,
                state->any.any = data;
                state->msg = msg;
                state->msgid = (uint64_t)(uintptr_t)state;
-               state->circuit = circuit;
+               state->circ = circ;
                state->iocom = iocom;
                msg->state = state;
+               if (circ)
+                       kdmsg_circ_hold(circ);
                /*msg->any.head.msgid = state->msgid;XXX*/
 
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
@@ -1408,10 +1629,34 @@ kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit, uint32_t cmd,
                msg->any.head.msgid = state->msgid;
                lockmgr(&iocom->msglk, LK_RELEASE);
        }
-
        return (msg);
 }
 
+kdmsg_msg_t *
+kdmsg_msg_alloc_state(kdmsg_state_t *state, uint32_t cmd,
+                     int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
+{
+       kdmsg_iocom_t *iocom = state->iocom;
+       kdmsg_msg_t *msg;
+       size_t hbytes;
+
+       KKASSERT(iocom != NULL);
+       hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
+       msg = kmalloc(offsetof(struct kdmsg_msg, any) + hbytes,
+                     iocom->mmsg, M_WAITOK | M_ZERO);
+       msg->hdr_size = hbytes;
+       msg->iocom = iocom;
+       msg->any.head.magic = DMSG_HDR_MAGIC;
+       msg->any.head.cmd = cmd;
+       msg->state = state;
+       if (state->circ) {
+               kdmsg_circ_hold(state->circ);
+               msg->circ = state->circ;
+               msg->any.head.circuit = state->circ->msgid;
+       }
+       return(msg);
+}
+
 void
 kdmsg_msg_free(kdmsg_msg_t *msg)
 {
@@ -1422,14 +1667,35 @@ kdmsg_msg_free(kdmsg_msg_t *msg)
                kfree(msg->aux_data, iocom->mmsg);
                msg->flags &= ~KDMSG_FLAG_AUXALLOC;
        }
+       if (msg->circ) {
+               kdmsg_circ_drop(msg->circ);
+               msg->circ = NULL;
+       }
+       if (msg->state) {
+               if (msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               msg->state = NULL;
+       }
        msg->aux_data = NULL;
        msg->aux_size = 0;
        msg->iocom = NULL;
-       msg->any.head.circuit = -1;
        kfree(msg, iocom->mmsg);
 }
 
 /*
+ * Circuits are tracked in a red-black tree by their circuit id (msgid).
+ */
+int
+kdmsg_circuit_cmp(kdmsg_circuit_t *circ1, kdmsg_circuit_t *circ2)
+{
+       if (circ1->msgid < circ2->msgid)
+               return(-1);
+       if (circ1->msgid > circ2->msgid)
+               return(1);
+       return (0);
+}
+
+/*
  * Indexed messages are stored in a red-black tree indexed by their
  * msgid.  Only persistent messages are indexed.
  */
@@ -1440,9 +1706,9 @@ kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2)
                return(-1);
        if (state1->iocom > state2->iocom)
                return(1);
-       if (state1->circuit < state2->circuit)
+       if (state1->circ < state2->circ)
                return(-1);
-       if (state1->circuit > state2->circuit)
+       if (state1->circ > state2->circ)
                return(1);
        if (state1->msgid < state2->msgid)
                return(-1);
@@ -1490,16 +1756,65 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                 * between a possibly lost in-transaction message due to
                 * competing aborts and a real one-off message?)
                 */
+               state = NULL;
                msg->any.head.msgid = 0;
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        }
 
        /*
-        * Finish up the msg fields
+        * With AUTOCIRC and AUTOFORGE it is possible for the circuit to
+        * get ripped out in the rxthread while some other thread is
+        * holding a ref on it inbetween allocating and sending a dmsg.
+        */
+       if (msg->circ && msg->circ->rcirc_state == NULL &&
+           (msg->circ->span_state == NULL || msg->circ->circ_state == NULL)) {
+               kprintf("kdmsg_msg_write: Attempt to write message to "
+                       "terminated circuit: msg %08x\n", msg->any.head.cmd);
+               lockmgr(&iocom->msglk, LK_RELEASE);
+               if (kdmsg_state_msgtx(msg)) {
+                       if (state == NULL || msg != state->msg)
+                               kdmsg_msg_free(msg);
+               } else if ((msg->state->rxcmd & DMSGF_DELETE) == 0) {
+                       /* XXX SMP races simulating a response here */
+                       kdmsg_state_t *state = msg->state;
+                       kdmsg_state_cleanuptx(msg);
+                       kdmsg_state_abort(state);
+               } else {
+                       kdmsg_state_cleanuptx(msg);
+               }
+               return;
+       }
+
+       /*
+        * This flag is not set until after the tx thread has drained
+        * the txmsgq and simulated responses.  After that point the
+        * txthread is dead and can no longer simulate responses.
+        *
+        * Device drivers should never try to send a message once this
+        * flag is set.  They should have detected (through the state
+        * closures) that the link is in trouble.
+        */
+       if (iocom->flags & KDMSG_IOCOMF_EXITNOACC) {
+               lockmgr(&iocom->msglk, LK_RELEASE);
+               panic("kdmsg_msg_write: Attempt to write message to "
+                     "terminated iocom\n");
+       }
+
+       /*
+        * Finish up the msg fields.  Note that msg->aux_size and the
+        * aux_bytes stored in the message header represent the unaligned
+        * (actual) bytes of data, but the buffer is sized to an aligned
+        * size and the CRC is generated over the aligned length.
         */
        msg->any.head.salt = /* (random << 8) | */ (iocom->msg_seq & 255);
        ++iocom->msg_seq;
 
+       if (msg->aux_data && msg->aux_size) {
+               uint32_t abytes = DMSG_DOALIGN(msg->aux_size);
+
+               msg->any.head.aux_bytes = msg->aux_size;
+               msg->any.head.aux_crc = iscsi_crc32(msg->aux_data, abytes);
+       }
        msg->any.head.hdr_crc = 0;
        msg->any.head.hdr_crc = iscsi_crc32(msg->any.buf, msg->hdr_size);
 
@@ -1554,12 +1869,8 @@ kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error)
        }
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = kdmsg_msg_alloc(msg->iocom, msg->any.head.circuit,
-                              cmd & DMSGF_BASECMDMASK,
-                              NULL, NULL);
-       nmsg->any.head.cmd = cmd;
+       nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
        nmsg->any.head.error = error;
-       nmsg->state = state;
        kdmsg_msg_write(nmsg);
 }
 
@@ -1604,12 +1915,8 @@ kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error)
        }
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = kdmsg_msg_alloc(msg->iocom, msg->any.head.circuit,
-                              cmd & DMSGF_BASECMDMASK,
-                              NULL, NULL);
-       nmsg->any.head.cmd = cmd;
+       nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
        nmsg->any.head.error = error;
-       nmsg->state = state;
        kdmsg_msg_write(nmsg);
 }
 
@@ -1652,12 +1959,8 @@ kdmsg_state_reply(kdmsg_state_t *state, uint32_t error)
        }
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = kdmsg_msg_alloc(state->iocom, state->circuit,
-                              cmd & DMSGF_BASECMDMASK,
-                              NULL, NULL);
-       nmsg->any.head.cmd = cmd;
+       nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
        nmsg->any.head.error = error;
-       nmsg->state = state;
        kdmsg_msg_write(nmsg);
 }
 
@@ -1701,11 +2004,7 @@ kdmsg_state_result(kdmsg_state_t *state, uint32_t error)
        }
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = kdmsg_msg_alloc(state->iocom, state->circuit,
-                              cmd & DMSGF_BASECMDMASK,
-                              NULL, NULL);
-       nmsg->any.head.cmd = cmd;
+       nmsg = kdmsg_msg_alloc_state(state, cmd, NULL, NULL);
        nmsg->any.head.error = error;
-       nmsg->state = state;
        kdmsg_msg_write(nmsg);
 }
index 7694866..345713a 100644 (file)
@@ -50,6 +50,7 @@
 #include <sys/thread.h>
 #include <sys/queue.h>
 #include <sys/lock.h>
+#include <sys/stat.h>
 #include <sys/uuid.h>
 
 #include <sys/dmsg.h>
 #include <sys/msgport2.h>
 #include <sys/thread2.h>
 
+struct dios_open {
+       int     openrd;
+       int     openwr;
+};
+
+struct dios_io {
+       int     count;
+       int     eof;
+};
+
 static MALLOC_DEFINE(M_DMSG_DISK, "dmsg_disk", "disk dmsg");
 
 static int disk_iocom_reconnect(struct disk *dp, struct file *fp);
 static int disk_rcvdmsg(kdmsg_msg_t *msg);
 
+static void disk_blk_open(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_read(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_write(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_flush(struct disk *dp, kdmsg_msg_t *msg);
+static void disk_blk_freeblks(struct disk *dp, kdmsg_msg_t *msg);
+static void diskiodone(struct bio *bio);
+
 void
 disk_iocom_init(struct disk *dp)
 {
@@ -160,31 +178,451 @@ disk_rcvdmsg(kdmsg_msg_t *msg)
 {
        struct disk *dp = msg->iocom->handle;
 
-       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
+       /*
+        * Handle debug messages (these might not be in transactions)
+        */
+       switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
        case DMSG_DBG_SHELL:
                /*
                 * Execute shell command (not supported atm)
                 */
                kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
-               break;
+               return(0);
        case DMSG_DBG_SHELL | DMSGF_REPLY:
                if (msg->aux_data) {
                        msg->aux_data[msg->aux_size - 1] = 0;
                        kprintf("diskiocom: DEBUGMSG: %s\n", msg->aux_data);
                }
+               return(0);
+       }
+
+       /*
+        * All remaining messages must be in a transaction
+        *
+        * NOTE!  We are switching on the first message's command.  The
+        *        actual message command within the transaction may be
+        *        different (if streaming within a transaction).
+        */
+       if (msg->state == NULL) {
+               kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+               return(0);
+       }
+
+       switch(msg->state->rxcmd & DMSGF_CMDSWMASK) {
+       case DMSG_BLK_OPEN:
+       case DMSG_BLK_CLOSE:
+               disk_blk_open(dp, msg);
+               break;
+       case DMSG_BLK_READ:
+               disk_blk_read(dp, msg);
+               break;
+       case DMSG_BLK_WRITE:
+               disk_blk_write(dp, msg);
+               break;
+       case DMSG_BLK_FLUSH:
+               disk_blk_flush(dp, msg);
+               break;
+       case DMSG_BLK_FREEBLKS:
+               disk_blk_freeblks(dp, msg);
                break;
-       case DMSG_BLK_OPEN | DMSGF_CREATE:
-       case DMSG_BLK_READ | DMSGF_CREATE:
-       case DMSG_BLK_WRITE | DMSGF_CREATE:
-       case DMSG_BLK_FLUSH | DMSGF_CREATE:
-       case DMSG_BLK_FREEBLKS | DMSGF_CREATE:
        default:
-               kprintf("diskiocom: DISK ADHOC INPUT %s%d cmd %08x\n",
-                       dev_dname(dp->d_rawdev), dkunit(dp->d_rawdev),
-                       msg->any.head.cmd);
-               if (msg->any.head.cmd & DMSGF_CREATE)
-                       kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+               if ((msg->any.head.cmd & DMSGF_REPLY) == 0) {
+                       if (msg->any.head.cmd & DMSGF_DELETE)
+                               kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+                       else
+                               kdmsg_msg_result(msg, DMSG_ERR_NOSUPP);
+               }
                break;
        }
        return (0);
 }
+
+static
+void
+disk_blk_open(struct disk *dp, kdmsg_msg_t *msg)
+{
+       struct dios_open *openst;
+       int error = DMSG_ERR_NOSUPP;
+       int fflags;
+
+       openst = msg->state->any.any;
+       if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_OPEN) {
+               if (openst == NULL) {
+                       openst = kmalloc(sizeof(*openst), M_DEVBUF,
+                                               M_WAITOK | M_ZERO);
+                       msg->state->any.any = openst;
+               }
+               fflags = 0;
+               if (msg->any.blk_open.modes & DMSG_BLKOPEN_RD)
+                       fflags = FREAD;
+               if (msg->any.blk_open.modes & DMSG_BLKOPEN_WR)
+                       fflags |= FWRITE;
+               error = dev_dopen(dp->d_rawdev, fflags, S_IFCHR, proc0.p_ucred);
+               if (error) {
+                       error = DMSG_ERR_IO;
+               } else {
+                       if (msg->any.blk_open.modes & DMSG_BLKOPEN_RD)
+                               ++openst->openrd;
+                       if (msg->any.blk_open.modes & DMSG_BLKOPEN_WR)
+                               ++openst->openwr;
+               }
+       }
+       if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_CLOSE &&
+           openst) {
+               fflags = 0;
+               if ((msg->any.blk_open.modes & DMSG_BLKOPEN_RD) &&
+                   openst->openrd) {
+                       fflags = FREAD;
+               }
+               if ((msg->any.blk_open.modes & DMSG_BLKOPEN_WR) &&
+                   openst->openwr) {
+                       fflags |= FWRITE;
+               }
+               error = dev_dclose(dp->d_rawdev, fflags, S_IFCHR);
+               if (error) {
+                       error = DMSG_ERR_IO;
+               } else {
+                       if (msg->any.blk_open.modes & DMSG_BLKOPEN_RD)
+                               --openst->openrd;
+                       if (msg->any.blk_open.modes & DMSG_BLKOPEN_WR)
+                               --openst->openwr;
+               }
+       }
+       if (msg->any.head.cmd & DMSGF_DELETE) {
+               if (openst) {
+                       while (openst->openrd && openst->openwr) {
+                               --openst->openrd;
+                               --openst->openwr;
+                               dev_dclose(dp->d_rawdev, FREAD|FWRITE, S_IFCHR);
+                       }
+                       while (openst->openrd) {
+                               --openst->openrd;
+                               dev_dclose(dp->d_rawdev, FREAD, S_IFCHR);
+                       }
+                       while (openst->openwr) {
+                               --openst->openwr;
+                               dev_dclose(dp->d_rawdev, FWRITE, S_IFCHR);
+                       }
+                       kfree(openst, M_DEVBUF);
+                       msg->state->any.any = NULL;
+               }
+               kdmsg_msg_reply(msg, error);
+       } else {
+               kdmsg_msg_result(msg, error);
+       }
+}
+
+static
+void
+disk_blk_read(struct disk *dp, kdmsg_msg_t *msg)
+{
+       struct dios_io *iost;
+       struct buf *bp;
+       struct bio *bio;
+       int error = DMSG_ERR_NOSUPP;
+       int reterr = 1;
+
+       /*
+        * Only DMSG_BLK_READ commands imply read ops.
+        */
+       iost = msg->state->any.any;
+       if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_READ) {
+               if (msg->any.blk_read.bytes < DEV_BSIZE ||
+                   msg->any.blk_read.bytes > MAXPHYS) {
+                       error = DMSG_ERR_PARAM;
+                       goto done;
+               }
+               if (iost == NULL) {
+                       iost = kmalloc(sizeof(*iost), M_DEVBUF,
+                                      M_WAITOK | M_ZERO);
+                       msg->state->any.any = iost;
+               }
+               reterr = 0;
+               bp = geteblk(msg->any.blk_read.bytes);
+               bio = &bp->b_bio1;
+               bp->b_cmd = BUF_CMD_READ;
+               bp->b_bcount = msg->any.blk_read.bytes;
+               bp->b_resid = bp->b_bcount;
+               bio->bio_offset = msg->any.blk_read.offset;
+               bio->bio_caller_info1.ptr = msg->state;
+               bio->bio_done = diskiodone;
+               /* kdmsg_state_hold(msg->state); */
+
+               atomic_add_int(&iost->count, 1);
+               if (msg->any.head.cmd & DMSGF_DELETE)
+                       iost->eof = 1;
+               BUF_KERNPROC(bp);
+               dev_dstrategy(dp->d_rawdev, bio);
+       }
+done:
+       if (reterr) {
+               if (msg->any.head.cmd & DMSGF_DELETE) {
+                       if (iost && iost->count == 0) {
+                               kfree(iost, M_DEVBUF);
+                               msg->state->any.any = NULL;
+                       }
+                       kdmsg_msg_reply(msg, error);
+               } else {
+                       kdmsg_msg_result(msg, error);
+               }
+       }
+}
+
+static
+void
+disk_blk_write(struct disk *dp, kdmsg_msg_t *msg)
+{
+       struct dios_io *iost;
+       struct buf *bp;
+       struct bio *bio;
+       int error = DMSG_ERR_NOSUPP;
+       int reterr = 1;
+
+       /*
+        * Only DMSG_BLK_WRITE commands imply read ops.
+        */
+       iost = msg->state->any.any;
+       if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_WRITE) {
+               if (msg->any.blk_write.bytes < DEV_BSIZE ||
+                   msg->any.blk_write.bytes > MAXPHYS) {
+                       error = DMSG_ERR_PARAM;
+                       goto done;
+               }
+               if (iost == NULL) {
+                       iost = kmalloc(sizeof(*iost), M_DEVBUF,
+                                      M_WAITOK | M_ZERO);
+                       msg->state->any.any = iost;
+               }
+
+               /*
+                * Issue WRITE.  Short data implies zeros.  Try to optimize
+                * the buffer cache buffer for the case where we can just
+                * use the message's data pointer.
+                */
+               reterr = 0;
+               if (msg->aux_size >= msg->any.blk_write.bytes)
+                       bp = getpbuf(NULL);
+               else
+                       bp = geteblk(msg->any.blk_write.bytes);
+               bio = &bp->b_bio1;
+               bp->b_cmd = BUF_CMD_WRITE;
+               bp->b_bcount = msg->any.blk_write.bytes;
+               bp->b_resid = bp->b_bcount;
+               if (msg->aux_size >= msg->any.blk_write.bytes) {
+                       bp->b_data = msg->aux_data;
+               } else {
+                       bcopy(msg->aux_data, bp->b_data, msg->aux_size);
+                       bzero(bp->b_data + msg->aux_size,
+                             msg->any.blk_write.bytes - msg->aux_size);
+               }
+               bio->bio_offset = msg->any.blk_write.offset;
+               bio->bio_caller_info1.ptr = msg->state;
+               bio->bio_done = diskiodone;
+               /* kdmsg_state_hold(msg->state); */
+
+               atomic_add_int(&iost->count, 1);
+               if (msg->any.head.cmd & DMSGF_DELETE)
+                       iost->eof = 1;
+               BUF_KERNPROC(bp);
+               dev_dstrategy(dp->d_rawdev, bio);
+       }
+done:
+       if (reterr) {
+               if (msg->any.head.cmd & DMSGF_DELETE) {
+                       if (iost && iost->count == 0) {
+                               kfree(iost, M_DEVBUF);
+                               msg->state->any.any = NULL;
+                       }
+                       kdmsg_msg_reply(msg, error);
+               } else {
+                       kdmsg_msg_result(msg, error);
+               }
+       }
+}
+
+static
+void
+disk_blk_flush(struct disk *dp, kdmsg_msg_t *msg)
+{
+       struct dios_io *iost;
+       struct buf *bp;
+       struct bio *bio;
+       int error = DMSG_ERR_NOSUPP;
+       int reterr = 1;
+
+       /*
+        * Only DMSG_BLK_FLUSH commands imply read ops.
+        */
+       iost = msg->state->any.any;
+       if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_FLUSH) {
+               if (iost == NULL) {
+                       iost = kmalloc(sizeof(*iost), M_DEVBUF,
+                                      M_WAITOK | M_ZERO);
+                       msg->state->any.any = iost;
+               }
+               reterr = 0;
+               bp = getpbuf(NULL);
+               bio = &bp->b_bio1;
+               bp->b_cmd = BUF_CMD_FLUSH;
+               bp->b_bcount = msg->any.blk_flush.bytes;
+               bp->b_resid = 0;
+               bio->bio_offset = msg->any.blk_flush.offset;
+               bio->bio_caller_info1.ptr = msg->state;
+               bio->bio_done = diskiodone;
+               /* kdmsg_state_hold(msg->state); */
+
+               atomic_add_int(&iost->count, 1);
+               if (msg->any.head.cmd & DMSGF_DELETE)
+                       iost->eof = 1;
+               BUF_KERNPROC(bp);
+               dev_dstrategy(dp->d_rawdev, bio);
+       }
+       if (reterr) {
+               if (msg->any.head.cmd & DMSGF_DELETE) {
+                       if (iost && iost->count == 0) {
+                               kfree(iost, M_DEVBUF);
+                               msg->state->any.any = NULL;
+                       }
+                       kdmsg_msg_reply(msg, error);
+               } else {
+                       kdmsg_msg_result(msg, error);
+               }
+       }
+}
+
+static
+void
+disk_blk_freeblks(struct disk *dp, kdmsg_msg_t *msg)
+{
+       struct dios_io *iost;
+       struct buf *bp;
+       struct bio *bio;
+       int error = DMSG_ERR_NOSUPP;
+       int reterr = 1;
+
+       /*
+        * Only DMSG_BLK_FREEBLKS commands imply read ops.
+        */
+       iost = msg->state->any.any;
+       if ((msg->any.head.cmd & DMSGF_CMDSWMASK) == DMSG_BLK_FREEBLKS) {
+               if (iost == NULL) {
+                       iost = kmalloc(sizeof(*iost), M_DEVBUF,
+                                      M_WAITOK | M_ZERO);
+                       msg->state->any.any = iost;
+               }
+               reterr = 0;
+               bp = getpbuf(NULL);
+               bio = &bp->b_bio1;
+               bp->b_cmd = BUF_CMD_FREEBLKS;
+               bp->b_bcount = msg->any.blk_freeblks.bytes;
+               bp->b_resid = 0;
+               bio->bio_offset = msg->any.blk_freeblks.offset;
+               bio->bio_caller_info1.ptr = msg->state;
+               bio->bio_done = diskiodone;
+               /* kdmsg_state_hold(msg->state); */
+
+               atomic_add_int(&iost->count, 1);
+               if (msg->any.head.cmd & DMSGF_DELETE)
+                       iost->eof = 1;
+               BUF_KERNPROC(bp);
+               dev_dstrategy(dp->d_rawdev, bio);
+       }
+       if (reterr) {
+               if (msg->any.head.cmd & DMSGF_DELETE) {
+                       if (iost && iost->count == 0) {
+                               kfree(iost, M_DEVBUF);
+                               msg->state->any.any = NULL;
+                       }
+                       kdmsg_msg_reply(msg, error);
+               } else {
+                       kdmsg_msg_result(msg, error);
+               }
+       }
+}
+
+static
+void
+diskiodone(struct bio *bio)
+{
+       struct buf *bp = bio->bio_buf;
+       kdmsg_state_t *state = bio->bio_caller_info1.ptr;
+       kdmsg_msg_t *rmsg;
+       struct dios_io *iost = state->any.any;
+       int error;
+       int resid = 0;
+       int bytes;
+       uint32_t cmd;
+       void *data;
+
+       cmd = DMSG_LNK_ERROR;
+       data = NULL;
+       bytes = 0;
+
+       switch(bp->b_cmd) {
+       case BUF_CMD_READ:
+               cmd = DMSG_LNK_ERROR;
+               data = bp->b_data;
+               bytes = bp->b_bcount;
+               /* fall through */
+       case BUF_CMD_WRITE:
+               if (bp->b_flags & B_ERROR) {
+                       error = bp->b_error;
+               } else {
+                       error = 0;
+                       resid = bp->b_resid;
+               }
+               break;
+       case BUF_CMD_FLUSH:
+       case BUF_CMD_FREEBLKS:
+               if (bp->b_flags & B_ERROR)
+                       error = bp->b_error;
+               else
+                       error = 0;
+               break;
+       default:
+               panic("diskiodone: Unknown bio cmd = %d\n",
+                     bio->bio_buf->b_cmd);
+               break; /* NOT REACHED */
+       }
+
+       /*
+        * Convert LNK_ERROR or BLK_ERROR if non-zero resid.  READS will
+        * have already converted cmd to BLK_ERROR and set up data to return.
+        */
+       if (resid && cmd == DMSG_LNK_ERROR)
+               cmd = DMSG_BLK_ERROR;
+       /* XXX txcmd is delayed so this won't work for streaming */
+       if ((state->txcmd & DMSGF_CREATE) == 0) /* assume serialized */
+               cmd |= DMSGF_CREATE;
+       if (iost->eof) {
+               if (atomic_fetchadd_int(&iost->count, -1) == 1)
+                       cmd |= DMSGF_DELETE;
+       } else {
+               atomic_add_int(&iost->count, -1);
+       }
+       cmd |= DMSGF_REPLY;
+
+       /*
+        * Allocate a basic or extended reply.  Be careful not to populate
+        * extended header fields unless we allocated an extended reply.
+        */
+       rmsg = kdmsg_msg_alloc_state(state, cmd, NULL, 0);
+       if (data) {
+               rmsg->aux_data = kmalloc(bytes, state->iocom->mmsg, M_INTWAIT);
+               rmsg->aux_size = bytes;
+               rmsg->flags |= KDMSG_FLAG_AUXALLOC;
+               bcopy(data, rmsg->aux_data, bytes);
+       }
+       rmsg->any.blk_error.head.error = error;
+       if ((cmd & DMSGF_BASECMDMASK) == DMSG_BLK_ERROR)
+               rmsg->any.blk_error.resid = resid;
+       bio->bio_caller_info1.ptr = NULL;
+       /* kdmsg_state_drop(state); */
+       kdmsg_msg_write(rmsg);
+       if (bp->b_flags & B_PAGING) {
+               relpbuf(bio->bio_buf, NULL);
+       } else {
+               bp->b_flags |= B_INVAL | B_AGE;
+               brelse(bp);
+       }
+}
index 2726460..9dd8d66 100644 (file)
  *
  * Auxillary data, whether in-band or out-of-band, must be at-least 64-byte
  * aligned.  The aux_bytes field contains the actual byte-granular length
- * and not the aligned length.
+ * and not the aligned length.  The crc is against the aligned length (so
+ * a faster crc algorithm can be used, theoretically).
  *
  * hdr_crc is calculated over the entire, ALIGNED extended header.  For
  * the purposes of calculating the crc, the hdr_crc field is 0.  That is,
@@ -670,19 +671,29 @@ typedef struct dmsg_dbg_shell dmsg_dbg_shell_t;
  *
  * BLK_OPEN    - Open device.  This transaction must be left open for the
  *               duration and the returned keyid passed in all associated
- *               BLK commands.
+ *               BLK commands.  Multiple OPENs can be issued within the
+ *               transaction.
  *
- * BLK_READ    - Strategy read
+ * BLK_CLOSE   - Close device.  This can be used to close one of the opens
+ *               within a BLK_OPEN transaction.  It may NOT initiate a
+ *               transaction.  Note that a termination of the transaction
+ *               (e.g. with LNK_ERROR or BLK_ERROR) closes all active OPENs
+ *               for that transaction.
  *
- * BLK_WRITE   - Strategy write
+ * BLK_READ    - Strategy read.  Not typically streaming.
  *
- * BLK_FLUSH   - Strategy flush
+ * BLK_WRITE   - Strategy write.  Not typically streaming.
+ *
+ * BLK_FLUSH   - Strategy flush.  Not typically streaming.
+ *
+ * BLK_FREEBLKS        - Strategy freeblks.  Not typically streaming.
  */
 #define DMSG_BLK_OPEN          DMSG_BLK(0x001, dmsg_blk_open)
-#define DMSG_BLK_READ          DMSG_BLK(0x002, dmsg_blk_read)
-#define DMSG_BLK_WRITE         DMSG_BLK(0x003, dmsg_blk_write)
-#define DMSG_BLK_FLUSH         DMSG_BLK(0x004, dmsg_blk_flush)
-#define DMSG_BLK_FREEBLKS      DMSG_BLK(0x005, dmsg_blk_freeblks)
+#define DMSG_BLK_CLOSE         DMSG_BLK(0x002, dmsg_blk_open)
+#define DMSG_BLK_READ          DMSG_BLK(0x003, dmsg_blk_read)
+#define DMSG_BLK_WRITE         DMSG_BLK(0x004, dmsg_blk_write)
+#define DMSG_BLK_FLUSH         DMSG_BLK(0x005, dmsg_blk_flush)
+#define DMSG_BLK_FREEBLKS      DMSG_BLK(0x006, dmsg_blk_freeblks)
 #define DMSG_BLK_ERROR         DMSG_BLK(0xFFF, dmsg_blk_error)
 
 struct dmsg_blk_open {
@@ -763,6 +774,9 @@ typedef struct dmsg_blk_error               dmsg_blk_error_t;
  */
 #define DMSG_ERR_NOSUPP                0x20
 #define DMSG_ERR_LOSTLINK      0x21
+#define DMSG_ERR_IO            0x22    /* generic */
+#define DMSG_ERR_PARAM         0x23    /* generic */
+#define DMSG_ERR_CANTCIRC      0x24    /* (typically means lost span) */
 
 union dmsg_any {
        char                    buf[DMSG_HDR_MAX];
@@ -811,12 +825,16 @@ struct kdmsg_msg;
  * connect to specific services over the cluster.
  */
 struct kdmsg_circuit {
-       TAILQ_ENTRY(kdmsg_circuit) entry;
-       struct kdmsg_iocom      *iocom;
+       RB_ENTRY(kdmsg_circuit) rbnode;         /* indexed by msgid */
+       TAILQ_ENTRY(kdmsg_circuit) entry;       /* written by shim */
+       struct kdmsg_iocom      *iocom;         /* written by shim */
        struct kdmsg_state      *span_state;
-       struct kdmsg_state      *circ_state;
-       int                     recorded;       /* used by shim */
+       struct kdmsg_state      *circ_state;    /* master circuit */
+       struct kdmsg_state      *rcirc_state;   /* slave circuit */
+       uint64_t                msgid;
        int                     weight;
+       int                     recorded;       /* written by shim */
+       int                     refs;           /* written by shim */
 };
 
 typedef struct kdmsg_circuit kdmsg_circuit_t;
@@ -829,7 +847,7 @@ typedef struct kdmsg_circuit kdmsg_circuit_t;
 struct kdmsg_state {
        RB_ENTRY(kdmsg_state) rbnode;           /* indexed by msgid */
        struct kdmsg_iocom *iocom;
-       uint64_t        circuit;
+       struct kdmsg_circuit *circ;
        uint32_t        icmd;                   /* record cmd creating state */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
@@ -854,6 +872,7 @@ struct kdmsg_msg {
        TAILQ_ENTRY(kdmsg_msg) qentry;          /* serialized queue */
        struct kdmsg_iocom *iocom;
        struct kdmsg_state *state;
+       struct kdmsg_circuit *circ;
        size_t          hdr_size;
        size_t          aux_size;
        char            *aux_data;
@@ -868,10 +887,15 @@ typedef struct kdmsg_state kdmsg_state_t;
 typedef struct kdmsg_msg kdmsg_msg_t;
 
 struct kdmsg_state_tree;
-RB_HEAD(kdmsg_state_tree, kdmsg_state);
 int kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2);
+RB_HEAD(kdmsg_state_tree, kdmsg_state);
 RB_PROTOTYPE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
 
+struct kdmsg_circuit_tree;
+int kdmsg_circuit_cmp(kdmsg_circuit_t *circ1, kdmsg_circuit_t *circ2);
+RB_HEAD(kdmsg_circuit_tree, kdmsg_circuit);
+RB_PROTOTYPE(kdmsg_circuit_tree, kdmsg_circuit, rbnode, kdmsg_circuit_cmp);
+
 /*
  * Structure embedded in e.g. mount, master control structure for
  * DMSG stream handling.
@@ -895,6 +919,7 @@ struct kdmsg_iocom {
        struct kdmsg_state      *freewr_state;  /* allocation cache */
        struct kdmsg_state_tree staterd_tree;   /* active messages */
        struct kdmsg_state_tree statewr_tree;   /* active messages */
+       struct kdmsg_circuit_tree circ_tree;    /* active circuits */
        dmsg_lnk_conn_t         auto_lnk_conn;
        dmsg_lnk_span_t         auto_lnk_span;
 };
@@ -905,10 +930,12 @@ typedef struct kdmsg_iocom        kdmsg_iocom_t;
 #define KDMSG_IOCOMF_AUTOSPAN  0x0002  /* handle received LNK_SPAN */
 #define KDMSG_IOCOMF_AUTOCIRC  0x0004  /* handle received LNK_CIRC */
 #define KDMSG_IOCOMF_AUTOFORGE 0x0008  /* auto initiate LNK_CIRC */
+#define KDMSG_IOCOMF_EXITNOACC 0x0010  /* cannot accept writes */
 
 #define KDMSG_IOCOMF_AUTOANY   (KDMSG_IOCOMF_AUTOCONN |        \
                                 KDMSG_IOCOMF_AUTOSPAN |        \
-                                KDMSG_IOCOMF_AUTOCIRC)
+                                KDMSG_IOCOMF_AUTOCIRC |        \
+                                KDMSG_IOCOMF_AUTOFORGE)
 
 uint32_t kdmsg_icrc32(const void *buf, size_t size);
 uint32_t kdmsg_icrc32c(const void *buf, size_t size, uint32_t crc);
@@ -926,23 +953,24 @@ void kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom,
 void kdmsg_iocom_uninit(kdmsg_iocom_t *iocom);
 void kdmsg_drain_msgq(kdmsg_iocom_t *iocom);
 
-int kdmsg_state_msgrx(kdmsg_msg_t *msg);
-int kdmsg_state_msgtx(kdmsg_msg_t *msg);
-void kdmsg_state_cleanuprx(kdmsg_msg_t *msg);
-void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
-int kdmsg_msg_execute(kdmsg_msg_t *msg);
-void kdmsg_state_free(kdmsg_state_t *state);
 void kdmsg_msg_free(kdmsg_msg_t *msg);
-kdmsg_msg_t *kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit,
+kdmsg_msg_t *kdmsg_msg_alloc(kdmsg_iocom_t *iocom, kdmsg_circuit_t *circ,
                                uint32_t cmd,
                                int (*func)(kdmsg_state_t *, kdmsg_msg_t *),
                                void *data);
+kdmsg_msg_t *kdmsg_msg_alloc_state(kdmsg_state_t *state, uint32_t cmd,
+                               int (*func)(kdmsg_state_t *, kdmsg_msg_t *),
+                               void *data);
 void kdmsg_msg_write(kdmsg_msg_t *msg);
 void kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error);
 void kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error);
 void kdmsg_state_reply(kdmsg_state_t *state, uint32_t error);
 void kdmsg_state_result(kdmsg_state_t *state, uint32_t error);
 
+void kdmsg_circ_hold(kdmsg_circuit_t *circ);
+void kdmsg_circ_drop(kdmsg_circuit_t *circ);
+
+
 #endif
 
 #endif