cluster - more libdmsg work
[dragonfly.git] / lib / libdmsg / msg_lnk.c
index 5e7eb27..e2e2b73 100644 (file)
  * SUCH DAMAGE.
  */
 /*
- * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
- *
- * This code supports the LNK_SPAN protocol.  Essentially all PFS's
- * clients and services rendezvous with the userland hammer2 service and
- * open LNK_SPAN transactions using a message header linkid of 0,
- * registering any PFS's they have connectivity to with us.
- *
- * --
- *
- * Each registration maintains its own open LNK_SPAN message transaction.
- * The SPANs are collected, aggregated, and retransmitted over available
- * connections through the maintainance of additional LNK_SPAN message
- * transactions on each link.
- *
- * The msgid for each active LNK_SPAN transaction we receive allows us to
- * send a message to the target PFS (which might be one of many belonging
- * to the same cluster), by specifying that msgid as the linkid in any
- * message we send to the target PFS.
- *
- * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
- * (and remember we will maintain multiple open LNK_SPAN transactions on
- * each connection representing the topology span, so every node sees every
- * other node as a separate open transaction).  So, similarly the msgid for
- * these active transactions which we initiated can be used by the other
- * end to route messages through us to another node, ultimately winding up
- * at the identified hammer2 PFS.  We have to adjust the spanid in the message
- * header at each hop to be representative of the outgoing LNK_SPAN we
- * are forwarding the message through.
- *
- * --
- *
- * If we were to retransmit every LNK_SPAN transaction we receive it would
- * create a huge mess, so we have to aggregate all received LNK_SPAN
- * transactions, sort them by the fsid (the cluster) and sub-sort them by
- * the pfs_fsid (individual nodes in the cluster), and only retransmit
- * (create outgoing transactions) for a subset of the nearest distance-hops
- * for each individual node.
- *
- * The higher level protocols can then issue transactions to the nodes making
- * up a cluster to perform all actions required.
- *
- * --
- *
- * Since this is a large topology and a spanning tree protocol, links can
- * go up and down all the time.  Any time a link goes down its transaction
- * is closed.  The transaction has to be closed on both ends before we can
- * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
- * closed may have been propagated out to other connections and those related
- * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
- * go away, ultimately reaching all sources and all targets.
- *
- * Any messages in-transit using a route that goes away will be thrown away.
- * Open transactions are only tracked at the two end-points.  When a link
- * failure propagates to an end-point the related open transactions lose
- * their spanid and are automatically aborted.
- *
- * It is important to note that internal route nodes cannot just associate
- * a lost LNK_SPAN transaction with another route to the same destination.
- * Message transactions MUST be serialized and MUST be ordered.  All messages
- * for a transaction must run over the same route.  So if the route used by
- * an active transaction is lost, the related messages will be fully aborted
- * and the higher protocol levels will retry as appropriate.
- *
- * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
- * back to the originator.  Only the originator keeps tracks of a message.
- * Routers just pass it through.  If a route is lost during transit the
- * message is simply thrown away.
- *
- * It is also important to note that several paths to the same PFS can be
- * propagated along the same link, which allows concurrency and even
- * redundancy over several network interfaces or via different routes through
- * the topology.  Any given transaction will use only a single route but busy
- * servers will often have hundreds of transactions active simultaniously,
- * so having multiple active paths through the network topology for A<->B
- * will improve performance.
- *
- * --
- *
- * Most protocols consolidate operations rather than simply relaying them.
- * This is particularly true of LEAF protocols (such as strict HAMMER2
- * clients), of which there can be millions connecting into the cluster at
- * various points.  The SPAN protocol is not used for these LEAF elements.
- *
- * Instead the primary service they connect to implements a proxy for the
- * client protocols so the core topology only has to propagate a couple of
- * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
- * core master nodes and satellite slaves and cache nodes.
+ * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an
+ * involved explanation of the protocol.
  */
 
 #include "dmsg_local.h"
 
+void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op);
+
 /*
  * Maximum spanning tree distance.  This has the practical effect of
  * stopping tail-chasing closed loops when a feeder span is lost.
@@ -193,6 +110,7 @@ RB_HEAD(h2span_cluster_tree, h2span_cluster);
 RB_HEAD(h2span_node_tree, h2span_node);
 RB_HEAD(h2span_link_tree, h2span_link);
 RB_HEAD(h2span_relay_tree, h2span_relay);
+uint32_t DMsgRNSS;
 
 /*
  * This represents a media
@@ -251,46 +169,42 @@ struct h2span_node {
        uint8_t pfs_type;
        uuid_t  pfs_fsid;               /* unique fsid */
        char    fs_label[128];          /* fs label (typ PEER_HAMMER2) */
