cluster - more libdmsg work
[dragonfly.git] / lib / libdmsg / msg_lnk.c
index 67cc7a9..e2e2b73 100644 (file)
  * SUCH DAMAGE.
  */
 /*
- * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
- *
- * This code supports the LNK_SPAN protocol.  Essentially all PFS's
- * clients and services rendezvous with the userland hammer2 service and
- * open LNK_SPAN transactions using a message header linkid of 0,
- * registering any PFS's they have connectivity to with us.
- *
- * --
- *
- * Each registration maintains its own open LNK_SPAN message transaction.
- * The SPANs are collected, aggregated, and retransmitted over available
- * connections through the maintainance of additional LNK_SPAN message
- * transactions on each link.
- *
- * The msgid for each active LNK_SPAN transaction we receive allows us to
- * send a message to the target PFS (which might be one of many belonging
- * to the same cluster), by specifying that msgid as the linkid in any
- * message we send to the target PFS.
- *
- * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
- * (and remember we will maintain multiple open LNK_SPAN transactions on
- * each connection representing the topology span, so every node sees every
- * other node as a separate open transaction).  So, similarly the msgid for
- * these active transactions which we initiated can be used by the other
- * end to route messages through us to another node, ultimately winding up
- * at the identified hammer2 PFS.  We have to adjust the spanid in the message
- * header at each hop to be representative of the outgoing LNK_SPAN we
- * are forwarding the message through.
- *
- * --
- *
- * If we were to retransmit every LNK_SPAN transaction we receive it would
- * create a huge mess, so we have to aggregate all received LNK_SPAN
- * transactions, sort them by the fsid (the cluster) and sub-sort them by
- * the pfs_fsid (individual nodes in the cluster), and only retransmit
- * (create outgoing transactions) for a subset of the nearest distance-hops
- * for each individual node.
- *
- * The higher level protocols can then issue transactions to the nodes making
- * up a cluster to perform all actions required.
- *
- * --
- *
- * Since this is a large topology and a spanning tree protocol, links can
- * go up and down all the time.  Any time a link goes down its transaction
- * is closed.  The transaction has to be closed on both ends before we can
- * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
- * closed may have been propagated out to other connections and those related
- * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
- * go away, ultimately reaching all sources and all targets.
- *
- * Any messages in-transit using a route that goes away will be thrown away.
- * Open transactions are only tracked at the two end-points.  When a link
- * failure propagates to an end-point the related open transactions lose
- * their spanid and are automatically aborted.
- *
- * It is important to note that internal route nodes cannot just associate
- * a lost LNK_SPAN transaction with another route to the same destination.
- * Message transactions MUST be serialized and MUST be ordered.  All messages
- * for a transaction must run over the same route.  So if the route used by
- * an active transaction is lost, the related messages will be fully aborted
- * and the higher protocol levels will retry as appropriate.
- *
- * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
- * back to the originator.  Only the originator keeps tracks of a message.
- * Routers just pass it through.  If a route is lost during transit the
- * message is simply thrown away.
- *
- * It is also important to note that several paths to the same PFS can be
- * propagated along the same link, which allows concurrency and even
- * redundancy over several network interfaces or via different routes through
- * the topology.  Any given transaction will use only a single route but busy
- * servers will often have hundreds of transactions active simultaniously,
- * so having multiple active paths through the network topology for A<->B
- * will improve performance.
- *
- * --
- *
- * Most protocols consolidate operations rather than simply relaying them.
- * This is particularly true of LEAF protocols (such as strict HAMMER2
- * clients), of which there can be millions connecting into the cluster at
- * various points.  The SPAN protocol is not used for these LEAF elements.
- *
- * Instead the primary service they connect to implements a proxy for the
- * client protocols so the core topology only has to propagate a couple of
- * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
- * core master nodes and satellite slaves and cache nodes.
+ * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an
+ * involved explanation of the protocol.
  */
 
 #include "dmsg_local.h"
 
+void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op);
+
 /*
  * Maximum spanning tree distance.  This has the practical effect of
  * stopping tail-chasing closed loops when a feeder span is lost.
@@ -193,6 +110,7 @@ RB_HEAD(h2span_cluster_tree, h2span_cluster);
 RB_HEAD(h2span_node_tree, h2span_node);
 RB_HEAD(h2span_link_tree, h2span_link);
 RB_HEAD(h2span_relay_tree, h2span_relay);
+uint32_t DMsgRNSS;
 
 /*
  * This represents a media
@@ -239,6 +157,8 @@ struct h2span_cluster {
        RB_ENTRY(h2span_cluster) rbnode;
        struct h2span_node_tree tree;
        uuid_t  pfs_clid;               /* shared fsid */
+       uint8_t peer_type;
+       char    cl_label[128];          /* cluster label (typ PEER_BLOCK) */
        int     refs;                   /* prevents destruction */
 };
 
