hammer2 - cluster / libdmsg circuit work
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Nov 2012 23:39:12 +0000 (15:39 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Nov 2012 23:39:12 +0000 (15:39 -0800)
* Major work on the virtual circuit code.  Note there are still some thread
  races.

* Major work on the spanning tree code to allow for future symmetric
  pathing support.

* fs_label is now part of the RB tree compare, along with pfs_fsid.
  (e.g. the serial number for BLK protocol services).

* Rip out the 'router' structure, replace with the 'circuit' structure.
  Embed circuit0 in the iocom.

lib/libdmsg/debug.c
lib/libdmsg/dmsg.h
lib/libdmsg/dmsg_local.h
lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c
lib/libdmsg/service.c
lib/libdmsg/subs.c

index 9c69ef0..9251975 100644 (file)
@@ -247,13 +247,11 @@ dmsg_msg_str(dmsg_msg_t *msg)
         * Generate the buf
         */
        snprintf(buf, sizeof(buf),
-               "msg=%s%s %s id=%08x src=%08x tgt=%08x %s",
+               "msg=%s%s %s msgid=%08x %s",
                 dmsg_basecmd_str(msg->any.head.cmd),
                 flagbuf,
                 errstr,
                 (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
-                (uint32_t)(intmax_t)msg->any.head.source,  /* for brevity */
-                (uint32_t)(intmax_t)msg->any.head.target,  /* for brevity */
                 statestr);
 
        return(buf);
index 9e99129..2a3eb7b 100644 (file)
@@ -122,41 +122,59 @@ typedef struct dmsg_handshake dmsg_handshake_t;
  * directly embedded (any), and the message may contain a reference
  * to allocated auxillary data.  The structure is recycled quite often
  * by a connection.
- *
- * This structure is typically not used for storing persistent message
- * state (see dmsg_persist for that).
  */
 struct dmsg_iocom;
-struct dmsg_persist;
+struct dmsg_circuit;
 struct dmsg_state;
-struct dmsg_router;
 struct dmsg_msg;
 
 TAILQ_HEAD(dmsg_state_queue, dmsg_state);
 TAILQ_HEAD(dmsg_msg_queue, dmsg_msg);
 RB_HEAD(dmsg_state_tree, dmsg_state);
-RB_HEAD(dmsg_router_tree, dmsg_router);
+RB_HEAD(dmsg_circuit_tree, dmsg_circuit);
 
 struct h2span_link;
 struct h2span_relay;
 struct h2span_conn;
 
+struct dmsg_circuit {
+       RB_ENTRY(dmsg_circuit)  rbnode;
+       uint64_t                msgid;
+       struct dmsg_iocom       *iocom;
+       struct dmsg_state_tree  staterd_tree;   /* active transactions */
+       struct dmsg_state_tree  statewr_tree;   /* active transactions */
+       struct dmsg_circuit     *peer;          /* (if circuit relay) */
+       struct dmsg_state       *state;         /* open VC transaction state */
+       struct dmsg_state       *span_state;    /* span, relay or link */
+       int                     is_relay;       /* span is h2span_relay */
+       int                     refs;
+};
+
+/*
+ * The state structure is ref-counted.  The iocom cannot go away while
+ * state structures are active.  However, the related h2span_* linkages
+ * can be destroyed and NULL'd out if the state is terminated in both
+ * directions.
+ */
 struct dmsg_state {
        RB_ENTRY(dmsg_state) rbnode;            /* indexed by msgid */
        struct dmsg_iocom *iocom;
-       struct dmsg_router *router;             /* if routed */
+       struct dmsg_circuit *circuit;           /* associated circuit */
+       uint32_t        icmd;                   /* command creating state */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
        uint64_t        msgid;                  /* {spanid,msgid} uniq */
        int             flags;
        int             error;
-       struct dmsg_msg *msg;
+       int             refs;                   /* prevent destruction */
+       struct dmsg_msg *msg;                   /* msg creating orig state */
        void (*func)(struct dmsg_msg *);
        union {
                void *any;
                struct h2span_link *link;
                struct h2span_conn *conn;
                struct h2span_relay *relay;
+               struct dmsg_circuit *circ;
        } any;
 };
 
@@ -164,22 +182,33 @@ struct dmsg_state {
 #define DMSG_STATE_DYNAMIC     0x0002
 #define DMSG_STATE_NODEID      0x0004          /* manages a node id */
 
+/*
+ * This is the core in-memory representation of a message structure.
+ * The iocom represents the incoming or outgoing iocom.  Various state
+ * pointers are calculated based on the message's raw source and target
+ * fields, and will ref the underlying state.  Message headers are embedded
+ * while auxillary data is separately allocated.
+ */
 struct dmsg_msg {
        TAILQ_ENTRY(dmsg_msg) qentry;
-       struct dmsg_router *router;
-       struct dmsg_state *state;
+       struct dmsg_iocom *iocom;               /* incoming/outgoing iocom */
+       struct dmsg_circuit *circuit;           /* associated circuit */
+       struct dmsg_state *state;               /* message state */
        size_t          hdr_size;
        size_t          aux_size;
        char            *aux_data;
        dmsg_any_t      any;
 };
 
+typedef struct dmsg_circuit dmsg_circuit_t;
 typedef struct dmsg_state dmsg_state_t;
 typedef struct dmsg_msg dmsg_msg_t;
 typedef struct dmsg_msg_queue dmsg_msg_queue_t;
 
 int dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2);
 RB_PROTOTYPE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
+int dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2);
+RB_PROTOTYPE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
 
 /*
  * dmsg_ioq - An embedded component of dmsg_conn, holds state
@@ -232,43 +261,14 @@ typedef struct dmsg_ioq dmsg_ioq_t;
 #define DMSG_IOQ_ERROR_IVWRAP          18      /* IVs exhaused */
 #define DMSG_IOQ_ERROR_MACFAIL         19      /* MAC of encr alg failed */
 #define DMSG_IOQ_ERROR_ALGO            20      /* Misc. encr alg error */
+#define DMSG_IOQ_ERROR_ROUTED          21      /* ignore routed message */
+#define DMSG_IOQ_ERROR_BAD_CIRCUIT     22      /* unconfigured circuit */
+#define DMSG_IOQ_ERROR_UNUSED23                23
+#define DMSG_IOQ_ERROR_ASSYM           24      /* Assymetric path */
 
 #define DMSG_IOQ_MAXIOVEC    16
 
 /*
- * dmsg_router - governs the routing of a message.  Passed into
- *                 dmsg_msg_write.
- *
- * The router is either connected to an iocom (socket) directly, or
- * connected to a SPAN transaction (h2span_link structure for outgoing)
- * or to a SPAN transaction (h2span_relay structure for incoming).
- */
-struct dmsg_router {
-       RB_ENTRY(dmsg_router) rbnode;   /* indexed by target */
-       struct dmsg_iocom *iocom;
-       struct h2span_link   *link;             /* may be NULL */
-       struct h2span_relay  *relay;            /* may be NULL */
-       void    (*signal_callback)(struct dmsg_router *);
-       void    (*rcvmsg_callback)(struct dmsg_msg *);
-       void    (*altmsg_callback)(struct dmsg_iocom *);
-       void    (*dbgmsg_callback)(dmsg_msg_t *msg);
-       struct dmsg_state_tree staterd_tree; /* active messages */
-       struct dmsg_state_tree statewr_tree; /* active messages */
-       dmsg_msg_queue_t txmsgq;                /* tx msgq from remote */
-       uint64_t        target;                 /* for routing */
-       int             flags;
-       int             refs;                   /* refs prevent destruction */
-};
-
-#define DMSG_ROUTER_CONNECTED          0x0001  /* on global RB tree */
-#define DMSG_ROUTER_DELETED            0x0002  /* parent structure destroyed */
-
-typedef struct dmsg_router dmsg_router_t;
-
-int dmsg_router_cmp(dmsg_router_t *router1, dmsg_router_t *router2);
-RB_PROTOTYPE(dmsg_router_tree, dmsg_router, rbnode, dmsg_router_cmp);
-
-/*
  * dmsg_iocom - governs a messaging stream connection
  */
 struct dmsg_iocom {
@@ -282,7 +282,14 @@ struct dmsg_iocom {
        int     flags;
        int     rxmisc;
        int     txmisc;
-       struct dmsg_router *router;
+       void    (*signal_callback)(struct dmsg_iocom *);
+       void    (*rcvmsg_callback)(struct dmsg_msg *);
+       void    (*altmsg_callback)(struct dmsg_iocom *);
+       void    (*dbgmsg_callback)(dmsg_msg_t *msg);
+       struct dmsg_circuit_tree circuit_tree;  /* active circuits */
+       struct dmsg_circuit     circuit0;       /* embedded circuit0 */
+       dmsg_msg_queue_t txmsgq;                /* tx msgq from remote */
+       struct h2span_conn *conn;               /* if LNK_CONN active */
        pthread_mutex_t mtx;                    /* mutex for state*tree/rmsgq */
 };
 
@@ -328,6 +335,14 @@ struct dmsg_master_service_info {
 
 typedef struct dmsg_master_service_info dmsg_master_service_info_t;
 
+/*
+ * node callbacks
+ */
+#define DMSG_NODEOP_ADD                1
+#define DMSG_NODEOP_DEL                2
+
+extern void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op);
+
 
 /*
  * icrc
@@ -347,6 +362,8 @@ const char *dmsg_msg_str(dmsg_msg_t *msg);
 void *dmsg_alloc(size_t bytes);
 void dmsg_free(void *ptr);
 const char *dmsg_uuid_to_str(uuid_t *uuid, char **strp);
+const char *dmsg_peer_type_to_str(uint8_t type);
+const char *dmsg_pfs_type_to_str(uint8_t type);
 int dmsg_connect(const char *hostname);
 
 /*
@@ -356,22 +373,24 @@ void dmsg_bswap_head(dmsg_hdr_t *head);
 void dmsg_ioq_init(dmsg_iocom_t *iocom, dmsg_ioq_t *ioq);
 void dmsg_ioq_done(dmsg_iocom_t *iocom, dmsg_ioq_t *ioq);
 void dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
-                       void (*state_func)(dmsg_router_t *),
+                       void (*state_func)(dmsg_iocom_t *),
                        void (*rcvmsg_func)(dmsg_msg_t *),
                        void (*dbgmsg_func)(dmsg_msg_t *),
                        void (*altmsg_func)(dmsg_iocom_t *));
-void dmsg_router_restate(dmsg_router_t *router,
-                       void (*state_func)(dmsg_router_t *),
+void dmsg_iocom_restate(dmsg_iocom_t *iocom,
+                       void (*state_func)(dmsg_iocom_t *),
                        void (*rcvmsg_func)(dmsg_msg_t *),
                        void (*altmsg_func)(dmsg_iocom_t *));
-void dmsg_router_signal(dmsg_router_t *router);
+void dmsg_iocom_signal(dmsg_iocom_t *iocom);
 void dmsg_iocom_done(dmsg_iocom_t *iocom);
-dmsg_msg_t *dmsg_msg_alloc(dmsg_router_t *router,
+void dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit);
+dmsg_msg_t *dmsg_msg_alloc(dmsg_circuit_t *circuit,
                        size_t aux_size, uint32_t cmd,
                        void (*func)(dmsg_msg_t *), void *data);
 void dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error);
 void dmsg_msg_result(dmsg_msg_t *msg, uint32_t error);
 void dmsg_state_reply(dmsg_state_t *state, uint32_t error);
+void dmsg_state_result(dmsg_state_t *state, uint32_t error);
 
 void dmsg_msg_free(dmsg_msg_t *msg);
 
@@ -385,18 +404,17 @@ void dmsg_iocom_flush2(dmsg_iocom_t *iocom);
 
 void dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
 void dmsg_state_free(dmsg_state_t *state);
+void dmsg_circuit_drop(dmsg_circuit_t *circuit);
 
-dmsg_router_t *dmsg_router_alloc(void);
-void dmsg_router_connect(dmsg_router_t *router);
-void dmsg_router_disconnect(dmsg_router_t **routerp);
+int dmsg_circuit_relay(dmsg_msg_t *msg);
 
 /*
  * Msg protocol functions
  */
-void dmsg_msg_lnk_signal(dmsg_router_t *router);
+void dmsg_msg_lnk_signal(dmsg_iocom_t *iocom);
 void dmsg_msg_lnk(dmsg_msg_t *msg);
 void dmsg_msg_dbg(dmsg_msg_t *msg);
-void dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused);
+void dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused);
 
 /*
  * Crypto functions
@@ -411,7 +429,7 @@ int dmsg_crypto_encrypt(dmsg_iocom_t *iocom, dmsg_ioq_t *ioq,
  * Service daemon functions
  */
 void *dmsg_master_service(void *data);