+       void    *opaque;
 };
 
 struct h2span_link {
        RB_ENTRY(h2span_link) rbnode;
        dmsg_state_t    *state;         /* state<->link */
        struct h2span_node *node;       /* related node */
-       int32_t dist;
+       uint32_t        dist;
+       uint32_t        rnss;
        struct h2span_relay_queue relayq; /* relay out */
-       struct dmsg_router *router;     /* route out this link */
 };
 
 /*
  * Any LNK_SPAN transactions we receive which are relayed out other
- * connections utilize this structure to track the LNK_SPAN transaction
- * we initiate on the other connections, if selected for relay.
+ * connections utilize this structure to track the LNK_SPAN transactions
+ * we initiate (relay out) on other connections.  We only relay out
+ * LNK_SPANs on connections we have an open CONN transaction for.
+ *
+ * The relay structure points to the outgoing LNK_SPAN trans (out_state)
+ * and to the incoming LNK_SPAN transaction (in_state).  The relay
+ * structure holds refs on the related states.
  *
  * In many respects this is the core of the protocol... actually figuring
  * out what LNK_SPANs to relay.  The spanid used for relaying is the
  * address of the 'state' structure, which is why h2span_relay has to
  * be entered into a RB-TREE based at h2span_conn (so we can look
  * up the spanid to validate it).
- *
- * NOTE: Messages can be received via the LNK_SPAN transaction the
- *      relay maintains, and can be replied via relay->router, but
- *      messages are NOT initiated via a relay.  Messages are initiated
- *      via incoming links (h2span_link's).
- *
- *      relay->link represents the link being relayed, NOT the LNK_SPAN
- *      transaction the relay is holding open.
  */
 struct h2span_relay {
-       RB_ENTRY(h2span_relay) rbnode;  /* from h2span_conn */
-       TAILQ_ENTRY(h2span_relay) entry; /* from link */
-       struct h2span_conn *conn;
-       dmsg_state_t    *state;         /* transmitted LNK_SPAN */
-       struct h2span_link *link;       /* LNK_SPAN being relayed */
-       struct dmsg_router      *router;/* route out this relay */
+       TAILQ_ENTRY(h2span_relay) entry;        /* from link */
+       RB_ENTRY(h2span_relay) rbnode;          /* from h2span_conn */
+       struct h2span_conn      *conn;          /* related CONN transaction */
+       dmsg_state_t            *source_rt;     /* h2span_link state */
+       dmsg_state_t            *target_rt;     /* h2span_relay state */
 };
 
-
 typedef struct h2span_media h2span_media_t;
 typedef struct h2span_conn h2span_conn_t;
 typedef struct h2span_cluster h2span_cluster_t;
@@ -300,6 +214,10 @@ typedef struct h2span_relay h2span_relay_t;
 
 #define dmsg_termstr(array)    _dmsg_termstr((array), sizeof(array))
 
+static h2span_relay_t *dmsg_generate_relay(h2span_conn_t *conn,
+                                       h2span_link_t *slink);
+static uint32_t dmsg_rnss(void);
+
 static __inline
 void
 _dmsg_termstr(char *base, size_t size)
@@ -328,7 +246,9 @@ h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
 }
 
 /*
- * Match against the uuid.  Currently we never match against the label.
+ * Match against fs_label/pfs_fsid.  Together these two items represent a
+ * unique node.  In most cases the primary differentiator is pfs_fsid but
+ * we also string-match fs_label.
  */
 static
 int
@@ -336,7 +256,9 @@ h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
 {
        int r;
 
-       r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
+       r = strcmp(node1->fs_label, node2->fs_label);
+       if (r == 0)
+               r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
        return (r);
 }
 
@@ -356,6 +278,10 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+       if (link1->rnss < link2->rnss)
+               return(-1);
+       if (link1->rnss > link2->rnss)
+               return(1);
 #if 1
        if ((uintptr_t)link1->state < (uintptr_t)link2->state)
                return(-1);
@@ -379,8 +305,8 @@ static
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
-       h2span_link_t *link1 = relay1->link;
-       h2span_link_t *link2 = relay2->link;
+       h2span_link_t *link1 = relay1->source_rt->any.link;
+       h2span_link_t *link2 = relay2->source_rt->any.link;
 
        if ((intptr_t)link1->node < (intptr_t)link2->node)
                return(-1);
@@ -390,6 +316,10 @@ h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+       if (link1->rnss < link2->rnss)
+               return(-1);
+       if (link1->rnss > link2->rnss)
+               return(1);
 #if 1
        if ((uintptr_t)link1->state < (uintptr_t)link2->state)
                return(-1);
@@ -432,6 +362,7 @@ static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
 
 static void dmsg_lnk_span(dmsg_msg_t *msg);
 static void dmsg_lnk_conn(dmsg_msg_t *msg);
+static void dmsg_lnk_circ(dmsg_msg_t *msg);
 static void dmsg_lnk_relay(dmsg_msg_t *msg);
 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
 static void dmsg_relay_delete(h2span_relay_t *relay);