@@ -246,49 +166,45 @@ struct h2span_node {
        RB_ENTRY(h2span_node) rbnode;
        struct h2span_link_tree tree;
        struct h2span_cluster *cls;
-       uint8_t peer_type;
+       uint8_t pfs_type;
        uuid_t  pfs_fsid;               /* unique fsid */
-       char    label[64];
+       char    fs_label[128];          /* fs label (typ PEER_HAMMER2) */
+       void    *opaque;
 };
 
 struct h2span_link {
        RB_ENTRY(h2span_link) rbnode;
        dmsg_state_t    *state;         /* state<->link */
        struct h2span_node *node;       /* related node */
-       int32_t dist;
+       uint32_t        dist;
+       uint32_t        rnss;
        struct h2span_relay_queue relayq; /* relay out */
-       struct dmsg_router *router;     /* route out this link */
 };
 
 /*
  * Any LNK_SPAN transactions we receive which are relayed out other
- * connections utilize this structure to track the LNK_SPAN transaction
- * we initiate on the other connections, if selected for relay.
+ * connections utilize this structure to track the LNK_SPAN transactions
+ * we initiate (relay out) on other connections.  We only relay out
+ * LNK_SPANs on connections we have an open CONN transaction for.
+ *
+ * The relay structure points to the outgoing LNK_SPAN trans (out_state)
+ * and to the incoming LNK_SPAN transaction (in_state).  The relay
+ * structure holds refs on the related states.
  *
  * In many respects this is the core of the protocol... actually figuring
  * out what LNK_SPANs to relay.  The spanid used for relaying is the
  * address of the 'state' structure, which is why h2span_relay has to
  * be entered into a RB-TREE based at h2span_conn (so we can look
  * up the spanid to validate it).
- *
- * NOTE: Messages can be received via the LNK_SPAN transaction the
- *      relay maintains, and can be replied via relay->router, but
- *      messages are NOT initiated via a relay.  Messages are initiated
- *      via incoming links (h2span_link's).
- *
- *      relay->link represents the link being relayed, NOT the LNK_SPAN
- *      transaction the relay is holding open.
  */
 struct h2span_relay {
-       RB_ENTRY(h2span_relay) rbnode;  /* from h2span_conn */
-       TAILQ_ENTRY(h2span_relay) entry; /* from link */
-       struct h2span_conn *conn;
-       dmsg_state_t    *state;         /* transmitted LNK_SPAN */
-       struct h2span_link *link;       /* LNK_SPAN being relayed */
-       struct dmsg_router      *router;/* route out this relay */
+       TAILQ_ENTRY(h2span_relay) entry;        /* from link */
+       RB_ENTRY(h2span_relay) rbnode;          /* from h2span_conn */
+       struct h2span_conn      *conn;          /* related CONN transaction */
+       dmsg_state_t            *source_rt;     /* h2span_link state */
+       dmsg_state_t            *target_rt;     /* h2span_relay state */
 };
 
-
 typedef struct h2span_media h2span_media_t;
 typedef struct h2span_conn h2span_conn_t;
 typedef struct h2span_cluster h2span_cluster_t;
@@ -296,26 +212,53 @@ typedef struct h2span_node h2span_node_t;
 typedef struct h2span_link h2span_link_t;
 typedef struct h2span_relay h2span_relay_t;
 
+#define dmsg_termstr(array)    _dmsg_termstr((array), sizeof(array))
+
+static h2span_relay_t *dmsg_generate_relay(h2span_conn_t *conn,
+                                       h2span_link_t *slink);
+static uint32_t dmsg_rnss(void);
+
+static __inline
+void
+_dmsg_termstr(char *base, size_t size)
+{
+       base[size-1] = 0;
+}
+
+/*
+ * Cluster peer_type, uuid, AND label must match for a match
+ */
 static
 int
 h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
 {
-       return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL));
+       int r;
+
+       if (cls1->peer_type < cls2->peer_type)
+               return(-1);
+       if (cls1->peer_type > cls2->peer_type)
+               return(1);
+       r = uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL);
+       if (r == 0)
+               r = strcmp(cls1->cl_label, cls2->cl_label);
+
+       return r;
 }
 
+/*
+ * Match against fs_label/pfs_fsid.  Together these two items represent a
+ * unique node.  In most cases the primary differentiator is pfs_fsid but
+ * we also string-match fs_label.
+ */
 static
 int
 h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
 {
        int r;
 
-       if (node1->peer_type < node2->peer_type)
-               return(-1);
-       if (node1->peer_type > node2->peer_type)
-               return(1);
-       r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
-       if (r == 0 && node1->peer_type == DMSG_PEER_BLOCK)
-               r = strcmp(node1->label, node2->label);
+       r = strcmp(node1->fs_label, node2->fs_label);
+       if (r == 0)
+               r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
        return (r);
 }
 
@@ -335,6 +278,10 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+       if (link1->rnss < link2->rnss)
+               return(-1);
+       if (link1->rnss > link2->rnss)
+               return(1);
 #if 1
        if ((uintptr_t)link1->state < (uintptr_t)link2->state)
                return(-1);
@@ -358,8 +305,8 @@ static
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
-       h2span_link_t *link1 = relay1->link;
-       h2span_link_t *link2 = relay2->link;
+       h2span_link_t *link1 = relay1->source_rt->any.link;
+       h2span_link_t *link2 = relay2->source_rt->any.link;
 
        if ((intptr_t)link1->node < (intptr_t)link2->node)
                return(-1);
@@ -369,6 +316,10 @@ h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+       if (link1->rnss < link2->rnss)
+               return(-1);
+       if (link1->rnss > link2->rnss)
+               return(1);
 #if 1
        if ((uintptr_t)link1->state < (uintptr_t)link2->state)
                return(-1);
@@ -411,6 +362,7 @@ static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
 
 static void dmsg_lnk_span(dmsg_msg_t *msg);
 static void dmsg_lnk_conn(dmsg_msg_t *msg);
+static void dmsg_lnk_circ(dmsg_msg_t *msg);
 static void dmsg_lnk_relay(dmsg_msg_t *msg);
 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
 static void dmsg_relay_delete(h2span_relay_t *relay);
