hammer2 - cluster / libdmsg circuit work
[dragonfly.git] / lib / libdmsg / msg.c
index ee9a8e9..8c0b3ae 100644 (file)
@@ -40,53 +40,37 @@ int DMsgDebugOpt;
 static int dmsg_state_msgrx(dmsg_msg_t *msg);
 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
 
+RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
+RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
+
 /*
- * ROUTER TREE - Represents available routes for message routing, indexed
- *              by their span transaction id.  The router structure is
- *              embedded in either an iocom, h2span_link (incoming),
- *              or h2span_relay (outgoing) (see msg_lnk.c).
+ * STATE TREE - Represents open transactions which are indexed by their
+ *             { msgid } relative to the governing iocom.
  */
 int
-dmsg_router_cmp(dmsg_router_t *router1, dmsg_router_t *router2)
+dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
 {
-       if (router1->target < router2->target)
+       if (state1->msgid < state2->msgid)
                return(-1);
-       if (router1->target > router2->target)
+       if (state1->msgid > state2->msgid)
                return(1);
        return(0);
 }
 
-RB_GENERATE(dmsg_router_tree, dmsg_router, rbnode, dmsg_router_cmp);
-
-static pthread_mutex_t router_mtx;
-static struct dmsg_router_tree router_ltree = RB_INITIALIZER(router_ltree);
-static struct dmsg_router_tree router_rtree = RB_INITIALIZER(router_rtree);
-
 /*
- * STATE TREE - Represents open transactions which are indexed by their
- *             {router,msgid} relative to the governing iocom.
- *
- *             router is usually iocom->router since state isn't stored
- *             for relayed messages.
+ * CIRCUIT TREE - Represents open circuits which are indexed by their
+ *               { msgid } relative to the governing iocom.
  */
 int
-dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
+dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
 {
-#if 0
-       if (state1->router < state2->router)
-               return(-1);
-       if (state1->router > state2->router)
-               return(1);
-#endif
-       if (state1->msgid < state2->msgid)
+       if (circuit1->msgid < circuit2->msgid)
                return(-1);
-       if (state1->msgid > state2->msgid)
+       if (circuit1->msgid > circuit2->msgid)
                return(1);
        return(0);
 }
 
-RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
-
 /*
  * Initialize a low-level ioq
  */
@@ -127,7 +111,7 @@ dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
  */
 void
 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
-                  void (*signal_func)(dmsg_router_t *),
+                  void (*signal_func)(dmsg_iocom_t *),
                   void (*rcvmsg_func)(dmsg_msg_t *),
                   void (*dbgmsg_func)(dmsg_msg_t *),
                   void (*altmsg_func)(dmsg_iocom_t *))
@@ -136,20 +120,16 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
 
        bzero(iocom, sizeof(*iocom));
 
-       iocom->router = dmsg_router_alloc();
-       iocom->router->signal_callback = signal_func;
-       iocom->router->rcvmsg_callback = rcvmsg_func;
-       iocom->router->altmsg_callback = altmsg_func;
-       iocom->router->dbgmsg_callback = dbgmsg_func;
-       /* we do not call dmsg_router_connect() for iocom routers */
+       iocom->signal_callback = signal_func;
+       iocom->rcvmsg_callback = rcvmsg_func;
+       iocom->altmsg_callback = altmsg_func;
+       iocom->dbgmsg_callback = dbgmsg_func;
 
        pthread_mutex_init(&iocom->mtx, NULL);