@@ -442,7 +373,7 @@ static void dmsg_volconf_start(h2span_media_config_t *conf,
                                const char *hostname);
 
 void
-dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
+dmsg_msg_lnk_signal(dmsg_iocom_t *iocom __unused)
 {
        pthread_mutex_lock(&cluster_mtx);
        dmsg_relay_scan(NULL, NULL);
@@ -450,20 +381,28 @@ dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
 }
 
 /*
- * Receive a DMSG_PROTO_LNK message.  This only called for
- * one-way and opening-transactions since state->func will be assigned
- * in all other cases.
+ * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK.
+ *           (incoming iocom lock not held)
+ *
+ * This function is typically called for one-way and opening-transactions
+ * since state->func is assigned after that, but it will also be called
+ * if no state->func is assigned on transaction-open.
  */
 void
 dmsg_msg_lnk(dmsg_msg_t *msg)
 {
-       switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
+       uint32_t icmd = msg->state ? msg->state->icmd : msg->any.head.cmd;
+
+       switch(icmd & DMSGF_BASECMDMASK) {
        case DMSG_LNK_CONN:
                dmsg_lnk_conn(msg);
                break;
        case DMSG_LNK_SPAN:
                dmsg_lnk_span(msg);
                break;
+       case DMSG_LNK_CIRC:
+               dmsg_lnk_circ(msg);
+               break;
        default:
                fprintf(stderr,
                        "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
@@ -473,6 +412,13 @@ dmsg_msg_lnk(dmsg_msg_t *msg)
        }
 }
 
+/*
+ * LNK_CONN - iocom identify message reception.
+ *           (incoming iocom lock not held)
+ *
+ * Remote node identifies itself to us, sets up a SPAN filter, and gives us
+ * the ok to start transmitting SPANs.
+ */
 void
 dmsg_lnk_conn(dmsg_msg_t *msg)
 {
@@ -486,6 +432,9 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
        pthread_mutex_lock(&cluster_mtx);
 
+       fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n",
+               msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd);
+
        switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_LNK_CONN | DMSGF_CREATE:
        case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
@@ -505,6 +454,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
                conn = dmsg_alloc(sizeof(*conn));
 
                RB_INIT(&conn->tree);
+               state->iocom->conn = conn;      /* XXX only one */
                conn->state = state;
                state->func = dmsg_lnk_conn;
                state->any.conn = conn;
@@ -529,7 +479,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
                if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
                        dmsg_msg_result(msg, 0);
-                       dmsg_router_signal(msg->router);
+                       dmsg_iocom_signal(msg->iocom);
                        break;
                }
                /* FALL THROUGH */
@@ -590,6 +540,7 @@ deleteconn:
                conn->media = NULL;
                conn->state = NULL;
                msg->state->any.conn = NULL;
+               msg->state->iocom->conn = NULL;
                TAILQ_REMOVE(&connq, conn, entry);
                dmsg_free(conn);
 
@@ -642,6 +593,13 @@ deleteconn:
        pthread_mutex_unlock(&cluster_mtx);
 }
 
+/*
+ * LNK_SPAN - Spanning tree protocol message reception
+ *           (incoming iocom lock not held)
+ *
+ * Receive a spanning tree transactional message, creating or destroying
+ * a SPAN and propagating it to other iocoms.
+ */
 void
 dmsg_lnk_span(dmsg_msg_t *msg)
 {
@@ -692,16 +650,23 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                 * Find the node
                 */
                dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
+               bcopy(msg->any.lnk_span.fs_label, dummy_node.fs_label,
+                     sizeof(dummy_node.fs_label));
                node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
                if (node == NULL) {
                        node = dmsg_alloc(sizeof(*node));
                        node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
+                       node->pfs_type = msg->any.lnk_span.pfs_type;
                        bcopy(msg->any.lnk_span.fs_label,
                              node->fs_label,
                              sizeof(node->fs_label));
                        node->cls = cls;
                        RB_INIT(&node->tree);
                        RB_INSERT(h2span_node_tree, &cls->tree, node);
+                       if (dmsg_node_handler) {
+                               dmsg_node_handler(&node->opaque, msg,
+                                                 DMSG_NODEOP_ADD);
+                       }
                }
 
                /*
@@ -712,27 +677,15 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                TAILQ_INIT(&slink->relayq);
                slink->node = node;
                slink->dist = msg->any.lnk_span.dist;
+               slink->rnss = msg->any.lnk_span.rnss;
                slink->state = state;
                state->any.link = slink;
 
-               /*
-                * Embedded router structure in link for message forwarding.
-                *
-                * The spanning id for the router is the message id of
-                * the SPAN link it is embedded in, allowing messages to
-                * be routed via &slink->router.
-                */
-               slink->router = dmsg_router_alloc();
-               slink->router->iocom = state->iocom;
-               slink->router->link = slink;
-               slink->router->target = state->msgid;
-               dmsg_router_connect(slink->router);
-
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
                fprintf(stderr,
                        "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
-                       msg->router->iocom,
+                       msg->iocom,
                        slink,
                        dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc),
                        msg->any.lnk_span.cl_label,
@@ -742,7 +695,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
 #if 0
                dmsg_relay_scan(NULL, node);
 #endif
-               dmsg_router_signal(msg->router);
+               dmsg_iocom_signal(msg->iocom);
        }
 
        /*
@@ -755,7 +708,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                cls = node->cls;
 
                fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
-                       msg->router->iocom,
+                       msg->iocom,
                        slink,
                        dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
                        state->msg->any.lnk_span.cl_label,
@@ -763,11 +716,6 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                        state->msg->any.lnk_span.dist);
                free(alloc);
 
-               /*
-                * Remove the router from consideration
-                */
-               dmsg_router_disconnect(&slink->router);
-
                /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