@@ -421,7 +373,7 @@ static void dmsg_volconf_start(h2span_media_config_t *conf,
                                const char *hostname);
 
 void
-dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
+dmsg_msg_lnk_signal(dmsg_iocom_t *iocom __unused)
 {
        pthread_mutex_lock(&cluster_mtx);
        dmsg_relay_scan(NULL, NULL);
@@ -429,20 +381,28 @@ dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
 }
 
 /*
- * Receive a DMSG_PROTO_LNK message.  This only called for
- * one-way and opening-transactions since state->func will be assigned
- * in all other cases.
+ * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK.
+ *           (incoming iocom lock not held)
+ *
+ * This function is typically called for one-way and opening-transactions
+ * since state->func is assigned after that, but it will also be called
+ * if no state->func is assigned on transaction-open.
  */
 void
 dmsg_msg_lnk(dmsg_msg_t *msg)
 {
-       switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
+       uint32_t icmd = msg->state ? msg->state->icmd : msg->any.head.cmd;
+
+       switch(icmd & DMSGF_BASECMDMASK) {
        case DMSG_LNK_CONN:
                dmsg_lnk_conn(msg);
                break;
        case DMSG_LNK_SPAN:
                dmsg_lnk_span(msg);
                break;
+       case DMSG_LNK_CIRC:
+               dmsg_lnk_circ(msg);
+               break;
        default:
                fprintf(stderr,
                        "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
@@ -452,6 +412,13 @@ dmsg_msg_lnk(dmsg_msg_t *msg)
        }
 }
 
+/*
+ * LNK_CONN - iocom identify message reception.
+ *           (incoming iocom lock not held)
+ *
+ * Remote node identifies itself to us, sets up a SPAN filter, and gives us
+ * the ok to start transmitting SPANs.
+ */
 void
 dmsg_lnk_conn(dmsg_msg_t *msg)
 {
@@ -465,6 +432,9 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
        pthread_mutex_lock(&cluster_mtx);
 
+       fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n",
+               msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd);
+
        switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_LNK_CONN | DMSGF_CREATE:
        case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
@@ -473,16 +443,18 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
                 * acknowledge the request, leaving the transaction open.
                 * We then relay priority-selected SPANs.
                 */
-               fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
+               fprintf(stderr, "LNK_CONN(%08x): %s/%s/%s\n",
                        (uint32_t)msg->any.head.msgid,
                        dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
                                            &alloc),
-                       msg->any.lnk_conn.label);
+                       msg->any.lnk_conn.cl_label,
+                       msg->any.lnk_conn.fs_label);
                free(alloc);
 
                conn = dmsg_alloc(sizeof(*conn));
 
                RB_INIT(&conn->tree);
+               state->iocom->conn = conn;      /* XXX only one */
                conn->state = state;
                state->func = dmsg_lnk_conn;
                state->any.conn = conn;
@@ -507,7 +479,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
                if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
                        dmsg_msg_result(msg, 0);
-                       dmsg_router_signal(msg->router);
+                       dmsg_iocom_signal(msg->iocom);
                        break;
                }
                /* FALL THROUGH */
@@ -568,6 +540,7 @@ deleteconn:
                conn->media = NULL;
                conn->state = NULL;
                msg->state->any.conn = NULL;
+               msg->state->iocom->conn = NULL;
                TAILQ_REMOVE(&connq, conn, entry);
                dmsg_free(conn);
 
@@ -620,6 +593,13 @@ deleteconn:
        pthread_mutex_unlock(&cluster_mtx);
 }
 
+/*
+ * LNK_SPAN - Spanning tree protocol message reception
+ *           (incoming iocom lock not held)
+ *
+ * Receive a spanning tree transactional message, creating or destroying
+ * a SPAN and propagating it to other iocoms.
+ */
 void
 dmsg_lnk_span(dmsg_msg_t *msg)
 {
@@ -643,16 +623,25 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                assert(state->func == NULL);
                state->func = dmsg_lnk_span;
 
-               msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
+               dmsg_termstr(msg->any.lnk_span.cl_label);
+               dmsg_termstr(msg->any.lnk_span.fs_label);
 
                /*
                 * Find the cluster
                 */
                dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
+               dummy_cls.peer_type = msg->any.lnk_span.peer_type;
+               bcopy(msg->any.lnk_span.cl_label,
+                     dummy_cls.cl_label,
+                     sizeof(dummy_cls.cl_label));
                cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
                if (cls == NULL) {
                        cls = dmsg_alloc(sizeof(*cls));
                        cls->pfs_clid = msg->any.lnk_span.pfs_clid;
+                       cls->peer_type = msg->any.lnk_span.peer_type;
+                       bcopy(msg->any.lnk_span.cl_label,
+                             cls->cl_label,
+                             sizeof(cls->cl_label));
                        RB_INIT(&cls->tree);
                        RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
                }
@@ -661,19 +650,23 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                 * Find the node
                 */
                dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
-               dummy_node.peer_type = msg->any.lnk_span.peer_type;
-               snprintf(dummy_node.label, sizeof(dummy_node.label),
-                        "%s", msg->any.lnk_span.label);
+               bcopy(msg->any.lnk_span.fs_label, dummy_node.fs_label,
+                     sizeof(dummy_node.fs_label));
                node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
                if (node == NULL) {
                        node = dmsg_alloc(sizeof(*node));
                        node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
-                       node->peer_type = msg->any.lnk_span.peer_type;
-                       snprintf(node->label, sizeof(node->label),
-                                "%s", msg->any.lnk_span.label);
+                       node->pfs_type = msg->any.lnk_span.pfs_type;
+                       bcopy(msg->any.lnk_span.fs_label,
+                             node->fs_label,
+                             sizeof(node->fs_label));
                        node->cls = cls;
                        RB_INIT(&node->tree);
                        RB_INSERT(h2span_node_tree, &cls->tree, node);
+                       if (dmsg_node_handler) {
+                               dmsg_node_handler(&node->opaque, msg,
+                                                 DMSG_NODEOP_ADD);
+                       }
                }
 
                /*
@@ -684,36 +677,25 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                TAILQ_INIT(&slink->relayq);
                slink->node = node;
                slink->dist = msg->any.lnk_span.dist;
+               slink->rnss = msg->any.lnk_span.rnss;
                slink->state = state;
                state->any.link = slink;
 
-               /*
-                * Embedded router structure in link for message forwarding.
-                *
-                * The spanning id for the router is the message id of
-                * the SPAN link it is embedded in, allowing messages to
-                * be routed via &slink->router.
-                */
-               slink->router = dmsg_router_alloc();
-               slink->router->iocom = state->iocom;
-               slink->router->link = slink;
-               slink->router->target = state->msgid;
-               dmsg_router_connect(slink->router);
-
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
-               fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n",
-                       msg->router->iocom,
+               fprintf(stderr,
+                       "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
+                       msg->iocom,
                        slink,
-                       dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid,
-                                           &alloc),
-                       msg->any.lnk_span.label,
+                       dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc),
+                       msg->any.lnk_span.cl_label,
+                       msg->any.lnk_span.fs_label,
                        msg->any.lnk_span.dist);
                free(alloc);
 #if 0
                dmsg_relay_scan(NULL, node);
 #endif