-void dmsg_router_printf(dmsg_router_t *router, const char *ctl, ...)
+void dmsg_circuit_printf(dmsg_circuit_t *circuit, const char *ctl, ...)
        __printflike(2, 3);
 
 extern int DMsgDebugOpt;
index a987532..99aac89 100644 (file)
@@ -62,4 +62,6 @@
 #include <openssl/err.h>
 #include <openssl/evp.h>       /* aes_256_cbc functions */
 
+#include <machine/atomic.h>
+
 #include "dmsg.h"
index ee9a8e9..8c0b3ae 100644 (file)
@@ -40,53 +40,37 @@ int DMsgDebugOpt;
 static int dmsg_state_msgrx(dmsg_msg_t *msg);
 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
 
+RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
+RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
+
 /*
- * ROUTER TREE - Represents available routes for message routing, indexed
- *              by their span transaction id.  The router structure is
- *              embedded in either an iocom, h2span_link (incoming),
- *              or h2span_relay (outgoing) (see msg_lnk.c).
+ * STATE TREE - Represents open transactions which are indexed by their
+ *             { msgid } relative to the governing iocom.
  */
 int
-dmsg_router_cmp(dmsg_router_t *router1, dmsg_router_t *router2)
+dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
 {
-       if (router1->target < router2->target)
+       if (state1->msgid < state2->msgid)
                return(-1);
-       if (router1->target > router2->target)
+       if (state1->msgid > state2->msgid)
                return(1);
        return(0);
 }
 
-RB_GENERATE(dmsg_router_tree, dmsg_router, rbnode, dmsg_router_cmp);
-
-static pthread_mutex_t router_mtx;
-static struct dmsg_router_tree router_ltree = RB_INITIALIZER(router_ltree);
-static struct dmsg_router_tree router_rtree = RB_INITIALIZER(router_rtree);
-
 /*
- * STATE TREE - Represents open transactions which are indexed by their
- *             {router,msgid} relative to the governing iocom.
- *
- *             router is usually iocom->router since state isn't stored
- *             for relayed messages.
+ * CIRCUIT TREE - Represents open circuits which are indexed by their
+ *               { msgid } relative to the governing iocom.
  */
 int
-dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
+dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
 {
-#if 0
-       if (state1->router < state2->router)
-               return(-1);
-       if (state1->router > state2->router)
-               return(1);
-#endif
-       if (state1->msgid < state2->msgid)
+       if (circuit1->msgid < circuit2->msgid)
                return(-1);
-       if (state1->msgid > state2->msgid)
+       if (circuit1->msgid > circuit2->msgid)
                return(1);
        return(0);
 }
 
-RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
-
 /*
  * Initialize a low-level ioq
  */
@@ -127,7 +111,7 @@ dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
  */
 void
 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
-                  void (*signal_func)(dmsg_router_t *),
+                  void (*signal_func)(dmsg_iocom_t *),
                   void (*rcvmsg_func)(dmsg_msg_t *),
                   void (*dbgmsg_func)(dmsg_msg_t *),
                   void (*altmsg_func)(dmsg_iocom_t *))
@@ -136,20 +120,16 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
 
        bzero(iocom, sizeof(*iocom));
 
-       iocom->router = dmsg_router_alloc();
-       iocom->router->signal_callback = signal_func;
-       iocom->router->rcvmsg_callback = rcvmsg_func;
-       iocom->router->altmsg_callback = altmsg_func;
-       iocom->router->dbgmsg_callback = dbgmsg_func;
-       /* we do not call dmsg_router_connect() for iocom routers */
+       iocom->signal_callback = signal_func;
+       iocom->rcvmsg_callback = rcvmsg_func;
+       iocom->altmsg_callback = altmsg_func;
+       iocom->dbgmsg_callback = dbgmsg_func;
 
        pthread_mutex_init(&iocom->mtx, NULL);
-       RB_INIT(&iocom->router->staterd_tree);
-       RB_INIT(&iocom->router->statewr_tree);
+       RB_INIT(&iocom->circuit_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
-       TAILQ_INIT(&iocom->router->txmsgq);
-       iocom->router->iocom = iocom;
+       TAILQ_INIT(&iocom->txmsgq);
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = DMSG_IOCOMF_RREQ;
@@ -162,6 +142,8 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
        fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
        fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
 
+       dmsg_circuit_init(iocom, &iocom->circuit0);
+
        /*
         * Negotiate session crypto synchronously.  This will mark the
         * connection as error'd if it fails.  If this is a pipe it's
@@ -192,25 +174,25 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
  * the recevmsg_func and the sendmsg_func is called at least once.
  */
 void
-dmsg_router_restate(dmsg_router_t *router,
-                  void (*signal_func)(dmsg_router_t *),
+dmsg_iocom_restate(dmsg_iocom_t *iocom,
+                  void (*signal_func)(dmsg_iocom_t *),
                   void (*rcvmsg_func)(dmsg_msg_t *msg),
                   void (*altmsg_func)(dmsg_iocom_t *))
 {
-       router->signal_callback = signal_func;
-       router->rcvmsg_callback = rcvmsg_func;
-       router->altmsg_callback = altmsg_func;
+       iocom->signal_callback = signal_func;
+       iocom->rcvmsg_callback = rcvmsg_func;
+       iocom->altmsg_callback = altmsg_func;
        if (signal_func)
-               router->iocom->flags |= DMSG_IOCOMF_SWORK;
+               iocom->flags |= DMSG_IOCOMF_SWORK;
        else
-               router->iocom->flags &= ~DMSG_IOCOMF_SWORK;
+               iocom->flags &= ~DMSG_IOCOMF_SWORK;
 }
 
 void
-dmsg_router_signal(dmsg_router_t *router)
+dmsg_iocom_signal(dmsg_iocom_t *iocom)
 {
-       if (router->signal_callback)
-               router->iocom->flags |= DMSG_IOCOMF_SWORK;
+       if (iocom->signal_callback)
+               iocom->flags |= DMSG_IOCOMF_SWORK;
 }
 
 /*
@@ -256,14 +238,29 @@ dmsg_iocom_done(dmsg_iocom_t *iocom)
 }
 
 /*
+ * Initialize a circuit structure and add it to the iocom's circuit_tree.
+ * circuit0 is left out and will not be added to the tree.
+ */
+void
+dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
+{
+       circuit->iocom = iocom;
+       RB_INIT(&circuit->staterd_tree);
+       RB_INIT(&circuit->statewr_tree);
+       if (circuit->msgid)
+               RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
+}
+
+/*
  * Allocate a new one-way message.
  */
 dmsg_msg_t *
-dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd,
-                 void (*func)(dmsg_msg_t *), void *data)
+dmsg_msg_alloc(dmsg_circuit_t *circuit,
+              size_t aux_size, uint32_t cmd,
+              void (*func)(dmsg_msg_t *), void *data)
 {
+       dmsg_iocom_t *iocom = circuit->iocom;
        dmsg_state_t *state = NULL;
-       dmsg_iocom_t *iocom = router->iocom;
        dmsg_msg_t *msg;
        int hbytes;
 
@@ -280,24 +277,30 @@ dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd,
        if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
                /*
                 * Create state when CREATE is set without REPLY.
+                * Assign a unique msgid, in this case simply using
+                * the pointer value for 'state'.
                 *
                 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
                 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
+                *
+                * NOTE: state initiated by us and state initiated by
+                *       a remote create are placed in different RB trees.
+                *       The msgid for SPAN state is used in source/target
+                *       for message routing as appropriate.
                 */
                state = malloc(sizeof(*state));
                bzero(state, sizeof(*state));
                state->iocom = iocom;
+               state->circuit = circuit;
                state->flags = DMSG_STATE_DYNAMIC;
                state->msgid = (uint64_t)(uintptr_t)state;
-               state->router = router;
                state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
                state->rxcmd = DMSGF_REPLY;
+               state->icmd = state->txcmd & DMSGF_BASECMDMASK;
                state->func = func;
                state->any.any = data;
                pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(dmsg_state_tree,
-                         &iocom->router->statewr_tree,
-                         state);
+               RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
                pthread_mutex_unlock(&iocom->mtx);
                state->flags |= DMSG_STATE_INSERTED;
        }
@@ -323,14 +326,19 @@ dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd,
        if (hbytes)
                bzero(&msg->any.head, hbytes);
        msg->hdr_size = hbytes;
+       msg->any.head.magic = DMSG_HDR_MAGIC;
        msg->any.head.cmd = cmd;
        msg->any.head.aux_descr = 0;
        msg->any.head.aux_crc = 0;
-       msg->router = router;
+       msg->any.head.circuit = 0;
+       msg->circuit = circuit;
+       msg->iocom = iocom;
        if (state) {
                msg->state = state;
                state->msg = msg;
                msg->any.head.msgid = state->msgid;
+       } else {
+               msg->any.head.msgid = 0;
        }
        return (msg);
 }