@@ -782,6 +730,10 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                RB_REMOVE(h2span_link_tree, &node->tree, slink);
                if (RB_EMPTY(&node->tree)) {
                        RB_REMOVE(h2span_node_tree, &cls->tree, node);
+                       if (dmsg_node_handler) {
+                               dmsg_node_handler(&node->opaque, msg,
+                                                 DMSG_NODEOP_DEL);
+                       }
                        if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
                                RB_REMOVE(h2span_cluster_tree,
                                          &cluster_tree, cls);
@@ -812,35 +764,279 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                        dmsg_relay_scan(NULL, node);
 #endif
                if (node)
-                       dmsg_router_signal(msg->router);
+                       dmsg_iocom_signal(msg->iocom);
        }
 
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 /*
- * Messages received on relay SPANs.  These are open transactions so it is
- * in fact possible for the other end to close the transaction.
+ * LNK_CIRC - Virtual circuit protocol message reception
+ *           (incoming iocom lock not held)
  *
- * XXX MPRACE on state structure
+ * Handles all cases.
  */
-static void
-dmsg_lnk_relay(dmsg_msg_t *msg)
+void
+dmsg_lnk_circ(dmsg_msg_t *msg)
 {
-       dmsg_state_t *state = msg->state;
-       h2span_relay_t *relay;
+       dmsg_circuit_t *circA;
+       dmsg_circuit_t *circB;
+       dmsg_state_t *rx_state;
+       dmsg_state_t *tx_state;
+       dmsg_state_t *state;
+       dmsg_state_t dummy;
+       dmsg_msg_t *fwd_msg;
+       dmsg_iocom_t *iocomA;
+       dmsg_iocom_t *iocomB;
+       int disconnect;
+
+       /*pthread_mutex_lock(&cluster_mtx);*/
+
+       if (DMsgDebugOpt >= 4)
+               fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd);
+
+       switch (msg->any.head.cmd & (DMSGF_CREATE |
+                                    DMSGF_DELETE |
+                                    DMSGF_REPLY)) {
+       case DMSGF_CREATE:
+       case DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * (A) wishes to establish a virtual circuit through us to (B).
+                * (B) is specified by lnk_circ.target (the message id for
+                * a LNK_SPAN that (A) received from us which represents (B)).
+                *
+                * Designate the originator of the circuit (the current
+                * remote end) as (A) and the other side as (B).
+                *
+                * Accept the VC but do not reply.  We will wait for the end-
+                * to-end reply to propagate back.
+                */
+               iocomA = msg->iocom;
 
-       assert(msg->any.head.cmd & DMSGF_REPLY);
+               /*
+                * Locate the open transaction state that the other end
+                * specified in <target>.  This will be an open SPAN
+                * transaction that we transmitted (h2span_relay) over
+                * the interface the LNK_CIRC is being received on.
+                *
+                * (all LNK_CIRC's that we transmit are on circuit0)
+                */
+               pthread_mutex_lock(&iocomA->mtx);
+               dummy.msgid = msg->any.lnk_circ.target;
+               tx_state = RB_FIND(dmsg_state_tree,
+                                  &iocomA->circuit0.statewr_tree,
+                                  &dummy);
+               pthread_mutex_unlock(&iocomA->mtx);
+               if (tx_state == NULL) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
+               if (tx_state->icmd != DMSG_LNK_SPAN) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
 
-       if (msg->any.head.cmd & DMSGF_DELETE) {
-               pthread_mutex_lock(&cluster_mtx);
-               if ((relay = state->any.relay) != NULL) {
-                       dmsg_relay_delete(relay);
+               /* locate h2span_link */
+               rx_state = tx_state->any.relay->source_rt;
+
+               /*
+                * A wishes to establish a VC through us to the
+                * specified target.
+                *
+                * A sends us the msgid of an open SPAN transaction
+                * it received from us as <target>.
+                */
+               circA = dmsg_alloc(sizeof(*circA));
+               dmsg_circuit_init(iocomA, circA);
+               circA->state = msg->state;      /* LNK_CIRC state */
+               circA->msgid = msg->state->msgid;
+               circA->span_state = tx_state;   /* H2SPAN_RELAY state */
+               circA->is_relay = 1;
+               circA->refs = 2;                /* state and peer */
+               msg->state->any.circ = circA;
+
+               iocomB = rx_state->iocom;
+
+               circB = dmsg_alloc(sizeof(*circB));
+               dmsg_circuit_init(iocomB, circB);
+
+               /*
+                * Create a LNK_CIRC transaction on B
+                */
+               fwd_msg = dmsg_msg_alloc(&iocomB->circuit0,
+                                        0, DMSG_LNK_CIRC | DMSGF_CREATE,
+                                        dmsg_lnk_circ, circB);
+               fwd_msg->state->any.circ = circB;
+               fwd_msg->any.lnk_circ.target = rx_state->msgid;
+               circB->state = fwd_msg->state;  /* LNK_CIRC state */
+               circB->msgid = fwd_msg->any.head.msgid;
+               circB->span_state = rx_state;   /* H2SPAN_LINK state */
+               circB->is_relay = 0;
+               circB->refs = 2;                /* state and peer */
+
+               if (DMsgDebugOpt >= 4)
+                       fprintf(stderr, "CIRC forward %p->%p\n", circA, circB);
+
+               /*
+                * Link the two circuits together.
+                */
+               circA->peer = circB;
+               circB->peer = circA;
+
+               if (iocomA < iocomB) {
+                       pthread_mutex_lock(&iocomA->mtx);
+                       pthread_mutex_lock(&iocomB->mtx);
                } else {
-                       dmsg_state_reply(state, 0);
+                       pthread_mutex_lock(&iocomB->mtx);
+                       pthread_mutex_lock(&iocomA->mtx);
                }
-               pthread_mutex_unlock(&cluster_mtx);
+               if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
+                       assert(0);
+               if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
+                       assert(0);
+               if (iocomA < iocomB) {
+                       pthread_mutex_unlock(&iocomB->mtx);
+                       pthread_mutex_unlock(&iocomA->mtx);
+               } else {
+                       pthread_mutex_unlock(&iocomA->mtx);
+                       pthread_mutex_unlock(&iocomB->mtx);
+               }
+
+               dmsg_msg_write(fwd_msg);
+
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0)
+                       break;
+               /* FALL THROUGH TO DELETE */
+       case DMSGF_DELETE:
+               /*
+                * (A) Is deleting the virtual circuit, propogate closure
+                * to (B).
+                */
+               iocomA = msg->iocom;
+               if (msg->state->any.circ == NULL) {
+                       /* already returned an error/deleted */
+                       break;
+               }
+               circA = msg->state->any.circ;
+               circB = circA->peer;
+               assert(msg->state == circA->state);
+
+               /*
+                * We are closing B's send side.  If B's receive side is
+                * already closed we disconnect the circuit from B's state.
+                */
+               disconnect = 0;
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->rxcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on A.  If A's send side is already
+                * closed we disconnect the circuit from A's state.
+                */
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       disconnect = 1;
+               }
+
+               /*
+                * Disconnect the peer<->peer association
+                */
+               if (disconnect) {
+                       if (circB) {
+                               circA->peer = NULL;
+                               circB->peer = NULL;
+                               dmsg_circuit_drop(circA);
+                               dmsg_circuit_drop(circB); /* XXX SMP */
+                       }
+               }
+               break;
+       case DMSGF_REPLY | DMSGF_CREATE:
+       case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * (B) is acknowledging the creation of the virtual
+                * circuit.  This propagates all the way back to (A), though
+                * it should be noted that (A) can start issuing commands
+                * via the virtual circuit before seeing this reply.
+                */
+               circB = msg->state->any.circ;
+               assert(circB);
+               circA = circB->peer;
+               assert(msg->state == circB->state);
+               assert(circA);
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                       dmsg_state_result(circA->state, msg->any.head.error);
+                       break;
+               }
+               /* FALL THROUGH TO DELETE */
+       case DMSGF_REPLY | DMSGF_DELETE:
+               /*
+                * (B) Is deleting the virtual circuit or acknowledging
+                * our deletion of the virtual circuit, propogate closure
+                * to (A).
+                */
+               iocomB = msg->iocom;
+               circB = msg->state->any.circ;
+               circA = circB->peer;
+               assert(msg->state == circB->state);
+
+               /*
+                * We received a close on (B), propagate to (A).  If we have
+                * already received the close from (A) we disconnect the state.
+                */
+               disconnect = 0;
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->rxcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on (B).  If (B)'s send side is already
+                * closed we disconnect the state.
+                */
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       disconnect = 1;
+               }
+
+               /*
+                * Disconnect the peer<->peer association
+                */
+               if (disconnect) {
+                       if (circA) {
+                               circB->peer = NULL;
+                               circA->peer = NULL;
+                               dmsg_circuit_drop(circB);
+                               dmsg_circuit_drop(circA); /* XXX SMP */
+                       }
+               }
+               break;
        }