-               dmsg_router_signal(msg->router);
+               dmsg_iocom_signal(msg->iocom);
        }
 
        /*
@@ -725,19 +707,15 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                node = slink->node;
                cls = node->cls;
 
-               fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n",
-                       msg->router->iocom,
+               fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
+                       msg->iocom,
                        slink,
                        dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
-                       state->msg->any.lnk_span.label,
+                       state->msg->any.lnk_span.cl_label,
+                       state->msg->any.lnk_span.fs_label,
                        state->msg->any.lnk_span.dist);
                free(alloc);
 
-               /*
-                * Remove the router from consideration
-                */
-               dmsg_router_disconnect(&slink->router);
-
                /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
@@ -752,6 +730,10 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                RB_REMOVE(h2span_link_tree, &node->tree, slink);
                if (RB_EMPTY(&node->tree)) {
                        RB_REMOVE(h2span_node_tree, &cls->tree, node);
+                       if (dmsg_node_handler) {
+                               dmsg_node_handler(&node->opaque, msg,
+                                                 DMSG_NODEOP_DEL);
+                       }
                        if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
                                RB_REMOVE(h2span_cluster_tree,
                                          &cluster_tree, cls);
@@ -782,35 +764,279 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                        dmsg_relay_scan(NULL, node);
 #endif
                if (node)
-                       dmsg_router_signal(msg->router);
+                       dmsg_iocom_signal(msg->iocom);
        }
 
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 /*
- * Messages received on relay SPANs.  These are open transactions so it is
- * in fact possible for the other end to close the transaction.
+ * LNK_CIRC - Virtual circuit protocol message reception
+ *           (incoming iocom lock not held)
  *
- * XXX MPRACE on state structure
+ * Handles all cases.
  */