-       RB_INIT(&iocom->router->staterd_tree);
-       RB_INIT(&iocom->router->statewr_tree);
+       RB_INIT(&iocom->circuit_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
-       TAILQ_INIT(&iocom->router->txmsgq);
-       iocom->router->iocom = iocom;
+       TAILQ_INIT(&iocom->txmsgq);
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = DMSG_IOCOMF_RREQ;
@@ -162,6 +142,8 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
        fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
        fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
 
+       dmsg_circuit_init(iocom, &iocom->circuit0);
+
        /*
         * Negotiate session crypto synchronously.  This will mark the
         * connection as error'd if it fails.  If this is a pipe it's
@@ -192,25 +174,25 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
  * the recevmsg_func and the sendmsg_func is called at least once.
  */
 void
-dmsg_router_restate(dmsg_router_t *router,
-                  void (*signal_func)(dmsg_router_t *),
+dmsg_iocom_restate(dmsg_iocom_t *iocom,
+                  void (*signal_func)(dmsg_iocom_t *),
                   void (*rcvmsg_func)(dmsg_msg_t *msg),
                   void (*altmsg_func)(dmsg_iocom_t *))
 {
-       router->signal_callback = signal_func;
-       router->rcvmsg_callback = rcvmsg_func;
-       router->altmsg_callback = altmsg_func;
+       iocom->signal_callback = signal_func;
+       iocom->rcvmsg_callback = rcvmsg_func;
+       iocom->altmsg_callback = altmsg_func;
        if (signal_func)
-               router->iocom->flags |= DMSG_IOCOMF_SWORK;
+               iocom->flags |= DMSG_IOCOMF_SWORK;
        else
-               router->iocom->flags &= ~DMSG_IOCOMF_SWORK;
+               iocom->flags &= ~DMSG_IOCOMF_SWORK;
 }
 
 void
-dmsg_router_signal(dmsg_router_t *router)
+dmsg_iocom_signal(dmsg_iocom_t *iocom)
 {
-       if (router->signal_callback)
-               router->iocom->flags |= DMSG_IOCOMF_SWORK;
+       if (iocom->signal_callback)
+               iocom->flags |= DMSG_IOCOMF_SWORK;
 }
 
 /*
@@ -255,15 +237,30 @@ dmsg_iocom_done(dmsg_iocom_t *iocom)
        pthread_mutex_destroy(&iocom->mtx);
 }
 
+/*
+ * Initialize a circuit structure and add it to the iocom's circuit_tree.
+ * circuit0 is left out and will not be added to the tree.
+ */
+void
+dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
+{
+       circuit->iocom = iocom;
+       RB_INIT(&circuit->staterd_tree);
+       RB_INIT(&circuit->statewr_tree);
+       if (circuit->msgid)
+               RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
+}
+
 /*
  * Allocate a new one-way message.
  */
 dmsg_msg_t *
-dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd,
-                 void (*func)(dmsg_msg_t *), void *data)
+dmsg_msg_alloc(dmsg_circuit_t *circuit,
+              size_t aux_size, uint32_t cmd,
+              void (*func)(dmsg_msg_t *), void *data)
 {
+       dmsg_iocom_t *iocom = circuit->iocom;
        dmsg_state_t *state = NULL;
-       dmsg_iocom_t *iocom = router->iocom;
        dmsg_msg_t *msg;
        int hbytes;
 
@@ -280,24 +277,30 @@ dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd,
        if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
                /*
                 * Create state when CREATE is set without REPLY.
+                * Assign a unique msgid, in this case simply using
+                * the pointer value for 'state'.
                 *
                 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
                 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
+                *
+                * NOTE: state initiated by us and state initiated by
+                *       a remote create are placed in different RB trees.
+                *       The msgid for SPAN state is used in source/target
+                *       for message routing as appropriate.
                 */
                state = malloc(sizeof(*state));
                bzero(state, sizeof(*state));
                state->iocom = iocom;
+               state->circuit = circuit;
                state->flags = DMSG_STATE_DYNAMIC;
                state->msgid = (uint64_t)(uintptr_t)state;
-               state->router = router;
                state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
                state->rxcmd = DMSGF_REPLY;
+               state->icmd = state->txcmd & DMSGF_BASECMDMASK;
                state->func = func;
                state->any.any = data;
                pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(dmsg_state_tree,
-                         &iocom->router->statewr_tree,
-                         state);
+               RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
                pthread_mutex_unlock(&iocom->mtx);
                state->flags |= DMSG_STATE_INSERTED;
        }
@@ -323,14 +326,19 @@ dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd,
        if (hbytes)
                bzero(&msg->any.head, hbytes);
        msg->hdr_size = hbytes;
+       msg->any.head.magic = DMSG_HDR_MAGIC;
        msg->any.head.cmd = cmd;
        msg->any.head.aux_descr = 0;
        msg->any.head.aux_crc = 0;
-       msg->router = router;
+       msg->any.head.circuit = 0;
+       msg->circuit = circuit;
+       msg->iocom = iocom;
        if (state) {
                msg->state = state;
                state->msg = msg;
                msg->any.head.msgid = state->msgid;
+       } else {
+               msg->any.head.msgid = 0;
        }
        return (msg);
 }
