hammer2 - Messaging layer separation work part 3
authorMatthew Dillon <dillon@apollo.backplane.com>
Thu, 25 Oct 2012 17:46:36 +0000 (10:46 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Thu, 25 Oct 2012 17:46:36 +0000 (10:46 -0700)
* Move more hammer2 vfs message code into kern_dmsg.c, renaming and
  implementing callbacks as needed.

* Move hammer2_icrc.c (the iscsi crc support) to libkern/icrc32.c

sbin/newfs_hammer2/Makefile
sbin/newfs_hammer2/newfs_hammer2.c
sys/conf/files
sys/kern/kern_dmsg.c [moved from sys/vfs/hammer2/hammer2_msg.c with 51% similarity]
sys/libkern/icrc32.c [moved from sys/vfs/hammer2/hammer2_icrc.c with 92% similarity]
sys/sys/dmsg.h
sys/sys/systm.h
sys/vfs/hammer2/Makefile
sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_msgops.c
sys/vfs/hammer2/hammer2_vfsops.c

index f306feb..398b212 100644 (file)
@@ -3,10 +3,10 @@
 PROG=  newfs_hammer2
 MAN=   newfs_hammer2.8
 CFLAGS+= -I${.CURDIR}/../../sys -I${.CURDIR}/../hammer2
-SRCS= newfs_hammer2.c hammer2_icrc.c
+SRCS= newfs_hammer2.c
 
 .PATH: ${.CURDIR}/../../sys/libkern
-.PATH: ${.CURDIR}/../../sys/vfs/hammer2
-SRCS+= crc32.c
+#.PATH: ${.CURDIR}/../../sys/vfs/hammer2
+SRCS+= icrc32.c
 
 .include <bsd.prog.mk>
index 322d0b3..aae4c6e 100644 (file)
 #include <err.h>
 #include <uuid.h>
 
+#define hammer2_icrc32(buf, size)      iscsi_crc32((buf), (size))
+#define hammer2_icrc32c(buf, size, crc)        iscsi_crc32_ext((buf), (size), (crc))
+uint32_t iscsi_crc32(const void *buf, size_t size);
+uint32_t iscsi_crc32_ext(const void *buf, size_t size, uint32_t ocrc);
+
 static hammer2_off_t check_volume(const char *path, int *fdp);
 static int64_t getsize(const char *str, int64_t minval, int64_t maxval, int pw);
 static const char *sizetostr(hammer2_off_t size);
@@ -534,7 +539,7 @@ format_hammer2(int fd, hammer2_off_t total_space, hammer2_off_t free_space)
        root_blockref.copyid = HAMMER2_COPYID_LOCAL;
        root_blockref.keybits = 0;
        root_blockref.check.iscsi32.value =
-                                       hammer2_icrc32(rawip, sizeof(*rawip));
+                       hammer2_icrc32(rawip, sizeof(*rawip));
        root_blockref.type = HAMMER2_BREF_TYPE_INODE;
        root_blockref.methods = HAMMER2_ENC_CHECKMETHOD(HAMMER2_CHECK_ICRC) |
                                HAMMER2_ENC_COMPMETHOD(HAMMER2_COMP_AUTOZERO);
index d7b25ac..fdf3c43 100644 (file)
@@ -833,6 +833,7 @@ kern/kern_conf.c    standard
 kern/kern_debug.c      standard
 kern/kern_device.c     standard
 kern/kern_descrip.c    standard
+kern/kern_dmsg.c       standard
 kern/kern_fp.c         standard
 kern/kern_environment.c        standard
 kern/kern_event.c      standard
@@ -1824,6 +1825,7 @@ libkern/arc4random.c                      standard
 libkern/bcd.c                          standard
 libkern/bsearch.c                      standard
 libkern/crc32.c                                standard
+libkern/icrc32.c                       standard
 libkern/cmpdi2.c                       standard
 libkern/ucmpdi2.c                      standard
 libkern/idr.c                          standard
similarity index 51%
rename from sys/vfs/hammer2/hammer2_msg.c
rename to sys/kern/kern_dmsg.c
index d5fd2b5..d65ee10 100644 (file)
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+#include <sys/param.h>
+#include <sys/types.h>
+#include <sys/kernel.h>
+#include <sys/conf.h>
+#include <sys/systm.h>
+#include <sys/queue.h>
+#include <sys/tree.h>
+#include <sys/malloc.h>
+#include <sys/mount.h>
+#include <sys/socket.h>
+#include <sys/vnode.h>
+#include <sys/file.h>
+#include <sys/proc.h>
+#include <sys/priv.h>
+#include <sys/thread.h>
+#include <sys/globaldata.h>
+#include <sys/limits.h>
 
-#include "hammer2.h"
+#include <sys/dmsg.h>
 
-RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
+RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
+
+static void kdmsg_iocom_thread_rd(void *arg);
+static void kdmsg_iocom_thread_wr(void *arg);
+
+/*
+ * Initialize the roll-up communications structure for a network
+ * messaging session.  This function does not install the socket.
+ */
+void
+kdmsg_iocom_init(kdmsg_iocom_t *iocom, void *handle,
+                struct malloc_type *mmsg,
+                void (*cctl_wakeup)(kdmsg_iocom_t *),
+                int (*lnk_rcvmsg)(kdmsg_msg_t *msg),
+                int (*dbg_rcvmsg)(kdmsg_msg_t *msg),
+                int (*misc_rcvmsg)(kdmsg_msg_t *msg))
+{
+       bzero(iocom, sizeof(*iocom));
+       iocom->handle = handle;
+       iocom->mmsg = mmsg;
+       iocom->clusterctl_wakeup = cctl_wakeup;
+       iocom->lnk_rcvmsg = lnk_rcvmsg;
+       iocom->dbg_rcvmsg = dbg_rcvmsg;
+       iocom->misc_rcvmsg = misc_rcvmsg;
+       iocom->router.iocom = iocom;
+       lockinit(&iocom->msglk, "h2msg", 0, 0);
+       TAILQ_INIT(&iocom->msgq);
+       RB_INIT(&iocom->staterd_tree);
+       RB_INIT(&iocom->statewr_tree);
+}
+
+/*
+ * [Re]connect using the passed file pointer.  The caller must ref the
+ * fp for us.  We own that ref now.
+ */
+void
+kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
+                     const char *subsysname)
+{
+       /*
+        * Destroy the current connection
+        */
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
+       while (iocom->msgrd_td || iocom->msgwr_td) {
+               wakeup(&iocom->msg_ctl);
+               tsleep(iocom, 0, "clstrkl", hz);
+       }
+
+       /*
+        * Drop communications descriptor
+        */
+       if (iocom->msg_fp) {
+               fdrop(iocom->msg_fp);
+               iocom->msg_fp = NULL;
+       }
+       kprintf("RESTART CONNECTION\n");
+
+       /*
+        * Setup new communications descriptor
+        */
+       iocom->msg_ctl = 0;
+       iocom->msg_fp = fp;
+       iocom->msg_seq = 0;
+
+       lwkt_create(kdmsg_iocom_thread_rd, iocom, &iocom->msgrd_td,
+                   NULL, 0, -1, "%s-msgrd", subsysname);
+       lwkt_create(kdmsg_iocom_thread_wr, iocom, &iocom->msgwr_td,
+                   NULL, 0, -1, "%s-msgwr", subsysname);
+}
+
+/*
+ * Cluster controller thread.  Perform messaging functions.  We have one
+ * thread for the reader and one for the writer.  The writer handles
+ * shutdown requests (which should break the reader thread).
+ */
+static
+void
+kdmsg_iocom_thread_rd(void *arg)
+{
+       kdmsg_iocom_t *iocom = arg;
+       dmsg_hdr_t hdr;
+       kdmsg_msg_t *msg;
+       kdmsg_state_t *state;
+       size_t hbytes;
+       int error = 0;
+
+       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0) {
+               /*
+                * Retrieve the message from the pipe or socket.
+                */
+               error = fp_read(iocom->msg_fp, &hdr, sizeof(hdr),
+                               NULL, 1, UIO_SYSSPACE);
+               if (error)
+                       break;
+               if (hdr.magic != DMSG_HDR_MAGIC) {
+                       kprintf("kdmsg: bad magic: %04x\n", hdr.magic);
+                       error = EINVAL;
+                       break;
+               }
+               hbytes = (hdr.cmd & DMSGF_SIZE) * DMSG_ALIGN;
+               if (hbytes < sizeof(hdr) || hbytes > DMSG_AUX_MAX) {
+                       kprintf("kdmsg: bad header size %zd\n", hbytes);
+                       error = EINVAL;
+                       break;
+               }
+               /* XXX messy: mask cmd to avoid allocating state */
+               msg = kdmsg_msg_alloc(&iocom->router,
+                                       hdr.cmd & DMSGF_BASECMDMASK,
+                                       NULL, NULL);
+               msg->any.head = hdr;
+               msg->hdr_size = hbytes;
+               if (hbytes > sizeof(hdr)) {
+                       error = fp_read(iocom->msg_fp, &msg->any.head + 1,
+                                       hbytes - sizeof(hdr),
+                                       NULL, 1, UIO_SYSSPACE);
+                       if (error) {
+                               kprintf("kdmsg: short msg received\n");
+                               error = EINVAL;
+                               break;
+                       }
+               }
+               msg->aux_size = hdr.aux_bytes * DMSG_ALIGN;
+               if (msg->aux_size > DMSG_AUX_MAX) {
+                       kprintf("kdmsg: illegal msg payload size %zd\n",
+                               msg->aux_size);
+                       error = EINVAL;
+                       break;
+               }
+               if (msg->aux_size) {
+                       msg->aux_data = kmalloc(msg->aux_size, iocom->mmsg,
+                                               M_WAITOK | M_ZERO);
+                       error = fp_read(iocom->msg_fp, msg->aux_data,
+                                       msg->aux_size,
+                                       NULL, 1, UIO_SYSSPACE);
+                       if (error) {
+                               kprintf("kdmsg: short msg payload received\n");
+                               break;
+                       }
+               }
+
+               /*
+                * State machine tracking, state assignment for msg,
+                * returns error and discard status.  Errors are fatal
+                * to the connection except for EALREADY which forces
+                * a discard without execution.
+                */
+               error = kdmsg_state_msgrx(msg);
+               if (error) {
+                       /*
+                        * Raw protocol or connection error
+                        */
+                       kdmsg_msg_free(msg);
+                       if (error == EALREADY)
+                               error = 0;
+               } else if (msg->state && msg->state->func) {
+                       /*
+                        * Message related to state which already has a
+                        * handling function installed for it.
+                        */
+                       error = msg->state->func(msg->state, msg);
+                       kdmsg_state_cleanuprx(msg);
+               } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
+                          DMSG_PROTO_LNK) {
+                       /*
+                        * Message related to the LNK protocol set
+                        */
+                       error = iocom->lnk_rcvmsg(msg);
+                       kdmsg_state_cleanuprx(msg);
+               } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
+                          DMSG_PROTO_DBG) {
+                       /*
+                        * Message related to the DBG protocol set
+                        */
+                       error = iocom->dbg_rcvmsg(msg);
+                       kdmsg_state_cleanuprx(msg);
+               } else {
+                       /*
+                        * Other higher-level messages (e.g. vnops)
+                        */
+                       error = iocom->misc_rcvmsg(msg);
+                       kdmsg_state_cleanuprx(msg);
+               }
+               msg = NULL;
+       }
+
+       if (error)
+               kprintf("kdmsg: read failed error %d\n", error);
+
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+       if (msg) {
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               kdmsg_msg_free(msg);
+       }
+
+       if ((state = iocom->freerd_state) != NULL) {
+               iocom->freerd_state = NULL;
+               kdmsg_state_free(state);
+       }
+
+       /*
+        * Shutdown the socket before waiting for the transmit side.
+        *
+        * If we are dying due to e.g. a socket disconnect verses being
+        * killed explicity we have to set KILL in order to kick the tx
+        * side when it might not have any other work to do.  KILL might
+        * already be set if we are in an unmount or reconnect.
+        */
+       fp_shutdown(iocom->msg_fp, SHUT_RDWR);
+
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
+       wakeup(&iocom->msg_ctl);
+
+       /*
+        * Wait for the transmit side to drain remaining messages
+        * before cleaning up the rx state.  The transmit side will
+        * set KILLTX and wait for the rx side to completely finish
+        * (set msgrd_td to NULL) before cleaning up any remaining
+        * tx states.
+        */
+       lockmgr(&iocom->msglk, LK_RELEASE);
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX);
+       wakeup(&iocom->msg_ctl);
+       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILLTX) == 0) {
+               wakeup(&iocom->msg_ctl);
+               tsleep(iocom, 0, "clstrkw", hz);
+       }
+
+       iocom->msgrd_td = NULL;
+
+       /*
+        * iocom can be ripped out from under us at this point but
+        * wakeup() is safe.
+        */
+       wakeup(iocom);
+       lwkt_exit();
+}
+
+static
+void
+kdmsg_iocom_thread_wr(void *arg)
+{
+       kdmsg_iocom_t *iocom = arg;
+       kdmsg_msg_t *msg;
+       kdmsg_state_t *state;
+       ssize_t res;
+       int error = 0;
+       int retries = 20;
+
+       /*
+        * Transmit loop
+        */
+       msg = NULL;
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+
+       while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0 && error == 0) {
+               /*
+                * Sleep if no messages pending.  Interlock with flag while
+                * holding msglk.
+                */
+               if (TAILQ_EMPTY(&iocom->msgq)) {
+                       atomic_set_int(&iocom->msg_ctl,
+                                      KDMSG_CLUSTERCTL_SLEEPING);
+                       lksleep(&iocom->msg_ctl, &iocom->msglk, 0, "msgwr", hz);
+                       atomic_clear_int(&iocom->msg_ctl,
+                                        KDMSG_CLUSTERCTL_SLEEPING);
+               }
+
+               while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
+                       /*
+                        * Remove msg from the transmit queue and do
+                        * persist and half-closed state handling.
+                        */
+                       TAILQ_REMOVE(&iocom->msgq, msg, qentry);
+                       lockmgr(&iocom->msglk, LK_RELEASE);
+
+                       error = kdmsg_state_msgtx(msg);
+                       if (error == EALREADY) {
+                               error = 0;
+                               kdmsg_msg_free(msg);
+                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                               continue;
+                       }
+                       if (error) {
+                               kdmsg_msg_free(msg);
+                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                               break;
+                       }
+
+                       /*
+                        * Dump the message to the pipe or socket.
+                        */
+                       error = fp_write(iocom->msg_fp, &msg->any,
+                                        msg->hdr_size, &res, UIO_SYSSPACE);
+                       if (error || res != msg->hdr_size) {
+                               if (error == 0)
+                                       error = EINVAL;
+                               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                               break;
+                       }
+                       if (msg->aux_size) {
+                               error = fp_write(iocom->msg_fp,
+                                                msg->aux_data, msg->aux_size,
+                                                &res, UIO_SYSSPACE);
+                               if (error || res != msg->aux_size) {
+                                       if (error == 0)
+                                               error = EINVAL;
+                                       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                                       break;
+                               }
+                       }
+                       kdmsg_state_cleanuptx(msg);
+                       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+               }
+       }
+
+       /*
+        * Cleanup messages pending transmission and release msgq lock.
+        */
+       if (error)
+               kprintf("kdmsg: write failed error %d\n", error);
+
+       if (msg) {
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               kdmsg_msg_free(msg);
+       }
+
+       /*
+        * Shutdown the socket.  This will cause the rx thread to get an
+        * EOF and ensure that both threads get to a termination state.
+        */
+       fp_shutdown(iocom->msg_fp, SHUT_RDWR);
+
+       /*
+        * Set KILLTX (which the rx side waits for), then wait for the RX
+        * side to completely finish before we clean out any remaining
+        * command states.
+        */
+       lockmgr(&iocom->msglk, LK_RELEASE);
+       atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLTX);
+       wakeup(&iocom->msg_ctl);
+       while (iocom->msgrd_td) {
+               wakeup(&iocom->msg_ctl);
+               tsleep(iocom, 0, "clstrkw", hz);
+       }
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+
+       /*
+        * Simulate received MSGF_DELETE's for any remaining states.
+        */
+cleanuprd:
+       RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
+               if (state->func &&
+                   (state->rxcmd & DMSGF_DELETE) == 0) {
+                       lockmgr(&iocom->msglk, LK_RELEASE);
+                       msg = kdmsg_msg_alloc(&iocom->router, DMSG_LNK_ERROR,
+                                             NULL, NULL);
+                       if ((state->rxcmd & DMSGF_CREATE) == 0)
+                               msg->any.head.cmd |= DMSGF_CREATE;
+                       msg->any.head.cmd |= DMSGF_DELETE;
+                       msg->state = state;
+                       state->rxcmd = msg->any.head.cmd &
+                                      ~DMSGF_DELETE;
+                       msg->state->func(state, msg);
+                       kdmsg_state_cleanuprx(msg);
+                       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                       goto cleanuprd;
+               }
+               if (state->func == NULL) {
+                       state->flags &= ~KDMSG_STATE_INSERTED;
+                       RB_REMOVE(kdmsg_state_tree,
+                                 &iocom->staterd_tree, state);
+                       kdmsg_state_free(state);
+                       goto cleanuprd;
+               }
+       }
+
+       /*
+        * NOTE: We have to drain the msgq to handle situations
+        *       where received states have built up output
+        *       messages, to avoid creating messages with
+        *       duplicate CREATE/DELETE flags.
+        */
+cleanupwr:
+       kdmsg_drain_msgq(iocom);
+       RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
+               if (state->func &&
+                   (state->rxcmd & DMSGF_DELETE) == 0) {
+                       lockmgr(&iocom->msglk, LK_RELEASE);
+                       msg = kdmsg_msg_alloc(&iocom->router, DMSG_LNK_ERROR,
+                                             NULL, NULL);
+                       if ((state->rxcmd & DMSGF_CREATE) == 0)
+                               msg->any.head.cmd |= DMSGF_CREATE;
+                       msg->any.head.cmd |= DMSGF_DELETE |
+                                            DMSGF_REPLY;
+                       msg->state = state;
+                       state->rxcmd = msg->any.head.cmd &
+                                      ~DMSGF_DELETE;
+                       msg->state->func(state, msg);
+                       kdmsg_state_cleanuprx(msg);
+                       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+                       goto cleanupwr;
+               }
+               if (state->func == NULL) {
+                       state->flags &= ~KDMSG_STATE_INSERTED;
+                       RB_REMOVE(kdmsg_state_tree,
+                                 &iocom->statewr_tree, state);
+                       kdmsg_state_free(state);
+                       goto cleanupwr;
+               }
+       }
+
+       kdmsg_drain_msgq(iocom);
+       if (--retries == 0)
+               panic("kdmsg: comm thread shutdown couldn't drain");
+       if (RB_ROOT(&iocom->statewr_tree))
+               goto cleanupwr;
+
+       if ((state = iocom->freewr_state) != NULL) {
+               iocom->freewr_state = NULL;
+               kdmsg_state_free(state);
+       }
+
+       lockmgr(&iocom->msglk, LK_RELEASE);
+
+       /*
+        * The state trees had better be empty now
+        */
+       KKASSERT(RB_EMPTY(&iocom->staterd_tree));
+       KKASSERT(RB_EMPTY(&iocom->statewr_tree));
+       KKASSERT(iocom->conn_state == NULL);
+
+       /*
+        * iocom can be ripped out from under us once msgwr_td is set to NULL.
+        * The wakeup is safe.
+        */
+       iocom->msgwr_td = NULL;
+       wakeup(iocom);
+       lwkt_exit();
+}
+
+/*
+ * This cleans out the pending transmit message queue, adjusting any
+ * persistent states properly in the process.
+ *
+ * Caller must hold pmp->iocom.msglk
+ */
+void
+kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
+{
+       kdmsg_msg_t *msg;
+
+       /*
+        * Clean out our pending transmit queue, executing the
+        * appropriate state adjustments.  If this tries to open
+        * any new outgoing transactions we have to loop up and
+        * clean them out.
+        */
+       while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
+               TAILQ_REMOVE(&iocom->msgq, msg, qentry);
+               lockmgr(&iocom->msglk, LK_RELEASE);
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               if (kdmsg_state_msgtx(msg))
+                       kdmsg_msg_free(msg);
+               else
+                       kdmsg_state_cleanuptx(msg);
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+       }
+}
 
 /*
  * Process state tracking for a message after reception, prior to
@@ -104,13 +591,13 @@ RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
  * will typically just contain status updates.
  */
 int