@@ -344,7 +352,7 @@ static
 void
 dmsg_msg_free_locked(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
 
        msg->state = NULL;
        if (msg->aux_data)
@@ -356,7 +364,7 @@ dmsg_msg_free_locked(dmsg_msg_t *msg)
 void
 dmsg_msg_free(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
 
        pthread_mutex_lock(&iocom->mtx);
        dmsg_msg_free_locked(msg);
@@ -455,7 +463,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
 
                if (iocom->flags & DMSG_IOCOMF_SWORK) {
                        iocom->flags &= ~DMSG_IOCOMF_SWORK;
-                       iocom->router->signal_callback(iocom->router);
+                       iocom->signal_callback(iocom);
                }
 
                /*
@@ -468,7 +476,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
                        iocom->flags |= DMSG_IOCOMF_RWORK;
                        iocom->flags |= DMSG_IOCOMF_WWORK;
-                       if (TAILQ_FIRST(&iocom->router->txmsgq))
+                       if (TAILQ_FIRST(&iocom->txmsgq))
                                dmsg_iocom_flush1(iocom);
                }
 
@@ -490,14 +498,14 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                                        fprintf(stderr, "receive %s\n",
                                                dmsg_msg_str(msg));
                                }
-                               iocom->router->rcvmsg_callback(msg);
+                               iocom->rcvmsg_callback(msg);
                                dmsg_state_cleanuprx(iocom, msg);
                        }
                }
 
                if (iocom->flags & DMSG_IOCOMF_ARWORK) {
                        iocom->flags &= ~DMSG_IOCOMF_ARWORK;
-                       iocom->router->altmsg_callback(iocom);
+                       iocom->altmsg_callback(iocom);
                }
        }
 }
@@ -556,6 +564,7 @@ dmsg_ioq_read(dmsg_iocom_t *iocom)
        dmsg_ioq_t *ioq = &iocom->ioq_rx;
        dmsg_msg_t *msg;
        dmsg_state_t *state;
+       dmsg_circuit_t *circuit0;
        dmsg_hdr_t *head;
        ssize_t n;
        size_t bytes;
@@ -682,8 +691,8 @@ again:
                /*
                 * Allocate the message, the next state will fill it in.
                 */
-               msg = dmsg_msg_alloc(iocom->router, ioq->abytes, 0,
-                                       NULL, NULL);
+               msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0,
+                                    NULL, NULL);
                ioq->msg = msg;
 
                /*
@@ -925,24 +934,6 @@ again:
        }
 
        /*
-        * Handle relaying.  Transactional state is not recorded XXX
-        */
-
-       /*
-        * Process transactional state for the message.
-        */
-       if (msg && ioq->error == 0) {
-               error = dmsg_state_msgrx(msg);
-               if (error) {
-                       if (error == DMSG_IOQ_ERROR_EALREADY) {
-                               dmsg_msg_free(msg);
-                               goto again;
-                       }
-                       ioq->error = error;
-               }
-       }
-
-       /*
         * Handle error, RREQ, or completion
         *
         * NOTE: nmax and bytes are invalid at this point, we don't bother
@@ -975,16 +966,20 @@ skip:
                 * transactions, ending with a final non-transactional
                 * LNK_ERROR (that the session can detect) when no
                 * transactions remain.
+                *
+                * We only need to scan transactions on circuit0 as these
+                * will contain all circuit forges, and terminating circuit
+                * forges will automatically terminate the transactions on
+                * any other circuits as well as those circuits.
                 */
-               msg = dmsg_msg_alloc(iocom->router, 0, 0, NULL, NULL);
-               bzero(&msg->any.head, sizeof(msg->any.head));
-               msg->any.head.magic = DMSG_HDR_MAGIC;
-               msg->any.head.cmd = DMSG_LNK_ERROR;
+               circuit0 = &iocom->circuit0;
+               msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
                msg->any.head.error = ioq->error;
 
                pthread_mutex_lock(&iocom->mtx);
                dmsg_iocom_drain(iocom);
-               if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) {
+
+               if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
                        /*
                         * Active remote transactions are still present.
                         * Simulate the other end sending us a DELETE.
@@ -995,13 +990,12 @@ skip:
                        } else {
                                /*state->txcmd |= DMSGF_DELETE;*/
                                msg->state = state;
-                               msg->router = state->router;
+                               msg->iocom = iocom;
                                msg->any.head.msgid = state->msgid;
                                msg->any.head.cmd |= DMSGF_ABORT |
                                                     DMSGF_DELETE;
                        }
-               } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) !=
-                          NULL) {
+               } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
                        /*
                         * Active local transactions are still present.
                         * Simulate the other end sending us a DELETE.
@@ -1011,7 +1005,7 @@ skip:
                                msg = NULL;
                        } else {
                                msg->state = state;
-                               msg->router = state->router;
+                               msg->iocom = iocom;
                                msg->any.head.msgid = state->msgid;
                                msg->any.head.cmd |= DMSGF_ABORT |
                                                     DMSGF_DELETE |
@@ -1055,7 +1049,7 @@ skip:
                iocom->flags |= DMSG_IOCOMF_RREQ;
        } else {
                /*
-                * Return msg.
+                * Continue processing msg.
                 *
                 * The fifo has already been advanced past the message.
                 * Trivially reset the FIFO indices if possible.
@@ -1076,6 +1070,43 @@ skip:
                }
                ioq->state = DMSG_MSGQ_STATE_HEADER1;
                ioq->msg = NULL;
+
+               /*
+                * Handle message routing.  Validates non-zero sources
+                * and routes message.  Error will be 0 if the message is
+                * destined for us.
+                *
+                * State processing only occurs for messages destined for us.
+                */
+               if (msg->any.head.circuit)
+                       error = dmsg_circuit_relay(msg);
+               else
+                       error = dmsg_state_msgrx(msg);
+
+               if (error) {
+                       /*
+                        * Abort-after-closure, throw message away and
+                        * start reading another.
+                        */
+                       if (error == DMSG_IOQ_ERROR_EALREADY) {
+                               dmsg_msg_free(msg);
+                               goto again;
+                       }
+
+                       /*
+                        * msg routed, msg pointer no longer owned by us.
+                        * Go to the top and start reading another.
+                        */
+                       if (error == DMSG_IOQ_ERROR_ROUTED)
+                               goto again;
+
+                       /*
+                        * Process real error and throw away message.
+                        */
+                       ioq->error = error;
+                       goto skip;
+               }
+               /* no error, not routed.  Fall through and return msg */
        }
        return (msg);
 }
@@ -1102,8 +1133,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
        iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
-       while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) {
-               TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry);
+       while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
+               TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
                TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        pthread_mutex_unlock(&iocom->mtx);
@@ -1361,7 +1392,7 @@ dmsg_iocom_drain(dmsg_iocom_t *iocom)
 void
 dmsg_msg_write(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
        dmsg_state_t *state;
        char dummy;
 
@@ -1384,23 +1415,21 @@ dmsg_msg_write(dmsg_msg_t *msg)
                if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
                    DMSGF_CREATE) {
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
+                       state->icmd = state->txcmd & DMSGF_BASECMDMASK;
                }
                msg->any.head.msgid = state->msgid;
                assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
-               if (msg->any.head.cmd & DMSGF_CREATE)
+               if (msg->any.head.cmd & DMSGF_CREATE) {
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
-       } else {
-               msg->any.head.msgid = 0;
-               /* XXX set spanid by router */
+                       state->icmd = state->txcmd & DMSGF_BASECMDMASK;
+               }
        }
-       msg->any.head.source = 0;
-       msg->any.head.target = msg->router->target;
 
        /*
         * Queue it for output, wake up the I/O pthread.  Note that the
         * I/O thread is responsible for generating the CRCs and encryption.
         */
-       TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry);
+       TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
        dummy = 0;
        write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
        pthread_mutex_unlock(&iocom->mtx);
@@ -1422,7 +1451,6 @@ dmsg_msg_write(dmsg_msg_t *msg)
 void
 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
        dmsg_state_t *state = msg->state;
        dmsg_msg_t *nmsg;
        uint32_t cmd;
@@ -1455,15 +1483,17 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
 
        /*
         * Allocate the message and associate it with the existing state.
-        * We cannot pass MSGF_CREATE to msg_alloc() because that may
+        * We cannot pass DMSGF_CREATE to msg_alloc() because that may
         * allocate new state.  We have our state already.
         */
-       nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
+       nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
        nmsg->any.head.error = error;
+       nmsg->any.head.msgid = msg->any.head.msgid;
+       nmsg->any.head.circuit = msg->any.head.circuit;
        nmsg->state = state;
        dmsg_msg_write(nmsg);
 }
@@ -1477,7 +1507,6 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
 void
 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
        dmsg_state_t *state = msg->state;
        dmsg_msg_t *nmsg;
        uint32_t cmd;
@@ -1508,12 +1537,14 @@ dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
                        cmd |= DMSGF_REPLY;
        }
 
-       nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
+       nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
        nmsg->any.head.error = error;
+       nmsg->any.head.msgid = msg->any.head.msgid;
+       nmsg->any.head.circuit = msg->any.head.circuit;
        nmsg->state = state;
        dmsg_msg_write(nmsg);
 }
@@ -1540,12 +1571,48 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
        if (state->txcmd & DMSGF_REPLY)
                cmd |= DMSGF_REPLY;
 
