hammer2 - Message routing work
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 10 Aug 2012 17:42:08 +0000 (10:42 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 10 Aug 2012 17:42:08 +0000 (10:42 -0700)
* Further API simplification

* Start adding router infrastructure to the kernel VFS

* Looks like we will need a 'source' and 'target' field in
  the message header after all (replaces the single 'spanid' field),
  in order to track the reply VC as a message is relayed.

sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c
sbin/hammer2/network.h
sbin/hammer2/subs.c
sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_msg.c
sys/vfs/hammer2/hammer2_msgops.c
sys/vfs/hammer2/hammer2_network.h
sys/vfs/hammer2/hammer2_vfsops.c

index d898da1..3531379 100644 (file)
@@ -35,8 +35,8 @@
 
 #include "hammer2.h"
 
-static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static int hammer2_state_msgrx(hammer2_msg_t *msg);
+static void hammer2_state_cleanuptx(hammer2_msg_t *msg);
 
 /*
  * ROUTER TREE - Represents available routes for message routing, indexed
@@ -46,9 +46,9 @@ static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 int
 hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2)
 {
-       if (router1->spanid < router2->spanid)
+       if (router1->target < router2->target)
                return(-1);
-       if (router1->spanid > router2->spanid)
+       if (router1->target > router2->target)
                return(1);
        return(0);
 }
@@ -61,17 +61,20 @@ static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree);
 
 /*
  * STATE TREE - Represents open transactions which are indexed by their
- *             {spanid,msgid} relative to the governing iocom.  spanid
- *             will usually be 0 since a non-zero spanid would have been
- *             routed elsewhere.
+ *             {router,msgid} relative to the governing iocom.
+ *
+ *             router is usually iocom->router since state isn't stored
+ *             for relayed messages.
  */
 int
 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
 {
-       if (state1->spanid < state2->spanid)
+#if 0
+       if (state1->router < state2->router)
                return(-1);
-       if (state1->spanid > state2->spanid)
+       if (state1->router > state2->router)
                return(1);
+#endif
        if (state1->msgid < state2->msgid)
                return(-1);
        if (state1->msgid > state2->msgid)
@@ -138,7 +141,6 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
        RB_INIT(&iocom->router->statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
-       TAILQ_INIT(&iocom->addrq);
        TAILQ_INIT(&iocom->router->txmsgq);
        iocom->router->iocom = iocom;
        iocom->sock_fd = sock_fd;
@@ -276,8 +278,7 @@ hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
                state->iocom = iocom;
                state->flags = HAMMER2_STATE_DYNAMIC;
                state->msgid = (uint64_t)(uintptr_t)state;
-               if (router->link)
-                       state->spanid = router->spanid;
+               state->router = router;
                state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
                                       HAMMER2_MSGF_DELETE);
                state->rxcmd = HAMMER2_MSGF_REPLY;
@@ -917,7 +918,7 @@ again:
         * Process transactional state for the message.
         */
        if (msg && ioq->error == 0) {
-               error = hammer2_state_msgrx(iocom, msg);
+               error = hammer2_state_msgrx(msg);
                if (error) {
                        if (error == HAMMER2_IOQ_ERROR_EALREADY) {
                                hammer2_msg_free(msg);
@@ -980,7 +981,7 @@ skip:
                        } else {
                                /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
                                msg->state = state;
-                               msg->any.head.spanid = state->spanid;
+                               msg->router = state->router;
                                msg->any.head.msgid = state->msgid;
                                msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                                     HAMMER2_MSGF_DELETE;
@@ -996,7 +997,7 @@ skip:
                                msg = NULL;
                        } else {
                                msg->state = state;
-                               msg->any.head.spanid = state->spanid;
+                               msg->router = state->router;
                                msg->any.head.msgid = state->msgid;
                                msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                                     HAMMER2_MSGF_DELETE |
@@ -1285,7 +1286,7 @@ hammer2_iocom_flush2(hammer2_iocom_t *iocom)
                ioq->hbytes = 0;
                ioq->abytes = 0;
 
-               hammer2_state_cleanuptx(iocom, msg);
+               hammer2_state_cleanuptx(msg);
        }
        assert(nact == 0);
 
@@ -1336,7 +1337,7 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom)
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                --ioq->msgcount;
-               hammer2_state_cleanuptx(iocom, msg);
+               hammer2_state_cleanuptx(msg);
        }
 }
 
@@ -1372,7 +1373,6 @@ hammer2_msg_write(hammer2_msg_t *msg)
                        state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                }
                msg->any.head.msgid = state->msgid;
-               msg->any.head.spanid = state->spanid;
                assert(((state->txcmd ^ msg->any.head.cmd) &
                        HAMMER2_MSGF_REPLY) == 0);
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
@@ -1381,6 +1381,8 @@ hammer2_msg_write(hammer2_msg_t *msg)
                msg->any.head.msgid = 0;
                /* XXX set spanid by router */
        }
+       msg->any.head.source = 0;
+       msg->any.head.target = msg->router->target;
 
        /*
         * Queue it for output, wake up the I/O pthread.  Note that the
@@ -1610,8 +1612,9 @@ hammer2_state_reply(hammer2_state_t *state, uint32_t error)
  * will typically just contain status updates.
  */
 static int
-hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_state_msgrx(hammer2_msg_t *msg)
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
        hammer2_state_t *state;
        hammer2_state_t dummy;
        int error;
@@ -1622,9 +1625,7 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         */
-
        dummy.msgid = msg->any.head.msgid;
-       dummy.spanid = msg->any.head.spanid;
        pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
@@ -1671,7 +1672,7 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                state->flags |= HAMMER2_STATE_INSERTED;
                state->msgid = msg->any.head.msgid;
-               state->spanid = msg->any.head.spanid;
+               state->router = msg->router;
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
                RB_INSERT(hammer2_state_tree,
@@ -1850,8 +1851,9 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 }
 
 static void
-hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_state_cleanuptx(hammer2_msg_t *msg)
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
        hammer2_state_t *state;
 
        if ((state = msg->state) == NULL) {
@@ -2214,12 +2216,13 @@ hammer2_msg_str(hammer2_msg_t *msg)
         * Generate the buf
         */
        snprintf(buf, sizeof(buf),
-               "msg=%s%s %s id=%08x span=%08x %s",
+               "msg=%s%s %s id=%08x src=%08x tgt=%08x %s",
                 hammer2_basecmd_str(msg->any.head.cmd),
                 flagbuf,
                 errstr,
                 (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
-                (uint32_t)(intmax_t)msg->any.head.spanid,  /* for brevity */
+                (uint32_t)(intmax_t)msg->any.head.source,  /* for brevity */
+                (uint32_t)(intmax_t)msg->any.head.target,  /* for brevity */
                 statestr);
 
        return(buf);
index 12d5399..b1216e7 100644 (file)
@@ -281,8 +281,12 @@ h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
 }
 
 /*
- * NOTE: Sort/subsort must match h2span_relay_cmp() under any given
- *      node.
+ * Sort/subsort must match h2span_relay_cmp() under any given node
+ * to make the aggregation algorithm easier, so the best links are
+ * in the same sorted order as the best relays.
+ *
+ * NOTE: We cannot use link*->state->msgid because this msgid is created
+ *      by each remote host and thus might wind up being the same.
  */
 static
 int
@@ -292,10 +296,17 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+#if 1
+       if ((uintptr_t)link1->state < (uintptr_t)link2->state)
+               return(-1);
+       if ((uintptr_t)link1->state > (uintptr_t)link2->state)
+               return(1);
+#else
        if (link1->state->msgid < link2->state->msgid)
                return(-1);
        if (link1->state->msgid > link2->state->msgid)
                return(1);
+#endif
        return(0);
 }
 
@@ -319,10 +330,17 @@ h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+#if 1
+       if ((uintptr_t)link1->state < (uintptr_t)link2->state)
+               return(-1);
+       if ((uintptr_t)link1->state > (uintptr_t)link2->state)
+               return(1);
+#else
        if (link1->state->msgid < link2->state->msgid)
                return(-1);
        if (link1->state->msgid > link2->state->msgid)
                return(1);
+#endif
        return(0);
 }
 
@@ -536,7 +554,7 @@ hammer2_lnk_span(hammer2_msg_t *msg)
                slink->router = hammer2_router_alloc();
                slink->router->iocom = state->iocom;
                slink->router->link = slink;
-               slink->router->spanid = state->msgid;
+               slink->router->target = state->msgid;
                hammer2_router_connect(slink->router);
 
                RB_INSERT(h2span_link_tree, &node->tree, slink);
@@ -823,7 +841,7 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
                        relay->router = hammer2_router_alloc();
                        relay->router->iocom = relay->state->iocom;
                        relay->router->relay = relay;
-                       relay->router->spanid = relay->state->msgid;
+                       relay->router->target = relay->state->msgid;
 
                        msg->any.lnk_span = slink->state->msg->any.lnk_span;
                        msg->any.lnk_span.dist = slink->dist + 1;
index 028f822..46399fc 100644 (file)
@@ -120,12 +120,10 @@ struct hammer2_iocom;
 struct hammer2_persist;
 struct hammer2_state;
 struct hammer2_router;
-struct hammer2_address;
 struct hammer2_msg;
 
 TAILQ_HEAD(hammer2_state_queue, hammer2_state);
 TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
-TAILQ_HEAD(hammer2_address_queue, hammer2_address);
 RB_HEAD(hammer2_state_tree, hammer2_state);
 RB_HEAD(hammer2_router_tree, hammer2_router);
 
@@ -135,14 +133,10 @@ struct h2span_connect;
 
 struct hammer2_state {
        RB_ENTRY(hammer2_state) rbnode;         /* indexed by msgid */
-       TAILQ_ENTRY(hammer2_state) source_entry;/* if routed */
-       TAILQ_ENTRY(hammer2_state) target_entry;/* if routed */
        struct hammer2_iocom *iocom;
-       struct hammer2_address_t *source_addr;  /* if routed */
-       struct hammer2_address_t *target_addr;  /* if routed */
+       struct hammer2_router *router;          /* if routed */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
-       uint64_t        spanid;                 /* routing id */
        uint64_t        msgid;                  /* {spanid,msgid} uniq */
        int             flags;
        int             error;
@@ -156,14 +150,6 @@ struct hammer2_state {
        } any;
 };
 