-hammer2_state_msgrx(hammer2_msg_t *msg)
+kdmsg_state_msgrx(kdmsg_msg_t *msg)
 {
-       hammer2_pfsmount_t *pmp;
-       hammer2_state_t *state;
+       kdmsg_iocom_t *iocom;
+       kdmsg_state_t *state;
        int error;
 
-       pmp = msg->router->pmp;
+       iocom = msg->router->iocom;
 
        /*
         * XXX resolve msg->any.head.source and msg->any.head.target
@@ -124,11 +611,10 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
         * one.  This is the only routine which uses freerd_state so no
         * races are possible.
         */
-       if ((state = pmp->freerd_state) == NULL) {
-               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
-               state->pmp = pmp;
-               state->flags = HAMMER2_STATE_DYNAMIC;
-               pmp->freerd_state = state;
+       if ((state = iocom->freerd_state) == NULL) {
+               state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
+               state->flags = KDMSG_STATE_DYNAMIC;
+               iocom->freerd_state = state;
        }
 
        /*
@@ -137,19 +623,19 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         */
-       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
        state->msgid = msg->any.head.msgid;
-       state->router = &pmp->router;
+       state->router = &iocom->router;
        kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
                msg->any.head.cmd,
                (intmax_t)msg->any.head.msgid,
                (intmax_t)msg->any.head.source,
                (intmax_t)msg->any.head.target);
        if (msg->any.head.cmd & DMSGF_REPLY)
-               state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
+               state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree, state);
        else