@@ -344,7 +352,7 @@ static
 void
 dmsg_msg_free_locked(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
 
        msg->state = NULL;
        if (msg->aux_data)
@@ -356,7 +364,7 @@ dmsg_msg_free_locked(dmsg_msg_t *msg)
 void
 dmsg_msg_free(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
 
        pthread_mutex_lock(&iocom->mtx);
        dmsg_msg_free_locked(msg);
@@ -455,7 +463,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
 
                if (iocom->flags & DMSG_IOCOMF_SWORK) {
                        iocom->flags &= ~DMSG_IOCOMF_SWORK;
-                       iocom->router->signal_callback(iocom->router);
+                       iocom->signal_callback(iocom);
                }
 
                /*
@@ -468,7 +476,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
                        iocom->flags |= DMSG_IOCOMF_RWORK;
                        iocom->flags |= DMSG_IOCOMF_WWORK;
-                       if (TAILQ_FIRST(&iocom->router->txmsgq))
+                       if (TAILQ_FIRST(&iocom->txmsgq))
                                dmsg_iocom_flush1(iocom);
                }
 
@@ -490,14 +498,14 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                                        fprintf(stderr, "receive %s\n",
                                                dmsg_msg_str(msg));
                                }
-                               iocom->router->rcvmsg_callback(msg);
+                               iocom->rcvmsg_callback(msg);
                                dmsg_state_cleanuprx(iocom, msg);
                        }
                }
 
                if (iocom->flags & DMSG_IOCOMF_ARWORK) {
                        iocom->flags &= ~DMSG_IOCOMF_ARWORK;
-                       iocom->router->altmsg_callback(iocom);
+                       iocom->altmsg_callback(iocom);
                }
        }
 }
@@ -556,6 +564,7 @@ dmsg_ioq_read(dmsg_iocom_t *iocom)
        dmsg_ioq_t *ioq = &iocom->ioq_rx;
        dmsg_msg_t *msg;
        dmsg_state_t *state;
+       dmsg_circuit_t *circuit0;
        dmsg_hdr_t *head;
        ssize_t n;
        size_t bytes;
@@ -682,8 +691,8 @@ again:
                /*
                 * Allocate the message, the next state will fill it in.
                 */
-               msg = dmsg_msg_alloc(iocom->router, ioq->abytes, 0,
-                                       NULL, NULL);
+               msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0,
+                                    NULL, NULL);
                ioq->msg = msg;
 
                /*
@@ -924,24 +933,6 @@ again:
                }
        }
 
-       /*
-        * Handle relaying.  Transactional state is not recorded XXX
-        */
-
-       /*
-        * Process transactional state for the message.
-        */
-       if (msg && ioq->error == 0) {
-               error = dmsg_state_msgrx(msg);
-               if (error) {
-                       if (error == DMSG_IOQ_ERROR_EALREADY) {
-                               dmsg_msg_free(msg);
-                               goto again;
-                       }
-                       ioq->error = error;
-               }
-       }
-
        /*
         * Handle error, RREQ, or completion
         *
@@ -975,16 +966,20 @@ skip:
                 * transactions, ending with a final non-transactional
                 * LNK_ERROR (that the session can detect) when no
                 * transactions remain.
+                *
+                * We only need to scan transactions on circuit0 as these
+                * will contain all circuit forges, and terminating circuit
+                * forges will automatically terminate the transactions on
+                * any other circuits as well as those circuits.
                 */
-               msg = dmsg_msg_alloc(iocom->router, 0, 0, NULL, NULL);
-               bzero(&msg->any.head, sizeof(msg->any.head));
-               msg->any.head.magic = DMSG_HDR_MAGIC;
-               msg->any.head.cmd = DMSG_LNK_ERROR;
+               circuit0 = &iocom->circuit0;
+               msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
                msg->any.head.error = ioq->error;
 
                pthread_mutex_lock(&iocom->mtx);
                dmsg_iocom_drain(iocom);
-               if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) {
+
+               if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
                        /*
                         * Active remote transactions are still present.
                         * Simulate the other end sending us a DELETE.
@@ -995,13 +990,12 @@ skip:
                        } else {
                                /*state->txcmd |= DMSGF_DELETE;*/
                                msg->state = state;
-                               msg->router = state->router;
+                               msg->iocom = iocom;
                                msg->any.head.msgid = state->msgid;
                                msg->any.head.cmd |= DMSGF_ABORT |
                                                     DMSGF_DELETE;
                        }