-struct hammer2_address {
-       TAILQ_ENTRY(hammer2_address) entry;     /* on-iocom */
-       struct hammer2_iocom *iocom;            /* related iocom */
-       struct hammer2_state_queue sourceq;     /* states on source queue */
-       struct hammer2_state_queue targetq;     /* states on target queue */
-       uint16_t        id;
-};
-
 #define HAMMER2_STATE_INSERTED 0x0001
 #define HAMMER2_STATE_DYNAMIC  0x0002
 #define HAMMER2_STATE_NODEID   0x0004          /* manages a node id */
@@ -179,7 +165,6 @@ struct hammer2_msg {
 };
 
 typedef struct hammer2_state hammer2_state_t;
-typedef struct hammer2_address hammer2_address_t;
 typedef struct hammer2_msg hammer2_msg_t;
 typedef struct hammer2_msg_queue hammer2_msg_queue_t;
 
@@ -249,7 +234,7 @@ typedef struct hammer2_ioq hammer2_ioq_t;
  * or to a SPAN transaction (h2span_relay structure for incoming).
  */
 struct hammer2_router {
-       RB_ENTRY(hammer2_router) rbnode;        /* indexed by spanid */
+       RB_ENTRY(hammer2_router) rbnode;        /* indexed by target */
        struct hammer2_iocom *iocom;
        struct h2span_link   *link;             /* may be NULL */
        struct h2span_relay  *relay;            /* may be NULL */
@@ -259,7 +244,7 @@ struct hammer2_router {
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state_tree statewr_tree; /* active messages */
        hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
-       uint64_t        spanid;                 /* for routing */
+       uint64_t        target;                 /* for routing */
        int     flags;
        int     refs;                           /* refs prevent destruction */
 };
@@ -280,7 +265,6 @@ struct hammer2_iocom {
        hammer2_ioq_t   ioq_tx;
        hammer2_msg_queue_t freeq;              /* free msgs hdr only */
        hammer2_msg_queue_t freeq_aux;          /* free msgs w/aux_data */
-       struct hammer2_address_queue  addrq;    /* source/target addrs */
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
        int     wakeupfds[2];                   /* pipe wakes up iocom thread */
index fcf0394..d04c410 100644 (file)
@@ -151,14 +151,14 @@ hammer2_bswap_head(hammer2_msg_hdr_t *head)
        head->salt      = bswap32(head->salt);
 
        head->msgid     = bswap64(head->msgid);
-       head->spanid    = bswap64(head->spanid);
+       head->source    = bswap64(head->source);
+       head->target    = bswap64(head->target);
 
        head->cmd       = bswap32(head->cmd);
        head->aux_crc   = bswap32(head->aux_crc);
        head->aux_bytes = bswap32(head->aux_bytes);
        head->error     = bswap32(head->error);
        head->aux_descr = bswap64(head->aux_descr);
-       head->reserved30= bswap64(head->reserved30);
        head->reserved38= bswap32(head->reserved38);
        head->hdr_crc   = bswap32(head->hdr_crc);
 }
index d3e6a3a..068f94f 100644 (file)
@@ -74,6 +74,7 @@ struct hammer2_chain;
 struct hammer2_inode;
 struct hammer2_mount;
 struct hammer2_pfsmount;
+struct hammer2_span;
 struct hammer2_state;
 struct hammer2_msg;
 
@@ -277,6 +278,19 @@ struct hammer2_freecache {
 typedef struct hammer2_freecache hammer2_freecache_t;
 
 /*
+ * Structure used to represent a virtual circuit for a messaging
+ * route.  Typically associated from hammer2_state but the hammer2_pfsmount
+ * structure also has one to represent the point-to-point link.
+ */
+struct hammer2_router {
+       struct hammer2_pfsmount *pmp;
+       struct hammer2_state    *state;         /* received LNK_SPAN state */
+       uint64_t                target;         /* target */
+};
+
+typedef struct hammer2_router hammer2_router_t;
+
+/*
  * Global (per device) mount structure for device (aka vp->v_mount->hmp)
  */
 struct hammer2_mount {
@@ -328,6 +342,7 @@ struct hammer2_pfsmount {
        struct hammer2_state    *freewr_state;  /* allocation cache */
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state_tree statewr_tree; /* active messages */
+       struct hammer2_router   router;
 };
 
 typedef struct hammer2_pfsmount hammer2_pfsmount_t;
@@ -335,22 +350,16 @@ typedef struct hammer2_pfsmount hammer2_pfsmount_t;
 #define HAMMER2_CLUSTERCTL_KILL        0x0001
 
 /*
- * In-memory message structure for hammer2.
- *
- * Persistent cache state messages will be associated with a hammer2_chain.
- *
- * NOTE!  If REPLY is set then the source and target fields in the message
- *       are swapped.  That is, source and target remain unchanged whether
- *       the message is a command from side A or a reply from side B.
- *       The message is routed based on target if REPLY is not set, and on
- *       source if REPLY is set.
+ * Transactional state structure, representing an open transaction.  The
+ * transaction might represent a cache state (and thus have a chain
+ * association), or a VOP op, LNK_SPAN, or other things.
  */
 struct hammer2_state {
        RB_ENTRY(hammer2_state) rbnode;         /* indexed by msgid */
        struct hammer2_pfsmount *pmp;
+       struct hammer2_router *router;          /* related LNK_SPAN route */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
-       uint64_t        spanid;                 /* spanning tree routing */
        uint64_t        msgid;                  /* {spanid,msgid} uniq */
        int             flags;
        int             error;
@@ -368,6 +377,7 @@ struct hammer2_state {
 
 struct hammer2_msg {
        TAILQ_ENTRY(hammer2_msg) qentry;        /* serialized queue */
+       struct hammer2_router *router;
        struct hammer2_state *state;
        size_t          hdr_size;
        size_t          aux_size;
@@ -375,6 +385,7 @@ struct hammer2_msg {
        hammer2_msg_any_t any;
 };
 
+typedef struct hammer2_link hammer2_link_t;
 typedef struct hammer2_state hammer2_state_t;
 typedef struct hammer2_msg hammer2_msg_t;
 
@@ -539,29 +550,25 @@ int hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data,
 /*
  * hammer2_msg.c
  */
-int hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-int hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-void hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-void hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-int hammer2_msg_execute(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+int hammer2_state_msgrx(hammer2_msg_t *msg);
+int hammer2_state_msgtx(hammer2_msg_t *msg);
+void hammer2_state_cleanuprx(hammer2_msg_t *msg);
+void hammer2_state_cleanuptx(hammer2_msg_t *msg);
+int hammer2_msg_execute(hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
-void hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid,
-                               uint32_t cmd);
-void hammer2_msg_write(hammer2_pfsmount_t *pmp,
-                               hammer2_msg_t *msg,
+void hammer2_msg_free(hammer2_msg_t *msg);
+hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd);
+void hammer2_msg_write(hammer2_msg_t *msg,
                                int (*func)(hammer2_state_t *, hammer2_msg_t *),
                                void *data);
-void hammer2_msg_reply(hammer2_pfsmount_t *pmp,
-                               hammer2_msg_t *msg, uint32_t error);
-void hammer2_msg_result(hammer2_pfsmount_t *pmp,
-                               hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
 
 /*
  * hammer2_msgops.c
  */
-int hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-int hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+int hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg);
+int hammer2_msg_adhoc_input(hammer2_msg_t *msg);
 
 /*
  * hammer2_freemap.c
index e4471a2..ef84f34 100644 (file)
@@ -104,11 +104,21 @@ RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
  * will typically just contain status updates.
  */
 int
-hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_msgrx(hammer2_msg_t *msg)
 {
+       hammer2_pfsmount_t *pmp;
        hammer2_state_t *state;
        int error;
 
+       pmp = msg->router->pmp;
+
+       /*
+        * XXX resolve msg->any.head.source and msg->any.head.target
+        *     into LNK_SPAN references.
+        *
+        * XXX replace msg->router
+        */
+
        /*
         * Make sure a state structure is ready to go in case we need a new
         * one.  This is the only routine which uses freerd_state so no
@@ -130,10 +140,12 @@ hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
        lockmgr(&pmp->msglk, LK_EXCLUSIVE);
 
        state->msgid = msg->any.head.msgid;
-       state->spanid = msg->any.head.spanid;
-       kprintf("received msg %08x msgid %jx spanid=%jx\n",
+       state->router = &pmp->router;
+       kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
                msg->any.head.cmd,
-               (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
+               (intmax_t)msg->any.head.msgid,
+               (intmax_t)msg->any.head.source,
+               (intmax_t)msg->any.head.target);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
                state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
        else
@@ -168,6 +180,7 @@ hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
                state = pmp->freerd_state;
                pmp->freerd_state = NULL;
                msg->state = state;
+               state->router = msg->router;
                state->msg = msg;
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                state->txcmd = HAMMER2_MSGF_REPLY;
@@ -290,12 +303,13 @@ hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
 }
 
 void
-hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_cleanuprx(hammer2_msg_t *msg)
 {
+       hammer2_pfsmount_t *pmp = msg->router->pmp;
        hammer2_state_t *state;
 
        if ((state = msg->state) == NULL) {
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                state->rxcmd |= HAMMER2_MSGF_DELETE;
@@ -320,9 +334,9 @@ hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
                } else {
                        lockmgr(&pmp->msglk, LK_RELEASE);
                }
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        } else if (state->msg != msg) {
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        }
 }
 
@@ -340,8 +354,9 @@ hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
  * A NULL state may be returned in this case.
  */
 int
-hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_msgtx(hammer2_msg_t *msg)
 {
+       hammer2_pfsmount_t *pmp = msg->router->pmp;
        hammer2_state_t *state;
        int error;
 
@@ -519,12 +534,13 @@ hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
 }
 
 void
-hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_cleanuptx(hammer2_msg_t *msg)
 {
+       hammer2_pfsmount_t *pmp = msg->router->pmp;
        hammer2_state_t *state;
 
        if ((state = msg->state) == NULL) {
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                state->txcmd |= HAMMER2_MSGF_DELETE;
@@ -549,9 +565,9 @@ hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
                } else {
                        lockmgr(&pmp->msglk, LK_RELEASE);
                }
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        } else if (state->msg != msg) {
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        }
 }
 
@@ -565,33 +581,39 @@ hammer2_state_free(hammer2_state_t *state)
        state->msg = NULL;
        kfree(state, pmp->mmsg);
        if (msg)
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
 }
 
 hammer2_msg_t *
-hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
+hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd)
 {
        hammer2_msg_t *msg;
        size_t hbytes;
 
        hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
        msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
-                     pmp->mmsg, M_WAITOK | M_ZERO);
+                     router->pmp->mmsg, M_WAITOK | M_ZERO);
        msg->hdr_size = hbytes;
+       msg->router = router;
+       KKASSERT(router != NULL);
        msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
-       msg->any.head.spanid = spanid;
+       msg->any.head.source = 0;
+       msg->any.head.target = router->target;
        msg->any.head.cmd = cmd;
 
        return (msg);
 }
 
 void
-hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_free(hammer2_msg_t *msg)
 {
+       hammer2_pfsmount_t *pmp = msg->router->pmp;
+
        if (msg->aux_data && msg->aux_size) {
                kfree(msg->aux_data, pmp->mmsg);
                msg->aux_data = NULL;
                msg->aux_size = 0;
+               msg->router = NULL;
        }
        kfree(msg, pmp->mmsg);
 }
@@ -603,9 +625,9 @@ hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
 int
 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
 {
-       if (state1->spanid < state2->spanid)
+       if (state1->router < state2->router)
                return(-1);
-       if (state1->spanid > state2->spanid)
+       if (state1->router > state2->router)
                return(1);
        if (state1->msgid < state2->msgid)
                return(-1);
@@ -618,22 +640,23 @@ hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
  * Write a message.  All requisit command flags have been set.
  *
  * If msg->state is non-NULL the message is written to the existing
- * transaction.  Both msgid and spanid will be set accordingly.
+ * transaction.  msgid will be set accordingly.
  *
  * If msg->state is NULL and CREATE is set new state is allocated and
- * (func, data) is installed.
+ * (func, data) is installed.  A msgid is assigned.
  *
  * If msg->state is NULL and CREATE is not set the message is assumed
- * to be a one-way message.  The msgid and spanid must be set by the
- * caller (msgid is typically set to 0 for this case).
+ * to be a one-way message.  The originator must assign the msgid
+ * (or leave it 0, which is typical.
  *
  * This function merely queues the message to the management thread, it
  * does not write to the message socket/pipe.
  */
 void
-hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
+hammer2_msg_write(hammer2_msg_t *msg,
                  int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
 {
+       hammer2_pfsmount_t *pmp = msg->router->pmp;
        hammer2_state_t *state;
 
        if (msg->state) {
@@ -644,12 +667,14 @@ hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
                 * (Function callback and aux data for the receive side can
                 * be replaced or left alone).
                 */
-               msg->any.head.msgid = msg->state->msgid;
-               msg->any.head.spanid = msg->state->spanid;
+               state = msg->state;
+               msg->any.head.msgid = state->msgid;
+               msg->any.head.source = 0;
+               msg->any.head.target = state->router->target;
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                if (func) {
-                       msg->state->func = func;
-                       msg->state->any.any = data;
+                       state->func = func;
+                       state->any.any = data;
                }
        } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                /*
@@ -664,8 +689,11 @@ hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
                state->any.any = data;
                state->msg = msg;
                state->msgid = (uint64_t)(uintptr_t)state;
-               state->spanid = msg->any.head.spanid;
+               state->router = msg->router;
                msg->state = state;
+               msg->any.head.source = 0;
+               msg->any.head.target = state->router->target;
+               msg->any.head.msgid = state->msgid;
 
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
@@ -678,6 +706,8 @@ hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
                 * competing aborts and a real one-off message?)
                 */
                msg->any.head.msgid = 0;
+               msg->any.head.source = 0;
+               msg->any.head.target = msg->router->target;
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
        }
 
@@ -700,9 +730,10 @@ hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
  * If msg->state is non-NULL we are replying to a one-way message.
  */
 void
-hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
 {
        hammer2_state_t *state = msg->state;
+       hammer2_msg_t *nmsg;
        uint32_t cmd;
 
        /*
@@ -721,7 +752,7 @@ hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
         */
        if (state) {
                if (state->txcmd & HAMMER2_MSGF_DELETE) {
-                       hammer2_msg_free(pmp, msg);
+                       hammer2_msg_free(msg);
                        return;
                }
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
@@ -734,10 +765,10 @@ hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       msg = hammer2_msg_alloc(pmp, 0, cmd);
-       msg->any.head.error = error;
-       msg->state = state;
-       hammer2_msg_write(pmp, msg, NULL, NULL);
+       nmsg = hammer2_msg_alloc(msg->router, cmd);
+       nmsg->any.head.error = error;
+       nmsg->state = state;
+       hammer2_msg_write(nmsg, NULL, NULL);
 }
 
 /*
@@ -747,9 +778,10 @@ hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
  * function degenerates into the same as hammer2_msg_reply().
  */
 void
-hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
 {
        hammer2_state_t *state = msg->state;
+       hammer2_msg_t *nmsg;
        uint32_t cmd;
 
        /*
@@ -768,7 +800,7 @@ hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
         */
        if (state) {
                if (state->txcmd & HAMMER2_MSGF_DELETE) {
-                       hammer2_msg_free(pmp, msg);
+                       hammer2_msg_free(msg);
                        return;
                }
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
@@ -781,8 +813,8 @@ hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       msg = hammer2_msg_alloc(pmp, 0, cmd);
-       msg->any.head.error = error;
-       msg->state = state;
-       hammer2_msg_write(pmp, msg, NULL, NULL);
+       nmsg = hammer2_msg_alloc(msg->router, cmd);
+       nmsg->any.head.error = error;
+       nmsg->state = state;
+       hammer2_msg_write(nmsg, NULL, NULL);
 }
index f791998..012cefd 100644 (file)
 #include "hammer2.h"
 
 int
-hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_adhoc_input(hammer2_msg_t *msg)
 {
        kprintf("ADHOC INPUT MSG %08x\n", msg->any.head.cmd);
        return(0);
 }
 
 int
-hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg)
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
        case HAMMER2_DBG_SHELL:
                /*
                 * Execute shell command (not supported atm)
                 */
-               hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_NOSUPP);
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                break;
        case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                if (msg->aux_data) {
@@ -70,7 +70,7 @@ hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
                }
                break;
        default:
-               hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_NOSUPP);
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                break;
        }
        return(0);
index c56dc1e..882141c 100644 (file)
 /*
  * Mesh network protocol structures.
  *
+ *                             SPAN PROTOCOL
+ *
  * The mesh is constructed from point-to-point streaming links with varying
- * levels of interconnectedness, forming a graph.  The spanning tree protocol
- * running on each node transmits a LNK_SPAN transactional message to the
- * other end.  The protocol collects LNK_SPAN messages from all sources,
- * aggregates them using a shortest-distance-path algorithm, and transmits
- * them over each link as well, creating a multplication within the topology.
- *
- * Any node in the graph may transmit a message to any other node by using
- * the msgid of the LNK_SPAN open transaction as the message's 'linkid'.
- * This identifies both sides so there is no 'source' and 'target' per-say.
- *
- * Open transactions are recorded by the source and the target, but not by
- * intermediate nodes in the route.  Streaming protocols are used.  If a
- * span element is lost its transaction will be aborted automatically (even
- * if other routes to the same target are available), and any related
- * messages will be aborted.  If the span element was chosen for aggregation
- * this will propagate through the entire topology and thus ultimately reach
- * the target which used the aggregated span element, but does not
- * necessarily effect all paths in the topology.
- *
- * When a link failure occurs all SPANs related to that link are
- * transactionally closed.  The SPANs are not deleted until closed in
- * both directions, thus the spanid serves as a placeholder allowing all
- * in-transit messages being routed over that spanid to be properly thrown
- * out.  Once completely closed the spanid can be reused.
- *
- * NOTE: Multiple spans for the same physical {fsid,pfs_fsid} can be
- *      forwarded, allowing concurrency within the topology.
- *
- * NOTE: It is important that messages in a lost route be aborted because
- *      the messaging protocol expects serialization over any given route.
- *      Only propagated spans are forwarded as spans to other nodes, so any
- *      given open span transaction will represent a specific path.
- *
- *      If a portion of the path in the middle of the topology is lost it
- *      will propagate in both directions all the way to the ends that used
- *      it.  Intermediate route nodes DO NOT silently re-route messages to
- *      another span.  Messages in-flight will meet the updating SPAN and
- *      simply be discarded by intermediate nodes.  Ultimately the updating
- *      SPAN reaches all end-points and auto-aborts the open transaction.
- *
- *      If another path is available the transaction can be instantly
- *      retried.
- *
- * NOTE: It is possible to route messages virtually using the msgid of any
- *      open transaction instead of the msgid of a SPAN transaction, but
- *      not recommended and not currently coded.
- *
- * NOTE: Both the msgid and the spanid are 64-bit fields and may be populated
- *      with actual memory pointers (which simplifies the end-points).
- *      However, all such identifiers must be indexed as appropriate by the
- *      nodes and verified as being valid before any memory dereference
- *      occurs, for obvious reasons.
- *
- * All message responses follow the SAME PATH that the original message
- * followed, but in reverse.  This is an absolute requirement since messages
- * expecting replies record persistent state at each hop.  Sequencing must
- * be preserved.
+ * levels of interconnectedness, forming a graph.  Terminii in the graph
+ * are entities such as a HAMMER2 PFS or a network mount or other types
+ * of nodes.
+ *
+ * The spanning tree protocol runs symmetrically on every node. Each node
+ * transmits a representitive LNK_SPAN out all available connections.  Nodes
+ * also receive LNK_SPANs from other nodes (obviously), and must aggregate,
+ * reduce, and relay those LNK_SPANs out all available connections, thus
+ * propagating the spanning tree.  Any connection failure or topology change
+ * causes changes in the LNK_SPAN propagation.
+ *
+ * Each LNK_SPAN or LNK_SPAN relay represents a virtual circuit for routing
+ * purposes.  In addition, each relay is chained in one direction,
+ * representing a 1:N fan-out (i.e. one received LNK_SPAN can be relayed out
+ * multiple connections).  In order to be able to route a message via a
+ * LNK_SPAN over a deterministic route THE MESSAGE CAN ONLY FLOW FROM A
+ * REMOTE NODE TOWARDS OUR NODE (N:1 fan-in).
+ *
+ * This supports the requirement that we have both message serialization
+ * and positive feedback if a topology change breaks the chain of VCs
+ * the message is flowing over.  A remote node sending a message to us
+ * will get positive feedback that the route was broken and can take suitable
+ * action to terminate the transaction with an error.
+ *
+ *                             TRANSACTIONAL REPLIES
+ *
+ * However, when we receive a command message from a remote node and we want
+ * to reply to it, we have a problem.  We want the remote node to have
+ * positive feedback if our reply fails to make it, but if we use a virtual
+ * circuit based on the remote node's LNK_SPAN to us it will be a DIFFERENT
+ * virtual circuit than the one the remote node used to message us.  That's
+ * a problem because it means we have no reliable way to notify the remote
+ * node if we get notified that our reply has failed.
+ *
+ * The solution is to first note the fact that the remote chose an optimal
+ * route to get to us, so the reverse should be true. The reason the VC
+ * might not exist over the same route in the reverse is because there may
+ * be multiple paths available with the same distance metric.
+ *
+ * But this also means that we can adjust the messaging protocols to
+ * propagate a LNK_SPAN from the remote to us WHILE the remote's command
+ * message is being sent to us, and it will not only likely be optimal but
+ * it might also already exist, and it will also guarantee that a reply
+ * failure will propagate back to both sides (because even though each
+ * direction is using a different VC chain, the two chains are still
+ * going along the same path).
+ *
+ * We communicate the return VC by having the relay adjust both the target
+ * and the source fields in the message, rather than just the target, on
+ * each relay.  As of when the message gets to us the 'source' field will
+ * represent the VC for the return direction (and of course also identify
+ * the node the message came from).
+ *
+ * This way both sides get positive feedback if a topology change disrupts
+ * the VC for the transaction.  We also get one additional guarantee, and
+ * that is no spurious messages.  Messages simply die when the VC they are
+ * traveling over is broken, in either direction, simple as that.
+ * It makes managing message transactional states very easy.
  *
  *                     MESSAGE TRANSACTIONAL STATES
  *
  */
 struct hammer2_msg_hdr {
        uint16_t        magic;          /* 00 sanity, synchro, endian */
-       uint16_t        reserved02;     /* 02 size of header in bytes */
+       uint16_t        reserved02;     /* 02 */
        uint32_t        salt;           /* 04 random salt helps w/crypto */
 
        uint64_t        msgid;          /* 08 message transaction id */
-       uint64_t        spanid;         /* 10 message routing id or 0 */
-
-       uint32_t        cmd;            /* 18 flags | cmd | hdr_size / ALIGN */
-       uint32_t        aux_crc;        /* 1C auxillary data crc */
-       uint32_t        aux_bytes;      /* 20 auxillary data length (bytes) */
-       uint32_t        error;          /* 24 error code or 0 */
-       uint64_t        aux_descr;      /* 28 negotiated OOB data descr */
-       uint64_t        reserved30;     /* 30 */
+       uint64_t        source;         /* 10 originator or 0   */
+       uint64_t        target;         /* 18 destination or 0  */
+
+       uint32_t        cmd;            /* 20 flags | cmd | hdr_size / ALIGN */
+       uint32_t        aux_crc;        /* 24 auxillary data crc */
+       uint32_t        aux_bytes;      /* 28 auxillary data length (bytes) */
+       uint32_t        error;          /* 2C error code or 0 */
+       uint64_t        aux_descr;      /* 30 negotiated OOB data descr */
        uint32_t        reserved38;     /* 38 */
        uint32_t        hdr_crc;        /* 3C (aligned) extended header crc */
 };
index a806297..3bb3864 100644 (file)
@@ -141,7 +141,7 @@ static void hammer2_cluster_thread_rd(void *arg);
 static void hammer2_cluster_thread_wr(void *arg);
 static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
 static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
-static int hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg);
 
 /*
  * HAMMER2 vfs operations.
@@ -370,6 +370,7 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
        }
        ccms_domain_init(&pmp->ccms_dom);
        pmp->hmp = hmp;
+       pmp->router.pmp = pmp;
        ++hmp->pmp_count;
        lockmgr(&hammer2_mntlk, LK_RELEASE);
        kprintf("hammer2_mount hmp=%p pmpcnt=%d\n", hmp, hmp->pmp_count);
@@ -1042,8 +1043,7 @@ hammer2_cluster_thread_rd(void *arg)
                        error = EINVAL;
                        break;
                }
-               msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
-                             pmp->mmsg, M_WAITOK | M_ZERO);
+               msg = hammer2_msg_alloc(&pmp->router, hdr.cmd);
                msg->any.head = hdr;
                msg->hdr_size = hbytes;
                if (hbytes > sizeof(hdr)) {
@@ -1082,12 +1082,12 @@ hammer2_cluster_thread_rd(void *arg)
                 * to the connection except for EALREADY which forces
                 * a discard without execution.
                 */
-               error = hammer2_state_msgrx(pmp, msg);
+               error = hammer2_state_msgrx(msg);
                if (error) {
                        /*
                         * Raw protocol or connection error
                         */
-                       hammer2_msg_free(pmp, msg);
+                       hammer2_msg_free(msg);
                        if (error == EALREADY)
                                error = 0;
                } else if (msg->state && msg->state->func) {
@@ -1096,27 +1096,27 @@ hammer2_cluster_thread_rd(void *arg)
                         * handling function installed for it.
                         */
                        error = msg->state->func(msg->state, msg);
-                       hammer2_state_cleanuprx(pmp, msg);
+                       hammer2_state_cleanuprx(msg);
                } else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
                           HAMMER2_MSG_PROTO_LNK) {
                        /*
                         * Message related to the LNK protocol set
                         */
-                       error = hammer2_msg_lnk_rcvmsg(pmp, msg);
-                       hammer2_state_cleanuprx(pmp, msg);
+                       error = hammer2_msg_lnk_rcvmsg(msg);
+                       hammer2_state_cleanuprx(msg);
                } else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
                           HAMMER2_MSG_PROTO_DBG) {
                        /*
                         * Message related to the DBG protocol set
                         */
-                       error = hammer2_msg_dbg_rcvmsg(pmp, msg);
-                       hammer2_state_cleanuprx(pmp, msg);
+                       error = hammer2_msg_dbg_rcvmsg(msg);
+                       hammer2_state_cleanuprx(msg);
                } else {
                        /*
                         * Other higher-level messages (e.g. vnops)
                         */
-                       error = hammer2_msg_adhoc_input(pmp, msg);
-                       hammer2_state_cleanuprx(pmp, msg);
+                       error = hammer2_msg_adhoc_input(msg);
+                       hammer2_state_cleanuprx(msg);
                }
                msg = NULL;
        }