-       nmsg = dmsg_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL);
+       nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
        nmsg->any.head.error = error;
+       nmsg->any.head.msgid = state->msgid;
+       nmsg->any.head.circuit = state->msg->any.head.circuit;
+       nmsg->state = state;
+       dmsg_msg_write(nmsg);
+}
+
+/*
+ * Terminate a transaction given a state structure by issuing a DELETE.
+ */
+void
+dmsg_state_result(dmsg_state_t *state, uint32_t error)
+{
+       dmsg_msg_t *nmsg;
+       uint32_t cmd = DMSG_LNK_ERROR;
+
+       /*
+        * Nothing to do if we already transmitted a delete
+        */
+       if (state->txcmd & DMSGF_DELETE)
+               return;
+
+       /*
+        * Set REPLY if the other end initiated the command.  Otherwise
+        * we are the command direction.
+        */
+       if (state->txcmd & DMSGF_REPLY)
+               cmd |= DMSGF_REPLY;
+
+       nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
+       if (state) {
+               if ((state->txcmd & DMSGF_CREATE) == 0)
+                       nmsg->any.head.cmd |= DMSGF_CREATE;
+       }
+       nmsg->any.head.error = error;
+       nmsg->any.head.msgid = state->msgid;
+       nmsg->any.head.circuit = state->msg->any.head.circuit;
        nmsg->state = state;
        dmsg_msg_write(nmsg);
 }
@@ -1557,8 +1624,8 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
  */
 
 /*
- * Process state tracking for a message after reception, prior to
- * execution.
+ * Process circuit and state tracking for a message after reception, prior
+ * to execution.
  *
  * Called with msglk held and the msg dequeued.
  *
@@ -1626,25 +1693,41 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
 static int
 dmsg_state_msgrx(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
+       dmsg_circuit_t *circuit;
        dmsg_state_t *state;
-       dmsg_state_t dummy;
+       dmsg_state_t sdummy;
+       dmsg_circuit_t cdummy;
        int error;
 
+       pthread_mutex_lock(&iocom->mtx);
+
+       /*
+        * Locate existing persistent circuit and state, if any.
+        */
+       if (msg->any.head.circuit == 0) {
+               circuit = &iocom->circuit0;
+       } else {
+               cdummy.msgid = msg->any.head.circuit;
+               circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
+                                 &cdummy);
+               if (circuit == NULL)
+                       return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
+       }
+       msg->circuit = circuit;
+       ++circuit->refs;
+
        /*
-        * Lock RB tree and locate existing persistent state, if any.
-        *
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         */
-       dummy.msgid = msg->any.head.msgid;
-       pthread_mutex_lock(&iocom->mtx);
+       sdummy.msgid = msg->any.head.msgid;
        if (msg->any.head.cmd & DMSGF_REPLY) {
-               state = RB_FIND(dmsg_state_tree,
-                               &iocom->router->statewr_tree, &dummy);
+               state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
+                               &sdummy);
        } else {
-               state = RB_FIND(dmsg_state_tree,
-                               &iocom->router->staterd_tree, &dummy);
+               state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
+                               &sdummy);
        }
        msg->state = state;
        pthread_mutex_unlock(&iocom->mtx);
@@ -1678,17 +1761,17 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                state = malloc(sizeof(*state));
                bzero(state, sizeof(*state));
                state->iocom = iocom;
+               state->circuit = circuit;
                state->flags = DMSG_STATE_DYNAMIC;
                state->msg = msg;
                state->txcmd = DMSGF_REPLY;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
+               state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
                state->flags |= DMSG_STATE_INSERTED;
                state->msgid = msg->any.head.msgid;
-               state->router = msg->router;
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(dmsg_state_tree,
-                         &iocom->router->staterd_tree, state);
+               RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
                if (DMsgDebugOpt) {
@@ -1840,11 +1923,11 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                        if (state->rxcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->statewr_tree, state);
+                                         &msg->circuit->statewr_tree, state);
                        } else {
                                assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->staterd_tree, state);
+                                         &msg->circuit->staterd_tree, state);
                        }
                        state->flags &= ~DMSG_STATE_INSERTED;
                        dmsg_state_free(state);
@@ -1865,13 +1948,14 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 static void
 dmsg_state_cleanuptx(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
        dmsg_state_t *state;
 
        if ((state = msg->state) == NULL) {
                dmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
                pthread_mutex_lock(&iocom->mtx);
+               assert((state->txcmd & DMSGF_DELETE) == 0);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
                        if (state->msg == msg)
@@ -1880,11 +1964,11 @@ dmsg_state_cleanuptx(dmsg_msg_t *msg)
                        if (state->txcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->staterd_tree, state);
+                                         &msg->circuit->staterd_tree, state);
                        } else {
                                assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->statewr_tree, state);
+                                         &msg->circuit->statewr_tree, state);
                        }
                        state->flags &= ~DMSG_STATE_INSERTED;
                        dmsg_state_free(state);
@@ -1904,9 +1988,7 @@ dmsg_state_cleanuptx(dmsg_msg_t *msg)
 void
 dmsg_state_free(dmsg_state_t *state)
 {
-       dmsg_iocom_t *iocom = state->iocom;
        dmsg_msg_t *msg;
-       char dummy;
 
        if (DMsgDebugOpt) {
                fprintf(stderr, "terminate state %p id=%08x\n",
@@ -1918,101 +2000,48 @@ dmsg_state_free(dmsg_state_t *state)
        if (msg)
                dmsg_msg_free_locked(msg);
        free(state);
-
-       /*
-        * When an iocom error is present we are trying to close down the
-        * iocom, but we have to wait for all states to terminate before
-        * we can do so.  The iocom rx code will terminate the receive side
-        * for all transactions by simulating incoming DELETE messages,
-        * but the state doesn't go away until both sides are terminated.
-        *
-        * We may have to wake up the rx code.
-        */
-       if (iocom->ioq_rx.error &&
-           RB_EMPTY(&iocom->router->staterd_tree) &&
-           RB_EMPTY(&iocom->router->statewr_tree)) {
-               dummy = 0;
-               write(iocom->wakeupfds[1], &dummy, 1);
-       }
 }
 
-/************************************************************************
- *                             ROUTING                                 *
- ************************************************************************
- *
- * Incoming messages are routed by their spanid, matched up against
- * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c).
- * Any replies run through the same router.
- *
- * Originated messages are routed by their spanid, matched up against
- * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c).
- * Replies come back through the same route.
- *
- * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular
- * transaction runs through the same route.  Commands and replies both.
- *
- * An originated message will use a different routing spanid to
- * reach a target node than a message which originates from that node.
- * They might use the same physical pipes (each pipe can have multiple
- * SPANs and RELAYs), but the routes are distinct from the perspective
- * of the router.
+/*
+ * Called with iocom locked
  */
-dmsg_router_t *
-dmsg_router_alloc(void)
-{
-       dmsg_router_t *router;
-
-       router = dmsg_alloc(sizeof(*router));
-       TAILQ_INIT(&router->txmsgq);
-       return (router);
-}
-
 void
-dmsg_router_connect(dmsg_router_t *router)
+dmsg_circuit_drop(dmsg_circuit_t *circuit)
 {
-       dmsg_router_t *tmp;
-
-       assert(router->link || router->relay);
-       assert((router->flags & DMSG_ROUTER_CONNECTED) == 0);
-
-       pthread_mutex_lock(&router_mtx);
-       if (router->link)
-               tmp = RB_INSERT(dmsg_router_tree, &router_ltree, router);
-       else
-               tmp = RB_INSERT(dmsg_router_tree, &router_rtree, router);
-       assert(tmp == NULL);
-       router->flags |= DMSG_ROUTER_CONNECTED;
-       pthread_mutex_unlock(&router_mtx);
-}
+       dmsg_iocom_t *iocom = circuit->iocom;
+       char dummy;
 
-void
-dmsg_router_disconnect(dmsg_router_t **routerp)
-{
-       dmsg_router_t *router;
+       assert(circuit->refs > 0);
+       assert(iocom);
 
-       router = *routerp;
-       assert(router->link || router->relay);
-       assert(router->flags & DMSG_ROUTER_CONNECTED);
+       /*
+        * Decrement circuit refs, destroy circuit when refs drops to 0.
+        */
+       if (--circuit->refs > 0)
+               return;
 
-       pthread_mutex_lock(&router_mtx);
-       if (router->link)
-               RB_REMOVE(dmsg_router_tree, &router_ltree, router);
-       else
-               RB_REMOVE(dmsg_router_tree, &router_rtree, router);
-       router->flags &= ~DMSG_ROUTER_CONNECTED;
-       *routerp = NULL;
-       pthread_mutex_unlock(&router_mtx);
-}
+       assert(RB_EMPTY(&circuit->staterd_tree));
+       assert(RB_EMPTY(&circuit->statewr_tree));
+       RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
+       circuit->iocom = NULL;
+       dmsg_free(circuit);
 
-#if 0
-/*
- * XXX
- */
-dmsg_router_t *
-dmsg_route_msg(dmsg_msg_t *msg)
-{
+       /*
+        * When an iocom error is present the rx code will terminate the
+        * receive side for all transactions and (indirectly) all circuits
+        * by simulating DELETE messages.  The state and related circuits
+        * don't disappear until the related states are closed in both
+        * directions
+        *
+        * Detect the case where the last circuit is now gone (and thus all
+        * states for all circuits are gone), and wakeup the rx thread to
+        * complete the termination.
+        */
+       if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
+               dummy = 0;
+               write(iocom->wakeupfds[1], &dummy, 1);
+       }
 }
-#endif
 
 /*
  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
@@ -2026,8 +2055,8 @@ dmsg_bswap_head(dmsg_hdr_t *head)
        head->salt      = bswap32(head->salt);
 
        head->msgid     = bswap64(head->msgid);
-       head->source    = bswap64(head->source);
-       head->target    = bswap64(head->target);
+       head->circuit   = bswap64(head->circuit);
+       head->reserved18= bswap64(head->reserved18);
 
        head->cmd       = bswap32(head->cmd);
        head->aux_crc   = bswap32(head->aux_crc);
index 5e7eb27..c76da55 100644 (file)
  * SUCH DAMAGE.
  */
 /*
- * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
- *
- * This code supports the LNK_SPAN protocol.  Essentially all PFS's
- * clients and services rendezvous with the userland hammer2 service and
- * open LNK_SPAN transactions using a message header linkid of 0,
- * registering any PFS's they have connectivity to with us.
- *
- * --
- *
- * Each registration maintains its own open LNK_SPAN message transaction.
- * The SPANs are collected, aggregated, and retransmitted over available
- * connections through the maintainance of additional LNK_SPAN message
- * transactions on each link.
- *
- * The msgid for each active LNK_SPAN transaction we receive allows us to
- * send a message to the target PFS (which might be one of many belonging
- * to the same cluster), by specifying that msgid as the linkid in any
- * message we send to the target PFS.
- *
- * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
- * (and remember we will maintain multiple open LNK_SPAN transactions on
- * each connection representing the topology span, so every node sees every
- * other node as a separate open transaction).  So, similarly the msgid for
- * these active transactions which we initiated can be used by the other
- * end to route messages through us to another node, ultimately winding up
- * at the identified hammer2 PFS.  We have to adjust the spanid in the message
- * header at each hop to be representative of the outgoing LNK_SPAN we
- * are forwarding the message through.
- *
- * --
- *
- * If we were to retransmit every LNK_SPAN transaction we receive it would
- * create a huge mess, so we have to aggregate all received LNK_SPAN
- * transactions, sort them by the fsid (the cluster) and sub-sort them by
- * the pfs_fsid (individual nodes in the cluster), and only retransmit
- * (create outgoing transactions) for a subset of the nearest distance-hops
- * for each individual node.
- *
- * The higher level protocols can then issue transactions to the nodes making
- * up a cluster to perform all actions required.
- *
- * --
- *
- * Since this is a large topology and a spanning tree protocol, links can
- * go up and down all the time.  Any time a link goes down its transaction
- * is closed.  The transaction has to be closed on both ends before we can
- * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
- * closed may have been propagated out to other connections and those related
- * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
- * go away, ultimately reaching all sources and all targets.
- *
- * Any messages in-transit using a route that goes away will be thrown away.
- * Open transactions are only tracked at the two end-points.  When a link
- * failure propagates to an end-point the related open transactions lose
- * their spanid and are automatically aborted.
- *
- * It is important to note that internal route nodes cannot just associate
- * a lost LNK_SPAN transaction with another route to the same destination.
- * Message transactions MUST be serialized and MUST be ordered.  All messages
- * for a transaction must run over the same route.  So if the route used by
- * an active transaction is lost, the related messages will be fully aborted
- * and the higher protocol levels will retry as appropriate.
- *
- * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
- * back to the originator.  Only the originator keeps tracks of a message.
- * Routers just pass it through.  If a route is lost during transit the
- * message is simply thrown away.
- *
- * It is also important to note that several paths to the same PFS can be
- * propagated along the same link, which allows concurrency and even
- * redundancy over several network interfaces or via different routes through
- * the topology.  Any given transaction will use only a single route but busy
- * servers will often have hundreds of transactions active simultaniously,
- * so having multiple active paths through the network topology for A<->B
- * will improve performance.
- *
- * --
- *
- * Most protocols consolidate operations rather than simply relaying them.
- * This is particularly true of LEAF protocols (such as strict HAMMER2
- * clients), of which there can be millions connecting into the cluster at
- * various points.  The SPAN protocol is not used for these LEAF elements.
- *
- * Instead the primary service they connect to implements a proxy for the
- * client protocols so the core topology only has to propagate a couple of
- * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
- * core master nodes and satellite slaves and cache nodes.
+ * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an
+ * involved explanation of the protocol.
  */
 
 #include "dmsg_local.h"
 
+void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op);
+
 /*
  * Maximum spanning tree distance.  This has the practical effect of
  * stopping tail-chasing closed loops when a feeder span is lost.
@@ -193,6 +110,7 @@ RB_HEAD(h2span_cluster_tree, h2span_cluster);
 RB_HEAD(h2span_node_tree, h2span_node);
 RB_HEAD(h2span_link_tree, h2span_link);
 RB_HEAD(h2span_relay_tree, h2span_relay);
+uint32_t DMsgRNSS;
 
 /*
  * This represents a media
@@ -251,46 +169,42 @@ struct h2span_node {
        uint8_t pfs_type;
        uuid_t  pfs_fsid;               /* unique fsid */
        char    fs_label[128];          /* fs label (typ PEER_HAMMER2) */