-               } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) !=
-                          NULL) {
+               } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
                        /*
                         * Active local transactions are still present.
                         * Simulate the other end sending us a DELETE.
@@ -1011,7 +1005,7 @@ skip:
                                msg = NULL;
                        } else {
                                msg->state = state;
-                               msg->router = state->router;
+                               msg->iocom = iocom;
                                msg->any.head.msgid = state->msgid;
                                msg->any.head.cmd |= DMSGF_ABORT |
                                                     DMSGF_DELETE |
@@ -1055,7 +1049,7 @@ skip:
                iocom->flags |= DMSG_IOCOMF_RREQ;
        } else {
                /*
-                * Return msg.
+                * Continue processing msg.
                 *
                 * The fifo has already been advanced past the message.
                 * Trivially reset the FIFO indices if possible.
@@ -1076,6 +1070,43 @@ skip:
                }
                ioq->state = DMSG_MSGQ_STATE_HEADER1;
                ioq->msg = NULL;
+
+               /*
+                * Handle message routing.  Validates non-zero sources
+                * and routes message.  Error will be 0 if the message is
+                * destined for us.
+                *
+                * State processing only occurs for messages destined for us.
+                */
+               if (msg->any.head.circuit)
+                       error = dmsg_circuit_relay(msg);
+               else
+                       error = dmsg_state_msgrx(msg);
+
+               if (error) {
+                       /*
+                        * Abort-after-closure, throw message away and
+                        * start reading another.
+                        */
+                       if (error == DMSG_IOQ_ERROR_EALREADY) {
+                               dmsg_msg_free(msg);
+                               goto again;
+                       }
+
+                       /*
+                        * msg routed, msg pointer no longer owned by us.
+                        * Go to the top and start reading another.
+                        */
+                       if (error == DMSG_IOQ_ERROR_ROUTED)
+                               goto again;
+
+                       /*
+                        * Process real error and throw away message.
+                        */
+                       ioq->error = error;
+                       goto skip;
+               }
+               /* no error, not routed.  Fall through and return msg */
        }
        return (msg);
 }
@@ -1102,8 +1133,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
        iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
-       while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) {
-               TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry);
+       while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
+               TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
                TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        pthread_mutex_unlock(&iocom->mtx);
@@ -1361,7 +1392,7 @@ dmsg_iocom_drain(dmsg_iocom_t *iocom)
 void
 dmsg_msg_write(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
        dmsg_state_t *state;
        char dummy;
 
@@ -1384,23 +1415,21 @@ dmsg_msg_write(dmsg_msg_t *msg)
                if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
                    DMSGF_CREATE) {
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
+                       state->icmd = state->txcmd & DMSGF_BASECMDMASK;
                }
                msg->any.head.msgid = state->msgid;
                assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
-               if (msg->any.head.cmd & DMSGF_CREATE)
+               if (msg->any.head.cmd & DMSGF_CREATE) {
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
-       } else {
-               msg->any.head.msgid = 0;
-               /* XXX set spanid by router */
+                       state->icmd = state->txcmd & DMSGF_BASECMDMASK;
+               }
        }
-       msg->any.head.source = 0;
-       msg->any.head.target = msg->router->target;
 
        /*
         * Queue it for output, wake up the I/O pthread.  Note that the
         * I/O thread is responsible for generating the CRCs and encryption.
         */