@@ -1128,7 +1128,7 @@ hammer2_cluster_thread_rd(void *arg)
        if (msg) {
                if (msg->state && msg->state->msg == msg)
                        msg->state->msg = NULL;
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        }
 
        if ((state = pmp->freerd_state) != NULL) {
@@ -1171,7 +1171,8 @@ hammer2_cluster_thread_wr(void *arg)
         * The transaction remains fully open for the duration of the
         * connection.
         */
-       msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE);
+       msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_CONN |
+                                             HAMMER2_MSGF_CREATE);
        msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
        msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
        msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
@@ -1181,7 +1182,7 @@ hammer2_cluster_thread_wr(void *arg)
                name_len = sizeof(msg->any.lnk_conn.label) - 1;
        bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
        msg->any.lnk_conn.label[name_len] = 0;
-       hammer2_msg_write(pmp, msg, hammer2_msg_conn_reply, pmp);
+       hammer2_msg_write(msg, hammer2_msg_conn_reply, pmp);
 
        /*
         * Transmit loop
@@ -1199,10 +1200,10 @@ hammer2_cluster_thread_wr(void *arg)
                        TAILQ_REMOVE(&pmp->msgq, msg, qentry);
                        lockmgr(&pmp->msglk, LK_RELEASE);
 
-                       error = hammer2_state_msgtx(pmp, msg);
+                       error = hammer2_state_msgtx(msg);
                        if (error == EALREADY) {
                                error = 0;
-                               hammer2_msg_free(pmp, msg);
+                               hammer2_msg_free(msg);
                                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                                continue;
                        }
@@ -1233,7 +1234,7 @@ hammer2_cluster_thread_wr(void *arg)
                                        break;
                                }
                        }
-                       hammer2_state_cleanuptx(pmp, msg);
+                       hammer2_state_cleanuptx(msg);
                        lockmgr(&pmp->msglk, LK_EXCLUSIVE);
                }
        }
@@ -1247,14 +1248,14 @@ hammer2_cluster_thread_wr(void *arg)
        if (msg) {
                if (msg->state && msg->state->msg == msg)
                        msg->state->msg = NULL;
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        }
 
        while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
                TAILQ_REMOVE(&pmp->msgq, msg, qentry);
                if (msg->state && msg->state->msg == msg)
                        msg->state->msg = NULL;
-               hammer2_msg_free(pmp, msg);
+               hammer2_msg_free(msg);
        }
 
        if ((state = pmp->freewr_state) != NULL) {
@@ -1281,12 +1282,15 @@ hammer2_cluster_thread_wr(void *arg)
 }
 
 static int
-hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
        case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
+               /*
+                * reply & leave trans open
+                */
                kprintf("CONN RECEIVE - (just ignore it)\n");
-               hammer2_msg_result(pmp, msg, 0); /* reply & leave trans open */
+               hammer2_msg_result(msg, 0);
                break;
        case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
                kprintf("SPAN RECEIVE - ADDED FROM CLUSTER\n");
@@ -1315,8 +1319,8 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
 
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
-               msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_SPAN |
-                                               HAMMER2_MSGF_CREATE);
+               msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
+                                                     HAMMER2_MSGF_CREATE);
                msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
                msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
                msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
@@ -1328,11 +1332,11 @@ hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
                      msg->any.lnk_span.label,
                      name_len);
                msg->any.lnk_span.label[name_len] = 0;
-               hammer2_msg_write(pmp, msg, hammer2_msg_span_reply, pmp);
+               hammer2_msg_write(msg, hammer2_msg_span_reply, pmp);
        }
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                kprintf("LNK_CONN transaction terminated by remote\n");
-               hammer2_msg_reply(pmp, msg, 0);
+               hammer2_msg_reply(msg, 0);
        }
        return(0);
 }