-static void
-dmsg_lnk_relay(dmsg_msg_t *msg)
+void
+dmsg_lnk_circ(dmsg_msg_t *msg)
 {
-       dmsg_state_t *state = msg->state;
-       h2span_relay_t *relay;
+       dmsg_circuit_t *circA;
+       dmsg_circuit_t *circB;
+       dmsg_state_t *rx_state;
+       dmsg_state_t *tx_state;
+       dmsg_state_t *state;
+       dmsg_state_t dummy;
+       dmsg_msg_t *fwd_msg;
+       dmsg_iocom_t *iocomA;
+       dmsg_iocom_t *iocomB;
+       int disconnect;
+
+       /*pthread_mutex_lock(&cluster_mtx);*/
+
+       if (DMsgDebugOpt >= 4)
+               fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd);
+
+       switch (msg->any.head.cmd & (DMSGF_CREATE |
+                                    DMSGF_DELETE |
+                                    DMSGF_REPLY)) {
+       case DMSGF_CREATE:
+       case DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * (A) wishes to establish a virtual circuit through us to (B).
+                * (B) is specified by lnk_circ.target (the message id for
+                * a LNK_SPAN that (A) received from us which represents (B)).
+                *
+                * Designate the originator of the circuit (the current
+                * remote end) as (A) and the other side as (B).
+                *
+                * Accept the VC but do not reply.  We will wait for the end-
+                * to-end reply to propagate back.
+                */
+               iocomA = msg->iocom;
 
-       assert(msg->any.head.cmd & DMSGF_REPLY);
+               /*
+                * Locate the open transaction state that the other end
+                * specified in <target>.  This will be an open SPAN
+                * transaction that we transmitted (h2span_relay) over
+                * the interface the LNK_CIRC is being received on.
+                *
+                * (all LNK_CIRC's that we transmit are on circuit0)
+                */
+               pthread_mutex_lock(&iocomA->mtx);
+               dummy.msgid = msg->any.lnk_circ.target;
+               tx_state = RB_FIND(dmsg_state_tree,
+                                  &iocomA->circuit0.statewr_tree,
+                                  &dummy);
+               pthread_mutex_unlock(&iocomA->mtx);
+               if (tx_state == NULL) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
+               if (tx_state->icmd != DMSG_LNK_SPAN) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
 
-       if (msg->any.head.cmd & DMSGF_DELETE) {
-               pthread_mutex_lock(&cluster_mtx);
-               if ((relay = state->any.relay) != NULL) {
-                       dmsg_relay_delete(relay);
+               /* locate h2span_link */
+               rx_state = tx_state->any.relay->source_rt;
+
+               /*
+                * A wishes to establish a VC through us to the
+                * specified target.
+                *
+                * A sends us the msgid of an open SPAN transaction
+                * it received from us as <target>.
+                */
+               circA = dmsg_alloc(sizeof(*circA));
+               dmsg_circuit_init(iocomA, circA);
+               circA->state = msg->state;      /* LNK_CIRC state */
+               circA->msgid = msg->state->msgid;
+               circA->span_state = tx_state;   /* H2SPAN_RELAY state */
+               circA->is_relay = 1;
+               circA->refs = 2;                /* state and peer */
+               msg->state->any.circ = circA;
+
+               iocomB = rx_state->iocom;
+
+               circB = dmsg_alloc(sizeof(*circB));
+               dmsg_circuit_init(iocomB, circB);
+
+               /*
+                * Create a LNK_CIRC transaction on B
+                */
+               fwd_msg = dmsg_msg_alloc(&iocomB->circuit0,
+                                        0, DMSG_LNK_CIRC | DMSGF_CREATE,
+                                        dmsg_lnk_circ, circB);
+               fwd_msg->state->any.circ = circB;
+               fwd_msg->any.lnk_circ.target = rx_state->msgid;
+               circB->state = fwd_msg->state;  /* LNK_CIRC state */
+               circB->msgid = fwd_msg->any.head.msgid;
+               circB->span_state = rx_state;   /* H2SPAN_LINK state */
+               circB->is_relay = 0;
+               circB->refs = 2;                /* state and peer */
+
+               if (DMsgDebugOpt >= 4)
+                       fprintf(stderr, "CIRC forward %p->%p\n", circA, circB);
+
+               /*
+                * Link the two circuits together.
+                */
+               circA->peer = circB;
+               circB->peer = circA;
+
+               if (iocomA < iocomB) {
+                       pthread_mutex_lock(&iocomA->mtx);
+                       pthread_mutex_lock(&iocomB->mtx);
                } else {
-                       dmsg_state_reply(state, 0);
+                       pthread_mutex_lock(&iocomB->mtx);
+                       pthread_mutex_lock(&iocomA->mtx);
                }
-               pthread_mutex_unlock(&cluster_mtx);
+               if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
+                       assert(0);
+               if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
+                       assert(0);
+               if (iocomA < iocomB) {
+                       pthread_mutex_unlock(&iocomB->mtx);
+                       pthread_mutex_unlock(&iocomA->mtx);
+               } else {
+                       pthread_mutex_unlock(&iocomA->mtx);
+                       pthread_mutex_unlock(&iocomB->mtx);
+               }
+
+               dmsg_msg_write(fwd_msg);
+
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0)
+                       break;
+               /* FALL THROUGH TO DELETE */
+       case DMSGF_DELETE:
+               /*
+                * (A) Is deleting the virtual circuit, propogate closure
+                * to (B).
+                */
+               iocomA = msg->iocom;
+               if (msg->state->any.circ == NULL) {
+                       /* already returned an error/deleted */
+                       break;
+               }
+               circA = msg->state->any.circ;
+               circB = circA->peer;
+               assert(msg->state == circA->state);
+
+               /*
+                * We are closing B's send side.  If B's receive side is
+                * already closed we disconnect the circuit from B's state.
+                */
+               disconnect = 0;
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->rxcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on A.  If A's send side is already
+                * closed we disconnect the circuit from A's state.
+                */
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       disconnect = 1;
+               }
+
+               /*
+                * Disconnect the peer<->peer association
+                */
+               if (disconnect) {
+                       if (circB) {
+                               circA->peer = NULL;
+                               circB->peer = NULL;
+                               dmsg_circuit_drop(circA);
+                               dmsg_circuit_drop(circB); /* XXX SMP */
+                       }
+               }
+               break;
+       case DMSGF_REPLY | DMSGF_CREATE:
+       case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * (B) is acknowledging the creation of the virtual
+                * circuit.  This propagates all the way back to (A), though
+                * it should be noted that (A) can start issuing commands
+                * via the virtual circuit before seeing this reply.
+                */
+               circB = msg->state->any.circ;
+               assert(circB);
+               circA = circB->peer;
+               assert(msg->state == circB->state);
+               assert(circA);
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                       dmsg_state_result(circA->state, msg->any.head.error);
+                       break;
+               }
+               /* FALL THROUGH TO DELETE */
+       case DMSGF_REPLY | DMSGF_DELETE:
+               /*
+                * (B) Is deleting the virtual circuit or acknowledging
+                * our deletion of the virtual circuit, propogate closure
+                * to (A).
+                */
+               iocomB = msg->iocom;
+               circB = msg->state->any.circ;
+               circA = circB->peer;
+               assert(msg->state == circB->state);
+
+               /*
+                * We received a close on (B), propagate to (A).  If we have
+                * already received the close from (A) we disconnect the state.
+                */
+               disconnect = 0;
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->rxcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on (B).  If (B)'s send side is already
+                * closed we disconnect the state.
+                */
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       disconnect = 1;
+               }
+
+               /*
+                * Disconnect the peer<->peer association
+                */
+               if (disconnect) {
+                       if (circA) {
+                               circB->peer = NULL;
+                               circA->peer = NULL;
+                               dmsg_circuit_drop(circB);
+                               dmsg_circuit_drop(circA); /* XXX SMP */
+                       }
+               }
+               break;
        }