-       TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry);
+       TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
        dummy = 0;
        write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
        pthread_mutex_unlock(&iocom->mtx);
@@ -1422,7 +1451,6 @@ dmsg_msg_write(dmsg_msg_t *msg)
 void
 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
        dmsg_state_t *state = msg->state;
        dmsg_msg_t *nmsg;
        uint32_t cmd;
@@ -1455,15 +1483,17 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
 
        /*
         * Allocate the message and associate it with the existing state.
-        * We cannot pass MSGF_CREATE to msg_alloc() because that may
+        * We cannot pass DMSGF_CREATE to msg_alloc() because that may
         * allocate new state.  We have our state already.
         */
-       nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
+       nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
        nmsg->any.head.error = error;
+       nmsg->any.head.msgid = msg->any.head.msgid;
+       nmsg->any.head.circuit = msg->any.head.circuit;
        nmsg->state = state;
        dmsg_msg_write(nmsg);
 }
@@ -1477,7 +1507,6 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
 void
 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
        dmsg_state_t *state = msg->state;
        dmsg_msg_t *nmsg;
        uint32_t cmd;
@@ -1508,12 +1537,14 @@ dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
                        cmd |= DMSGF_REPLY;
        }
 
-       nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
+       nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
        nmsg->any.head.error = error;
+       nmsg->any.head.msgid = msg->any.head.msgid;
+       nmsg->any.head.circuit = msg->any.head.circuit;
        nmsg->state = state;
        dmsg_msg_write(nmsg);
 }
@@ -1540,12 +1571,48 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
        if (state->txcmd & DMSGF_REPLY)
                cmd |= DMSGF_REPLY;
 
-       nmsg = dmsg_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL);
+       nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & DMSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= DMSGF_CREATE;
        }
        nmsg->any.head.error = error;
+       nmsg->any.head.msgid = state->msgid;
+       nmsg->any.head.circuit = state->msg->any.head.circuit;
+       nmsg->state = state;
+       dmsg_msg_write(nmsg);
+}
+
+/*
+ * Terminate a transaction given a state structure by issuing a DELETE.
+ */
+void
+dmsg_state_result(dmsg_state_t *state, uint32_t error)
+{
+       dmsg_msg_t *nmsg;
+       uint32_t cmd = DMSG_LNK_ERROR;
+
+       /*
+        * Nothing to do if we already transmitted a delete
+        */
+       if (state->txcmd & DMSGF_DELETE)
+               return;
+
+       /*
+        * Set REPLY if the other end initiated the command.  Otherwise
+        * we are the command direction.
+        */
+       if (state->txcmd & DMSGF_REPLY)
+               cmd |= DMSGF_REPLY;
+
+       nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
+       if (state) {
+               if ((state->txcmd & DMSGF_CREATE) == 0)
+                       nmsg->any.head.cmd |= DMSGF_CREATE;
+       }
+       nmsg->any.head.error = error;
+       nmsg->any.head.msgid = state->msgid;
+       nmsg->any.head.circuit = state->msg->any.head.circuit;
        nmsg->state = state;
        dmsg_msg_write(nmsg);
 }
@@ -1557,8 +1624,8 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
  */
 
 /*
- * Process state tracking for a message after reception, prior to
- * execution.
+ * Process circuit and state tracking for a message after reception, prior
+ * to execution.
  *
  * Called with msglk held and the msg dequeued.
  *
@@ -1626,25 +1693,41 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error)
 static int
 dmsg_state_msgrx(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
+       dmsg_circuit_t *circuit;
        dmsg_state_t *state;
-       dmsg_state_t dummy;
+       dmsg_state_t sdummy;
+       dmsg_circuit_t cdummy;
        int error;
 
+       pthread_mutex_lock(&iocom->mtx);
+
+       /*
+        * Locate existing persistent circuit and state, if any.
+        */
+       if (msg->any.head.circuit == 0) {
+               circuit = &iocom->circuit0;
+       } else {
+               cdummy.msgid = msg->any.head.circuit;
+               circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
+                                 &cdummy);
+               if (circuit == NULL)
+                       return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
+       }
+       msg->circuit = circuit;
+       ++circuit->refs;
+
        /*
-        * Lock RB tree and locate existing persistent state, if any.
-        *
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         */
-       dummy.msgid = msg->any.head.msgid;
-       pthread_mutex_lock(&iocom->mtx);
+       sdummy.msgid = msg->any.head.msgid;
        if (msg->any.head.cmd & DMSGF_REPLY) {
-               state = RB_FIND(dmsg_state_tree,
-                               &iocom->router->statewr_tree, &dummy);
+               state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
+                               &sdummy);
        } else {
-               state = RB_FIND(dmsg_state_tree,
-                               &iocom->router->staterd_tree, &dummy);
+               state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
+                               &sdummy);
        }
        msg->state = state;
        pthread_mutex_unlock(&iocom->mtx);