+
+       /*pthread_mutex_lock(&cluster_mtx);*/
 }
 
 /*
@@ -911,9 +1107,9 @@ dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
 {
        struct relay_scan_info *info = arg;
 
-       if ((intptr_t)relay->link->node < (intptr_t)info->node)
+       if ((intptr_t)relay->source_rt->any.link->node < (intptr_t)info->node)
                return(-1);
-       if ((intptr_t)relay->link->node > (intptr_t)info->node)
+       if ((intptr_t)relay->source_rt->any.link->node > (intptr_t)info->node)
                return(1);
        return(0);
 }
@@ -936,8 +1132,10 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
        h2span_link_t *slink;
        dmsg_lnk_conn_t *lconn;
        dmsg_lnk_span_t *lspan;
-       dmsg_msg_t *msg;
-       int count = 2;
+       int count;
+       int maxcount = 2;
+       uint32_t lastdist = DMSG_SPAN_MAXDIST;
+       uint32_t lastrnss = 0;
 
        info.node = node;
        info.relay = NULL;
@@ -951,7 +1149,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
        relay = info.relay;
        info.relay = NULL;
        if (relay)
-               assert(relay->link->node == node);
+               assert(relay->source_rt->any.link->node == node);
 
        if (DMsgDebugOpt > 8)
                fprintf(stderr, "relay scan for connection %p\n", conn);
@@ -969,15 +1167,39 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         *  removed the relay, so the relay can only match exactly or
         *  be worse).
         */