+       void    *opaque;
 };
 
 struct h2span_link {
        RB_ENTRY(h2span_link) rbnode;
        dmsg_state_t    *state;         /* state<->link */
        struct h2span_node *node;       /* related node */
-       int32_t dist;
+       uint32_t        dist;
+       uint32_t        rnss;
        struct h2span_relay_queue relayq; /* relay out */
-       struct dmsg_router *router;     /* route out this link */
 };
 
 /*
  * Any LNK_SPAN transactions we receive which are relayed out other
- * connections utilize this structure to track the LNK_SPAN transaction
- * we initiate on the other connections, if selected for relay.
+ * connections utilize this structure to track the LNK_SPAN transactions
+ * we initiate (relay out) on other connections.  We only relay out
+ * LNK_SPANs on connections we have an open CONN transaction for.
+ *
+ * The relay structure points to the outgoing LNK_SPAN trans (out_state)
+ * and to the incoming LNK_SPAN transaction (in_state).  The relay
+ * structure holds refs on the related states.
  *
  * In many respects this is the core of the protocol... actually figuring
  * out what LNK_SPANs to relay.  The spanid used for relaying is the
  * address of the 'state' structure, which is why h2span_relay has to
  * be entered into a RB-TREE based at h2span_conn (so we can look
  * up the spanid to validate it).
- *
- * NOTE: Messages can be received via the LNK_SPAN transaction the
- *      relay maintains, and can be replied via relay->router, but
- *      messages are NOT initiated via a relay.  Messages are initiated
- *      via incoming links (h2span_link's).
- *
- *      relay->link represents the link being relayed, NOT the LNK_SPAN
- *      transaction the relay is holding open.
  */
 struct h2span_relay {
-       RB_ENTRY(h2span_relay) rbnode;  /* from h2span_conn */
-       TAILQ_ENTRY(h2span_relay) entry; /* from link */
-       struct h2span_conn *conn;
-       dmsg_state_t    *state;         /* transmitted LNK_SPAN */
-       struct h2span_link *link;       /* LNK_SPAN being relayed */
-       struct dmsg_router      *router;/* route out this relay */
+       TAILQ_ENTRY(h2span_relay) entry;        /* from link */
+       RB_ENTRY(h2span_relay) rbnode;          /* from h2span_conn */
+       struct h2span_conn      *conn;          /* related CONN transaction */
+       dmsg_state_t            *source_rt;     /* h2span_link state */
+       dmsg_state_t            *target_rt;     /* h2span_relay state */
 };
 
-
 typedef struct h2span_media h2span_media_t;
 typedef struct h2span_conn h2span_conn_t;
 typedef struct h2span_cluster h2span_cluster_t;
@@ -300,6 +214,10 @@ typedef struct h2span_relay h2span_relay_t;
 
 #define dmsg_termstr(array)    _dmsg_termstr((array), sizeof(array))
 
+static h2span_relay_t *dmsg_generate_relay(h2span_conn_t *conn,
+                                       h2span_link_t *slink);
+static uint32_t dmsg_rnss(void);
+
 static __inline
 void
 _dmsg_termstr(char *base, size_t size)
@@ -328,7 +246,9 @@ h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
 }
 
 /*
- * Match against the uuid.  Currently we never match against the label.
+ * Match against fs_label/pfs_fsid.  Together these two items represent a
+ * unique node.  In most cases the primary differentiator is pfs_fsid but
+ * we also string-match fs_label.
  */
 static
 int
@@ -336,7 +256,9 @@ h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
 {
        int r;
 
-       r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
+       r = strcmp(node1->fs_label, node2->fs_label);
+       if (r == 0)
+               r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL);
        return (r);
 }
 
@@ -356,6 +278,10 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+       if (link1->rnss < link2->rnss)
+               return(-1);
+       if (link1->rnss > link2->rnss)
+               return(1);
 #if 1
        if ((uintptr_t)link1->state < (uintptr_t)link2->state)
                return(-1);
@@ -379,8 +305,8 @@ static
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
-       h2span_link_t *link1 = relay1->link;
-       h2span_link_t *link2 = relay2->link;
+       h2span_link_t *link1 = relay1->source_rt->any.link;
+       h2span_link_t *link2 = relay2->source_rt->any.link;
 
        if ((intptr_t)link1->node < (intptr_t)link2->node)
                return(-1);
@@ -390,6 +316,10 @@ h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
+       if (link1->rnss < link2->rnss)
+               return(-1);
+       if (link1->rnss > link2->rnss)
+               return(1);
 #if 1
        if ((uintptr_t)link1->state < (uintptr_t)link2->state)
                return(-1);
@@ -432,6 +362,7 @@ static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
 
 static void dmsg_lnk_span(dmsg_msg_t *msg);
 static void dmsg_lnk_conn(dmsg_msg_t *msg);
+static void dmsg_lnk_circ(dmsg_msg_t *msg);
 static void dmsg_lnk_relay(dmsg_msg_t *msg);
 static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node);
 static void dmsg_relay_delete(h2span_relay_t *relay);
@@ -442,7 +373,7 @@ static void dmsg_volconf_start(h2span_media_config_t *conf,
                                const char *hostname);
 
 void
-dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
+dmsg_msg_lnk_signal(dmsg_iocom_t *iocom __unused)
 {
        pthread_mutex_lock(&cluster_mtx);
        dmsg_relay_scan(NULL, NULL);
@@ -450,20 +381,28 @@ dmsg_msg_lnk_signal(dmsg_router_t *router __unused)
 }
 
 /*
- * Receive a DMSG_PROTO_LNK message.  This only called for
- * one-way and opening-transactions since state->func will be assigned
- * in all other cases.
+ * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK.
+ *           (incoming iocom lock not held)
+ *
+ * This function is typically called for one-way and opening-transactions
+ * since state->func is assigned after that, but it will also be called
+ * if no state->func is assigned on transaction-open.
  */
 void
 dmsg_msg_lnk(dmsg_msg_t *msg)
 {
-       switch(msg->any.head.cmd & DMSGF_BASECMDMASK) {
+       uint32_t icmd = msg->state ? msg->state->icmd : msg->any.head.cmd;
+
+       switch(icmd & DMSGF_BASECMDMASK) {
        case DMSG_LNK_CONN:
                dmsg_lnk_conn(msg);
                break;
        case DMSG_LNK_SPAN:
                dmsg_lnk_span(msg);
                break;
+       case DMSG_LNK_CIRC:
+               dmsg_lnk_circ(msg);
+               break;
        default:
                fprintf(stderr,
                        "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
@@ -473,6 +412,13 @@ dmsg_msg_lnk(dmsg_msg_t *msg)
        }
 }
 
+/*
+ * LNK_CONN - iocom identify message reception.
+ *           (incoming iocom lock not held)
+ *
+ * Remote node identifies itself to us, sets up a SPAN filter, and gives us
+ * the ok to start transmitting SPANs.
+ */
 void
 dmsg_lnk_conn(dmsg_msg_t *msg)
 {
@@ -505,6 +451,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
                conn = dmsg_alloc(sizeof(*conn));
 
                RB_INIT(&conn->tree);
+               state->iocom->conn = conn;      /* XXX only one */
                conn->state = state;
                state->func = dmsg_lnk_conn;
                state->any.conn = conn;
@@ -529,7 +476,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
                if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
                        dmsg_msg_result(msg, 0);
-                       dmsg_router_signal(msg->router);
+                       dmsg_iocom_signal(msg->iocom);
                        break;
                }
                /* FALL THROUGH */
@@ -590,6 +537,7 @@ deleteconn:
                conn->media = NULL;
                conn->state = NULL;
                msg->state->any.conn = NULL;
+               msg->state->iocom->conn = NULL;
                TAILQ_REMOVE(&connq, conn, entry);
                dmsg_free(conn);
 
@@ -642,6 +590,13 @@ deleteconn:
        pthread_mutex_unlock(&cluster_mtx);
 }
 
+/*
+ * LNK_SPAN - Spanning tree protocol message reception
+ *           (incoming iocom lock not held)
+ *
+ * Receive a spanning tree transactional message, creating or destroying
+ * a SPAN and propagating it to other iocoms.
+ */
 void
 dmsg_lnk_span(dmsg_msg_t *msg)
 {
@@ -692,16 +647,23 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                 * Find the node
                 */
                dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
+               bcopy(msg->any.lnk_span.fs_label, dummy_node.fs_label,
+                     sizeof(dummy_node.fs_label));
                node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
                if (node == NULL) {
                        node = dmsg_alloc(sizeof(*node));
                        node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
+                       node->pfs_type = msg->any.lnk_span.pfs_type;
                        bcopy(msg->any.lnk_span.fs_label,
                              node->fs_label,
                              sizeof(node->fs_label));
                        node->cls = cls;
                        RB_INIT(&node->tree);
                        RB_INSERT(h2span_node_tree, &cls->tree, node);
+                       if (dmsg_node_handler) {
+                               dmsg_node_handler(&node->opaque, msg,
+                                                 DMSG_NODEOP_ADD);
+                       }
                }
 
                /*
@@ -712,27 +674,15 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                TAILQ_INIT(&slink->relayq);
                slink->node = node;
                slink->dist = msg->any.lnk_span.dist;
+               slink->rnss = msg->any.lnk_span.rnss;
                slink->state = state;
                state->any.link = slink;
 
-               /*
-                * Embedded router structure in link for message forwarding.
-                *
-                * The spanning id for the router is the message id of
-                * the SPAN link it is embedded in, allowing messages to
-                * be routed via &slink->router.
-                */
-               slink->router = dmsg_router_alloc();
-               slink->router->iocom = state->iocom;
-               slink->router->link = slink;
-               slink->router->target = state->msgid;
-               dmsg_router_connect(slink->router);
-
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
                fprintf(stderr,
                        "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n",
-                       msg->router->iocom,
+                       msg->iocom,
                        slink,
                        dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc),
                        msg->any.lnk_span.cl_label,
@@ -742,7 +692,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
 #if 0
                dmsg_relay_scan(NULL, node);
 #endif
-               dmsg_router_signal(msg->router);
+               dmsg_iocom_signal(msg->iocom);
        }
 
        /*
@@ -755,7 +705,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                cls = node->cls;
 
                fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n",
-                       msg->router->iocom,
+                       msg->iocom,
                        slink,
                        dmsg_uuid_to_str(&cls->pfs_clid, &alloc),
                        state->msg->any.lnk_span.cl_label,
@@ -764,11 +714,6 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                free(alloc);
 
                /*
-                * Remove the router from consideration
-                */
-               dmsg_router_disconnect(&slink->router);
-
-               /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
                 */
@@ -782,6 +727,10 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                RB_REMOVE(h2span_link_tree, &node->tree, slink);
                if (RB_EMPTY(&node->tree)) {
                        RB_REMOVE(h2span_node_tree, &cls->tree, node);
+                       if (dmsg_node_handler) {
+                               dmsg_node_handler(&node->opaque, msg,
+                                                 DMSG_NODEOP_DEL);
+                       }
                        if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
                                RB_REMOVE(h2span_cluster_tree,
                                          &cluster_tree, cls);
@@ -812,35 +761,219 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                        dmsg_relay_scan(NULL, node);
 #endif
                if (node)
-                       dmsg_router_signal(msg->router);
+                       dmsg_iocom_signal(msg->iocom);
        }
 
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 /*
- * Messages received on relay SPANs.  These are open transactions so it is
- * in fact possible for the other end to close the transaction.
+ * LNK_CIRC - Virtual circuit protocol message reception
+ *           (incoming iocom lock not held)
  *
- * XXX MPRACE on state structure
+ * Handles all cases.
  */
-static void
-dmsg_lnk_relay(dmsg_msg_t *msg)
+void
+dmsg_lnk_circ(dmsg_msg_t *msg)
 {
-       dmsg_state_t *state = msg->state;
-       h2span_relay_t *relay;
+       dmsg_circuit_t *circA;
+       dmsg_circuit_t *circB;
+       dmsg_state_t *rx_state;
+       dmsg_state_t *tx_state;
+       dmsg_state_t *state;
+       dmsg_state_t dummy;
+       dmsg_msg_t *fwd_msg;
+       dmsg_iocom_t *iocomA;
+       dmsg_iocom_t *iocomB;
+
+       /*pthread_mutex_lock(&cluster_mtx);*/
+
+       switch (msg->any.head.cmd & (DMSGF_CREATE |
+                                    DMSGF_DELETE |
+                                    DMSGF_REPLY)) {
+       case DMSGF_CREATE:
+       case DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * (A) wishes to establish a virtual circuit through us to (B).
+                * (B) is specified by lnk_circ.target (the message id for
+                * a LNK_SPAN that (A) received from us which represents (B)).
+                *
+                * Designate the originator of the circuit (the current
+                * remote end) as (A) and the other side as (B).
+                *
+                * Accept the VC but do not reply.  We will wait for the end-
+                * to-end reply to propagate back.
+                */
+               iocomA = msg->iocom;
 
-       assert(msg->any.head.cmd & DMSGF_REPLY);
+               /*
+                * Locate the open transaction state that the other end
+                * specified in <target>.  This will be an open SPAN
+                * transaction that we transmitted (h2span_relay) over
+                * the interface the LNK_CIRC is being received on.
+                *
+                * (all LNK_CIRC's that we transmit are on circuit0)
+                */
+               pthread_mutex_lock(&iocomA->mtx);
+               dummy.msgid = msg->any.lnk_circ.target;
+               tx_state = RB_FIND(dmsg_state_tree,
+                                  &iocomA->circuit0.statewr_tree,
+                                  &dummy);
+               /* XXX state refs */
+               assert(tx_state);
+               pthread_mutex_unlock(&iocomA->mtx);
+
+               /* locate h2span_link */
+               rx_state = tx_state->any.relay->source_rt;
 
-       if (msg->any.head.cmd & DMSGF_DELETE) {
-               pthread_mutex_lock(&cluster_mtx);
-               if ((relay = state->any.relay) != NULL) {
-                       dmsg_relay_delete(relay);
-               } else {
-                       dmsg_state_reply(state, 0);
+               /*
+                * A wishes to establish a VC through us to the
+                * specified target.
+                *
+                * A sends us the msgid of an open SPAN transaction
+                * it received from us as <target>.
+                */
+               circA = dmsg_alloc(sizeof(*circA));
+               circA->iocom = iocomA;
+               circA->state = msg->state;      /* LNK_CIRC state */
+               circA->msgid = msg->state->msgid;
+               circA->span_state = tx_state;   /* H2SPAN_RELAY state */
+               circA->is_relay = 1;
+               circA->refs = 2;                /* state and peer */
+               msg->state->any.circ = circA;
+
+               iocomB = rx_state->iocom;
+
+               circB = dmsg_alloc(sizeof(*circB));
+
+               /*
+                * Create a LNK_CIRC transaction on B
+                */
+               fwd_msg = dmsg_msg_alloc(&iocomB->circuit0,
+                                        0, DMSG_LNK_CIRC | DMSGF_CREATE,
+                                        dmsg_lnk_circ, circB);
+               fwd_msg->state->any.circ = circB;
+               circB->iocom = iocomB;
+               circB->state = fwd_msg->state;  /* LNK_CIRC state */
+               circB->msgid = fwd_msg->any.head.msgid;
+               circB->span_state = rx_state;   /* H2SPAN_LINK state */
+               circB->is_relay = 0;
+               circB->refs = 2;                /* state and peer */
+
+               /*
+                * Link the two circuits together.
+                */
+               circA->peer = circB;
+               circB->peer = circA;
+
+               if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
+                       assert(0);
+               if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
+                       assert(0);
+
+               dmsg_msg_write(fwd_msg);
+
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0)
+                       break;
+               /* FALL THROUGH TO DELETE */
+       case DMSGF_DELETE:
+               /*
+                * (A) Is deleting the virtual circuit, propogate closure
+                * to (B).
+                */
+               iocomA = msg->iocom;
+               circA = msg->state->any.circ;
+               circB = circA->peer;
+               assert(msg->state == circA->state);
+
+               /*
+                * If we are closing A and the peer B is closed, disconnect.
+                */
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->rxcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       dmsg_state_reply(state, msg->any.head.error);
                }
-               pthread_mutex_unlock(&cluster_mtx);
+
+               /*
+                * If both sides now closed terminate the peer association
+                * and the state association.  This may drop up to two refs
+                * on circA and one on circB.
+                */
+               if (circA->state->txcmd & DMSGF_DELETE) {
+                       if (circB) {
+                               circA->peer = NULL;
+                               circB->peer = NULL;
+                               dmsg_circuit_drop(circA);
+                               dmsg_circuit_drop(circB); /* XXX SMP */
+                       }
+                       circA->state->any.circ = NULL;
+                       circA->state = NULL;
+                       dmsg_circuit_drop(circA);
+               }
+               break;
+       case DMSGF_REPLY | DMSGF_CREATE:
+       case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * (B) is acknowledging the creation of the virtual
+                * circuit.  This propagates all the way back to (A), though
+                * it should be noted that (A) can start issuing commands
+                * via the virtual circuit before seeing this reply.
+                */
+               circB = msg->state->any.circ;
+               circA = circB->peer;
+               assert(msg->state == circB->state);
+               if (circA && (msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                       dmsg_state_result(circA->state, msg->any.head.error);
+                       break;
+               }
+               /* FALL THROUGH TO DELETE */
+       case DMSGF_REPLY | DMSGF_DELETE:
+               /*
+                * (B) Is deleting the virtual circuit or acknowledging
+                * our deletion of the virtual circuit, propogate closure
+                * to (A).
+                */
+               iocomB = msg->iocom;
+               circB = msg->state->any.circ;
+               circA = circB->peer;
+               assert(msg->state == circB->state);
+
+               /*
+                * If we are closing A and the peer B is closed, disconnect.
+                */
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->rxcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       dmsg_state_reply(state, msg->any.head.error);
+               }
+
+               /*
+                * If both sides now closed terminate the peer association
+                * and the state association.  This may drop up to two refs
+                * on circA and one on circB.
+                */
+               if (circB->state->txcmd & DMSGF_DELETE) {
+                       if (circA) {
+                               circB->peer = NULL;
+                               circA->peer = NULL;
+                               dmsg_circuit_drop(circB);
+                               dmsg_circuit_drop(circA); /* XXX SMP */
+                       }
+                       circB->state->any.circ = NULL;
+                       circB->state = NULL;
+                       dmsg_circuit_drop(circB);
+               }
+               break;
        }
+
+       /*pthread_mutex_lock(&cluster_mtx);*/
 }
 
 /*
@@ -911,9 +1044,9 @@ dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg)
 {
        struct relay_scan_info *info = arg;
 
-       if ((intptr_t)relay->link->node < (intptr_t)info->node)
+       if ((intptr_t)relay->source_rt->any.link->node < (intptr_t)info->node)
                return(-1);
-       if ((intptr_t)relay->link->node > (intptr_t)info->node)
+       if ((intptr_t)relay->source_rt->any.link->node > (intptr_t)info->node)
                return(1);
        return(0);
 }
@@ -936,8 +1069,10 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
        h2span_link_t *slink;
        dmsg_lnk_conn_t *lconn;
        dmsg_lnk_span_t *lspan;
-       dmsg_msg_t *msg;
-       int count = 2;
+       int count;
+       int maxcount = 2;
+       uint32_t lastdist = DMSG_SPAN_MAXDIST;
+       uint32_t lastrnss = 0;
 
        info.node = node;
        info.relay = NULL;
@@ -951,7 +1086,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
        relay = info.relay;
        info.relay = NULL;
        if (relay)
-               assert(relay->link->node == node);
+               assert(relay->source_rt->any.link->node == node);
 
        if (DMsgDebugOpt > 8)
                fprintf(stderr, "relay scan for connection %p\n", conn);
@@ -969,15 +1104,39 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         *  removed the relay, so the relay can only match exactly or
         *  be worse).
         */
+       count = 0;
        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                /*
+                * Increment count of successful relays.  This isn't
+                * quite accurate if we break out but nothing after
+                * the loop uses (count).
+                *
+                * If count exceeds the maximum number of relays we desire
+                * we normally want to break out.  However, in order to
+                * guarantee a symmetric path we have to continue if both
+                * (dist) and (rnss) continue to match.  Otherwise the SPAN
+                * propagation in the reverse direction may choose different
+                * routes and we will not have a symmetric path.
+                *
+                * NOTE: Spanning tree does not have to be symmetrical so
+                *       this code is not currently enabled.
+                */
+               if (++count >= maxcount) {
+#ifdef REQUIRE_SYMMETRICAL
+                       if (lastdist != slink->dist || lastrnss != slink->rnss)
+                               break;
+#else
+                       break;
+#endif
+                       /* go beyond the nominal maximum desired relays */
+               }
+
+               /*
                 * Match, relay already in-place, get the next
                 * relay to match against the next slink.
                 */
-               if (relay && relay->link == slink) {
+               if (relay && relay->source_rt->any.link == slink) {
                        relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
-                       if (--count == 0)
-                               break;
                        continue;
                }
 
@@ -1004,11 +1163,16 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                 *
                 * Don't bother transmitting if the remote connection
                 * is not accepting this SPAN's peer_type.
+                *
+                * pfs_mask is typically used so pure clients can filter
+                * out receiving SPANs for other pure clients.
                 */
                lspan = &slink->state->msg->any.lnk_span;
                lconn = &conn->state->msg->any.lnk_conn;
                if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0)
                        break;
+               if (((1LLU << lspan->pfs_type) & lconn->pfs_mask) == 0)
+                       break;
 
                /*
                 * Do not give pure clients visibility to other pure clients
@@ -1042,7 +1206,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                }
 
                /*
-                * NOTE! fs_uuid differentiates nodes within the same cluster
+                * NOTE! pfs_fsid differentiates nodes within the same cluster
                 *       so we obviously don't want to match those.  Similarly
                 *       for fs_label.
                 */
@@ -1051,47 +1215,17 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
                 * Ok, we've accepted this SPAN for relaying.
                 */
                assert(relay == NULL ||
-                      relay->link->node != slink->node ||
-                      relay->link->dist >= slink->dist);
-               relay = dmsg_alloc(sizeof(*relay));
-               relay->conn = conn;
-               relay->link = slink;
-
-               msg = dmsg_msg_alloc(conn->state->iocom->router, 0,
-                                       DMSG_LNK_SPAN |
-                                       DMSGF_CREATE,
-                                       dmsg_lnk_relay, relay);
-               relay->state = msg->state;
-               relay->router = dmsg_router_alloc();
-               relay->router->iocom = relay->state->iocom;
-               relay->router->relay = relay;
-               relay->router->target = relay->state->msgid;
-
-               msg->any.lnk_span = slink->state->msg->any.lnk_span;
-               msg->any.lnk_span.dist = slink->dist + 1;
-
-               dmsg_router_connect(relay->router);
-
-               RB_INSERT(h2span_relay_tree, &conn->tree, relay);
-               TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
-
-               dmsg_msg_write(msg);
-
-               fprintf(stderr,
-                       "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
-                       "FD %d state %p\n",
-                       slink,
-                       relay,
-                       node->cls, node, slink->dist,
-                       conn->state->iocom->sock_fd, relay->state);
+                      relay->source_rt->any.link->node != slink->node ||
+                      relay->source_rt->any.link->dist >= slink->dist);
+               relay = dmsg_generate_relay(conn, slink);
+               lastdist = slink->dist;
+               lastrnss = slink->rnss;
 
                /*
                 * Match (created new relay), get the next relay to
                 * match against the next slink.
                 */
                relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