@@ -1678,17 +1761,17 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                state = malloc(sizeof(*state));
                bzero(state, sizeof(*state));
                state->iocom = iocom;
+               state->circuit = circuit;
                state->flags = DMSG_STATE_DYNAMIC;
                state->msg = msg;
                state->txcmd = DMSGF_REPLY;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
+               state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
                state->flags |= DMSG_STATE_INSERTED;
                state->msgid = msg->any.head.msgid;
-               state->router = msg->router;
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(dmsg_state_tree,
-                         &iocom->router->staterd_tree, state);
+               RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
                if (DMsgDebugOpt) {
@@ -1840,11 +1923,11 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                        if (state->rxcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->statewr_tree, state);
+                                         &msg->circuit->statewr_tree, state);
                        } else {
                                assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->staterd_tree, state);
+                                         &msg->circuit->staterd_tree, state);
                        }
                        state->flags &= ~DMSG_STATE_INSERTED;
                        dmsg_state_free(state);
@@ -1865,13 +1948,14 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 static void
 dmsg_state_cleanuptx(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->router->iocom;
+       dmsg_iocom_t *iocom = msg->iocom;
        dmsg_state_t *state;
 
        if ((state = msg->state) == NULL) {
                dmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
                pthread_mutex_lock(&iocom->mtx);
+               assert((state->txcmd & DMSGF_DELETE) == 0);
                state->txcmd |= DMSGF_DELETE;
                if (state->rxcmd & DMSGF_DELETE) {
                        if (state->msg == msg)
@@ -1880,11 +1964,11 @@ dmsg_state_cleanuptx(dmsg_msg_t *msg)
                        if (state->txcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->staterd_tree, state);
+                                         &msg->circuit->staterd_tree, state);
                        } else {
                                assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
                                RB_REMOVE(dmsg_state_tree,
-                                         &iocom->router->statewr_tree, state);
+                                         &msg->circuit->statewr_tree, state);
                        }
                        state->flags &= ~DMSG_STATE_INSERTED;
                        dmsg_state_free(state);