+       count = 0;
        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+               /*
+                * Increment count of successful relays.  This isn't
+                * quite accurate if we break out but nothing after
+                * the loop uses (count).
+                *
+                * If count exceeds the maximum number of relays we desire
+                * we normally want to break out.  However, in order to
+                * guarantee a symmetric path we have to continue if both
+                * (dist) and (rnss) continue to match.  Otherwise the SPAN
+                * propagation in the reverse direction may choose different
+                * routes and we will not have a symmetric path.
+                *
+                * NOTE: Spanning tree does not have to be symmetrical so
+                *       this code is not currently enabled.
+                */
+               if (++count >= maxcount) {
+#ifdef REQUIRE_SYMMETRICAL
+                       if (lastdist != slink->dist || lastrnss != slink->rnss)
+                               break;
+#else
+                       break;
+#endif
+                       /* go beyond the nominal maximum desired relays */
+               }
+
                /*
                 * Match, relay already in-place, get the next
                 * relay to match against the next slink.
                 */
-               if (relay && relay->link == slink) {
+               if (relay && relay->source_rt->any.link == slink) {
                        relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
-                       if (--count == 0)
-                               break;
                        continue;
                }
 
@@ -1004,11 +1226,16 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                 *
                 * Don't bother transmitting if the remote connection
                 * is not accepting this SPAN's peer_type.
+                *
+                * pfs_mask is typically used so pure clients can filter
+                * out receiving SPANs for other pure clients.
                 */
                lspan = &slink->state->msg->any.lnk_span;
                lconn = &conn->state->msg->any.lnk_conn;
                if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0)
                        break;
+               if (((1LLU << lspan->pfs_type) & lconn->pfs_mask) == 0)
+                       break;
 
                /*
                 * Do not give pure clients visibility to other pure clients
@@ -1042,7 +1269,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                }
 
                /*
-                * NOTE! fs_uuid differentiates nodes within the same cluster
+                * NOTE! pfs_fsid differentiates nodes within the same cluster
                 *       so we obviously don't want to match those.  Similarly
                 *       for fs_label.
                 */
@@ -1051,47 +1278,17 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                 * Ok, we've accepted this SPAN for relaying.
                 */
                assert(relay == NULL ||
-                      relay->link->node != slink->node ||
-                      relay->link->dist >= slink->dist);
-               relay = dmsg_alloc(sizeof(*relay));
-               relay->conn = conn;
-               relay->link = slink;
-
-               msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
-                                       DMSG_LNK_SPAN |
-                                       DMSGF_CREATE,
-                                       dmsg_lnk_relay, relay);
-               relay->state = msg->state;
-               relay->router = dmsg_router_alloc();
-               relay->router->iocom = relay->state->iocom;
-               relay->router->relay = relay;
-               relay->router->target = relay->state->msgid;
-
-               msg->any.lnk_span = slink->state->msg->any.lnk_span;
-               msg->any.lnk_span.dist = slink->dist + 1;
-
-               dmsg_router_connect(relay->router);
-
-               RB_INSERT(h2span_relay_tree, &conn->tree, relay);
-               TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
-
-               dmsg_msg_write(msg);
-
-               fprintf(stderr,
-                       "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
-                       "FD %d state %p\n",
-                       slink,
-                       relay,
-                       node->cls, node, slink->dist,
-                       conn->state->iocom->sock_fd, relay->state);
+                      relay->source_rt->any.link->node != slink->node ||
+                      relay->source_rt->any.link->dist >= slink->dist);
+               relay = dmsg_generate_relay(conn, slink);
+               lastdist = slink->dist;
+               lastrnss = slink->rnss;
 
                /*
                 * Match (created new relay), get the next relay to
                 * match against the next slink.
                 */
                relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