+
+       /*pthread_mutex_lock(&cluster_mtx);*/
 }
 
 /*
@@ -881,9 +1107,9 @@ dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
 {
        struct relay_scan_info *info = arg;
 
-       if ((intptr_t)relay->link->node < (intptr_t)info->node)
+       if ((intptr_t)relay->source_rt->any.link->node < (intptr_t)info->node)
                return(-1);
-       if ((intptr_t)relay->link->node > (intptr_t)info->node)
+       if ((intptr_t)relay->source_rt->any.link->node > (intptr_t)info->node)
                return(1);
        return(0);
 }
@@ -905,9 +1131,11 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
        h2span_relay_t *next_relay;
        h2span_link_t *slink;
        dmsg_lnk_conn_t *lconn;
-       dmsg_msg_t *msg;
-       int count = 2;
-       uint8_t peer_type;
+       dmsg_lnk_span_t *lspan;
+       int count;
+       int maxcount = 2;
+       uint32_t lastdist = DMSG_SPAN_MAXDIST;
+       uint32_t lastrnss = 0;
 
        info.node = node;
        info.relay = NULL;
@@ -921,7 +1149,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
        relay = info.relay;
        info.relay = NULL;
        if (relay)
-               assert(relay->link->node == node);
+               assert(relay->source_rt->any.link->node == node);
 
        if (DMsgDebugOpt > 8)
                fprintf(stderr, "relay scan for connection %p\n", conn);
@@ -939,15 +1167,39 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         *  removed the relay, so the relay can only match exactly or
         *  be worse).
         */
+       count = 0;
        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+               /*
+                * Increment count of successful relays.  This isn't
+                * quite accurate if we break out but nothing after
+                * the loop uses (count).
+                *
+                * If count exceeds the maximum number of relays we desire
+                * we normally want to break out.  However, in order to
+                * guarantee a symmetric path we have to continue if both
+                * (dist) and (rnss) continue to match.  Otherwise the SPAN
+                * propagation in the reverse direction may choose different
+                * routes and we will not have a symmetric path.
+                *
+                * NOTE: Spanning tree does not have to be symmetrical so
+                *       this code is not currently enabled.
+                */
+               if (++count >= maxcount) {
+#ifdef REQUIRE_SYMMETRICAL
+                       if (lastdist != slink->dist || lastrnss != slink->rnss)
+                               break;
+#else
+                       break;
+#endif
+                       /* go beyond the nominal maximum desired relays */
+               }
+
                /*
                 * Match, relay already in-place, get the next
                 * relay to match against the next slink.
                 */
-               if (relay && relay->link == slink) {
+               if (relay && relay->source_rt->any.link == slink) {
                        relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
-                       if (--count == 0)
-                               break;
                        continue;
                }
 
@@ -974,73 +1226,69 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                 *
                 * Don't bother transmitting if the remote connection
                 * is not accepting this SPAN's peer_type.
+                *
+                * pfs_mask is typically used so pure clients can filter
+                * out receiving SPANs for other pure clients.
                 */
-               peer_type = slink->state->msg->any.lnk_span.peer_type;
+               lspan = &slink->state->msg->any.lnk_span;
                lconn = &conn->state->msg->any.lnk_conn;
-               if (((1LLU << peer_type) & lconn->peer_mask) == 0)
+               if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0)
+                       break;
+               if (((1LLU << lspan->pfs_type) & lconn->pfs_mask) == 0)
                        break;
 
                /*
-                * Filter based on pfs_clid or label (XXX).  This typically
-                * reduces the amount of SPAN traffic that a mount end-point
-                * sees by only passing along SPANs related to the cluster id
-                * (that is, it will see all PFS's associated with the
-                * particular cluster it represents).
+                * Do not give pure clients visibility to other pure clients
                 */
-               if (peer_type == lconn->peer_type &&
-                   peer_type == DMSG_PEER_HAMMER2) {
-                       if (!uuid_is_nil(&slink->node->cls->pfs_clid, NULL) &&
-                           uuid_compare(&slink->node->cls->pfs_clid,
-                                        &lconn->pfs_clid, NULL) != 0) {
-                               break;
-                       }
+               if (lconn->pfs_type == DMSG_PFSTYPE_CLIENT &&
+                   lspan->pfs_type == DMSG_PFSTYPE_CLIENT) {
+                       break;
                }
 
                /*
-                * Ok, we've accepted this SPAN for relaying.
+                * Connection filter, if cluster uuid is not NULL it must
+                * match the span cluster uuid.  Only applies when the
+                * peer_type matches.
                 */
-               assert(relay == NULL ||
-                      relay->link->node != slink->node ||
-                      relay->link->dist >= slink->dist);
-               relay = dmsg_alloc(sizeof(*relay));
-               relay->conn = conn;
-               relay->link = slink;
-
-               msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
-                                       DMSG_LNK_SPAN |
-                                       DMSGF_CREATE,
-                                       dmsg_lnk_relay, relay);
-               relay->state = msg->state;
-               relay->router = dmsg_router_alloc();
-               relay->router->iocom = relay->state->iocom;
-               relay->router->relay = relay;
-               relay->router->target = relay->state->msgid;
-
-               msg->any.lnk_span = slink->state->msg->any.lnk_span;
-               msg->any.lnk_span.dist = slink->dist + 1;
-
-               dmsg_router_connect(relay->router);
+               if (lspan->peer_type == lconn->peer_type &&
+                   !uuid_is_nil(&lconn->pfs_clid, NULL) &&
+                   uuid_compare(&slink->node->cls->pfs_clid,
+                                &lconn->pfs_clid, NULL)) {
+                       break;
+               }
 
