hammer2 - kernel cluster messaging support API work
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Nov 2012 23:33:49 +0000 (15:33 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Nov 2012 23:33:49 +0000 (15:33 -0800)
* Rework the API significantly

* Add circuit tracking and sniffing features

* Add flags to automate the LNK_CONN, LNK_SPAN, and LNK_CIRC state machines.

* Misc improvemenets.

sys/kern/kern_dmsg.c
sys/kern/subr_diskiocom.c
sys/sys/dmsg.h

index 76fec4b..e8ea583 100644 (file)
 
 RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
 
+static struct lwkt_token kdmsg_token = LWKT_TOKEN_INITIALIZER(kdmsg_token);
+static void kdmsg_circ_free_check(kdmsg_circuit_t *circ);
+
 static void kdmsg_iocom_thread_rd(void *arg);
 static void kdmsg_iocom_thread_wr(void *arg);
+static int kdmsg_autorxmsg(kdmsg_msg_t *msg);
+static void kdmsg_autocirc(kdmsg_msg_t *msg);
+static int kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
 
 /*
  * Initialize the roll-up communications structure for a network
  * messaging session.  This function does not install the socket.
  */
 void
-kdmsg_iocom_init(kdmsg_iocom_t *iocom, void *handle,
+kdmsg_iocom_init(kdmsg_iocom_t *iocom, void *handle, uint32_t flags,
                 struct malloc_type *mmsg,
-                int (*lnk_rcvmsg)(kdmsg_msg_t *msg),
-                int (*dbg_rcvmsg)(kdmsg_msg_t *msg),
-                int (*misc_rcvmsg)(kdmsg_msg_t *msg))
+                int (*rcvmsg)(kdmsg_msg_t *msg))
 {
        bzero(iocom, sizeof(*iocom));
        iocom->handle = handle;
        iocom->mmsg = mmsg;
-       iocom->lnk_rcvmsg = lnk_rcvmsg;
-       iocom->dbg_rcvmsg = dbg_rcvmsg;
-       iocom->misc_rcvmsg = misc_rcvmsg;
-       iocom->router.iocom = iocom;
+       iocom->rcvmsg = rcvmsg;
+       iocom->flags = flags;
        lockinit(&iocom->msglk, "h2msg", 0, 0);
        TAILQ_INIT(&iocom->msgq);
        RB_INIT(&iocom->staterd_tree);
@@ -104,7 +106,6 @@ kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
                fdrop(iocom->msg_fp);
                iocom->msg_fp = NULL;
        }
-       kprintf("RESTART CONNECTION\n");
 
        /*
         * Setup new communications descriptor
@@ -120,6 +121,76 @@ kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
 }
 
 /*
+ * Caller sets up iocom->auto_lnk_conn and iocom->auto_lnk_span, then calls
+ * this function to handle the state machine for LNK_CONN and LNK_SPAN.
+ *
+ * NOTE: Caller typically also sets the IOCOMF_AUTOCONN, IOCOMF_AUTOSPAN,
+ *      and IOCOMF_AUTOCIRC in the kdmsg_iocom_init() call.  Clients
+ *      typically set IOCOMF_AUTOFORGE to automatically forged circuits
+ *      for received SPANs.
+ */
+static int kdmsg_lnk_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
+static int kdmsg_lnk_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
+
+void
+kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom,
+                        void (*auto_callback)(kdmsg_msg_t *msg))
+{
+       kdmsg_msg_t *msg;
+
+       iocom->auto_callback = auto_callback;
+
+       msg = kdmsg_msg_alloc(iocom, 0,
+                             DMSG_LNK_CONN | DMSGF_CREATE,
+                             kdmsg_lnk_conn_reply, NULL);
+       iocom->auto_lnk_conn.head = msg->any.head;
+       msg->any.lnk_conn = iocom->auto_lnk_conn;
+       iocom->conn_state = msg->state;
+       kdmsg_msg_write(msg);
+}
+
+static
+int
+kdmsg_lnk_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
+{
+       kdmsg_iocom_t *iocom = state->iocom;
+       kdmsg_msg_t *rmsg;
+
+       if (msg->any.head.cmd & DMSGF_CREATE) {
+               rmsg = kdmsg_msg_alloc(iocom, 0,
+                                      DMSG_LNK_SPAN | DMSGF_CREATE,
+                                      kdmsg_lnk_span_reply, NULL);
+               iocom->auto_lnk_span.head = rmsg->any.head;
+               rmsg->any.lnk_span = iocom->auto_lnk_span;
+               kdmsg_msg_write(rmsg);
+       }
+       if ((state->txcmd & DMSGF_DELETE) == 0 &&
+           (msg->any.head.cmd & DMSGF_DELETE)) {
+               iocom->conn_state = NULL;
+               kdmsg_msg_reply(msg, 0);
+       }
+       if (iocom->auto_callback)
+               iocom->auto_callback(msg);
+
+       return (0);
+}
+
+static
+int
+kdmsg_lnk_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
+{
+       /*kdmsg_iocom_t *iocom = state->iocom;*/
+
+       if ((state->txcmd & DMSGF_DELETE) == 0 &&
+           (msg->any.head.cmd & DMSGF_DELETE)) {
+               kdmsg_msg_reply(msg, 0);
+       }
+       if (state->iocom->auto_callback)
+               state->iocom->auto_callback(msg);
+       return (0);
+}
+
+/*
  * Disconnect and clean up
  */
 void
@@ -180,9 +251,9 @@ kdmsg_iocom_thread_rd(void *arg)
                        break;
                }
                /* XXX messy: mask cmd to avoid allocating state */
-               msg = kdmsg_msg_alloc(&iocom->router,
-                                       hdr.cmd & DMSGF_BASECMDMASK,
-                                       NULL, NULL);
+               msg = kdmsg_msg_alloc(iocom, 0,
+                                     hdr.cmd & DMSGF_BASECMDMASK,
+                                     NULL, NULL);
                msg->any.head = hdr;
                msg->hdr_size = hbytes;
                if (hbytes > sizeof(hdr)) {
@@ -205,6 +276,7 @@ kdmsg_iocom_thread_rd(void *arg)
                if (msg->aux_size) {
                        msg->aux_data = kmalloc(msg->aux_size, iocom->mmsg,
                                                M_WAITOK | M_ZERO);
+                       msg->flags |= KDMSG_FLAG_AUXALLOC;
                        error = fp_read(iocom->msg_fp, msg->aux_data,
                                        msg->aux_size,
                                        NULL, 1, UIO_SYSSPACE);
@@ -235,25 +307,11 @@ kdmsg_iocom_thread_rd(void *arg)
                         */
                        error = msg->state->func(msg->state, msg);
                        kdmsg_state_cleanuprx(msg);
-               } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
-                          DMSG_PROTO_LNK) {
-                       /*
-                        * Message related to the LNK protocol set
-                        */
-                       error = iocom->lnk_rcvmsg(msg);
-                       kdmsg_state_cleanuprx(msg);
-               } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
-                          DMSG_PROTO_DBG) {
-                       /*
-                        * Message related to the DBG protocol set
-                        */
-                       error = iocom->dbg_rcvmsg(msg);
+               } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) {
+                       error = kdmsg_autorxmsg(msg);
                        kdmsg_state_cleanuprx(msg);
                } else {
-                       /*
-                        * Other higher-level messages (e.g. vnops)
-                        */
-                       error = iocom->misc_rcvmsg(msg);
+                       error = iocom->rcvmsg(msg);
                        kdmsg_state_cleanuprx(msg);
                }
                msg = NULL;