@@ -1904,9 +1988,7 @@ dmsg_state_cleanuptx(dmsg_msg_t *msg)
 void
 dmsg_state_free(dmsg_state_t *state)
 {
-       dmsg_iocom_t *iocom = state->iocom;
        dmsg_msg_t *msg;
-       char dummy;
 
        if (DMsgDebugOpt) {
                fprintf(stderr, "terminate state %p id=%08x\n",
@@ -1918,101 +2000,48 @@ dmsg_state_free(dmsg_state_t *state)
        if (msg)
                dmsg_msg_free_locked(msg);
        free(state);
-
-       /*
-        * When an iocom error is present we are trying to close down the
-        * iocom, but we have to wait for all states to terminate before
-        * we can do so.  The iocom rx code will terminate the receive side
-        * for all transactions by simulating incoming DELETE messages,
-        * but the state doesn't go away until both sides are terminated.
-        *
-        * We may have to wake up the rx code.
-        */
-       if (iocom->ioq_rx.error &&
-           RB_EMPTY(&iocom->router->staterd_tree) &&
-           RB_EMPTY(&iocom->router->statewr_tree)) {
-               dummy = 0;
-               write(iocom->wakeupfds[1], &dummy, 1);
-       }
 }
 
-/************************************************************************
- *                             ROUTING                                 *
- ************************************************************************
- *
- * Incoming messages are routed by their spanid, matched up against
- * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c).
- * Any replies run through the same router.
- *
- * Originated messages are routed by their spanid, matched up against
- * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c).
- * Replies come back through the same route.
- *
- * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular
- * transaction runs through the same route.  Commands and replies both.
- *
- * An originated message will use a different routing spanid to
- * reach a target node than a message which originates from that node.
- * They might use the same physical pipes (each pipe can have multiple
- * SPANs and RELAYs), but the routes are distinct from the perspective
- * of the router.
+/*
+ * Called with iocom locked
  */
-dmsg_router_t *
-dmsg_router_alloc(void)
-{
-       dmsg_router_t *router;
-
-       router = dmsg_alloc(sizeof(*router));
-       TAILQ_INIT(&router->txmsgq);
-       return (router);
-}
-
 void
-dmsg_router_connect(dmsg_router_t *router)
+dmsg_circuit_drop(dmsg_circuit_t *circuit)
 {
-       dmsg_router_t *tmp;
-
-       assert(router->link || router->relay);
-       assert((router->flags & DMSG_ROUTER_CONNECTED) == 0);
-
-       pthread_mutex_lock(&router_mtx);
-       if (router->link)
-               tmp = RB_INSERT(dmsg_router_tree, &router_ltree, router);
-       else
-               tmp = RB_INSERT(dmsg_router_tree, &router_rtree, router);
-       assert(tmp == NULL);
-       router->flags |= DMSG_ROUTER_CONNECTED;
-       pthread_mutex_unlock(&router_mtx);
-}
+       dmsg_iocom_t *iocom = circuit->iocom;
+       char dummy;
 
-void
-dmsg_router_disconnect(dmsg_router_t **routerp)
-{
-       dmsg_router_t *router;
+       assert(circuit->refs > 0);
+       assert(iocom);
 
-       router = *routerp;
-       assert(router->link || router->relay);
-       assert(router->flags & DMSG_ROUTER_CONNECTED);
+       /*
+        * Decrement circuit refs, destroy circuit when refs drops to 0.
+        */
+       if (--circuit->refs > 0)
+               return;
 
-       pthread_mutex_lock(&router_mtx);
-       if (router->link)
-               RB_REMOVE(dmsg_router_tree, &router_ltree, router);
-       else
-               RB_REMOVE(dmsg_router_tree, &router_rtree, router);
-       router->flags &= ~DMSG_ROUTER_CONNECTED;
-       *routerp = NULL;
-       pthread_mutex_unlock(&router_mtx);
-}
+       assert(RB_EMPTY(&circuit->staterd_tree));
+       assert(RB_EMPTY(&circuit->statewr_tree));
+       RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
+       circuit->iocom = NULL;
+       dmsg_free(circuit);
 
-#if 0
-/*
- * XXX
- */
-dmsg_router_t *
-dmsg_route_msg(dmsg_msg_t *msg)
-{
+       /*
+        * When an iocom error is present the rx code will terminate the
+        * receive side for all transactions and (indirectly) all circuits
+        * by simulating DELETE messages.  The state and related circuits
+        * don't disappear until the related states are closed in both
+        * directions
+        *
+        * Detect the case where the last circuit is now gone (and thus all
+        * states for all circuits are gone), and wakeup the rx thread to
+        * complete the termination.
+        */
+       if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
+               dummy = 0;
+               write(iocom->wakeupfds[1], &dummy, 1);
+       }
 }
-#endif
 
 /*
  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
@@ -2026,8 +2055,8 @@ dmsg_bswap_head(dmsg_hdr_t *head)
        head->salt      = bswap32(head->salt);
 
        head->msgid     = bswap64(head->msgid);
-       head->source    = bswap64(head->source);
-       head->target    = bswap64(head->target);
+       head->circuit   = bswap64(head->circuit);
+       head->reserved18= bswap64(head->reserved18);
 
        head->cmd       = bswap32(head->cmd);
        head->aux_crc   = bswap32(head->aux_crc);