-               RB_INSERT(h2span_relay_tree, &conn->tree, relay);
-               TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
+               /*
+                * Connection filter, if cluster label is not empty it must
+                * match the span cluster label.  Only applies when the
+                * peer_type matches.
+                */
+               if (lspan->peer_type == lconn->peer_type &&
+                   lconn->cl_label[0] &&
+                   strcmp(lconn->cl_label, slink->node->cls->cl_label)) {
+                       break;
+               }
 
-               dmsg_msg_write(msg);
+               /*
+                * NOTE! pfs_fsid differentiates nodes within the same cluster
+                *       so we obviously don't want to match those.  Similarly
+                *       for fs_label.
+                */
 
-               fprintf(stderr,
-                       "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
-                       "FD %d state %p\n",
-                       slink,
-                       relay,
-                       node->cls, node, slink->dist,
-                       conn->state->iocom->sock_fd, relay->state);
+               /*
+                * Ok, we've accepted this SPAN for relaying.
+                */
+               assert(relay == NULL ||
+                      relay->source_rt->any.link->node != slink->node ||
+                      relay->source_rt->any.link->dist >= slink->dist);
+               relay = dmsg_generate_relay(conn, slink);
+               lastdist = slink->dist;
+               lastrnss = slink->rnss;
 
                /*
                 * Match (created new relay), get the next relay to
                 * match against the next slink.
                 */
                relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
-               if (--count == 0)
-                       break;
        }
 
        /*
@@ -1048,38 +1296,106 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         * the node are in excess of the current aggregate spanning state
         * and should be removed.
         */
-       while (relay && relay->link->node == node) {
+       while (relay && relay->source_rt->any.link->node == node) {
                next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               fprintf(stderr, "RELAY DELETE FROM EXTRAS\n");
                dmsg_relay_delete(relay);
                relay = next_relay;
        }
 }
 
+/*
+ * Helper function to generate missing relay.
+ *
+ * cluster_mtx must be held
+ */
+static
+h2span_relay_t *
+dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
+{
+       h2span_relay_t *relay;
+       h2span_node_t *node;
+       dmsg_msg_t *msg;
+
+       node = slink->node;
+
+       relay = dmsg_alloc(sizeof(*relay));
+       relay->conn = conn;
+       relay->source_rt = slink->state;
+       /* relay->source_rt->any.link = slink; */
+
+       /*
+        * NOTE: relay->target_rt->any.relay set to relay by alloc.
+        */
+       msg = dmsg_msg_alloc(&conn->state->iocom->circuit0,
+                            0, DMSG_LNK_SPAN | DMSGF_CREATE,
+                            dmsg_lnk_relay, relay);
+       relay->target_rt = msg->state;
+
+       msg->any.lnk_span = slink->state->msg->any.lnk_span;
+       msg->any.lnk_span.dist = slink->dist + 1;
+       msg->any.lnk_span.rnss = slink->rnss + dmsg_rnss();
+
+       RB_INSERT(h2span_relay_tree, &conn->tree, relay);
+       TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
+
+       dmsg_msg_write(msg);
+
+       return (relay);
+}
+
+/*
+ * Messages received on relay SPANs.  These are open transactions so it is
+ * in fact possible for the other end to close the transaction.
+ *
+ * XXX MPRACE on state structure
+ */
+static void
+dmsg_lnk_relay(dmsg_msg_t *msg)
+{
+       dmsg_state_t *state = msg->state;
+       h2span_relay_t *relay;
+
+       assert(msg->any.head.cmd & DMSGF_REPLY);
+
+       if (msg->any.head.cmd & DMSGF_DELETE) {
+               pthread_mutex_lock(&cluster_mtx);
+               fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n");
+               if ((relay = state->any.relay) != NULL) {
+                       dmsg_relay_delete(relay);
+               } else {
+                       dmsg_state_reply(state, 0);
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+}
+
+/*
+ * cluster_mtx held by caller
+ */
 static
 void
 dmsg_relay_delete(h2span_relay_t *relay)
 {
        fprintf(stderr,
                "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
-               relay->link,
+               relay->source_rt->any.link,
                relay,
-               relay->link->node->cls, relay->link->node,
-               relay->link->dist,
-               relay->conn->state->iocom->sock_fd, relay->state);
-
-       dmsg_router_disconnect(&relay->router);
+               relay->source_rt->any.link->node->cls, relay->source_rt->any.link->node,
+               relay->source_rt->any.link->dist,
+               relay->conn->state->iocom->sock_fd, relay->target_rt);
 
        RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
-       TAILQ_REMOVE(&relay->link->relayq, relay, entry);
+       TAILQ_REMOVE(&relay->source_rt->any.link->relayq, relay, entry);
 
-       if (relay->state) {
-               relay->state->any.relay = NULL;
-               dmsg_state_reply(relay->state, 0);
+       if (relay->target_rt) {
+               relay->target_rt->any.relay = NULL;
+               dmsg_state_reply(relay->target_rt, 0);
                /* state invalid after reply */
-               relay->state = NULL;
+               relay->target_rt = NULL;
        }
        relay->conn = NULL;
-       relay->link = NULL;
+       relay->source_rt = NULL;
        dmsg_free(relay);
 }
 
@@ -1174,6 +1490,55 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
        }
 }
 