-               state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
+               state = RB_FIND(kdmsg_state_tree, &iocom->staterd_tree, state);
        msg->state = state;
 
        /*
@@ -157,7 +643,7 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
         */
        if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
                                  DMSGF_ABORT)) == 0) {
-               lockmgr(&pmp->msglk, LK_RELEASE);
+               lockmgr(&iocom->msglk, LK_RELEASE);
                return(0);
        }
 
@@ -172,19 +658,19 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                 * New persistant command received.
                 */
                if (state) {
-                       kprintf("hammer2_state_msgrx: duplicate transaction\n");
+                       kprintf("kdmsg_state_msgrx: duplicate transaction\n");
                        error = EINVAL;
                        break;
                }
-               state = pmp->freerd_state;
-               pmp->freerd_state = NULL;
+               state = iocom->freerd_state;
+               iocom->freerd_state = NULL;
                msg->state = state;
                state->router = msg->router;
                state->msg = msg;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->txcmd = DMSGF_REPLY;
-               RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
-               state->flags |= HAMMER2_STATE_INSERTED;
+               RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state);
+               state->flags |= KDMSG_STATE_INSERTED;
                error = 0;
                break;
        case DMSGF_DELETE:
@@ -196,7 +682,7 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgrx: no state "
+                               kprintf("kdmsg_state_msgrx: no state "
                                        "for DELETE\n");
                                error = EINVAL;
                        }
@@ -211,7 +697,7 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgrx: state reused "
+                               kprintf("kdmsg_state_msgrx: state reused "
                                        "for DELETE\n");
                                error = EINVAL;
                        }
@@ -240,7 +726,7 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                 * persistent state message should already exist.
                 */
                if (state == NULL) {
-                       kprintf("hammer2_state_msgrx: no state match for "
+                       kprintf("kdmsg_state_msgrx: no state match for "
                                "REPLY cmd=%08x msgid=%016jx\n",
                                msg->any.head.cmd,
                                (intmax_t)msg->any.head.msgid);
@@ -259,7 +745,7 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgrx: no state match "
+                               kprintf("kdmsg_state_msgrx: no state match "
                                        "for REPLY|DELETE\n");
                                error = EINVAL;
                        }
@@ -275,7 +761,7 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgrx: state reused "
+                               kprintf("kdmsg_state_msgrx: state reused "
                                        "for REPLY|DELETE\n");
                                error = EINVAL;
                        }
@@ -297,45 +783,47 @@ hammer2_state_msgrx(hammer2_msg_t *msg)
                error = 0;
                break;
        }
-       lockmgr(&pmp->msglk, LK_RELEASE);
+       lockmgr(&iocom->msglk, LK_RELEASE);
        return (error);
 }
 
 void
-hammer2_state_cleanuprx(hammer2_msg_t *msg)
+kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
 {
-       hammer2_pfsmount_t *pmp = msg->router->pmp;
-       hammer2_state_t *state;
+       kdmsg_iocom_t *iocom;
+       kdmsg_state_t *state;
+
+       iocom = msg->router->iocom;
 
        if ((state = msg->state) == NULL) {
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                state->rxcmd |= DMSGF_DELETE;
                if (state->txcmd & DMSGF_DELETE) {
                        if (state->msg == msg)
                                state->msg = NULL;
-                       KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
+                       KKASSERT(state->flags & KDMSG_STATE_INSERTED);
                        if (state->rxcmd & DMSGF_REPLY) {
                                KKASSERT(msg->any.head.cmd &
                                         DMSGF_REPLY);
-                               RB_REMOVE(hammer2_state_tree,
-                                         &pmp->statewr_tree, state);
+                               RB_REMOVE(kdmsg_state_tree,
+                                         &iocom->statewr_tree, state);
                        } else {
                                KKASSERT((msg->any.head.cmd &
                                          DMSGF_REPLY) == 0);
-                               RB_REMOVE(hammer2_state_tree,
-                                         &pmp->staterd_tree, state);
+                               RB_REMOVE(kdmsg_state_tree,
+                                         &iocom->staterd_tree, state);
                        }
-                       state->flags &= ~HAMMER2_STATE_INSERTED;
-                       lockmgr(&pmp->msglk, LK_RELEASE);
-                       hammer2_state_free(state);
+                       state->flags &= ~KDMSG_STATE_INSERTED;
+                       lockmgr(&iocom->msglk, LK_RELEASE);
+                       kdmsg_state_free(state);
                } else {
-                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       lockmgr(&iocom->msglk, LK_RELEASE);
                }
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
        } else if (state->msg != msg) {
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
        }
 }
 
@@ -353,29 +841,30 @@ hammer2_state_cleanuprx(hammer2_msg_t *msg)
  * A NULL state may be returned in this case.
  */
 int
-hammer2_state_msgtx(hammer2_msg_t *msg)
+kdmsg_state_msgtx(kdmsg_msg_t *msg)
 {
-       hammer2_pfsmount_t *pmp = msg->router->pmp;
-       hammer2_state_t *state;
+       kdmsg_iocom_t *iocom;
+       kdmsg_state_t *state;
        int error;
 
+       iocom = msg->router->iocom;
+
        /*
         * Make sure a state structure is ready to go in case we need a new
         * one.  This is the only routine which uses freewr_state so no
         * races are possible.
         */
-       if ((state = pmp->freewr_state) == NULL) {
-               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
-               state->pmp = pmp;
-               state->flags = HAMMER2_STATE_DYNAMIC;
-               pmp->freewr_state = state;
+       if ((state = iocom->freewr_state) == NULL) {
+               state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
+               state->flags = KDMSG_STATE_DYNAMIC;
+               iocom->freewr_state = state;
        }
 
        /*
         * Lock RB tree.  If persistent state is present it will have already
         * been assigned to msg.
         */
-       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        state = msg->state;
 
        /*
@@ -383,7 +872,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
         */
        if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
                                  DMSGF_ABORT)) == 0) {
-               lockmgr(&pmp->msglk, LK_RELEASE);
+               lockmgr(&iocom->msglk, LK_RELEASE);
                return(0);
        }
 
@@ -403,7 +892,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                 * closed state here.
                 *
                 * XXX state must be assigned and inserted by
-                *     hammer2_msg_write().  txcmd is assigned by us
+                *     kdmsg_msg_write().  txcmd is assigned by us
                 *     on-transmit.
                 */
                KKASSERT(state != NULL);
@@ -420,7 +909,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgtx: no state match "
+                               kprintf("kdmsg_state_msgtx: no state match "
                                        "for DELETE cmd=%08x msgid=%016jx\n",
                                        msg->any.head.cmd,
                                        (intmax_t)msg->any.head.msgid);
@@ -438,7 +927,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgtx: state reused "
+                               kprintf("kdmsg_state_msgtx: state reused "
                                        "for DELETE\n");
                                error = EINVAL;
                        }
@@ -466,7 +955,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                 * persistent state message should already exist.
                 */
                if (state == NULL) {
-                       kprintf("hammer2_state_msgtx: no state match "
+                       kprintf("kdmsg_state_msgtx: no state match "
                                "for REPLY | CREATE\n");
                        error = EINVAL;
                        break;
@@ -489,7 +978,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgtx: no state match "
+                               kprintf("kdmsg_state_msgtx: no state match "
                                        "for REPLY | DELETE\n");
                                error = EINVAL;
                        }
@@ -504,7 +993,7 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                        if (msg->any.head.cmd & DMSGF_ABORT) {
                                error = EALREADY;
                        } else {
-                               kprintf("hammer2_state_msgtx: state reused "
+                               kprintf("kdmsg_state_msgtx: state reused "
                                        "for REPLY | DELETE\n");
                                error = EINVAL;
                        }
@@ -528,74 +1017,79 @@ hammer2_state_msgtx(hammer2_msg_t *msg)
                error = 0;
                break;
        }
-       lockmgr(&pmp->msglk, LK_RELEASE);
+       lockmgr(&iocom->msglk, LK_RELEASE);
        return (error);
 }
 
 void
-hammer2_state_cleanuptx(hammer2_msg_t *msg)
+kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
 {
-       hammer2_pfsmount_t *pmp = msg->router->pmp;
-       hammer2_state_t *state;
+       kdmsg_iocom_t *iocom;
+       kdmsg_state_t *state;
+
+       iocom = msg->router->iocom;
 
        if ((state = msg->state) == NULL) {
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
                        if (state->msg == msg)
                                state->msg = NULL;
-                       KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
+                       KKASSERT(state->flags & KDMSG_STATE_INSERTED);
                        if (state->txcmd & DMSGF_REPLY) {
                                KKASSERT(msg->any.head.cmd &
                                         DMSGF_REPLY);
-                               RB_REMOVE(hammer2_state_tree,
-                                         &pmp->staterd_tree, state);
+                               RB_REMOVE(kdmsg_state_tree,
+                                         &iocom->staterd_tree, state);
                        } else {
                                KKASSERT((msg->any.head.cmd &
                                          DMSGF_REPLY) == 0);
-                               RB_REMOVE(hammer2_state_tree,
-                                         &pmp->statewr_tree, state);
+                               RB_REMOVE(kdmsg_state_tree,
+                                         &iocom->statewr_tree, state);
                        }
-                       state->flags &= ~HAMMER2_STATE_INSERTED;
-                       lockmgr(&pmp->msglk, LK_RELEASE);
-                       hammer2_state_free(state);
+                       state->flags &= ~KDMSG_STATE_INSERTED;
+                       lockmgr(&iocom->msglk, LK_RELEASE);
+                       kdmsg_state_free(state);
                } else {
-                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       lockmgr(&iocom->msglk, LK_RELEASE);
                }
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
        } else if (state->msg != msg) {
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
        }
 }
 
 void