-               if (--count == 0)
-                       break;
        }
 
        /*
@@ -1099,38 +1296,106 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         * the node are in excess of the current aggregate spanning state
         * and should be removed.
         */
-       while (relay && relay->link->node == node) {
+       while (relay && relay->source_rt->any.link->node == node) {
                next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               fprintf(stderr, "RELAY DELETE FROM EXTRAS\n");
                dmsg_relay_delete(relay);
                relay = next_relay;
        }
 }
 
+/*
+ * Helper function to generate missing relay.
+ *
+ * cluster_mtx must be held
+ */
+static
+h2span_relay_t *
+dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
+{
+       h2span_relay_t *relay;
+       h2span_node_t *node;
+       dmsg_msg_t *msg;
+
+       node = slink->node;
+
+       relay = dmsg_alloc(sizeof(*relay));
+       relay->conn = conn;
+       relay->source_rt = slink->state;
+       /* relay->source_rt->any.link = slink; */
+
+       /*
+        * NOTE: relay->target_rt->any.relay set to relay by alloc.
+        */
+       msg = dmsg_msg_alloc(&conn->state->iocom->circuit0,
+                            0, DMSG_LNK_SPAN | DMSGF_CREATE,
+                            dmsg_lnk_relay, relay);
+       relay->target_rt = msg->state;
+
+       msg->any.lnk_span = slink->state->msg->any.lnk_span;
+       msg->any.lnk_span.dist = slink->dist + 1;
+       msg->any.lnk_span.rnss = slink->rnss + dmsg_rnss();
+
+       RB_INSERT(h2span_relay_tree, &conn->tree, relay);
+       TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
+
+       dmsg_msg_write(msg);
+
+       return (relay);
+}
+
+/*
+ * Messages received on relay SPANs.  These are open transactions so it is
+ * in fact possible for the other end to close the transaction.
+ *
+ * XXX MPRACE on state structure
+ */
+static void
+dmsg_lnk_relay(dmsg_msg_t *msg)
+{
+       dmsg_state_t *state = msg->state;
+       h2span_relay_t *relay;
+
+       assert(msg->any.head.cmd & DMSGF_REPLY);
+
+       if (msg->any.head.cmd & DMSGF_DELETE) {
+               pthread_mutex_lock(&cluster_mtx);
+               fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n");
+               if ((relay = state->any.relay) != NULL) {
+                       dmsg_relay_delete(relay);
+               } else {
+                       dmsg_state_reply(state, 0);
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+}
+
+/*
+ * cluster_mtx held by caller
+ */
 static
 void
 dmsg_relay_delete(h2span_relay_t *relay)
 {
        fprintf(stderr,
                "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
-               relay->link,
+               relay->source_rt->any.link,
                relay,
-               relay->link->node->cls, relay->link->node,
-               relay->link->dist,
-               relay->conn->state->iocom->sock_fd, relay->state);
-
-       dmsg_router_disconnect(&relay->router);
+               relay->source_rt->any.link->node->cls, relay->source_rt->any.link->node,
+               relay->source_rt->any.link->dist,
+               relay->conn->state->iocom->sock_fd, relay->target_rt);
 
        RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
-       TAILQ_REMOVE(&relay->link->relayq, relay, entry);
+       TAILQ_REMOVE(&relay->source_rt->any.link->relayq, relay, entry);
 
-       if (relay->state) {
-               relay->state->any.relay = NULL;
-               dmsg_state_reply(relay->state, 0);
+       if (relay->target_rt) {
+               relay->target_rt->any.relay = NULL;
+               dmsg_state_reply(relay->target_rt, 0);
                /* state invalid after reply */
-               relay->state = NULL;
+               relay->target_rt = NULL;
        }
        relay->conn = NULL;
-       relay->link = NULL;
+       relay->source_rt = NULL;
        dmsg_free(relay);
 }
 
@@ -1225,6 +1490,55 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
        }
 }
 