+/************************************************************************
+ *                     MESSAGE ROUTING AND SOURCE VALIDATION           *
+ ************************************************************************/
+
+int
+dmsg_circuit_relay(dmsg_msg_t *msg)
+{
+       dmsg_iocom_t *iocom = msg->iocom;
+       dmsg_circuit_t *circ;
+       dmsg_circuit_t *peer;
+       dmsg_circuit_t dummy;
+       int error = 0;
+
+       /*
+        * Relay occurs before any state processing, msg state should always
+        * be NULL.
+        */
+       assert(msg->state == NULL);
+
+       /*
+        * Lookup the circuit on the incoming iocom.
+        */
+       pthread_mutex_lock(&iocom->mtx);
+
+       dummy.msgid = msg->any.head.circuit;
+       circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
+       assert(circ);
+       peer = circ->peer;
+       dmsg_circuit_hold(peer);
+
+       if (DMsgDebugOpt >= 4) {
+               fprintf(stderr,
+                       "CIRC relay %08x %p->%p\n",
+                       msg->any.head.cmd, circ, peer);
+       }
+
+       msg->iocom = peer->iocom;
+       msg->any.head.circuit = peer->msgid;
+       dmsg_circuit_drop_locked(msg->circuit);
+       msg->circuit = peer;
+
+       pthread_mutex_unlock(&iocom->mtx);
+
+       dmsg_msg_write(msg);
+       error = DMSG_IOQ_ERROR_ROUTED;
+
+       return error;
+}
+
 /************************************************************************
  *                     ROUTER AND MESSAGING HANDLES                    *
  ************************************************************************
@@ -1234,50 +1599,45 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 
 #endif
 
-#if 0
-/*
- * Acquire a persistent router structure given the cluster and node ids.
- * Messages can be transacted via this structure while held.  If the route
- * is lost messages will return failure.
- */
-dmsg_router_t *
-dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
-{
-}
-
-/*
- * Release previously acquired router.
- */
-void
-dmsg_router_put(dmsg_router_t *router)
-{
-}
-#endif
-
 /*
  * Dumps the spanning tree
+ *
+ * DEBUG ONLY
  */
 void
-dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
+dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
 {
        h2span_cluster_t *cls;
        h2span_node_t *node;
        h2span_link_t *slink;
+       h2span_relay_t *relay;
        char *uustr = NULL;
 
        pthread_mutex_lock(&cluster_mtx);
        RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
-               dmsg_router_printf(router, "Cluster %s\n",
-                                  dmsg_uuid_to_str(&cls->pfs_clid, &uustr));
+               dmsg_circuit_printf(circuit, "Cluster %s %s (%s)\n",
+                                 dmsg_peer_type_to_str(cls->peer_type),
+                                 dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
+                                 cls->cl_label);
                RB_FOREACH(node, h2span_node_tree, &cls->tree) {
-                       dmsg_router_printf(router, "    Node %s (%s)\n",
+                       dmsg_circuit_printf(circuit, "    Node %s %s (%s)\n",
+                               dmsg_pfs_type_to_str(node->pfs_type),
                                dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
-                               node->label);
+                               node->fs_label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
-                               dmsg_router_printf(router,
-                                           "\tLink dist=%d via %d\n",
+                               dmsg_circuit_printf(circuit,
+                                           "\tSLink msgid %016jx "
+                                           "dist=%d via %d\n",
+                                           (intmax_t)slink->state->msgid,
                                            slink->dist,
                                            slink->state->iocom->sock_fd);
+                               TAILQ_FOREACH(relay, &slink->relayq, entry) {
+                                       dmsg_circuit_printf(circuit,
+                                           "\t    Relay-out msgid %016jx "
+                                           "via %d\n",
+                                           (intmax_t)relay->target_rt->msgid,
+                                           relay->target_rt->iocom->sock_fd);
+                               }
                        }
                }
        }
@@ -1289,3 +1649,60 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
        }
 #endif
 }
+
+/*
+ * DEBUG ONLY
+ *
+ * Locate the state representing an incoming LNK_SPAN given its msgid.
+ */
+int
+dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep)
+{
+       h2span_cluster_t *cls;
+       h2span_node_t *node;
+       h2span_link_t *slink;
+       h2span_relay_t *relay;
+
+       pthread_mutex_lock(&cluster_mtx);
+       relay = NULL;
+       RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+               RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+                       RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+                               if (slink->state->msgid == msgid) {
+                                       *statep = slink->state;
+                                       goto found;
+                               }
+                       }
+               }
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+       *statep = NULL;
+       return(ENOENT);
+found:
+       pthread_mutex_unlock(&cluster_mtx);
+       return(0);
+}
+
+/*
+ * Random number sub-sort value to add to SPAN rnss fields on relay.
+ * This allows us to differentiate spans with the same <dist> field
+ * for relaying purposes.  We must normally limit the number of relays
+ * for any given SPAN origination but we must also guarantee that a
+ * symmetric reverse path exists, so we use the rnss field as a sub-sort
+ * (since there can be thousands or millions if we only match on <dist>),
+ * and if there STILL too many spans we go past the limit.
+ */
+static
+uint32_t
+dmsg_rnss(void)
+{
+       if (DMsgRNSS == 0) {
+               pthread_mutex_lock(&cluster_mtx);
+               while (DMsgRNSS == 0) {
+                       srandomdev();
+                       DMsgRNSS = random();
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+       return(DMsgRNSS);
+}