-hammer2_state_free(hammer2_state_t *state)
+kdmsg_state_free(kdmsg_state_t *state)
 {
-       hammer2_pfsmount_t *pmp = state->pmp;
-       hammer2_msg_t *msg;
+       kdmsg_iocom_t *iocom;
+       kdmsg_msg_t *msg;
+
+       iocom = state->router->iocom;
 
-       KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
+       KKASSERT((state->flags & KDMSG_STATE_INSERTED) == 0);
        msg = state->msg;
        state->msg = NULL;
-       kfree(state, pmp->mmsg);
+       kfree(state, iocom->mmsg);
        if (msg)
-               hammer2_msg_free(msg);
+               kdmsg_msg_free(msg);
 }
 
-hammer2_msg_t *
-hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
-                 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
+kdmsg_msg_t *
+kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
+               int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
 {
-       hammer2_pfsmount_t *pmp = router->pmp;
-       hammer2_msg_t *msg;
-       hammer2_state_t *state;
+       kdmsg_iocom_t *iocom;
+       kdmsg_msg_t *msg;
+       kdmsg_state_t *state;
        size_t hbytes;
 
+       iocom = router->iocom;
        hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
-       msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
-                     pmp->mmsg, M_WAITOK | M_ZERO);
+       msg = kmalloc(offsetof(struct kdmsg_msg, any) + hbytes,
+                     iocom->mmsg, M_WAITOK | M_ZERO);
        msg->hdr_size = hbytes;
        msg->router = router;
        KKASSERT(router != NULL);
@@ -610,9 +1104,8 @@ hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
                 * msgid to be allocated.
                 */
                KKASSERT(msg->state == NULL);
-               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
-               state->pmp = pmp;
-               state->flags = HAMMER2_STATE_DYNAMIC;
+               state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
+               state->flags = KDMSG_STATE_DYNAMIC;
                state->func = func;
                state->any.any = data;
                state->msg = msg;
@@ -623,29 +1116,31 @@ hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
                msg->any.head.target = state->router->target;
                msg->any.head.msgid = state->msgid;
 
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+               if (RB_INSERT(kdmsg_state_tree, &iocom->statewr_tree, state))
                        panic("duplicate msgid allocated");
-               state->flags |= HAMMER2_STATE_INSERTED;
+               state->flags |= KDMSG_STATE_INSERTED;
                msg->any.head.msgid = state->msgid;
-               lockmgr(&pmp->msglk, LK_RELEASE);
+               lockmgr(&iocom->msglk, LK_RELEASE);
        }
 
        return (msg);
 }
 
 void
-hammer2_msg_free(hammer2_msg_t *msg)
+kdmsg_msg_free(kdmsg_msg_t *msg)
 {
-       hammer2_pfsmount_t *pmp = msg->router->pmp;
+       kdmsg_iocom_t *iocom;
+
+       iocom = msg->router->iocom;
 
        if (msg->aux_data && msg->aux_size) {
-               kfree(msg->aux_data, pmp->mmsg);
+               kfree(msg->aux_data, iocom->mmsg);
                msg->aux_data = NULL;
                msg->aux_size = 0;
                msg->router = NULL;
        }
-       kfree(msg, pmp->mmsg);
+       kfree(msg, iocom->mmsg);
 }
 
 /*
@@ -653,7 +1148,7 @@ hammer2_msg_free(hammer2_msg_t *msg)
  * msgid.  Only persistent messages are indexed.
  */
 int
-hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
+kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2)
 {
        if (state1->router < state2->router)
                return(-1);
@@ -683,10 +1178,12 @@ hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
  * does not write to the message socket/pipe.
  */
 void
-hammer2_msg_write(hammer2_msg_t *msg)
+kdmsg_msg_write(kdmsg_msg_t *msg)
 {
-       hammer2_pfsmount_t *pmp = msg->router->pmp;
-       hammer2_state_t *state;
+       kdmsg_iocom_t *iocom;
+       kdmsg_state_t *state;
+
+       iocom = msg->router->iocom;
 
        if (msg->state) {
                /*
@@ -700,7 +1197,7 @@ hammer2_msg_write(hammer2_msg_t *msg)
                msg->any.head.msgid = state->msgid;
                msg->any.head.source = 0;
                msg->any.head.target = state->router->target;
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        } else {
                /*
                 * One-off message (always uses msgid 0 to distinguish
@@ -710,21 +1207,21 @@ hammer2_msg_write(hammer2_msg_t *msg)
                msg->any.head.msgid = 0;
                msg->any.head.source = 0;
                msg->any.head.target = msg->router->target;
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        }
 
        /*
         * Finish up the msg fields
         */
-       msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
-       ++pmp->msg_seq;
+       msg->any.head.salt = /* (random << 8) | */ (iocom->msg_seq & 255);
+       ++iocom->msg_seq;
 
        msg->any.head.hdr_crc = 0;
-       msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
+       msg->any.head.hdr_crc = iscsi_crc32(msg->any.buf, msg->hdr_size);
 
-       TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
-       hammer2_clusterctl_wakeup(pmp);
-       lockmgr(&pmp->msglk, LK_RELEASE);
+       TAILQ_INSERT_TAIL(&iocom->msgq, msg, qentry);
+       iocom->clusterctl_wakeup(iocom);
+       lockmgr(&iocom->msglk, LK_RELEASE);
 }
 
 /*
@@ -733,10 +1230,10 @@ hammer2_msg_write(hammer2_msg_t *msg)
  * If msg->state is non-NULL we are replying to a one-way message.
  */
 void
-hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
+kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error)
 {
-       hammer2_state_t *state = msg->state;
-       hammer2_msg_t *nmsg;
+       kdmsg_state_t *state = msg->state;
+       kdmsg_msg_t *nmsg;
        uint32_t cmd;
 
        /*
@@ -768,25 +1265,25 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
        kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = hammer2_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
-                                NULL, NULL);
+       nmsg = kdmsg_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
+                              NULL, NULL);
        nmsg->any.head.cmd = cmd;
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(nmsg);
+       kdmsg_msg_write(nmsg);
 }
 
 /*
  * Reply to a message and continue our side of the transaction.
  *
  * If msg->state is non-NULL we are replying to a one-way message and this
- * function degenerates into the same as hammer2_msg_reply().
+ * function degenerates into the same as kdmsg_msg_reply().
  */
 void
-hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
+kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error)
 {
-       hammer2_state_t *state = msg->state;
-       hammer2_msg_t *nmsg;
+       kdmsg_state_t *state = msg->state;
+       kdmsg_msg_t *nmsg;
        uint32_t cmd;
 
        /*
@@ -817,10 +1314,10 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
        }
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = hammer2_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
-                                NULL, NULL);
+       nmsg = kdmsg_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
+                              NULL, NULL);
        nmsg->any.head.cmd = cmd;
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(nmsg);
+       kdmsg_msg_write(nmsg);
 }
similarity index 92%
rename from sys/vfs/hammer2/hammer2_icrc.c
rename to sys/libkern/icrc32.c
index b0fdd39..a818250 100644 (file)
  | $Id: isc_subr.c 560 2009-05-07 07:37:49Z danny $
  */
 
-#include "hammer2.h"
+#ifdef _KERNEL
+#include <sys/param.h>
+#include <sys/systm.h>
+#else
+#include <sys/types.h>
+#endif
+
+#ifndef _KERNEL
+/* prototypes for the kernel are in <sys/systm.h> */
+uint32_t iscsi_crc32(const void *buf, size_t size);
+uint32_t iscsi_crc32_ext(const void *buf, size_t size, uint32_t ocrc);
+#endif
 
 /*****************************************************************/
 /*                                                               */
@@ -51,7 +62,7 @@
 /*                                                               */
 /*****************************************************************/
 
-static uint32_t crc32Table[256] = {
+static uint32_t iscsiCrc32Table[256] = {
     0x00000000L, 0xF26B8303L, 0xE13B70F7L, 0x1350F3F4L,
     0xC79A971FL, 0x35F1141CL, 0x26A1E7E8L, 0xD4CA64EBL,
     0x8AD958CFL, 0x78B2DBCCL, 0x6BE22838L, 0x9989AB3BL,
@@ -119,26 +130,26 @@ static uint32_t crc32Table[256] = {
 };
 
 uint32_t
-hammer2_icrc32(const void *buf, size_t size)
+iscsi_crc32(const void *buf, size_t size)
 {
      const uint8_t *p = buf;
      uint32_t crc = 0;
 
      crc = crc ^ 0xffffffff;
      while (size--)
-         crc = crc32Table[(crc ^ *p++) & 0xff] ^ (crc >> 8);
+         crc = iscsiCrc32Table[(crc ^ *p++) & 0xff] ^ (crc >> 8);
      crc = crc ^ 0xffffffff;
      return crc;
 }
 
 uint32_t
-hammer2_icrc32c(const void *buf, size_t size, uint32_t crc)
+iscsi_crc32_ext(const void *buf, size_t size, uint32_t crc)
 {
      const uint8_t *p = buf;
 
      crc = crc ^ 0xffffffff;
      while (size--)
-         crc = crc32Table[(crc ^ *p++) & 0xff] ^ (crc >> 8);
+         crc = iscsiCrc32Table[(crc ^ *p++) & 0xff] ^ (crc >> 8);
      crc = crc ^ 0xffffffff;
      return crc;
 }
index e763a51..bd077f0 100644 (file)
@@ -41,6 +41,9 @@
 #ifndef _SYS_TREE_H_
 #include <sys/tree.h>
 #endif
+#ifndef _SYS_THREAD_H_
+#include <sys/thread.h>
+#endif
 #ifndef _SYS_UUID_H_
 #include <sys/uuid.h>
 #endif
@@ -493,19 +496,19 @@ typedef struct dmsg_lnk_span dmsg_lnk_span_t;
  *
  * When you do a HAMMER2 mount you are effectively tying into a HAMMER2
  * cluster via local media.  The local media does not have to participate
- * in the cluster, other than to provide the hammer2_copy_data[] array and
+ * in the cluster, other than to provide the dmsg_vol_data[] array and
  * root inode for the mount.
  *
  * This is important: The mount device path you specify serves to bootstrap
  * your entry into the cluster, but your mount will make active connections
- * to ALL copy elements in the hammer2_copy_data[] array which match the
+ * to ALL copy elements in the dmsg_vol_data[] array which match the
  * PFSID of the directory in the super-root that you specified.  The local
  * media path does not have to be mentioned in this array but becomes part
  * of the cluster based on its type and access rights.  ALL ELEMENTS ARE
  * TREATED ACCORDING TO TYPE NO MATTER WHICH ONE YOU MOUNT FROM.
  *
  * The actual cluster may be far larger than the elements you list in the
- * hammer2_copy_data[] array.  You list only the elements you wish to
+ * dmsg_vol_data[] array.  You list only the elements you wish to
  * directly connect to and you are able to access the rest of the cluster
  * indirectly through those connections.
  *
@@ -651,4 +654,144 @@ union dmsg_any {
 
 typedef union dmsg_any dmsg_any_t;
 
+/*
+ * Kernel iocom structures and prototypes for kern/kern_dmsg.c
+ */
+#ifdef _KERNEL
+
+struct hammer2_pfsmount;
+struct kdmsg_router;
+struct kdmsg_iocom;
+struct kdmsg_state;
+struct kdmsg_msg;
+
+/*
+ * Structure used to represent a virtual circuit for a messaging
+ * route.  Typically associated from hammer2_state but the hammer2_pfsmount
+ * structure also has one to represent the point-to-point link.
+ */
+struct kdmsg_router {
+       struct kdmsg_iocom      *iocom;
+       struct kdmsg_state      *state;         /* received LNK_SPAN state */
+       uint64_t                target;         /* target */
+};
+
+typedef struct kdmsg_router kdmsg_router_t;
+
+/*
+ * msg_ctl flags (atomic)
+ */
+#define KDMSG_CLUSTERCTL_KILL          0x00000001
+#define KDMSG_CLUSTERCTL_KILLRX                0x00000002 /* staged helper exit */
+#define KDMSG_CLUSTERCTL_KILLTX                0x00000004 /* staged helper exit */
+#define KDMSG_CLUSTERCTL_SLEEPING      0x00000008 /* interlocked w/msglk */
+
+/*
+ * Transactional state structure, representing an open transaction.  The
+ * transaction might represent a cache state (and thus have a chain
+ * association), or a VOP op, LNK_SPAN, or other things.
+ */
+struct kdmsg_state {
+       RB_ENTRY(kdmsg_state) rbnode;           /* indexed by msgid */
+       struct kdmsg_router *router;            /* related LNK_SPAN route */
+       uint32_t        txcmd;                  /* mostly for CMDF flags */
+       uint32_t        rxcmd;                  /* mostly for CMDF flags */
+       uint64_t        msgid;                  /* {spanid,msgid} uniq */
+       int             flags;
+       int             error;
+       void            *chain;                 /* (caller's state) */
+       struct kdmsg_msg *msg;
+       int (*func)(struct kdmsg_state *, struct kdmsg_msg *);
+       union {
+               void *any;
+               struct hammer2_pfsmount *pmp;
+       } any;
+};
+
+#define KDMSG_STATE_INSERTED   0x0001
+#define KDMSG_STATE_DYNAMIC    0x0002
+#define KDMSG_STATE_DELPEND    0x0004          /* transmit delete pending */
+
+struct kdmsg_msg {
+       TAILQ_ENTRY(kdmsg_msg) qentry;          /* serialized queue */
+       struct kdmsg_router *router;
+       struct kdmsg_state *state;
+       size_t          hdr_size;
+       size_t          aux_size;
+       char            *aux_data;
+       dmsg_any_t      any;
+};
+
+typedef struct kdmsg_link kdmsg_link_t;
+typedef struct kdmsg_state kdmsg_state_t;
+typedef struct kdmsg_msg kdmsg_msg_t;
+
+struct kdmsg_state_tree;
+RB_HEAD(kdmsg_state_tree, kdmsg_state);
+int kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2);
+RB_PROTOTYPE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
+
+/*
+ * Structure embedded in e.g. mount, master control structure for
+ * DMSG stream handling.
+ */
+struct kdmsg_iocom {
+       struct malloc_type      *mmsg;
+       struct file             *msg_fp;        /* cluster pipe->userland */
+       thread_t                msgrd_td;       /* cluster thread */
+       thread_t                msgwr_td;       /* cluster thread */
+       int                     msg_ctl;        /* wakeup flags */
+       int                     msg_seq;        /* cluster msg sequence id */
+       uint32_t                reserved01;
+       struct lock             msglk;          /* lockmgr lock */
+       TAILQ_HEAD(, kdmsg_msg) msgq;           /* transmit queue */
+       void                    *handle;
+       void                    (*clusterctl_wakeup)(struct kdmsg_iocom *);
+       int                     (*lnk_rcvmsg)(kdmsg_msg_t *msg);
+       int                     (*dbg_rcvmsg)(kdmsg_msg_t *msg);
+       int                     (*misc_rcvmsg)(kdmsg_msg_t *msg);
+       struct kdmsg_state      *conn_state;    /* active LNK_CONN state */
+       struct kdmsg_state      *freerd_state;  /* allocation cache */
+       struct kdmsg_state      *freewr_state;  /* allocation cache */
+       struct kdmsg_state_tree staterd_tree;   /* active messages */
+       struct kdmsg_state_tree statewr_tree;   /* active messages */
+       struct kdmsg_router     router;
+};
+
+typedef struct kdmsg_iocom     kdmsg_iocom_t;
+
+uint32_t kdmsg_icrc32(const void *buf, size_t size);
+uint32_t kdmsg_icrc32c(const void *buf, size_t size, uint32_t crc);
+
+/*
+ * kern_dmsg.c
+ */
+void kdmsg_iocom_init(kdmsg_iocom_t *iocom,
+                       void *handle,
+                       struct malloc_type *mmsg,
+                       void (*cctl_wakeup)(kdmsg_iocom_t *),
+                       int (*lnk_rcvmsg)(kdmsg_msg_t *msg),
+                       int (*dbg_rcvmsg)(kdmsg_msg_t *msg),
+                       int (*misc_rcvmsg)(kdmsg_msg_t *msg));
+
+void kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
+                       const char *subsysname);
+void kdmsg_drain_msgq(kdmsg_iocom_t *iocom);
+
+int kdmsg_state_msgrx(kdmsg_msg_t *msg);
+int kdmsg_state_msgtx(kdmsg_msg_t *msg);
+void kdmsg_state_cleanuprx(kdmsg_msg_t *msg);
+void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
+int kdmsg_msg_execute(kdmsg_msg_t *msg);
+void kdmsg_state_free(kdmsg_state_t *state);
+void kdmsg_msg_free(kdmsg_msg_t *msg);
+kdmsg_msg_t *kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
+                               int (*func)(kdmsg_state_t *, kdmsg_msg_t *),
+                               void *data);
+void kdmsg_msg_write(kdmsg_msg_t *msg);
+void kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error);
+void kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error);
+
+#endif
+
 #endif
index cb068e2..012bb31 100644 (file)
@@ -177,6 +177,8 @@ vm_paddr_t kvtop(void *addr);
 extern uint32_t crc32_tab[];
 uint32_t crc32(const void *buf, size_t size);
 uint32_t crc32_ext(const void *buf, size_t size, uint32_t ocrc);
+uint32_t iscsi_crc32(const void *buf, size_t size);
+uint32_t iscsi_crc32_ext(const void *buf, size_t size, uint32_t ocrc);
 void   init_param1 (void);
 void   init_param2 (int physpages);
 void   tablefull (const char *);
index 294aae5..a272bd3 100644 (file)
@@ -6,7 +6,7 @@
 CFLAGS+= -DINVARIANTS -DSMP
 KMOD=  hammer2
 SRCS=  hammer2_vfsops.c hammer2_vnops.c hammer2_inode.c hammer2_ccms.c
-SRCS+= hammer2_chain.c hammer2_freemap.c hammer2_subr.c hammer2_icrc.c
-SRCS+= hammer2_ioctl.c hammer2_msg.c hammer2_msgops.c
+SRCS+= hammer2_chain.c hammer2_freemap.c hammer2_subr.c
+SRCS+= hammer2_ioctl.c hammer2_msgops.c
 
 .include <bsd.kmod.mk>
index 383e542..724ad85 100644 (file)
@@ -102,7 +102,6 @@ struct hammer2_msg;
  * not match the blockref at (parent, index).
  */
 RB_HEAD(hammer2_chain_tree, hammer2_chain);