-               if (--count == 0)
-                       break;
        }
 
        /*
@@ -1099,38 +1233,102 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         * the node are in excess of the current aggregate spanning state
         * and should be removed.
         */
-       while (relay && relay->link->node == node) {
+       while (relay && relay->source_rt->any.link->node == node) {
                next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
                dmsg_relay_delete(relay);
                relay = next_relay;
        }
 }
 
+/*
+ * Helper function to generate missing relay.
+ *
+ * cluster_mtx must be held
+ */
+static
+h2span_relay_t *
+dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
+{
+       h2span_relay_t *relay;
+       h2span_node_t *node;
+       dmsg_msg_t *msg;
+
+       node = slink->node;
+
+       relay = dmsg_alloc(sizeof(*relay));
+       relay->conn = conn;
+       relay->source_rt = slink->state;
+       /* relay->source_rt->any.link = slink; */
+
+       /*
+        * NOTE: relay->target_rt->any.relay set to relay by alloc.
+        */
+       msg = dmsg_msg_alloc(&conn->state->iocom->circuit0,
+                            0, DMSG_LNK_SPAN | DMSGF_CREATE,
+                            dmsg_lnk_relay, relay);
+       relay->target_rt = msg->state;
+
+       msg->any.lnk_span = slink->state->msg->any.lnk_span;
+       msg->any.lnk_span.dist = slink->dist + 1;
+       msg->any.lnk_span.rnss = slink->rnss + dmsg_rnss();
+
+       RB_INSERT(h2span_relay_tree, &conn->tree, relay);
+       TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
+
+       dmsg_msg_write(msg);
+
+       return (relay);
+}
+
+/*
+ * Messages received on relay SPANs.  These are open transactions so it is
+ * in fact possible for the other end to close the transaction.
+ *
+ * XXX MPRACE on state structure
+ */
+static void
+dmsg_lnk_relay(dmsg_msg_t *msg)
+{
+       dmsg_state_t *state = msg->state;
+       h2span_relay_t *relay;
+
+       assert(msg->any.head.cmd & DMSGF_REPLY);
+
+       if (msg->any.head.cmd & DMSGF_DELETE) {
+               pthread_mutex_lock(&cluster_mtx);
+               if ((relay = state->any.relay) != NULL) {
+                       dmsg_relay_delete(relay);
+               } else {
+                       dmsg_state_reply(state, 0);
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+}
+
+
 static
 void
 dmsg_relay_delete(h2span_relay_t *relay)
 {
        fprintf(stderr,
                "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
-               relay->link,
+               relay->source_rt->any.link,
                relay,
-               relay->link->node->cls, relay->link->node,
-               relay->link->dist,
-               relay->conn->state->iocom->sock_fd, relay->state);
-
-       dmsg_router_disconnect(&relay->router);
+               relay->source_rt->any.link->node->cls, relay->source_rt->any.link->node,
+               relay->source_rt->any.link->dist,
+               relay->conn->state->iocom->sock_fd, relay->target_rt);
 
        RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
-       TAILQ_REMOVE(&relay->link->relayq, relay, entry);
+       TAILQ_REMOVE(&relay->source_rt->any.link->relayq, relay, entry);
 
-       if (relay->state) {
-               relay->state->any.relay = NULL;
-               dmsg_state_reply(relay->state, 0);
+       if (relay->target_rt) {
+               relay->target_rt->any.relay = NULL;
+               dmsg_state_reply(relay->target_rt, 0);
                /* state invalid after reply */
-               relay->state = NULL;
+               relay->target_rt = NULL;
        }
        relay->conn = NULL;
-       relay->link = NULL;
+       relay->source_rt = NULL;
        dmsg_free(relay);
 }
 
@@ -1226,6 +1424,48 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname)
 }
 
 /************************************************************************
+ *                     MESSAGE ROUTING AND SOURCE VALIDATION           *
+ ************************************************************************/
+
+int
+dmsg_circuit_relay(dmsg_msg_t *msg)
+{
+       dmsg_iocom_t *iocom = msg->iocom;
+       dmsg_circuit_t *circ;
+       dmsg_circuit_t *peer;
+       dmsg_circuit_t dummy;
+       int error = 0;
+
+       /*
+        * Relay occurs before any state processing, msg state should always
+        * be NULL.
+        */
+       assert(msg->state == NULL);
+
+       /*
+        * Lookup the circuit on the incoming iocom.
+        */
+       pthread_mutex_lock(&cluster_mtx);
+
+       dummy.msgid = msg->any.head.circuit;
+       circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
+       assert(circ);
+       peer = circ->peer;
+
+       msg->iocom = peer->iocom;
+       msg->any.head.circuit = peer->msgid;
+
+       pthread_mutex_unlock(&cluster_mtx);
+
+       fprintf(stderr, "ROUTE MESSAGE VC %08x to %08x\n",
+               (uint32_t)circ->msgid, (uint32_t)peer->msgid);  /* brevity */
+       dmsg_msg_write(msg);
+       error = DMSG_IOQ_ERROR_ROUTED;
+
+       return error;
+}
+
+/************************************************************************
  *                     ROUTER AND MESSAGING HANDLES                    *
  ************************************************************************
  *
@@ -1285,31 +1525,11 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 
 #endif
 
-#if 0
-/*
- * Acquire a persistent router structure given the cluster and node ids.
- * Messages can be transacted via this structure while held.  If the route
- * is lost messages will return failure.
- */
-dmsg_router_t *
-dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
-{
-}
-
-/*
- * Release previously acquired router.
- */
-void
-dmsg_router_put(dmsg_router_t *router)
-{
-}
-#endif
-
 /*
  * Dumps the spanning tree
  */
 void
-dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
+dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
 {
        h2span_cluster_t *cls;
        h2span_node_t *node;
@@ -1318,15 +1538,17 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
 
        pthread_mutex_lock(&cluster_mtx);
        RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
-               dmsg_router_printf(router, "Cluster %s (%s)\n",
-                                  dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
-                                  cls->cl_label);
+               dmsg_circuit_printf(circuit, "Cluster %s %s (%s)\n",
+                                 dmsg_peer_type_to_str(cls->peer_type),
+                                 dmsg_uuid_to_str(&cls->pfs_clid, &uustr),
+                                 cls->cl_label);
                RB_FOREACH(node, h2span_node_tree, &cls->tree) {
-                       dmsg_router_printf(router, "    Node %s (%s)\n",
+                       dmsg_circuit_printf(circuit, "    Node %s %s (%s)\n",
+                               dmsg_pfs_type_to_str(node->pfs_type),
                                dmsg_uuid_to_str(&node->pfs_fsid, &uustr),
                                node->fs_label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
-                               dmsg_router_printf(router,
+                               dmsg_circuit_printf(circuit,
                                            "\tLink dist=%d via %d\n",
                                            slink->dist,
                                            slink->state->iocom->sock_fd);
@@ -1341,3 +1563,27 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused)
        }
 #endif
 }
+
+/*
+ * Random number sub-sort value to add to SPAN rnss fields on relay.
+ * This allows us to differentiate spans with the same <dist> field
+ * for relaying purposes.  We must normally limit the number of relays
+ * for any given SPAN origination but we must also guarantee that a
+ * symmetric reverse path exists, so we use the rnss field as a sub-sort
+ * (since there can be thousands or millions if we only match on <dist>),
+ * and if there STILL too many spans we go past the limit.
+ */
+static
+uint32_t
+dmsg_rnss(void)
+{
+       if (DMsgRNSS == 0) {
+               pthread_mutex_lock(&cluster_mtx);
+               while (DMsgRNSS == 0) {
+                       srandomdev();
+                       DMsgRNSS = random();
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+       return(DMsgRNSS);
+}
index 387b1cb..43fcb71 100644 (file)
@@ -35,9 +35,9 @@
 
 #include "dmsg_local.h"
 
-static void master_auth_signal(dmsg_router_t *router);
+static void master_auth_signal(dmsg_iocom_t *iocom);
 static void master_auth_rxmsg(dmsg_msg_t *msg);
-static void master_link_signal(dmsg_router_t *router);
+static void master_link_signal(dmsg_iocom_t *iocom);
 static void master_link_rxmsg(dmsg_msg_t *msg);
 
 /*
@@ -87,7 +87,7 @@ static void master_auth_conn_rx(dmsg_msg_t *msg);
 
 static
 void
-master_auth_signal(dmsg_router_t *router)
+master_auth_signal(dmsg_iocom_t *iocom)
 {
        dmsg_msg_t *msg;
 
@@ -97,14 +97,16 @@ master_auth_signal(dmsg_router_t *router)
         *
         * XXX put additional authentication states here?
         */
-       msg = dmsg_msg_alloc(router, 0, DMSG_LNK_CONN | DMSGF_CREATE,
+       msg = dmsg_msg_alloc(&iocom->circuit0, 0,
+                            DMSG_LNK_CONN | DMSGF_CREATE,
                             master_auth_conn_rx, NULL);
        msg->any.lnk_conn.peer_mask = (uint64_t)-1;
        msg->any.lnk_conn.peer_type = DMSG_PEER_CLUSTER;
+       msg->any.lnk_conn.pfs_mask = (uint64_t)-1;
 
        dmsg_msg_write(msg);
 
-       dmsg_router_restate(router,
+       dmsg_iocom_restate(iocom,
                            master_link_signal,
                            master_link_rxmsg,
                            NULL);
@@ -132,9 +134,9 @@ master_auth_rxmsg(dmsg_msg_t *msg __unused)
  */
 static
 void
-master_link_signal(dmsg_router_t *router)
+master_link_signal(dmsg_iocom_t *iocom)
 {
-       dmsg_msg_lnk_signal(router);
+       dmsg_msg_lnk_signal(iocom);
 }
 
 static
@@ -192,7 +194,7 @@ dmsg_msg_dbg(dmsg_msg_t *msg)
                 */
                if (msg->aux_data)
                        msg->aux_data[msg->aux_size - 1] = 0;
-               msg->router->dbgmsg_callback(msg);
+               msg->iocom->dbgmsg_callback(msg);
                dmsg_msg_reply(msg, 0);
                break;
        case DMSG_DBG_SHELL | DMSGF_REPLY:
@@ -216,11 +218,11 @@ dmsg_msg_dbg(dmsg_msg_t *msg)
  * not modified and stays intact.  We use a one-way message with REPLY set
  * to distinguish between a debug command and debug terminal output.
  *
- * To prevent loops router_printf() can filter the message (cmd) related
- * to the router_printf().  We filter out DBG messages.
+ * To prevent loops circuit_printf() can filter the message (cmd) related
+ * to the circuit_printf().  We filter out DBG messages.
  */
 void
-dmsg_router_printf(dmsg_router_t *router, const char *ctl, ...)
+dmsg_circuit_printf(dmsg_circuit_t *circuit, const char *ctl, ...)
 {
        dmsg_msg_t *rmsg;
        va_list va;
@@ -232,8 +234,9 @@ dmsg_router_printf(dmsg_router_t *router, const char *ctl, ...)
        va_end(va);
        len = strlen(buf) + 1;
 
-       rmsg = dmsg_msg_alloc(router, len, DMSG_DBG_SHELL | DMSGF_REPLY,
-                                NULL, NULL);
+       rmsg = dmsg_msg_alloc(circuit, len,
+                             DMSG_DBG_SHELL | DMSGF_REPLY,
+                             NULL, NULL);
        bcopy(buf, rmsg->aux_data, len);
 
        dmsg_msg_write(rmsg);
index fc744d4..492d815 100644 (file)
@@ -67,6 +67,52 @@ dmsg_uuid_to_str(uuid_t *uuid, char **strp)
        return (*strp);
 }
 
+const char *
+dmsg_peer_type_to_str(uint8_t type)
+{
+       switch(type) {
+       case DMSG_PEER_NONE:
+               return("NONE");
+       case DMSG_PEER_CLUSTER:
+               return("CLUSTER");
+       case DMSG_PEER_BLOCK:
+               return("BLOCK");
+       case DMSG_PEER_HAMMER2:
+               return("HAMMER2");
+       default:
+               return("?PEERTYPE?");
+       }
+}
+
+const char *
+dmsg_pfs_type_to_str(uint8_t type)
+{
+       switch(type) {
+       case DMSG_PFSTYPE_NONE:
+               return("NONE");
+       case DMSG_PFSTYPE_ADMIN:
+               return("ADMIN");
+       case DMSG_PFSTYPE_CLIENT:
+               return("CLIENT");
+       case DMSG_PFSTYPE_CACHE:
+               return("CACHE");
+       case DMSG_PFSTYPE_COPY:
+               return("COPY");
+       case DMSG_PFSTYPE_SLAVE:
+               return("SLAVE");
+       case DMSG_PFSTYPE_SOFT_SLAVE:
+               return("SOFT_SLAVE");
+       case DMSG_PFSTYPE_SOFT_MASTER:
+               return("SOFT_MASTER");
+       case DMSG_PFSTYPE_MASTER:
+               return("MASTER");
+       case DMSG_PFSTYPE_SERVER:
+               return("SERVER");
+       default:
+               return("?PFSTYPE?");
+       }
+}
+
 int
 dmsg_connect(const char *hostname)
 {