@@ -430,11 +488,13 @@ cleanuprd:
                if (state->func &&
                    (state->rxcmd & DMSGF_DELETE) == 0) {
                        lockmgr(&iocom->msglk, LK_RELEASE);
-                       msg = kdmsg_msg_alloc(&iocom->router, DMSG_LNK_ERROR,
+                       msg = kdmsg_msg_alloc(iocom, state->circuit,
+                                             DMSG_LNK_ERROR,
                                              NULL, NULL);
                        if ((state->rxcmd & DMSGF_CREATE) == 0)
                                msg->any.head.cmd |= DMSGF_CREATE;
                        msg->any.head.cmd |= DMSGF_DELETE;
+                       msg->any.head.error = DMSG_ERR_LOSTLINK;
                        msg->state = state;
                        state->rxcmd = msg->any.head.cmd &
                                       ~DMSGF_DELETE;
@@ -464,12 +524,14 @@ cleanupwr:
                if (state->func &&
                    (state->rxcmd & DMSGF_DELETE) == 0) {
                        lockmgr(&iocom->msglk, LK_RELEASE);
-                       msg = kdmsg_msg_alloc(&iocom->router, DMSG_LNK_ERROR,
+                       msg = kdmsg_msg_alloc(iocom, state->circuit,
+                                             DMSG_LNK_ERROR,
                                              NULL, NULL);
                        if ((state->rxcmd & DMSGF_CREATE) == 0)
                                msg->any.head.cmd |= DMSGF_CREATE;
                        msg->any.head.cmd |= DMSGF_DELETE |
                                             DMSGF_REPLY;
+                       msg->any.head.error = DMSG_ERR_LOSTLINK;
                        msg->state = state;
                        state->rxcmd = msg->any.head.cmd &
                                       ~DMSGF_DELETE;
@@ -624,17 +686,13 @@ kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
 int
 kdmsg_state_msgrx(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom;
+       kdmsg_iocom_t *iocom = msg->iocom;
        kdmsg_state_t *state;
        int error;
 
-       iocom = msg->router->iocom;
-
        /*
         * XXX resolve msg->any.head.source and msg->any.head.target
         *     into LNK_SPAN references.
-        *
-        * XXX replace msg->router
         */
 
        /*
@@ -657,12 +715,8 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
        lockmgr(&iocom->msglk, LK_EXCLUSIVE);
 
        state->msgid = msg->any.head.msgid;
-       state->router = &iocom->router;
-       kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
-               msg->any.head.cmd,
-               (intmax_t)msg->any.head.msgid,
-               (intmax_t)msg->any.head.source,
-               (intmax_t)msg->any.head.target);
+       state->circuit = msg->any.head.circuit;
+       state->iocom = iocom;
        if (msg->any.head.cmd & DMSGF_REPLY)
                state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree, state);
        else
@@ -682,7 +736,7 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
         * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
         * inside the case statements.
         */
-       switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)) {
+       switch(msg->any.head.cmd & (DMSGF_CREATE|DMSGF_DELETE|DMSGF_REPLY)) {
        case DMSGF_CREATE:
        case DMSGF_CREATE | DMSGF_DELETE:
                /*
@@ -696,10 +750,12 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
                state = iocom->freerd_state;
                iocom->freerd_state = NULL;
                msg->state = state;
-               state->router = msg->router;
                state->msg = msg;
+               state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->txcmd = DMSGF_REPLY;
+               state->msgid = msg->any.head.msgid;
+               state->circuit = msg->any.head.circuit;
                RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state);
                state->flags |= KDMSG_STATE_INSERTED;
                error = 0;
@@ -818,14 +874,226 @@ kdmsg_state_msgrx(kdmsg_msg_t *msg)
        return (error);
 }
 
+/*
+ * Called instead of iocom->rcvmsg() if any of the AUTO flags are set.
+ * This routine must call iocom->rcvmsg() for anything not automatically
+ * handled.
+ */
+static int
+kdmsg_autorxmsg(kdmsg_msg_t *msg)
+{
+       kdmsg_iocom_t *iocom = msg->iocom;
+       int error = 0;
+
+       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
+       case DMSG_LNK_CONN | DMSGF_CREATE:
+               /*
+                * Received LNK_CONN transaction.  Transmit response and
+                * leave transaction open, which allows the other end to
+                * start to the SPAN protocol.
+                */
+               if (iocom->flags & KDMSG_IOCOMF_AUTOCONN) {
+                       kdmsg_msg_result(msg, 0);
+                       if (iocom->auto_callback)
+                               iocom->auto_callback(msg);
+               } else {
+                       error = iocom->rcvmsg(msg);
+               }
+               break;
+       case DMSG_LNK_SPAN | DMSGF_CREATE:
+       case DMSG_LNK_SPAN | DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * Received LNK_SPAN transaction.  We do not have to respond
+                * but we must leave the transaction open.
+                *
+                * If AUTOCIRC is set automatically initiate a virtual circuit
+                * to the span.  This will attach a kdmsg_circuit to the
+                * SPAN state.
+                */
+               if (iocom->flags & KDMSG_IOCOMF_AUTOSPAN) {
+                       if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                               if (iocom->flags & KDMSG_IOCOMF_AUTOFORGE)
+                                       kdmsg_autocirc(msg);
+                               if (iocom->auto_callback)
+                                       iocom->auto_callback(msg);
+                               break;
+                       }
+                       /* fall through */
+               } else {
+                       error = iocom->rcvmsg(msg);
+                       break;
+               }
+               /* fall through */
+       case DMSG_LNK_SPAN | DMSGF_DELETE:
+               if (iocom->flags & KDMSG_IOCOMF_AUTOSPAN) {
+                       if (iocom->flags & KDMSG_IOCOMF_AUTOFORGE)
+                               kdmsg_autocirc(msg);
+                       kdmsg_msg_reply(msg, 0);
+                       if (iocom->auto_callback)
+                               iocom->auto_callback(msg);
+               } else {
+                       error = iocom->rcvmsg(msg);
+               }
+               break;
+       case DMSG_LNK_CIRC | DMSGF_CREATE:
+       case DMSG_LNK_CIRC | DMSGF_CREATE | DMSGF_DELETE:
+               /*
+                * Received LNK_CIRC transaction.  We must respond and should
+                * leave the transaction open, allowing the circuit.  The
+                * remote can start issuing commands to us over the circuit
+                * even before we respond.
+                *
+                * Theoretically we should track the circuit id but it
+                * should not be necessary, kernel devices don't usually
+                * care how many circuits are forged to a device, only how
+                * many OPENs are active.
+                */
+               if (iocom->flags & KDMSG_IOCOMF_AUTOCIRC) {
+                       kprintf("kdmsg: CREATE LINK_CIRC\n");
+                       kdmsg_msg_result(msg, 0);
+                       if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
+                               if (iocom->auto_callback)
+                                       iocom->auto_callback(msg);
+                               break;
+                       }
+                       /* fall through */
+               } else {
+                       error = iocom->rcvmsg(msg);
+                       break;
+               }
+               /* fall through */
+       case DMSG_LNK_CIRC | DMSGF_DELETE:
+               if (iocom->flags & KDMSG_IOCOMF_AUTOCIRC) {
+                       kprintf("kdmsg: DELETE LINK_CIRC\n");
+                       if (iocom->auto_callback)
+                               iocom->auto_callback(msg);
+                       kdmsg_msg_reply(msg, 0);
+               } else {
+                       error = iocom->rcvmsg(msg);
+               }
+               break;
+       default:
+               /*
+                * Anything unhandled goes into rcvmsg.
+                *
+                * NOTE: Replies to link-level messages initiated by our side
+                *       are handled by the state callback, they are NOT
+                *       handled here.
+                */
+               error = iocom->rcvmsg(msg);
+               break;
+       }
+       return (error);
+}
+
+/*
+ * Handle automatic management of virtual circuits for received SPANs.
+ *
+ * We can ignore non-transactions here.  Use trans->icmd to test the
+ * transactional command (once past the CREATE the individual message
+ * commands are not usually the icmd).
+ */
+static
+void
+kdmsg_autocirc(kdmsg_msg_t *msg)
+{
+       kdmsg_iocom_t *iocom = msg->iocom;
+       kdmsg_circuit_t *circ;
+       kdmsg_msg_t *xmsg;      /* CIRC */
+
+       if (msg->state == NULL)
+               return;
+
+       /*
+        * Gaining the SPAN, automatically forge a circuit to the target.
+        */
+       if (msg->state->icmd == DMSG_LNK_SPAN &&
+           (msg->any.head.cmd & DMSGF_CREATE)) {
+               kprintf("KDMSG VC: CREATE SPAN->CIRC MSGID %016jx\n",
+                       (intmax_t)msg->any.head.msgid);
+               circ = kmalloc(sizeof(*circ), iocom->mmsg, M_WAITOK | M_ZERO);
+               msg->state->any.circ = circ;
+               circ->iocom = iocom;
+               circ->span_state = msg->state;
+               xmsg = kdmsg_msg_alloc(iocom, 0,
+                                      DMSG_LNK_CIRC | DMSGF_CREATE,
+                                      kdmsg_autocirc_reply, circ);
+               circ->circ_state = xmsg->state;
+               circ->weight = msg->any.lnk_span.dist;
+               xmsg->any.lnk_circ.target = msg->any.head.msgid;
+               kdmsg_msg_write(xmsg);
+       }
+
+       /*
+        * Losing the SPAN
+        *
+        * NOTE: When losing a SPAN, any circuits using the span should be
+        *       deleted by the remote end first.  XXX might not be ordered
+        *       on actual loss of connection.
+        */
+       if (msg->state->icmd == DMSG_LNK_SPAN &&
+           (msg->any.head.cmd & DMSGF_DELETE) &&
+           msg->state->any.circ) {
+               kprintf("KDMSG VC: DELETE SPAN->CIRC\n");
+               circ = msg->state->any.circ;
+               lwkt_gettoken(&kdmsg_token);
+               circ->span_state = NULL;
+               msg->state->any.circ = NULL;
+               kdmsg_circ_free_check(circ);
+               lwkt_reltoken(&kdmsg_token);
+       }
+}
+
+static
+int
+kdmsg_autocirc_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
+{
+       kdmsg_iocom_t *iocom = state->iocom;
+       kdmsg_circuit_t *circ = state->any.circ;
+
+       /*
+        * Shim is typically used by the end point to record the circuit
+        * on CREATE and erase it on DELETE.
+        */
+       if (iocom->auto_callback)
+               iocom->auto_callback(msg);
+
+       /*
+        * If the remote is terminating the VC we terminate our side
+        */
+       if ((state->txcmd & DMSGF_DELETE) == 0 &&
+           (msg->any.head.cmd & DMSGF_DELETE)) {
+               kprintf("KDMSG VC: DELETE CIRC FROM REMOTE\n");
+               lwkt_gettoken(&kdmsg_token);
+               circ->circ_state = NULL;
+               state->any.circ = NULL;
+               kdmsg_circ_free_check(circ);
+               lwkt_reltoken(&kdmsg_token);
+               kdmsg_msg_reply(msg, 0);
+       }
+       return (0);
+}
+
+static
+void
+kdmsg_circ_free_check(kdmsg_circuit_t *circ)
+{
+       kdmsg_iocom_t *iocom = circ->iocom;
+
+       if (circ->span_state == NULL &&
+           circ->circ_state == NULL &&
+           circ->recorded == 0) {
+               circ->iocom = NULL;
+               kfree(circ, iocom->mmsg);
+       }
+}
+
 void
 kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom;
+       kdmsg_iocom_t *iocom = msg->iocom;
        kdmsg_state_t *state;
 
-       iocom = msg->router->iocom;
-
        if ((state = msg->state) == NULL) {
                kdmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
@@ -874,12 +1142,10 @@ kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
 int
 kdmsg_state_msgtx(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom;
+       kdmsg_iocom_t *iocom = msg->iocom;
        kdmsg_state_t *state;
        int error;
 
-       iocom = msg->router->iocom;
-
        /*
         * Make sure a state structure is ready to go in case we need a new
         * one.  This is the only routine which uses freewr_state so no
@@ -888,7 +1154,7 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
        if ((state = iocom->freewr_state) == NULL) {
                state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
                state->flags = KDMSG_STATE_DYNAMIC;
-               state->router = &iocom->router;
+               state->iocom = iocom;
                iocom->freewr_state = state;
        }
 
@@ -928,6 +1194,7 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
                 *     on-transmit.
                 */
                KKASSERT(state != NULL);
+               state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK;
                state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->rxcmd = DMSGF_REPLY;
                error = 0;
@@ -1056,11 +1323,9 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
 void
 kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom;
+       kdmsg_iocom_t *iocom = msg->iocom;
        kdmsg_state_t *state;
 
-       iocom = msg->router->iocom;
-
        if ((state = msg->state) == NULL) {
                kdmsg_msg_free(msg);
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
@@ -1096,11 +1361,9 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
 void
 kdmsg_state_free(kdmsg_state_t *state)
 {
-       kdmsg_iocom_t *iocom;
+       kdmsg_iocom_t *iocom = state->iocom;
        kdmsg_msg_t *msg;
 
-       iocom = state->router->iocom;
-
        KKASSERT((state->flags & KDMSG_STATE_INSERTED) == 0);
        msg = state->msg;
        state->msg = NULL;
@@ -1110,24 +1373,21 @@ kdmsg_state_free(kdmsg_state_t *state)
 }
 
 kdmsg_msg_t *
-kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
+kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit, uint32_t cmd,
                int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
 {
-       kdmsg_iocom_t *iocom;
        kdmsg_msg_t *msg;
        kdmsg_state_t *state;
        size_t hbytes;
 
-       iocom = router->iocom;
+       KKASSERT(iocom != NULL);
        hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
        msg = kmalloc(offsetof(struct kdmsg_msg, any) + hbytes,
                      iocom->mmsg, M_WAITOK | M_ZERO);
        msg->hdr_size = hbytes;
-       msg->router = router;
-       KKASSERT(router != NULL);
+       msg->iocom = iocom;
        msg->any.head.magic = DMSG_HDR_MAGIC;
-       msg->any.head.source = 0;
-       msg->any.head.target = router->target;
+       msg->any.head.circuit = circuit;
        msg->any.head.cmd = cmd;
 
        if (cmd & DMSGF_CREATE) {
@@ -1142,11 +1402,10 @@ kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
                state->any.any = data;
                state->msg = msg;
                state->msgid = (uint64_t)(uintptr_t)state;
-               state->router = msg->router;
+               state->circuit = circuit;
+               state->iocom = iocom;
                msg->state = state;
-               msg->any.head.source = 0;
-               msg->any.head.target = state->router->target;
-               msg->any.head.msgid = state->msgid;
+               /*msg->any.head.msgid = state->msgid;XXX*/
 
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                if (RB_INSERT(kdmsg_state_tree, &iocom->statewr_tree, state))
@@ -1162,16 +1421,17 @@ kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
 void
 kdmsg_msg_free(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom;
-
-       iocom = msg->router->iocom;
+       kdmsg_iocom_t *iocom = msg->iocom;
 
-       if (msg->aux_data && msg->aux_size) {
+       if ((msg->flags & KDMSG_FLAG_AUXALLOC) &&
+           msg->aux_data && msg->aux_size) {
                kfree(msg->aux_data, iocom->mmsg);
-               msg->aux_data = NULL;
-               msg->aux_size = 0;
-               msg->router = NULL;
+               msg->flags &= ~KDMSG_FLAG_AUXALLOC;
        }
+       msg->aux_data = NULL;
+       msg->aux_size = 0;
+       msg->iocom = NULL;
+       msg->any.head.circuit = -1;
        kfree(msg, iocom->mmsg);
 }
 
@@ -1182,9 +1442,13 @@ kdmsg_msg_free(kdmsg_msg_t *msg)
 int
 kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2)
 {
-       if (state1->router < state2->router)
+       if (state1->iocom < state2->iocom)
                return(-1);
-       if (state1->router > state2->router)
+       if (state1->iocom > state2->iocom)
+               return(1);
+       if (state1->circuit < state2->circuit)
+               return(-1);
+       if (state1->circuit > state2->circuit)
                return(1);
        if (state1->msgid < state2->msgid)
                return(-1);
@@ -1212,11 +1476,9 @@ kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2)
 void
 kdmsg_msg_write(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom;
+       kdmsg_iocom_t *iocom = msg->iocom;
        kdmsg_state_t *state;
 
-       iocom = msg->router->iocom;
-
        if (msg->state) {
                /*
                 * Continuance or termination of existing transaction.
@@ -1227,8 +1489,6 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                 */
                state = msg->state;
                msg->any.head.msgid = state->msgid;
-               msg->any.head.source = 0;
-               msg->any.head.target = state->router->target;
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        } else {
                /*
@@ -1237,8 +1497,6 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                 * competing aborts and a real one-off message?)
                 */
                msg->any.head.msgid = 0;
-               msg->any.head.source = 0;
-               msg->any.head.target = msg->router->target;
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
        }
 
@@ -1300,10 +1558,10 @@ kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error)
                if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
                        cmd |= DMSGF_REPLY;
        }
-       kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = kdmsg_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
+       nmsg = kdmsg_msg_alloc(msg->iocom, msg->any.head.circuit,
+                              cmd & DMSGF_BASECMDMASK,
                               NULL, NULL);
        nmsg->any.head.cmd = cmd;
        nmsg->any.head.error = error;
@@ -1352,7 +1610,105 @@ kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error)
        }
 
        /* XXX messy mask cmd to avoid allocating state */
-       nmsg = kdmsg_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
+       nmsg = kdmsg_msg_alloc(msg->iocom, msg->any.head.circuit,
+                              cmd & DMSGF_BASECMDMASK,
+                              NULL, NULL);
+       nmsg->any.head.cmd = cmd;
+       nmsg->any.head.error = error;
+       nmsg->state = state;
+       kdmsg_msg_write(nmsg);
+}
+
+/*
+ * Reply to a message and terminate our side of the transaction.
+ *
+ * If msg->state is non-NULL we are replying to a one-way message.
+ */
+void
+kdmsg_state_reply(kdmsg_state_t *state, uint32_t error)
+{
+       kdmsg_msg_t *nmsg;
+       uint32_t cmd;
+
+       /*
+        * Reply with a simple error code and terminate the transaction.
+        */
+       cmd = DMSG_LNK_ERROR;
+
+       /*
+        * Check if our direction has even been initiated yet, set CREATE.
+        *
+        * Check what direction this is (command or reply direction).  Note
+        * that txcmd might not have been initiated yet.
+        *
+        * If our direction has already been closed we just return without
+        * doing anything.
+        */
+       if (state) {
+               if (state->txcmd & DMSGF_DELETE)
+                       return;
+               if ((state->txcmd & DMSGF_CREATE) == 0)
+                       cmd |= DMSGF_CREATE;
+               if (state->txcmd & DMSGF_REPLY)
+                       cmd |= DMSGF_REPLY;
+               cmd |= DMSGF_DELETE;
+       } else {
+               if ((state->txcmd & DMSGF_REPLY) == 0)
+                       cmd |= DMSGF_REPLY;
+       }
+
+       /* XXX messy mask cmd to avoid allocating state */
+       nmsg = kdmsg_msg_alloc(state->iocom, state->circuit,
+                              cmd & DMSGF_BASECMDMASK,
+                              NULL, NULL);
+       nmsg->any.head.cmd = cmd;
+       nmsg->any.head.error = error;
+       nmsg->state = state;
+       kdmsg_msg_write(nmsg);
+}
+
+/*
+ * Reply to a message and continue our side of the transaction.
+ *
+ * If msg->state is non-NULL we are replying to a one-way message and this
+ * function degenerates into the same as kdmsg_msg_reply().
+ */
+void
+kdmsg_state_result(kdmsg_state_t *state, uint32_t error)
+{
+       kdmsg_msg_t *nmsg;
+       uint32_t cmd;
+
+       /*
+        * Return a simple result code, do NOT terminate the transaction.
+        */
+       cmd = DMSG_LNK_ERROR;
+
+       /*
+        * Check if our direction has even been initiated yet, set CREATE.
+        *
+        * Check what direction this is (command or reply direction).  Note
+        * that txcmd might not have been initiated yet.
+        *
+        * If our direction has already been closed we just return without
+        * doing anything.
+        */
+       if (state) {
+               if (state->txcmd & DMSGF_DELETE)
+                       return;
+               if ((state->txcmd & DMSGF_CREATE) == 0)
+                       cmd |= DMSGF_CREATE;
+               if (state->txcmd & DMSGF_REPLY)
+                       cmd |= DMSGF_REPLY;
+               /* continuing transaction, do not set MSGF_DELETE */
+       } else {
+               if ((state->txcmd & DMSGF_REPLY) == 0)
+                       cmd |= DMSGF_REPLY;
+       }
+
+       /* XXX messy mask cmd to avoid allocating state */
+       nmsg = kdmsg_msg_alloc(state->iocom, state->circuit,
+                              cmd & DMSGF_BASECMDMASK,
                               NULL, NULL);
        nmsg->any.head.cmd = cmd;
        nmsg->any.head.error = error;
index 0300aeb..7694866 100644 (file)
 static MALLOC_DEFINE(M_DMSG_DISK, "dmsg_disk", "disk dmsg");
 
 static int disk_iocom_reconnect(struct disk *dp, struct file *fp);
-static int disk_msg_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
-static int disk_msg_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg);
+static int disk_rcvdmsg(kdmsg_msg_t *msg);
 
 void
 disk_iocom_init(struct disk *dp)
 {
-       kdmsg_iocom_init(&dp->d_iocom, dp, M_DMSG_DISK,
-                        disk_lnk_rcvmsg,
-                        disk_dbg_rcvmsg,
-                        disk_adhoc_input);
+       kdmsg_iocom_init(&dp->d_iocom, dp,
+                        KDMSG_IOCOMF_AUTOCONN |
+                        KDMSG_IOCOMF_AUTOSPAN |
+                        KDMSG_IOCOMF_AUTOCIRC,
+                        M_DMSG_DISK, disk_rcvdmsg);
 }
 
 void
@@ -113,7 +113,6 @@ static
 int
 disk_iocom_reconnect(struct disk *dp, struct file *fp)
 {
-       kdmsg_msg_t *msg;
        char devname[64];
 
        ksnprintf(devname, sizeof(devname), "%s%d",
@@ -121,107 +120,47 @@ disk_iocom_reconnect(struct disk *dp, struct file *fp)
 
        kdmsg_iocom_reconnect(&dp->d_iocom, fp, devname);
 
-       msg = kdmsg_msg_alloc(&dp->d_iocom.router, DMSG_LNK_CONN | DMSGF_CREATE,
-                             disk_msg_conn_reply, dp);
-       msg->any.lnk_conn.pfs_type = 0;
-       msg->any.lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
-       msg->any.lnk_conn.peer_type = DMSG_PEER_BLOCK;
-       msg->any.lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
-
-       ksnprintf(msg->any.lnk_conn.cl_label,
-                 sizeof(msg->any.lnk_conn.cl_label),
+       dp->d_iocom.auto_lnk_conn.pfs_type = DMSG_PFSTYPE_SERVER;
+       dp->d_iocom.auto_lnk_conn.proto_version = DMSG_SPAN_PROTO_1;
+       dp->d_iocom.auto_lnk_conn.peer_type = DMSG_PEER_BLOCK;
+       dp->d_iocom.auto_lnk_conn.peer_mask = 1LLU << DMSG_PEER_BLOCK;
+       dp->d_iocom.auto_lnk_conn.pfs_mask = (uint64_t)-1;
+       ksnprintf(dp->d_iocom.auto_lnk_conn.cl_label,
+                 sizeof(dp->d_iocom.auto_lnk_conn.cl_label),
                  "%s/%s", hostname, devname);
-       dp->d_iocom.conn_state = msg->state;
-       kdmsg_msg_write(msg);
-
-       return (0);
-}
-
-/*
- * Received reply to our LNK_CONN transaction, indicating LNK_SPAN support.
- * Issue LNK_SPAN.
- */
-static
-int
-disk_msg_conn_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
-{
-       struct disk *dp = state->any.any;
-       kdmsg_msg_t *rmsg;
-
-       if (msg->any.head.cmd & DMSGF_CREATE) {
-               kprintf("DISK LNK_CONN received reply\n");
-               rmsg = kdmsg_msg_alloc(&dp->d_iocom.router,
-                                      DMSG_LNK_SPAN | DMSGF_CREATE,
-                                      disk_msg_span_reply, dp);
-               rmsg->any.lnk_span.pfs_type = 0;
-               rmsg->any.lnk_span.proto_version = DMSG_SPAN_PROTO_1;
-               rmsg->any.lnk_span.peer_type = DMSG_PEER_BLOCK;
-
-               ksnprintf(rmsg->any.lnk_span.cl_label,
-                         sizeof(rmsg->any.lnk_span.cl_label),
-                         "%s/%s%d",
-                         hostname,
-                         dev_dname(dp->d_rawdev),
-                         dkunit(dp->d_rawdev));
-               kdmsg_msg_write(rmsg);
-       }
-       if ((state->txcmd & DMSGF_DELETE) == 0 &&
-           (msg->any.head.cmd & DMSGF_DELETE)) {
-               kprintf("DISK LNK_CONN terminated by remote\n");
-               dp->d_iocom.conn_state = NULL;
-               kdmsg_msg_reply(msg, 0);
+       if (dp->d_info.d_serialno) {
+               ksnprintf(dp->d_iocom.auto_lnk_conn.fs_label,
+                         sizeof(dp->d_iocom.auto_lnk_conn.fs_label),
+                         "%s", dp->d_info.d_serialno);
        }
-       return(0);
-}
-
-/*
- * Reply to our LNK_SPAN.  The transaction is left open.
- */
-static
-int
-disk_msg_span_reply(kdmsg_state_t *state, kdmsg_msg_t *msg)
-{
-       /*struct disk *dp = state->any.any;*/
 
-       kprintf("DISK LNK_SPAN reply received\n");
-       if ((state->txcmd & DMSGF_DELETE) == 0 &&
-           (msg->any.head.cmd & DMSGF_DELETE)) {
-               kdmsg_msg_reply(msg, 0);
+       dp->d_iocom.auto_lnk_span.pfs_type = DMSG_PFSTYPE_SERVER;
+       dp->d_iocom.auto_lnk_span.proto_version = DMSG_SPAN_PROTO_1;
+       dp->d_iocom.auto_lnk_span.peer_type = DMSG_PEER_BLOCK;
+       dp->d_iocom.auto_lnk_span.media.block.bytes =
+                                               dp->d_info.d_media_size;
+       dp->d_iocom.auto_lnk_span.media.block.blksize =
+                                               dp->d_info.d_media_blksize;
+       ksnprintf(dp->d_iocom.auto_lnk_span.cl_label,
+                 sizeof(dp->d_iocom.auto_lnk_span.cl_label),
+                 "%s/%s", hostname, devname);
+       if (dp->d_info.d_serialno) {
+               ksnprintf(dp->d_iocom.auto_lnk_span.fs_label,
+                         sizeof(dp->d_iocom.auto_lnk_span.fs_label),
+                         "%s", dp->d_info.d_serialno);
        }
-       return (0);
-}
 
-int
-disk_lnk_rcvmsg(kdmsg_msg_t *msg)
-{
-       /*struct disk *dp = msg->router->iocom->handle;*/
+       kdmsg_iocom_autoinitiate(&dp->d_iocom, NULL);
 
-       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
-       case DMSG_LNK_CONN | DMSGF_CREATE:
-               /*
-                * reply & leave trans open
-                */
-               kprintf("DISK CONN RECEIVE - (just ignore it)\n");
-               kdmsg_msg_result(msg, 0);
-               break;
-       case DMSG_LNK_SPAN | DMSGF_CREATE:
-               kprintf("DISK SPAN RECEIVE - ADDED FROM CLUSTER\n");
-               break;
-       case DMSG_LNK_SPAN | DMSGF_DELETE:
-               kprintf("DISK SPAN RECEIVE - DELETED FROM CLUSTER\n");
-               break;
-       default:
-               break;
-       }
        return (0);
 }
 
 int
-disk_dbg_rcvmsg(kdmsg_msg_t *msg)
+disk_rcvdmsg(kdmsg_msg_t *msg)
 {
-       /*struct disk *dp = msg->router->iocom->handle;*/
+       struct disk *dp = msg->iocom->handle;
 
-       switch(msg->any.head.cmd & DMSGF_CMDSWMASK) {
+       switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_DBG_SHELL:
                /*
                 * Execute shell command (not supported atm)
@@ -231,23 +170,21 @@ disk_dbg_rcvmsg(kdmsg_msg_t *msg)
        case DMSG_DBG_SHELL | DMSGF_REPLY:
                if (msg->aux_data) {
                        msg->aux_data[msg->aux_size - 1] = 0;
-                       kprintf("DEBUGMSG: %s\n", msg->aux_data);
+                       kprintf("diskiocom: DEBUGMSG: %s\n", msg->aux_data);
                }
                break;
+       case DMSG_BLK_OPEN | DMSGF_CREATE:
+       case DMSG_BLK_READ | DMSGF_CREATE:
+       case DMSG_BLK_WRITE | DMSGF_CREATE:
+       case DMSG_BLK_FLUSH | DMSGF_CREATE:
+       case DMSG_BLK_FREEBLKS | DMSGF_CREATE:
        default:
-               kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
+               kprintf("diskiocom: DISK ADHOC INPUT %s%d cmd %08x\n",
+                       dev_dname(dp->d_rawdev), dkunit(dp->d_rawdev),
+                       msg->any.head.cmd);
+               if (msg->any.head.cmd & DMSGF_CREATE)
+                       kdmsg_msg_reply(msg, DMSG_ERR_NOSUPP);
                break;
        }
        return (0);
 }
-
-int
-disk_adhoc_input(kdmsg_msg_t *msg)
-{
-       struct disk *dp = msg->router->iocom->handle;
-
-       kprintf("DISK ADHOC INPUT %s%d\n",
-               dev_dname(dp->d_rawdev), dkunit(dp->d_rawdev));
-
-       return (0);
-}
index f721aa9..2726460 100644 (file)
 /*
  * Mesh network protocol structures.
  *
- *                             SPAN PROTOCOL
+ *                             CONN PROTOCOL
  *
  * The mesh is constructed from point-to-point streaming links with varying
  * levels of interconnectedness, forming a graph.  Terminii in the graph
  * are entities such as a HAMMER2 PFS or a network mount or other types
  * of nodes.
  *
- * The spanning tree protocol runs symmetrically on every node. Each node
- * transmits a representitive LNK_SPAN out all available connections.  Nodes
- * also receive LNK_SPANs from other nodes (obviously), and must aggregate,
- * reduce, and relay those LNK_SPANs out all available connections, thus
- * propagating the spanning tree.  Any connection failure or topology change
- * causes changes in the LNK_SPAN propagation.
- *
- * Each LNK_SPAN or LNK_SPAN relay represents a virtual circuit for routing
- * purposes.  In addition, each relay is chained in one direction,
- * representing a 1:N fan-out (i.e. one received LNK_SPAN can be relayed out
- * multiple connections).  In order to be able to route a message via a
- * LNK_SPAN over a deterministic route THE MESSAGE CAN ONLY FLOW FROM A
- * REMOTE NODE TOWARDS OUR NODE (N:1 fan-in).
- *
- * This supports the requirement that we have both message serialization
- * and positive feedback if a topology change breaks the chain of VCs
- * the message is flowing over.  A remote node sending a message to us
- * will get positive feedback that the route was broken and can take suitable
- * action to terminate the transaction with an error.
- *
- *                             TRANSACTIONAL REPLIES
- *
- * However, when we receive a command message from a remote node and we want
- * to reply to it, we have a problem.  We want the remote node to have
- * positive feedback if our reply fails to make it, but if we use a virtual
- * circuit based on the remote node's LNK_SPAN to us it will be a DIFFERENT
- * virtual circuit than the one the remote node used to message us.  That's
- * a problem because it means we have no reliable way to notify the remote
- * node if we get notified that our reply has failed.
- *
- * The solution is to first note the fact that the remote chose an optimal
- * route to get to us, so the reverse should be true. The reason the VC
- * might not exist over the same route in the reverse is because there may
- * be multiple paths available with the same distance metric.
- *
- * But this also means that we can adjust the messaging protocols to
- * propagate a LNK_SPAN from the remote to us WHILE the remote's command
- * message is being sent to us, and it will not only likely be optimal but
- * it might also already exist, and it will also guarantee that a reply
- * failure will propagate back to both sides (because even though each
- * direction is using a different VC chain, the two chains are still
- * going along the same path).
- *
- * We communicate the return VC by having the relay adjust both the target
- * and the source fields in the message, rather than just the target, on
- * each relay.  As of when the message gets to us the 'source' field will
- * represent the VC for the return direction (and of course also identify
- * the node the message came from).
- *
- * This way both sides get positive feedback if a topology change disrupts
- * the VC for the transaction.  We also get one additional guarantee, and
- * that is no spurious messages.  Messages simply die when the VC they are
- * traveling over is broken, in either direction, simple as that.
- * It makes managing message transactional states very easy.
+ * Upon connecting and after authentication, a LNK_CONN transaction is opened
+ * on circuit 0 by both ends.  This configures and enables the SPAN protocol.
+ * The LNK_CONN transaction remains open for the life of the connection.
+ *
+ *                             SPAN PROTOCOL
+ *
+ * Once enabled, termini transmits a representitive LNK_SPAN out all
+ * available connections advertising what it is.  Nodes maintaing multiple
+ * connections will relay received LNK_SPANs out available connections
+ * with some filtering based on the CONN configuration.  A distance metric
+ * and per-node random value (rnss) is aggregated.
+ *
+ * Since LNK_SPANs can rapidly multiply in a complex graph, not all incoming
+ * LNK_SPANs will be relayed.  Only the top N over all collect LNK_SPANs for
+ * any given advertisement are relayed.
+ *
+ * It is possible to code the SPANning tree algorithm to guarantee that
+ * symmetrical spans will be generated after stabilization.  The RNSS field
+ * is used to help distinguish and reduce paths in complex graphs when
+ * symmetric spans are desired.  We always generate RNSS but we currently do
+ * not implement symmetrical SPAN guarantees.
+ *
+ *                             CIRC PROTOCOL
+ *
+ * We aren't done yet.  Before transactions can be relayed, symmetric paths
+ * must be formed via the LNK_CIRC protocol.  The LNK_CIRC protocol
+ * establishes a virtual circuit from any node to any other node, creating
+ * a circuit id which is stored in dmsg_hdr.circuit.  Messages received on
+ * one side or forwarded to the other.  Forwarded messages bypass normal
+ * state tracking.
+ *
+ * A virtual circuit is forged by working the propogated SPANs backwards.
+ * Each node in the graph helps propagate the virtual circuit by attach the
+ * LNK_CIRC transaction it receives to a LNK_CIRC transaction it initiates
+ * out the other interface.
+ *
+ * Since SPANs are link-state transactions any change in related span(s)
+ * will also force-terminate VC's using those spans.
  *
  *                     MESSAGE TRANSACTIONAL STATES
  *
  * data in one-way (non-transactional) messages is typically required to be
  * inline.  CRCs are still recommended and required at the beginning, but
  * may be negotiated away later.
- *
- *                      MULTI-PATH MESSAGE DUPLICATION
- *
- * Redundancy can be negotiated but is not required in the current spec.
- * Basically you send the same message, with the same msgid, via several
- * paths to the target.  The msgid is the rendezvous.  The first copy that
- * makes it to the target is used, the second is ignored.  Similarly for
- * replies.  This can improve performance during span flapping.  Only
- * transactional messages will be serialized.  The target might receive
- * multiple copies of one-way messages in higher protocol layers (potentially
- * out of order, too).
  */
 struct dmsg_hdr {
        uint16_t        magic;          /* 00 sanity, synchro, endian */
@@ -208,8 +181,8 @@ struct dmsg_hdr {
        uint32_t        salt;           /* 04 random salt helps w/crypto */
 
        uint64_t        msgid;          /* 08 message transaction id */
-       uint64_t        source;         /* 10 originator or 0   */
-       uint64_t        target;         /* 18 destination or 0  */
+       uint64_t        circuit;        /* 10 circuit id or 0   */
+       uint64_t        reserved18;     /* 18 */
 
        uint32_t        cmd;            /* 20 flags | cmd | hdr_size / ALIGN */
        uint32_t        aux_crc;        /* 24 auxillary data crc */
@@ -323,11 +296,11 @@ typedef struct dmsg_hdr dmsg_hdr_t;
  * Link layer ops basically talk to just the other side of a direct
  * connection.
  *
- * LNK_PAD     - One-way message on link-0, ignored by target.  Used to
+ * LNK_PAD     - One-way message on circuit 0, ignored by target.  Used to
  *               pad message buffers on shared-memory transports.  Not
  *               typically used with TCP.
  *
- * LNK_PING    - One-way message on link-0, keep-alive, run by both sides
+ * LNK_PING    - One-way message on circuit-0, keep-alive, run by both sides
  *               typically 1/sec on idle link, link is lost after 10 seconds
  *               of inactivity.
  *
@@ -337,21 +310,25 @@ typedef struct dmsg_hdr dmsg_hdr_t;
  *               authentication is complete.  This message also identifies
  *               the host.
  *
- * LNK_CONN    - Enable the SPAN protocol on link-0, possibly also installing
- *               a PFS filter (by cluster id, unique id, and/or wildcarded
- *               name).
+ * LNK_CONN    - Enable the SPAN protocol on circuit-0, possibly also
+ *               installing a PFS filter (by cluster id, unique id, and/or
+ *               wildcarded name).
  *
- * LNK_SPAN    - A SPAN transaction on link-0 enables messages to be relayed
- *               to/from a particular cluster node.  SPANs are received,
- *               sorted, aggregated, and retransmitted back out across all
- *               applicable connections.
+ * LNK_SPAN    - A SPAN transaction on circuit-0 enables messages to be
+ *               relayed to/from a particular cluster node.  SPANs are
+ *               received, sorted, aggregated, filtered, and retransmitted
+ *               back out across all applicable connections.
  *
  *               The leaf protocol also uses this to make a PFS available
  *               to the cluster (e.g. on-mount).
  *
+ * LNK_CIRC    - a CIRC transaction establishes a circuit from source to
+ *               target by creating pairs of open transactions across each
+ *               hop.
+ *
  * LNK_VOLCONF - Volume header configuration change.  All hammer2
  *               connections (hammer2 connect ...) stored in the volume
- *               header are spammed at the link level to the hammer2
+ *               header are spammed on circuit 0 to the hammer2
  *               service daemon, and any live configuration change
  *               thereafter.
  */
@@ -360,22 +337,12 @@ typedef struct dmsg_hdr dmsg_hdr_t;
 #define DMSG_LNK_AUTH          DMSG_LNK(0x010, dmsg_lnk_auth)
 #define DMSG_LNK_CONN          DMSG_LNK(0x011, dmsg_lnk_conn)
 #define DMSG_LNK_SPAN          DMSG_LNK(0x012, dmsg_lnk_span)
+#define DMSG_LNK_CIRC          DMSG_LNK(0x013, dmsg_lnk_circ)
 #define DMSG_LNK_VOLCONF       DMSG_LNK(0x020, dmsg_lnk_volconf)
 #define DMSG_LNK_ERROR         DMSG_LNK(0xFFF, dmsg_hdr)
 
 /*
- * LNK_CONN - Register connection for SPAN (transaction, left open)
- *
- * One LNK_CONN transaction may be opened on a stream connection, registering
- * the connection with the SPAN subsystem and allowing the subsystem to
- * accept and relay SPANs to this connection.
- *
- * The LNK_CONN message may contain a filter, limiting the desireable SPANs.
- *
- * This message contains a lot of the same info that a SPAN message contains,
- * but is not a SPAN.  That is, without this message the SPAN subprotocol will
- * not be executed on the connection, nor is this message a promise that the
- * sending end is a client or node of a cluster.
+ * LNK_AUTH - Authentication (often omitted)
  */
 struct dmsg_lnk_auth {
        dmsg_hdr_t      head;
@@ -383,17 +350,20 @@ struct dmsg_lnk_auth {
 };
 
 /*
- * LNK_CONN identifies a streaming connection into the cluster.  The other
- * fields serve as a filter when supported for a particular peer and are
- * not necessarily all used.
+ * LNK_CONN - Register connection info for SPAN protocol
+ *           (transaction, left open, circuit 0 only).
  *
- * peer_mask serves to filter the SPANs we receive by peer.  A cluster
- * controller typically sets this to (uint64_t)-1, a block devfs
- * interface might set it to 1 << DMSG_PEER_DISK, and a hammer2
- * mount might set it to 1 << DMSG_PEER_HAMMER2.
+ * LNK_CONN identifies a streaming connection into the cluster and serves
+ * to identify, enable, and specify filters for the SPAN protocol.
+ *
+ * peer_mask serves to filter the SPANs we receive by peer_type.  A cluster
+ * controller typically sets this to (uint64_t)-1, indicating that it wants
+ * everything.  A block devfs interface might set it to 1 << DMSG_PEER_DISK,
+ * and a hammer2 mount might set it to 1 << DMSG_PEER_HAMMER2.
  *
  * mediaid allows multiple (e.g. HAMMER2) connections belonging to the same
- * media, in terms of LNK_VOLCONF updates.
+ * media to transmit duplicative LNK_VOLCONF updates without causing
+ * confusion in the cluster controller.
  *
  * pfs_clid, pfs_fsid, pfs_type, and label are peer-specific and must be
  * left empty (zero-fill) if not supported by a particular peer.
@@ -412,9 +382,10 @@ struct dmsg_lnk_conn {
        uint8_t         pfs_type;       /* pfs type */
        uint16_t        proto_version;  /* high level protocol support */
        uint32_t        status;         /* status flags */
+       uint32_t        rnss;           /* node's generated rnss */
        uint8_t         reserved02[8];
-       int32_t         dist;           /* span distance */
-       uint32_t        reserved03[14];
+       uint32_t        reserved03[12];
+       uint64_t        pfs_mask;       /* PFS mask for SPAN filtering */
        char            cl_label[128];  /* cluster label (for PEER_BLOCK) */
        char            fs_label[128];  /* PFS label (for PEER_HAMMER2) */
 };
@@ -430,7 +401,8 @@ typedef struct dmsg_lnk_conn dmsg_lnk_conn_t;
 #define DMSG_PFSTYPE_SOFT_SLAVE        6
 #define DMSG_PFSTYPE_SOFT_MASTER 7
 #define DMSG_PFSTYPE_MASTER    8
-#define DMSG_PFSTYPE_MAX       9       /* 0-8 */
+#define DMSG_PFSTYPE_SERVER    9
+#define DMSG_PFSTYPE_MAX       10      /* 0-9 */
 
 #define DMSG_PEER_NONE         0
 #define DMSG_PEER_CLUSTER      1       /* a cluster controller */
@@ -438,12 +410,23 @@ typedef struct dmsg_lnk_conn dmsg_lnk_conn_t;
 #define DMSG_PEER_HAMMER2      3       /* hammer2-mounted volumes */
 
 /*
- * LNK_SPAN - Relay a SPAN (transaction, left open)
+ * Structures embedded in LNK_SPAN
+ */
+struct dmsg_media_block {
+       uint64_t        bytes;          /* media size in bytes */
+       uint32_t        blksize;        /* media block size */
+};
+
+typedef struct dmsg_media_block dmsg_media_block_t;
+
+/*
+ * LNK_SPAN - Initiate or relay a SPAN
+ *           (transaction, left open, circuit 0 only)
  *
- * This message registers a PFS/PFS_TYPE with the other end of the connection,
- * telling the other end who we are and what we can provide or what we want
- * to consume.  Multiple registrations can be maintained as open transactions
- * with each one specifying a unique {source} linkid.
+ * This message registers an end-point with the other end of the connection,
+ * telling the other end who we are and what we can provide or intend to
+ * consume.  Multiple registrations can be maintained as open transactions
+ * with each one specifying a unique end-point.
  *
  * Registrations are sent from {source}=S {1...n} to {target}=0 and maintained
  * as open transactions.  Registrations are also received and maintains as
@@ -472,26 +455,42 @@ typedef struct dmsg_lnk_conn dmsg_lnk_conn_t;
  * server-style PFS_TYPE and rendezvous at a cluster controller.
  *
  * The cluster controller does not aggregate/pass-on all received
- * registrations.  It typically filters what gets passed on based on
- * what it receives.
+ * registrations.  It typically filters what gets passed on based on what it
+ * receives, passing on only the best candidates.
+ *
+ * If a symmetric spanning tree is desired additional candidates whos
+ * {dist, rnss} fields match the last best candidate must also be propagated.
+ * This feature is not currently enabled.
  *
  * STATUS UPDATES: Status updates use the same structure but typically
- *                only contain incremental changes to pfs_type, with the
- *                label field containing a text status.
+ *                only contain incremental changes to e.g. pfs_type, with
+ *                a text description sent as out-of-band data.
  */
 struct dmsg_lnk_span {
        dmsg_hdr_t      head;
        uuid_t          pfs_clid;       /* rendezvous pfs uuid */
-       uuid_t          pfs_fsid;       /* unique pfs uuid */
+       uuid_t          pfs_fsid;       /* unique pfs id (differentiate node) */
        uint8_t         pfs_type;       /* PFS type */
        uint8_t         peer_type;      /* PEER type */
        uint16_t        proto_version;  /* high level protocol support */
        uint32_t        status;         /* status flags */
        uint8_t         reserved02[8];
-       int32_t         dist;           /* span distance */
-       uint32_t        reserved03[15];
-       char            cl_label[128];  /* cluster label (for PEER_BLOCK) */
-       char            fs_label[128];  /* PFS label (for PEER_HAMMER2) */
+       uint32_t        dist;           /* span distance */
+       uint32_t        rnss;           /* random number sub-sort */
+       union {
+               uint32_t        reserved03[14];
+               dmsg_media_block_t block;
+       } media;
+
+       /*
+        * NOTE: for PEER_HAMMER2 cl_label is typically empty and fs_label
+        *       is the superroot directory name.
+        *
+        *       for PEER_BLOCK cl_label is typically host/device and
+        *       fs_label is typically the serial number string.
+        */
+       char            cl_label[128];  /* cluster label */
+       char            fs_label[128];  /* PFS label */
 };
 
 typedef struct dmsg_lnk_span dmsg_lnk_span_t;
@@ -499,10 +498,29 @@ typedef struct dmsg_lnk_span dmsg_lnk_span_t;
 #define DMSG_SPAN_PROTO_1      1
 
 /*
- * LNK_VOLCONF
+ * LNK_CIRC - Establish a circuit
+ *           (transaction, left open, circuit 0 only)
+ *
+ * Establish a circuit to the specified target.  The msgid for the open
+ * transaction is used to transit messages in both directions.
+ *
+ * For circuit establishment the receiving entity looks up the outgoing
+ * relayed SPAN on the incoming iocom based on the target field and then
+ * creates peer circuit on the interface the SPAN originally came in on.
+ * Messages received on one side or forwarded to the other side and vise-versa.
+ * Any link state loss causes all related circuits to be lost.
  */
+struct dmsg_lnk_circ {
+       dmsg_hdr_t      head;
+       uint64_t        reserved01;
+       uint64_t        target;
+};
+
+typedef struct dmsg_lnk_circ dmsg_lnk_circ_t;
 
 /*
+ * LNK_VOLCONF
+ *
  * All HAMMER2 directories directly under the super-root on your local
  * media can be mounted separately, even if they share the same physical
  * device.
@@ -648,6 +666,94 @@ typedef struct dmsg_dbg_shell dmsg_dbg_shell_t;
 #define DMSG_QRM_COMMIT                DMSG_QRM(0x001, dmsg_qrm_commit)
 
 /*
+ * DMSG_PROTO_BLK Protocol
+ *
+ * BLK_OPEN    - Open device.  This transaction must be left open for the
+ *               duration and the returned keyid passed in all associated
+ *               BLK commands.
+ *
+ * BLK_READ    - Strategy read
+ *
+ * BLK_WRITE   - Strategy write
+ *
+ * BLK_FLUSH   - Strategy flush
+ */
+#define DMSG_BLK_OPEN          DMSG_BLK(0x001, dmsg_blk_open)
+#define DMSG_BLK_READ          DMSG_BLK(0x002, dmsg_blk_read)
+#define DMSG_BLK_WRITE         DMSG_BLK(0x003, dmsg_blk_write)
+#define DMSG_BLK_FLUSH         DMSG_BLK(0x004, dmsg_blk_flush)
+#define DMSG_BLK_FREEBLKS      DMSG_BLK(0x005, dmsg_blk_freeblks)
+#define DMSG_BLK_ERROR         DMSG_BLK(0xFFF, dmsg_blk_error)
+
+struct dmsg_blk_open {
+       dmsg_hdr_t      head;
+       uint32_t        modes;
+       uint32_t        reserved01;
+};
+
+#define DMSG_BLKOPEN_RD                0x0001
+#define DMSG_BLKOPEN_WR                0x0002
+
+/*
+ * DMSG_LNK_ERROR is returned for simple results,
+ * DMSG_BLK_ERROR is returned for extended results.
+ */
+struct dmsg_blk_error {
+       dmsg_hdr_t      head;
+       uint64_t        keyid;
+       uint32_t        resid;
+       uint32_t        reserved02;
+       char            buf[64];
+};
+
+struct dmsg_blk_read {
+       dmsg_hdr_t      head;
+       uint64_t        keyid;
+       uint64_t        offset;
+       uint32_t        bytes;
+       uint32_t        flags;
+       uint32_t        reserved01;
+       uint32_t        reserved02;
+};
+
+struct dmsg_blk_write {
+       dmsg_hdr_t      head;
+       uint64_t        keyid;
+       uint64_t        offset;
+       uint32_t        bytes;
+       uint32_t        flags;
+       uint32_t        reserved01;
+       uint32_t        reserved02;
+};
+
+struct dmsg_blk_flush {
+       dmsg_hdr_t      head;
+       uint64_t        keyid;
+       uint64_t        offset;
+       uint32_t        bytes;
+       uint32_t        flags;
+       uint32_t        reserved01;
+       uint32_t        reserved02;
+};
+
+struct dmsg_blk_freeblks {
+       dmsg_hdr_t      head;
+       uint64_t        keyid;
+       uint64_t        offset;
+       uint32_t        bytes;
+       uint32_t        flags;
+       uint32_t        reserved01;
+       uint32_t        reserved02;
+};
+
+typedef struct dmsg_blk_open           dmsg_blk_open_t;
+typedef struct dmsg_blk_read           dmsg_blk_read_t;
+typedef struct dmsg_blk_write          dmsg_blk_write_t;
+typedef struct dmsg_blk_flush          dmsg_blk_flush_t;
+typedef struct dmsg_blk_freeblks       dmsg_blk_freeblks_t;
+typedef struct dmsg_blk_error          dmsg_blk_error_t;
+
+/*
  * NOTE!!!! ALL EXTENDED HEADER STRUCTURES MUST BE 64-BYTE ALIGNED!!!
  *
  * General message errors
@@ -656,13 +762,23 @@ typedef struct dmsg_dbg_shell dmsg_dbg_shell_t;
  *     0x20 - 0x2F     Global errors
  */
 #define DMSG_ERR_NOSUPP                0x20
+#define DMSG_ERR_LOSTLINK      0x21
 
 union dmsg_any {
        char                    buf[DMSG_HDR_MAX];
        dmsg_hdr_t              head;
-       dmsg_lnk_span_t         lnk_span;
+
        dmsg_lnk_conn_t         lnk_conn;
+       dmsg_lnk_span_t         lnk_span;
+       dmsg_lnk_circ_t         lnk_circ;
        dmsg_lnk_volconf_t      lnk_volconf;
+
+       dmsg_blk_open_t         blk_open;
+       dmsg_blk_error_t        blk_error;
+       dmsg_blk_read_t         blk_read;
+       dmsg_blk_write_t        blk_write;
+       dmsg_blk_flush_t        blk_flush;
+       dmsg_blk_freeblks_t     blk_freeblks;
 };
 
 typedef union dmsg_any dmsg_any_t;
@@ -673,25 +789,11 @@ typedef union dmsg_any dmsg_any_t;
 #if defined(_KERNEL) || defined(_KERNEL_STRUCTURES)
 
 struct hammer2_pfsmount;
-struct kdmsg_router;
 struct kdmsg_iocom;
 struct kdmsg_state;
 struct kdmsg_msg;
 
 /*
- * Structure used to represent a virtual circuit for a messaging
- * route.  Typically associated from hammer2_state but the hammer2_pfsmount
- * structure also has one to represent the point-to-point link.
- */
-struct kdmsg_router {
-       struct kdmsg_iocom      *iocom;
-       struct kdmsg_state      *state;         /* received LNK_SPAN state */
-       uint64_t                target;         /* target */
-};
-
-typedef struct kdmsg_router kdmsg_router_t;
-
-/*
  * msg_ctl flags (atomic)
  */
 #define KDMSG_CLUSTERCTL_KILL          0x00000001
@@ -700,16 +802,38 @@ typedef struct kdmsg_router kdmsg_router_t;
 #define KDMSG_CLUSTERCTL_SLEEPING      0x00000008 /* interlocked w/msglk */
 
 /*
+ * When the KDMSG_IOCOMF_AUTOCIRC flag is set the kdmsg code in
+ * the kernel automatically tries to forge a virtual circuit for
+ * any active SPAN state received.
+ *
+ * This is only done when the received SPANs are significantly filtered
+ * by the transmitted LNK_CONN.  That is, it is done only by clients who
+ * connect to specific services over the cluster.
+ */
+struct kdmsg_circuit {
+       TAILQ_ENTRY(kdmsg_circuit) entry;
+       struct kdmsg_iocom      *iocom;
+       struct kdmsg_state      *span_state;
+       struct kdmsg_state      *circ_state;
+       int                     recorded;       /* used by shim */
+       int                     weight;
+};
+
+typedef struct kdmsg_circuit kdmsg_circuit_t;
+
+/*
  * Transactional state structure, representing an open transaction.  The
  * transaction might represent a cache state (and thus have a chain
  * association), or a VOP op, LNK_SPAN, or other things.
  */
 struct kdmsg_state {
        RB_ENTRY(kdmsg_state) rbnode;           /* indexed by msgid */
-       struct kdmsg_router *router;            /* related LNK_SPAN route */
+       struct kdmsg_iocom *iocom;
+       uint64_t        circuit;
+       uint32_t        icmd;                   /* record cmd creating state */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
-       uint64_t        msgid;                  /* {spanid,msgid} uniq */
+       uint64_t        msgid;                  /* {circuit,msgid} uniq */
        int             flags;
        int             error;
        void            *chain;                 /* (caller's state) */
@@ -718,6 +842,7 @@ struct kdmsg_state {
        union {
                void *any;
                struct hammer2_pfsmount *pmp;
+               struct kdmsg_circuit *circ;
        } any;
 };
 
@@ -727,14 +852,17 @@ struct kdmsg_state {
 
 struct kdmsg_msg {
        TAILQ_ENTRY(kdmsg_msg) qentry;          /* serialized queue */
-       struct kdmsg_router *router;
+       struct kdmsg_iocom *iocom;
        struct kdmsg_state *state;
        size_t          hdr_size;
        size_t          aux_size;
        char            *aux_data;
+       int             flags;
        dmsg_any_t      any;
 };
 
+#define KDMSG_FLAG_AUXALLOC    0x0001
+
 typedef struct kdmsg_link kdmsg_link_t;
 typedef struct kdmsg_state kdmsg_state_t;
 typedef struct kdmsg_msg kdmsg_msg_t;
@@ -755,38 +883,46 @@ struct kdmsg_iocom {
        thread_t                msgwr_td;       /* cluster thread */
        int                     msg_ctl;        /* wakeup flags */
        int                     msg_seq;        /* cluster msg sequence id */
-       uint32_t                reserved01;
+       uint32_t                flags;
        struct lock             msglk;          /* lockmgr lock */
        TAILQ_HEAD(, kdmsg_msg) msgq;           /* transmit queue */
        void                    *handle;
-       int                     (*lnk_rcvmsg)(kdmsg_msg_t *msg);
-       int                     (*dbg_rcvmsg)(kdmsg_msg_t *msg);
-       int                     (*misc_rcvmsg)(kdmsg_msg_t *msg);
+       void                    (*auto_callback)(kdmsg_msg_t *);
+       int                     (*rcvmsg)(kdmsg_msg_t *);
        void                    (*exit_func)(struct kdmsg_iocom *);
        struct kdmsg_state      *conn_state;    /* active LNK_CONN state */
        struct kdmsg_state      *freerd_state;  /* allocation cache */
        struct kdmsg_state      *freewr_state;  /* allocation cache */
        struct kdmsg_state_tree staterd_tree;   /* active messages */
        struct kdmsg_state_tree statewr_tree;   /* active messages */
-       struct kdmsg_router     router;
+       dmsg_lnk_conn_t         auto_lnk_conn;
+       dmsg_lnk_span_t         auto_lnk_span;
 };
 
 typedef struct kdmsg_iocom     kdmsg_iocom_t;
 
+#define KDMSG_IOCOMF_AUTOCONN  0x0001  /* handle received LNK_CONN */
+#define KDMSG_IOCOMF_AUTOSPAN  0x0002  /* handle received LNK_SPAN */
+#define KDMSG_IOCOMF_AUTOCIRC  0x0004  /* handle received LNK_CIRC */
+#define KDMSG_IOCOMF_AUTOFORGE 0x0008  /* auto initiate LNK_CIRC */
+
+#define KDMSG_IOCOMF_AUTOANY   (KDMSG_IOCOMF_AUTOCONN |        \
+                                KDMSG_IOCOMF_AUTOSPAN |        \
+                                KDMSG_IOCOMF_AUTOCIRC)
+
 uint32_t kdmsg_icrc32(const void *buf, size_t size);
 uint32_t kdmsg_icrc32c(const void *buf, size_t size, uint32_t crc);
 
 /*
  * kern_dmsg.c
  */
-void kdmsg_iocom_init(kdmsg_iocom_t *iocom,
-                       void *handle,
+void kdmsg_iocom_init(kdmsg_iocom_t *iocom, void *handle, u_int32_t flags,
                        struct malloc_type *mmsg,
-                       int (*lnk_rcvmsg)(kdmsg_msg_t *msg),
-                       int (*dbg_rcvmsg)(kdmsg_msg_t *msg),
-                       int (*misc_rcvmsg)(kdmsg_msg_t *msg));
+                       int (*rcvmsg)(kdmsg_msg_t *msg));
 void kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
                        const char *subsysname);
+void kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom,
+                       void (*conn_callback)(kdmsg_msg_t *msg));
 void kdmsg_iocom_uninit(kdmsg_iocom_t *iocom);
 void kdmsg_drain_msgq(kdmsg_iocom_t *iocom);
 
@@ -797,12 +933,15 @@ void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
 int kdmsg_msg_execute(kdmsg_msg_t *msg);
 void kdmsg_state_free(kdmsg_state_t *state);
 void kdmsg_msg_free(kdmsg_msg_t *msg);
-kdmsg_msg_t *kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
+kdmsg_msg_t *kdmsg_msg_alloc(kdmsg_iocom_t *iocom, uint64_t circuit,
+                               uint32_t cmd,
                                int (*func)(kdmsg_state_t *, kdmsg_msg_t *),
                                void *data);
 void kdmsg_msg_write(kdmsg_msg_t *msg);
 void kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error);
 void kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error);
+void kdmsg_state_reply(kdmsg_state_t *state, uint32_t error);
+void kdmsg_state_result(kdmsg_state_t *state, uint32_t error);
 
 #endif