-RB_HEAD(hammer2_state_tree, hammer2_state);
 TAILQ_HEAD(flush_deferral_list, hammer2_chain);
 
 struct hammer2_chain {
@@ -277,19 +276,6 @@ struct hammer2_freecache {
 
 typedef struct hammer2_freecache hammer2_freecache_t;
 
-/*
- * Structure used to represent a virtual circuit for a messaging
- * route.  Typically associated from hammer2_state but the hammer2_pfsmount
- * structure also has one to represent the point-to-point link.
- */
-struct hammer2_router {
-       struct hammer2_pfsmount *pmp;
-       struct hammer2_state    *state;         /* received LNK_SPAN state */
-       uint64_t                target;         /* target */
-};
-
-typedef struct hammer2_router hammer2_router_t;
-
 /*
  * Global (per device) mount structure for device (aka vp->v_mount->hmp)
  */
@@ -326,81 +312,15 @@ struct hammer2_pfsmount {
        struct hammer2_mount    *hmp;           /* device global mount */
        hammer2_chain_t         *rchain;        /* PFS root chain */
        hammer2_inode_t         *iroot;         /* PFS root inode */
-       struct malloc_type      *mmsg;
        ccms_domain_t           ccms_dom;
        struct netexport        export;         /* nfs export */
        int                     ronly;          /* read-only mount */
-       struct file             *msg_fp;        /* cluster pipe->userland */
-       thread_t                msgrd_td;       /* cluster thread */
-       thread_t                msgwr_td;       /* cluster thread */
-       int                     msg_ctl;        /* wakeup flags */
-       int                     msg_seq;        /* cluster msg sequence id */
-       uint32_t                reserved01;
-       struct lock             msglk;          /* lockmgr lock */
-       TAILQ_HEAD(, hammer2_msg) msgq;         /* transmit queue */
-       struct hammer2_state    *conn_state;    /* active LNK_CONN state */
-       struct hammer2_state    *freerd_state;  /* allocation cache */
-       struct hammer2_state    *freewr_state;  /* allocation cache */
-       struct hammer2_state_tree staterd_tree; /* active messages */
-       struct hammer2_state_tree statewr_tree; /* active messages */
-       struct hammer2_router   router;
+       struct malloc_type      *mmsg;
+       kdmsg_iocom_t           iocom;
 };
 
 typedef struct hammer2_pfsmount hammer2_pfsmount_t;
 
-/*
- * msg_ctl flags (atomic)
- */
-#define HAMMER2_CLUSTERCTL_KILL                0x00000001
-#define HAMMER2_CLUSTERCTL_KILLRX      0x00000002 /* staged helper exit */
-#define HAMMER2_CLUSTERCTL_KILLTX      0x00000004 /* staged helper exit */
-#define HAMMER2_CLUSTERCTL_SLEEPING    0x00000008 /* interlocked w/msglk */
-
-/*
- * Transactional state structure, representing an open transaction.  The
- * transaction might represent a cache state (and thus have a chain
- * association), or a VOP op, LNK_SPAN, or other things.
- */
-struct hammer2_state {
-       RB_ENTRY(hammer2_state) rbnode;         /* indexed by msgid */
-       struct hammer2_pfsmount *pmp;
-       struct hammer2_router *router;          /* related LNK_SPAN route */
-       uint32_t        txcmd;                  /* mostly for CMDF flags */
-       uint32_t        rxcmd;                  /* mostly for CMDF flags */
-       uint64_t        msgid;                  /* {spanid,msgid} uniq */
-       int             flags;
-       int             error;
-       struct hammer2_chain *chain;            /* msg associated w/chain */
-       struct hammer2_msg *msg;
-       int (*func)(struct hammer2_state *, struct hammer2_msg *);
-       union {
-               void *any;
-               hammer2_pfsmount_t *pmp;
-       } any;
-};
-
-#define HAMMER2_STATE_INSERTED 0x0001
-#define HAMMER2_STATE_DYNAMIC  0x0002
-#define HAMMER2_STATE_DELPEND  0x0004          /* transmit delete pending */
-
-struct hammer2_msg {
-       TAILQ_ENTRY(hammer2_msg) qentry;        /* serialized queue */
-       struct hammer2_router *router;
-       struct hammer2_state *state;
-       size_t          hdr_size;
-       size_t          aux_size;
-       char            *aux_data;
-       dmsg_any_t      any;
-};
-
-typedef struct hammer2_link hammer2_link_t;
-typedef struct hammer2_state hammer2_state_t;
-typedef struct hammer2_msg hammer2_msg_t;
-
-int hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2);
-RB_PROTOTYPE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
-
-
 #if defined(_KERNEL)
 
 MALLOC_DECLARE(M_HAMMER2);
@@ -447,8 +367,8 @@ extern long hammer2_ioa_volu_write;
 /*
  * hammer2_subr.c
  */
-uint32_t hammer2_icrc32(const void *buf, size_t size);
-uint32_t hammer2_icrc32c(const void *buf, size_t size, uint32_t crc);
+#define hammer2_icrc32(buf, size)      iscsi_crc32((buf), (size))
+#define hammer2_icrc32c(buf, size, crc)        iscsi_crc32_ext((buf), (size), (crc))
 
 void hammer2_inode_lock_ex(hammer2_inode_t *ip);
 void hammer2_inode_unlock_ex(hammer2_inode_t *ip);
@@ -558,33 +478,16 @@ void hammer2_chain_commit(hammer2_mount_t *hmp, hammer2_chain_t *chain);
 int hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data,
                                int fflag, struct ucred *cred);
 
-/*
- * hammer2_msg.c
- */
-int hammer2_state_msgrx(hammer2_msg_t *msg);
-int hammer2_state_msgtx(hammer2_msg_t *msg);
-void hammer2_state_cleanuprx(hammer2_msg_t *msg);
-void hammer2_state_cleanuptx(hammer2_msg_t *msg);
-int hammer2_msg_execute(hammer2_msg_t *msg);
-void hammer2_state_free(hammer2_state_t *state);
-void hammer2_msg_free(hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
-                               int (*func)(hammer2_state_t *, hammer2_msg_t *),
-                               void *data);
-void hammer2_msg_write(hammer2_msg_t *msg);
-void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
-void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
-
 /*
  * hammer2_msgops.c
  */
-int hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg);
-int hammer2_msg_adhoc_input(hammer2_msg_t *msg);
+int hammer2_msg_dbg_rcvmsg(kdmsg_msg_t *msg);
+int hammer2_msg_adhoc_input(kdmsg_msg_t *msg);
 
 /*
  * hammer2_vfsops.c
  */
-void hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp);
+void hammer2_clusterctl_wakeup(kdmsg_iocom_t *iocom);
 void hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index);
 void hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp);
 
index b9277a7..3b8566d 100644 (file)
 #include "hammer2.h"
 
 int
-hammer2_msg_adhoc_input(hammer2_msg_t *msg)
+hammer2_msg_adhoc_input(kdmsg_msg_t *msg)
 {
        kprintf("ADHOC INPUT MSG %08x\n", msg->any.head.cmd);
        return(0);
 }
 
 int
-hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg)
+hammer2_msg_dbg_rcvmsg(kdmsg_msg_t *msg)
 {
        switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
        case DMSG_DBG_SHELL:
                /*
                 * Execute shell command (not supported atm)
                 */
-               hammer2_msg_reply(msg, DMSG_ERR_NOSUPP);
+               kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
                break;
        case DMSG_DBG_SHELL | DMSGF_REPLY:
                if (msg->aux_data) {
@@ -70,7 +70,7 @@ hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg)
                }
                break;
        default:
-               hammer2_msg_reply(msg, DMSG_ERR_NOSUPP);
+               kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
                break;
        }
        return(0);
index 3a650d8..7890c06 100644 (file)
@@ -136,12 +136,9 @@ static int hammer2_install_volume_header(hammer2_mount_t *hmp);
 static int hammer2_sync_scan1(struct mount *mp, struct vnode *vp, void *data);
 static int hammer2_sync_scan2(struct mount *mp, struct vnode *vp, void *data);
 
-static void hammer2_cluster_thread_rd(void *arg);
-static void hammer2_cluster_thread_wr(void *arg);
-static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
-static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
-static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg);
-static void hammer2_drain_msgq(hammer2_pfsmount_t *pmp);
+static int hammer2_msg_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
+static int hammer2_msg_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
+static int hammer2_msg_lnk_rcvmsg(kdmsg_msg_t *msg);
 
 /*
  * HAMMER2 vfs operations.
@@ -225,6 +222,7 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
        hammer2_chain_t *parent;
        hammer2_chain_t *schain;
        hammer2_chain_t *rchain;
+       struct file *fp;
        char devstr[MNAMELEN];
        size_t size;
        size_t done;
@@ -354,11 +352,13 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
        pmp = kmalloc(sizeof(*pmp), M_HAMMER2, M_WAITOK | M_ZERO);
        mp->mnt_data = (qaddr_t)pmp;
        pmp->mp = mp;
+
        kmalloc_create(&pmp->mmsg, "HAMMER2-pfsmsg");
-       lockinit(&pmp->msglk, "h2msg", 0, 0);
-       TAILQ_INIT(&pmp->msgq);
-       RB_INIT(&pmp->staterd_tree);
-       RB_INIT(&pmp->statewr_tree);
+       kdmsg_iocom_init(&pmp->iocom, pmp, pmp->mmsg,
+                        hammer2_clusterctl_wakeup,
+                        hammer2_msg_lnk_rcvmsg,
+                        hammer2_msg_dbg_rcvmsg,
+                        hammer2_msg_adhoc_input);
 
        if (create_hmp) {
                hmp = kmalloc(sizeof(*hmp), M_HAMMER2, M_WAITOK | M_ZERO);
@@ -370,7 +370,6 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
        }
        ccms_domain_init(&pmp->ccms_dom);
        pmp->hmp = hmp;
-       pmp->router.pmp = pmp;
        ++hmp->pmp_count;
        lockmgr(&hammer2_mntlk, LK_RELEASE);
        kprintf("hammer2_mount hmp=%p pmpcnt=%d\n", hmp, hmp->pmp_count);
@@ -483,16 +482,13 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
         * Ref the cluster management messaging descriptor.  The mount
         * program deals with the other end of the communications pipe.
         */
-       pmp->msg_fp = holdfp(curproc->p_fd, info.cluster_fd, -1);
-       if (pmp->msg_fp == NULL) {
+       fp = holdfp(curproc->p_fd, info.cluster_fd, -1);
+       if (fp == NULL) {
                kprintf("hammer2_mount: bad cluster_fd!\n");
                hammer2_vfs_unmount(mp, MNT_FORCE);
                return EBADF;
        }
-       lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td,
-                   NULL, 0, -1, "hammer2-msgrd");
-       lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td,
-                   NULL, 0, -1, "hammer2-msgwr");
+       hammer2_cluster_reconnect(pmp, fp);
 
        /*
         * Finish setup
@@ -599,18 +595,18 @@ hammer2_vfs_unmount(struct mount *mp, int mntflags)
        /*
         * Ask the cluster controller to go away
         */