+/************************************************************************
+ *                     MESSAGE ROUTING AND SOURCE VALIDATION           *
+ ************************************************************************/
+
+int
+dmsg_circuit_relay(dmsg_msg_t *msg)
+{
+       dmsg_iocom_t *iocom = msg->iocom;
+       dmsg_circuit_t *circ;
+       dmsg_circuit_t *peer;
+       dmsg_circuit_t dummy;
+       int error = 0;
+
+       /*
+        * Relay occurs before any state processing, msg state should always
+        * be NULL.
+        */
+       assert(msg->state == NULL);
+
+       /*
+        * Lookup the circuit on the incoming iocom.
+        */
+       pthread_mutex_lock(&iocom->mtx);
+
+       dummy.msgid = msg->any.head.circuit;
+       circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
+       assert(circ);
+       peer = circ->peer;
+       dmsg_circuit_hold(peer);
+
+       if (DMsgDebugOpt >= 4) {
+               fprintf(stderr,
+                       "CIRC relay %08x %p->%p\n",
+                       msg->any.head.cmd, circ, peer);
+       }
+
+       msg->iocom = peer->iocom;
+       msg->any.head.circuit = peer->msgid;
+       dmsg_circuit_drop_locked(msg->circuit);
+       msg->circuit = peer;
+
+       pthread_mutex_unlock(&iocom->mtx);
+
+       dmsg_msg_write(msg);
+       error = DMSG_IOQ_ERROR_ROUTED;
+
+       return error;
+}
+
 /************************************************************************
  *                     ROUTER AND MESSAGING HANDLES                    *
  ************************************************************************
@@ -1285,51 +1599,45 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 
 #endif
 
-#if 0
-/*
- * Acquire a persistent router structure given the cluster and node ids.
- * Messages can be transacted via this structure while held.  If the route
- * is lost messages will return failure.
- */
-dmsg_router_t *
-dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
-{
-}
-
-/*
- * Release previously acquired router.
- */
-void
-dmsg_router_put(dmsg_router_t *router)
-{
-}
-#endif
-
 /*
  * Dumps the spanning tree
+ *
+ * DEBUG ONLY
  */
 void
-dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
+dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
 {
        h2span_cluster_t *cls;
        h2span_node_t *node;
        h2span_link_t *slink;
+       h2span_relay_t *relay;
        char *uustr = NULL;
 
        pthread_mutex_lock(&cluster_mtx);
        RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
-               dmsg_router_printf(router, "Cluster %s (%s)\n",
-                                  dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
-                                  cls->cl_label);
+               dmsg_circuit_printf(circuit, "Cluster %s %s (%s)\n",
+                                 dmsg_peer_type_to_str(cls->peer_type),
+                                 dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
+                                 cls->cl_label);
                RB_FOREACH(node, h2span_node_tree, &cls->tree) {
-                       dmsg_router_printf(router, "    Node %s (%s)\n",
+                       dmsg_circuit_printf(circuit, "    Node %s %s (%s)\n",
+                               dmsg_pfs_type_to_str(node->pfs_type),
                                dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
                                node->fs_label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
-                               dmsg_router_printf(router,
-                                           "\tLink dist=%d via %d\n",
+                               dmsg_circuit_printf(circuit,
+                                           "\tSLink msgid %016jx "
+                                           "dist=%d via %d\n",
+                                           (intmax_t)slink->state->msgid,
                                            slink->dist,
                                            slink->state->iocom->sock_fd);
+                               TAILQ_FOREACH(relay, &slink->relayq, entry) {
+                                       dmsg_circuit_printf(circuit,
+                                           "\t    Relay-out msgid %016jx "
+                                           "via %d\n",
+                                           (intmax_t)relay->target_rt->msgid,
+                                           relay->target_rt->iocom->sock_fd);
+                               }
                        }
                }
        }
@@ -1341,3 +1649,60 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
        }
 #endif
 }
+
+/*
+ * DEBUG ONLY
+ *
+ * Locate the state representing an incoming LNK_SPAN given its msgid.
+ */
+int
+dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep)
+{
+       h2span_cluster_t *cls;
+       h2span_node_t *node;
+       h2span_link_t *slink;
+       h2span_relay_t *relay;
+
+       pthread_mutex_lock(&cluster_mtx);
+       relay = NULL;
+       RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+               RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+                       RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+                               if (slink->state->msgid == msgid) {
+                                       *statep = slink->state;
+                                       goto found;
+                               }
+                       }
+               }
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+       *statep = NULL;
+       return(ENOENT);
+found:
+       pthread_mutex_unlock(&cluster_mtx);
+       return(0);
+}
+
+/*
+ * Random number sub-sort value to add to SPAN rnss fields on relay.
+ * This allows us to differentiate spans with the same <dist> field
+ * for relaying purposes.  We must normally limit the number of relays
+ * for any given SPAN origination but we must also guarantee that a
+ * symmetric reverse path exists, so we use the rnss field as a sub-sort
+ * (since there can be thousands or millions if we only match on <dist>),
+ * and if there STILL too many spans we go past the limit.
+ */
+static
+uint32_t
+dmsg_rnss(void)
+{
+       if (DMsgRNSS == 0) {
+               pthread_mutex_lock(&cluster_mtx);
+               while (DMsgRNSS == 0) {
+                       srandomdev();
+                       DMsgRNSS = random();
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+       return(DMsgRNSS);
+}