-       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
-       while (pmp->msgrd_td || pmp->msgwr_td) {
-               wakeup(&pmp->msg_ctl);
+       atomic_set_int(&pmp->iocom.msg_ctl, KDMSG_CLUSTERCTL_KILL);
+       while (pmp->iocom.msgrd_td || pmp->iocom.msgwr_td) {
+               wakeup(&pmp->iocom.msg_ctl);
                tsleep(pmp, 0, "clstrkl", hz);
        }
 
        /*
         * Drop communications descriptor
         */
-       if (pmp->msg_fp) {
-               fdrop(pmp->msg_fp);
-               pmp->msg_fp = NULL;
+       if (pmp->iocom.msg_fp) {
+               fdrop(pmp->iocom.msg_fp);
+               pmp->iocom.msg_fp = NULL;
        }
 
        /*
@@ -1013,214 +1009,15 @@ hammer2_install_volume_header(hammer2_mount_t *hmp)
 void
 hammer2_cluster_reconnect(hammer2_pfsmount_t *pmp, struct file *fp)
 {
-       /*
-        * Destroy the current connection
-        */
-       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
-       while (pmp->msgrd_td || pmp->msgwr_td) {
-              wakeup(&pmp->msg_ctl);
-              tsleep(pmp, 0, "clstrkl", hz);
-       }
-
-       /*
-        * Drop communications descriptor
-        */
-       if (pmp->msg_fp) {
-               fdrop(pmp->msg_fp);
-               pmp->msg_fp = NULL;
-       }
-       kprintf("RESTART CONNECTION\n");
-
-       /*
-        * Setup new communications descriptor
-        */
-       pmp->msg_ctl = 0;
-       pmp->msg_fp = fp;
-       pmp->msg_seq = 0;
-       lwkt_create(hammer2_cluster_thread_rd, pmp, &pmp->msgrd_td,
-                   NULL, 0, -1, "hammer2-msgrd");
-       lwkt_create(hammer2_cluster_thread_wr, pmp, &pmp->msgwr_td,
-                   NULL, 0, -1, "hammer2-msgwr");
-}
-
-/*
- * Cluster controller thread.  Perform messaging functions.  We have one
- * thread for the reader and one for the writer.  The writer handles
- * shutdown requests (which should break the reader thread).
- */
-static
-void
-hammer2_cluster_thread_rd(void *arg)
-{
-       hammer2_pfsmount_t *pmp = arg;
-       dmsg_hdr_t hdr;
-       hammer2_msg_t *msg;
-       hammer2_state_t *state;
-       size_t hbytes;
-       int error = 0;
-
-       while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0) {
-               /*
-                * Retrieve the message from the pipe or socket.
-                */
-               error = fp_read(pmp->msg_fp, &hdr, sizeof(hdr),
-                               NULL, 1, UIO_SYSSPACE);
-               if (error)
-                       break;
-               if (hdr.magic != DMSG_HDR_MAGIC) {
-                       kprintf("hammer2: msgrd: bad magic: %04x\n",
-                               hdr.magic);
-                       error = EINVAL;
-                       break;
-               }
-               hbytes = (hdr.cmd & DMSGF_SIZE) * DMSG_ALIGN;
-               if (hbytes < sizeof(hdr) || hbytes > DMSG_AUX_MAX) {
-                       kprintf("hammer2: msgrd: bad header size %zd\n",
-                               hbytes);
-                       error = EINVAL;
-                       break;
-               }
-               /* XXX messy: mask cmd to avoid allocating state */
-               msg = hammer2_msg_alloc(&pmp->router,
-                                       hdr.cmd & DMSGF_BASECMDMASK,
-                                       NULL, NULL);
-               msg->any.head = hdr;
-               msg->hdr_size = hbytes;
-               if (hbytes > sizeof(hdr)) {
-                       error = fp_read(pmp->msg_fp, &msg->any.head + 1,
-                                       hbytes - sizeof(hdr),
-                                       NULL, 1, UIO_SYSSPACE);
-                       if (error) {
-                               kprintf("hammer2: short msg received\n");
-                               error = EINVAL;
-                               break;
-                       }
-               }
-               msg->aux_size = hdr.aux_bytes * DMSG_ALIGN;
-               if (msg->aux_size > DMSG_AUX_MAX) {
-                       kprintf("hammer2: illegal msg payload size %zd\n",
-                               msg->aux_size);
-                       error = EINVAL;
-                       break;
-               }
-               if (msg->aux_size) {
-                       msg->aux_data = kmalloc(msg->aux_size, pmp->mmsg,
-                                               M_WAITOK | M_ZERO);
-                       error = fp_read(pmp->msg_fp, msg->aux_data,
-                                       msg->aux_size,
-                                       NULL, 1, UIO_SYSSPACE);
-                       if (error) {
-                               kprintf("hammer2: short msg "
-                                       "payload received\n");
-                               break;
-                       }
-               }
-
-               /*
-                * State machine tracking, state assignment for msg,
-                * returns error and discard status.  Errors are fatal
-                * to the connection except for EALREADY which forces
-                * a discard without execution.
-                */
-               error = hammer2_state_msgrx(msg);
-               if (error) {
-                       /*
-                        * Raw protocol or connection error
-                        */
-                       hammer2_msg_free(msg);
-                       if (error == EALREADY)
-                               error = 0;
-               } else if (msg->state && msg->state->func) {
-                       /*
-                        * Message related to state which already has a
-                        * handling function installed for it.
-                        */
-                       error = msg->state->func(msg->state, msg);
-                       hammer2_state_cleanuprx(msg);
-               } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
-                          DMSG_PROTO_LNK) {
-                       /*
-                        * Message related to the LNK protocol set
-                        */
-                       error = hammer2_msg_lnk_rcvmsg(msg);
-                       hammer2_state_cleanuprx(msg);
-               } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
-                          DMSG_PROTO_DBG) {
-                       /*
-                        * Message related to the DBG protocol set
-                        */
-                       error = hammer2_msg_dbg_rcvmsg(msg);
-                       hammer2_state_cleanuprx(msg);
-               } else {
-                       /*
-                        * Other higher-level messages (e.g. vnops)
-                        */
-                       error = hammer2_msg_adhoc_input(msg);
-                       hammer2_state_cleanuprx(msg);
-               }
-               msg = NULL;
-       }
-
-       if (error)
-               kprintf("hammer2: msg read failed error %d\n", error);
-
-       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-       if (msg) {
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
-               hammer2_msg_free(msg);
-       }
-
-       if ((state = pmp->freerd_state) != NULL) {
-               pmp->freerd_state = NULL;
-               hammer2_state_free(state);
-       }
-
-       /*
-        * Shutdown the socket before waiting for the transmit side.
-        *
-        * If we are dying due to e.g. a socket disconnect verses being
-        * killed explicity we have to set KILL in order to kick the tx
-        * side when it might not have any other work to do.  KILL might
-        * already be set if we are in an unmount or reconnect.
-        */
-       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
-
-       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILL);
-       wakeup(&pmp->msg_ctl);
+       kdmsg_msg_t *msg;
+       size_t name_len;
 
        /*
-        * Wait for the transmit side to drain remaining messages
-        * before cleaning up the rx state.  The transmit side will
-        * set KILLTX and wait for the rx side to completely finish
-        * (set msgrd_td to NULL) before cleaning up any remaining
-        * tx states.
+        * Closes old comm descriptor, kills threads, cleans up
+        * states, then installs the new descriptor and creates
+        * new threads.
         */
-       lockmgr(&pmp->msglk, LK_RELEASE);
-       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLRX);
-       wakeup(&pmp->msg_ctl);
-       while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILLTX) == 0) {
-               wakeup(&pmp->msg_ctl);
-               tsleep(pmp, 0, "clstrkw", hz);
-       }
-
-       pmp->msgrd_td = NULL;
-       /* pmp can be ripped out from under us at this point */
-       wakeup(pmp);
-       lwkt_exit();
-}
-
-static
-void
-hammer2_cluster_thread_wr(void *arg)
-{
-       hammer2_pfsmount_t *pmp = arg;
-       hammer2_msg_t *msg = NULL;
-       hammer2_state_t *state;
-       ssize_t res;
-       size_t name_len;
-       int error = 0;
-       int retries = 20;
+       kdmsg_iocom_reconnect(&pmp->iocom, fp, "hammer2");
 
        /*
         * Open a LNK_CONN transaction indicating that we want to take part
@@ -1233,7 +1030,7 @@ hammer2_cluster_thread_wr(void *arg)
         * The transaction remains fully open for the duration of the
         * connection.
         */
-       msg = hammer2_msg_alloc(&pmp->router, DMSG_LNK_CONN | DMSGF_CREATE,
+       msg = kdmsg_msg_alloc(&pmp->iocom.router, DMSG_LNK_CONN | DMSGF_CREATE,
                                hammer2_msg_conn_reply, pmp);
        msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
        msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
@@ -1245,232 +1042,9 @@ hammer2_cluster_thread_wr(void *arg)
        if (name_len >= sizeof(msg->any.lnk_conn.label))
                name_len = sizeof(msg->any.lnk_conn.label) - 1;
        bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
-       pmp->conn_state = msg->state;
+       pmp->iocom.conn_state = msg->state;
        msg->any.lnk_conn.label[name_len] = 0;
-       hammer2_msg_write(msg);
-
-       /*
-        * Transmit loop
-        */
-       msg = NULL;
-       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-
-       while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) {
-               /*
-                * Sleep if no messages pending.  Interlock with flag while
-                * holding msglk.
-                */
-               if (TAILQ_EMPTY(&pmp->msgq)) {
-                       atomic_set_int(&pmp->msg_ctl,
-                                      HAMMER2_CLUSTERCTL_SLEEPING);
-                       lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz);
-                       atomic_clear_int(&pmp->msg_ctl,
-                                        HAMMER2_CLUSTERCTL_SLEEPING);
-               }
-
-               while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
-                       /*
-                        * Remove msg from the transmit queue and do
-                        * persist and half-closed state handling.
-                        */
-                       TAILQ_REMOVE(&pmp->msgq, msg, qentry);
-                       lockmgr(&pmp->msglk, LK_RELEASE);
-
-                       error = hammer2_state_msgtx(msg);
-                       if (error == EALREADY) {
-                               error = 0;
-                               hammer2_msg_free(msg);
-                               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-                               continue;
-                       }
-                       if (error) {
-                               hammer2_msg_free(msg);
-                               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-                               break;
-                       }
-
-                       /*
-                        * Dump the message to the pipe or socket.
-                        */
-                       error = fp_write(pmp->msg_fp, &msg->any, msg->hdr_size,
-                                        &res, UIO_SYSSPACE);
-                       if (error || res != msg->hdr_size) {
-                               if (error == 0)
-                                       error = EINVAL;
-                               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-                               break;
-                       }
-                       if (msg->aux_size) {
-                               error = fp_write(pmp->msg_fp,
-                                                msg->aux_data, msg->aux_size,
-                                                &res, UIO_SYSSPACE);
-                               if (error || res != msg->aux_size) {
-                                       if (error == 0)
-                                               error = EINVAL;
-                                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-                                       break;
-                               }
-                       }
-                       hammer2_state_cleanuptx(msg);
-                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               }
-       }
-
-       /*
-        * Cleanup messages pending transmission and release msgq lock.
-        */
-       if (error)
-               kprintf("hammer2: msg write failed error %d\n", error);
-
-       if (msg) {
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
-               hammer2_msg_free(msg);
-       }
-
-       /*
-        * Shutdown the socket.  This will cause the rx thread to get an
-        * EOF and ensure that both threads get to a termination state.
-        */
-       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
-
-       /*
-        * Set KILLTX (which the rx side waits for), then wait for the RX
-        * side to completely finish before we clean out any remaining
-        * command states.
-        */
-       lockmgr(&pmp->msglk, LK_RELEASE);
-       atomic_set_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_KILLTX);
-       wakeup(&pmp->msg_ctl);
-       while (pmp->msgrd_td) {
-               wakeup(&pmp->msg_ctl);
-               tsleep(pmp, 0, "clstrkw", hz);
-       }
-       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-
-       /*
-        * Simulate received MSGF_DELETE's for any remaining states.
-        */
-cleanuprd:
-       RB_FOREACH(state, hammer2_state_tree, &pmp->staterd_tree) {
-               if (state->func &&
-                   (state->rxcmd & DMSGF_DELETE) == 0) {
-                       lockmgr(&pmp->msglk, LK_RELEASE);
-                       msg = hammer2_msg_alloc(&pmp->router, DMSG_LNK_ERROR,
-                                               NULL, NULL);
-                       if ((state->rxcmd & DMSGF_CREATE) == 0)
-                               msg->any.head.cmd |= DMSGF_CREATE;
-                       msg->any.head.cmd |= DMSGF_DELETE;
-                       msg->state = state;
-                       state->rxcmd = msg->any.head.cmd &
-                                      ~DMSGF_DELETE;
-                       msg->state->func(state, msg);
-                       hammer2_state_cleanuprx(msg);
-                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-                       goto cleanuprd;
-               }
-               if (state->func == NULL) {
-                       state->flags &= ~HAMMER2_STATE_INSERTED;
-                       RB_REMOVE(hammer2_state_tree,
-                                 &pmp->staterd_tree, state);
-                       hammer2_state_free(state);
-                       goto cleanuprd;
-               }
-       }
-
-       /*
-        * NOTE: We have to drain the msgq to handle situations
-        *       where received states have built up output
-        *       messages, to avoid creating messages with
-        *       duplicate CREATE/DELETE flags.
-        */
-cleanupwr:
-       hammer2_drain_msgq(pmp);
-       RB_FOREACH(state, hammer2_state_tree, &pmp->statewr_tree) {
-               if (state->func &&
-                   (state->rxcmd & DMSGF_DELETE) == 0) {
-                       lockmgr(&pmp->msglk, LK_RELEASE);
-                       msg = hammer2_msg_alloc(&pmp->router,
-                                               DMSG_LNK_ERROR,
-                                               NULL, NULL);
-                       if ((state->rxcmd & DMSGF_CREATE) == 0)
-                               msg->any.head.cmd |= DMSGF_CREATE;
-                       msg->any.head.cmd |= DMSGF_DELETE |
-                                            DMSGF_REPLY;
-                       msg->state = state;
-                       state->rxcmd = msg->any.head.cmd &
-                                      ~DMSGF_DELETE;
-                       msg->state->func(state, msg);
-                       hammer2_state_cleanuprx(msg);
-                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-                       goto cleanupwr;
-               }
-               if (state->func == NULL) {
-                       state->flags &= ~HAMMER2_STATE_INSERTED;
-                       RB_REMOVE(hammer2_state_tree,
-                                 &pmp->statewr_tree, state);
-                       hammer2_state_free(state);
-                       goto cleanupwr;
-               }
-       }
-
-       hammer2_drain_msgq(pmp);
-       if (--retries == 0)
-               panic("hammer2: comm thread shutdown couldn't drain");
-       if (RB_ROOT(&pmp->statewr_tree))
-               goto cleanupwr;
-
-       if ((state = pmp->freewr_state) != NULL) {
-               pmp->freewr_state = NULL;
-               hammer2_state_free(state);
-       }
-
-       lockmgr(&pmp->msglk, LK_RELEASE);
-
-       /*
-        * The state trees had better be empty now
-        */
-       KKASSERT(RB_EMPTY(&pmp->staterd_tree));
-       KKASSERT(RB_EMPTY(&pmp->statewr_tree));
-       KKASSERT(pmp->conn_state == NULL);
-
-       /*
-        * pmp can be ripped out from under us once msgwr_td is set to NULL.
-        */
-       pmp->msgwr_td = NULL;
-       wakeup(pmp);
-       lwkt_exit();
-}
-
-/*
- * This cleans out the pending transmit message queue, adjusting any
- * persistent states properly in the process.
- *
- * Caller must hold pmp->msglk
- */
-static
-void
-hammer2_drain_msgq(hammer2_pfsmount_t *pmp)
-{
-       hammer2_msg_t *msg;
-
-       /*
-        * Clean out our pending transmit queue, executing the
-        * appropriate state adjustments.  If this tries to open
-        * any new outgoing transactions we have to loop up and
-        * clean them out.
-        */
-       while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
-               TAILQ_REMOVE(&pmp->msgq, msg, qentry);
-               lockmgr(&pmp->msglk, LK_RELEASE);
-               if (msg->state && msg->state->msg == msg)
-                       msg->state->msg = NULL;
-               if (hammer2_state_msgtx(msg))
-                       hammer2_msg_free(msg);
-               else
-                       hammer2_state_cleanuptx(msg);
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-       }
+       kdmsg_msg_write(msg);
 }
 
 /*
@@ -1479,16 +1053,19 @@ hammer2_drain_msgq(hammer2_pfsmount_t *pmp)
  * wakeups.
  */
 void
-hammer2_clusterctl_wakeup(hammer2_pfsmount_t *pmp)
+hammer2_clusterctl_wakeup(kdmsg_iocom_t *iocom)
 {
-       if (pmp->msg_ctl & HAMMER2_CLUSTERCTL_SLEEPING) {
-               atomic_clear_int(&pmp->msg_ctl, HAMMER2_CLUSTERCTL_SLEEPING);
-               wakeup(&pmp->msg_ctl);
+       hammer2_pfsmount_t *pmp = iocom->handle;
+
+       if (pmp->iocom.msg_ctl & KDMSG_CLUSTERCTL_SLEEPING) {
+               atomic_clear_int(&pmp->iocom.msg_ctl,
+                                KDMSG_CLUSTERCTL_SLEEPING);
+               wakeup(&pmp->iocom.msg_ctl);
        }
 }
 
 static int
-hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
+hammer2_msg_lnk_rcvmsg(kdmsg_msg_t *msg)
 {
        switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_LNK_CONN | DMSGF_CREATE:
@@ -1496,7 +1073,7 @@ hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
                 * reply & leave trans open
                 */
                kprintf("CONN RECEIVE - (just ignore it)\n");
-               hammer2_msg_result(msg, 0);
+               kdmsg_msg_result(msg, 0);
                break;
        case DMSG_LNK_SPAN | DMSGF_CREATE:
                kprintf("SPAN RECEIVE - ADDED FROM CLUSTER\n");
@@ -1518,11 +1095,11 @@ hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
  * transaction open too.
  */
 static int
-hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_msg_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
 {
        hammer2_pfsmount_t *pmp = state->any.pmp;
        hammer2_mount_t *hmp = pmp->hmp;
-       hammer2_msg_t *rmsg;
+       kdmsg_msg_t *rmsg;
        size_t name_len;
        int copyid;
 
@@ -1530,9 +1107,9 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
 
        if (msg->any.head.cmd & DMSGF_CREATE) {
                kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
-               rmsg = hammer2_msg_alloc(&pmp->router, DMSG_LNK_SPAN |
-                                                      DMSGF_CREATE,
-                                       hammer2_msg_span_reply, pmp);
+               rmsg = kdmsg_msg_alloc(&pmp->iocom.router,
+                                      DMSG_LNK_SPAN | DMSGF_CREATE,
+                                      hammer2_msg_span_reply, pmp);
                rmsg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
                rmsg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
                rmsg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
@@ -1545,7 +1122,7 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
                      rmsg->any.lnk_span.label,
                      name_len);
                rmsg->any.lnk_span.label[name_len] = 0;
-               hammer2_msg_write(rmsg);
+               kdmsg_msg_write(rmsg);
 
                /*
                 * Dump the configuration stored in the volume header
@@ -1561,8 +1138,8 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
        if ((state->txcmd & DMSGF_DELETE) == 0 &&
            (msg->any.head.cmd & DMSGF_DELETE)) {
                kprintf("LNK_CONN transaction terminated by remote\n");
-               pmp->conn_state = NULL;
-               hammer2_msg_reply(msg, 0);
+               pmp->iocom.conn_state = NULL;
+               kdmsg_msg_reply(msg, 0);
        }
        return(0);
 }
@@ -1571,7 +1148,7 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
  * Remote terminated our span transaction.  We have to terminate our side.
  */
 static int
-hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_msg_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
 {
        /*hammer2_pfsmount_t *pmp = state->any.pmp;*/
 
@@ -1579,7 +1156,7 @@ hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
                "remote %08x state %p\n", msg->any.head.cmd, state);
        if ((state->txcmd & DMSGF_DELETE) == 0 &&
            (msg->any.head.cmd & DMSGF_DELETE)) {
-               hammer2_msg_reply(msg, 0);
+               kdmsg_msg_reply(msg, 0);
        }
        return(0);
 }
@@ -1592,18 +1169,18 @@ void
 hammer2_volconf_update(hammer2_pfsmount_t *pmp, int index)
 {
        hammer2_mount_t *hmp = pmp->hmp;
-       hammer2_msg_t *msg;
+       kdmsg_msg_t *msg;
 
        /* XXX interlock against connection state termination */
-       kprintf("volconf update %p\n", pmp->conn_state);
-       if (pmp->conn_state) {
+       kprintf("volconf update %p\n", pmp->iocom.conn_state);
+       if (pmp->iocom.conn_state) {
                kprintf("TRANSMIT VOLCONF VIA OPEN CONN TRANSACTION\n");
-               msg = hammer2_msg_alloc(&pmp->router, DMSG_LNK_VOLCONF,
+               msg = kdmsg_msg_alloc(&pmp->iocom.router, DMSG_LNK_VOLCONF,
                                        NULL, NULL);
-               msg->state = pmp->conn_state;
+               msg->state = pmp->iocom.conn_state;
                msg->any.lnk_volconf.copy = hmp->voldata.copyinfo[index];
                msg->any.lnk_volconf.mediaid = hmp->voldata.fsid;
                msg->any.lnk_volconf.index = index;
-               hammer2_msg_write(msg);
+               kdmsg_msg_write(msg);
        }
 }