hammer2 - SPAN protocol work, router work
authorMatthew Dillon <dillon@apollo.backplane.com>
Thu, 9 Aug 2012 06:25:23 +0000 (23:25 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Thu, 9 Aug 2012 06:25:23 +0000 (23:25 -0700)
* Fix SPAN relay sort and sequencing bugs.

* Start reworking the APIs to accomodate routed messages.  Start by
  creating a hammer2_router structure and adjusting most of the msg
  functions to pass it instead of the iocom.

* Fix hammer2_state races by moving the state allocation to
  hammer2_msg_alloc() instead of hammer2_msg_write(). This gives
  code a chance to assign the state->any.* field without having to
  worry about the state getting ripped out from under us.

sbin/hammer2/TODO [new file with mode: 0644]
sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c
sbin/hammer2/network.h

diff --git a/sbin/hammer2/TODO b/sbin/hammer2/TODO
new file mode 100644 (file)
index 0000000..33e49f9
--- /dev/null
@@ -0,0 +1,9 @@
+
+TODO:
+       msg_lnk.c  msg_lnk_signal rescans all connections.  What we really
+                  need to do is queue the conn (when possible) or the node
+                  (when possible), or just flag it w/ a global, and then
+                  only scan a subset.
+
+       link->dist propagation is still a bit screwy when there are a
+       lot of parallel network links.
index 0c098a6..f79117a 100644 (file)
@@ -37,9 +37,9 @@
 
 #define SHOW_TAB       2
 
 
 #define SHOW_TAB       2
 
-static void shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void shell_rcvmsg(hammer2_msg_t *msg);
 static void shell_ttymsg(hammer2_iocom_t *iocom);
 static void shell_ttymsg(hammer2_iocom_t *iocom);
-static void hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void hammer2_shell_parse(hammer2_msg_t *msg);
 
 /************************************************************************
  *                                 SHELL                               *
 
 /************************************************************************
  *                                 SHELL                               *
@@ -66,8 +66,9 @@ cmd_shell(const char *hostname)
        fcntl(0, F_SETFL, O_NONBLOCK);
        printf("debug: connected\n");
 
        fcntl(0, F_SETFL, O_NONBLOCK);
        printf("debug: connected\n");
 
-       msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
-       hammer2_msg_write(&iocom, msg, NULL, NULL, NULL);
+       msg = hammer2_msg_alloc(&iocom.router, 0, HAMMER2_DBG_SHELL,
+                               NULL, NULL);
+       hammer2_msg_write(msg);
        hammer2_iocom_core(&iocom);
        fprintf(stderr, "debug: disconnected\n");
        close(fd);
        hammer2_iocom_core(&iocom);
        fprintf(stderr, "debug: disconnected\n");
        close(fd);
@@ -80,7 +81,7 @@ cmd_shell(const char *hostname)
  */
 static
 void
  */
 static
 void
-shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+shell_rcvmsg(hammer2_msg_t *msg)
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
        case HAMMER2_LNK_ERROR:
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
        case HAMMER2_LNK_ERROR:
@@ -105,7 +106,7 @@ shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 * We send the commands, not accept them.
                 * (one-way message, not transactional)
                 */
                 * We send the commands, not accept them.
                 * (one-way message, not transactional)
                 */
-               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                break;
        case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                /*
                break;
        case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                /*
@@ -119,7 +120,7 @@ shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                break;
        case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
                fprintf(stderr, "Debug Shell is ignoring received LNK_CONN\n");
                break;
        case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
                fprintf(stderr, "Debug Shell is ignoring received LNK_CONN\n");
-               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                break;
        case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE:
                break;
                break;
        case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE:
                break;
@@ -130,9 +131,9 @@ shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 */
                fprintf(stderr, "Unknown message: %s\n", hammer2_msg_str(msg));
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
                 */
                fprintf(stderr, "Unknown message: %s\n", hammer2_msg_str(msg));
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
-                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+                       hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
                if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
-                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+                       hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                break;
        }
 }
                break;
        }
 }
@@ -150,9 +151,10 @@ shell_ttymsg(hammer2_iocom_t *iocom)
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
-               msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
+               msg = hammer2_msg_alloc(&iocom->router, len, HAMMER2_DBG_SHELL,
+                                       NULL, NULL);
                bcopy(buf, msg->aux_data, len);
                bcopy(buf, msg->aux_data, len);
-               hammer2_msg_write(iocom, msg, NULL, NULL, NULL);
+               hammer2_msg_write(msg);
        } else if (feof(stdin)) {
                /*
                 * Set EOF flag without setting any error code for normal
        } else if (feof(stdin)) {
                /*
                 * Set EOF flag without setting any error code for normal
@@ -170,7 +172,7 @@ shell_ttymsg(hammer2_iocom_t *iocom)
  * then finish up by outputting another prompt.
  */
 void
  * then finish up by outputting another prompt.
  */
 void
-hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_dbg(hammer2_msg_t *msg)
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
        case HAMMER2_DBG_SHELL:
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
        case HAMMER2_DBG_SHELL:
@@ -180,8 +182,8 @@ hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 */
                if (msg->aux_data)
                        msg->aux_data[msg->aux_size - 1] = 0;
                 */
                if (msg->aux_data)
                        msg->aux_data[msg->aux_size - 1] = 0;
-               hammer2_shell_parse(iocom, msg);
-               hammer2_msg_reply(iocom, msg, 0);
+               hammer2_shell_parse(msg);
+               hammer2_msg_reply(msg, 0);
                break;
        case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                /*
                break;
        case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                /*
@@ -194,37 +196,38 @@ hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        write(2, msg->aux_data, strlen(msg->aux_data));
                break;
        default:
                        write(2, msg->aux_data, strlen(msg->aux_data));
                break;
        default:
-               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                break;
        }
 }
 
                break;
        }
 }
 
-static void shell_span(hammer2_iocom_t *iocom, char *cmdbuf);
-/*static void shell_tree(hammer2_iocom_t *iocom, char *cmdbuf);*/
+static void shell_span(hammer2_router_t *router, char *cmdbuf);
+/*static void shell_tree(hammer2_router_t *router, char *cmdbuf);*/
 
 static void
 
 static void
-hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_shell_parse(hammer2_msg_t *msg)
 {
 {
+       hammer2_router_t *router = msg->router;
        char *cmdbuf = msg->aux_data;
        char *cmd = strsep(&cmdbuf, " \t");
 
        if (cmd == NULL || *cmd == 0) {
                ;
        } else if (strcmp(cmd, "span") == 0) {
        char *cmdbuf = msg->aux_data;
        char *cmd = strsep(&cmdbuf, " \t");
 
        if (cmd == NULL || *cmd == 0) {
                ;
        } else if (strcmp(cmd, "span") == 0) {
-               shell_span(iocom, cmdbuf);
+               shell_span(router, cmdbuf);
        } else if (strcmp(cmd, "tree") == 0) {
        } else if (strcmp(cmd, "tree") == 0) {
-               shell_tree(iocom, cmdbuf);
+               shell_tree(router, cmdbuf);
        } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
        } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
-               iocom_printf(iocom, "help            Command help\n");
-               iocom_printf(iocom, "span <host>     Span to target host\n");
-               iocom_printf(iocom, "tree            Dump spanning tree\n");
+               router_printf(router, "help            Command help\n");
+               router_printf(router, "span <host>     Span to target host\n");
+               router_printf(router, "tree            Dump spanning tree\n");
        } else {
        } else {
-               iocom_printf(iocom, "Unrecognized command: %s\n", cmd);
+               router_printf(router, "Unrecognized command: %s\n", cmd);
        }
 }
 
 static void
        }
 }
 
 static void
-shell_span(hammer2_iocom_t *iocom, char *cmdbuf)
+shell_span(hammer2_router_t *router, char *cmdbuf)
 {
        const char *hostname = strsep(&cmdbuf, " \t");
        pthread_t thread;
 {
        const char *hostname = strsep(&cmdbuf, " \t");
        pthread_t thread;
@@ -243,9 +246,9 @@ shell_span(hammer2_iocom_t *iocom, char *cmdbuf)
         * Start master service
         */
        if (fd < 0) {
         * Start master service
         */
        if (fd < 0) {
-               iocom_printf(iocom, "Connection to %s failed\n", hostname);
+               router_printf(router, "Connection to %s failed\n", hostname);
        } else {
        } else {
-               iocom_printf(iocom, "Connected to %s\n", hostname);
+               router_printf(router, "Connected to %s\n", hostname);
                pthread_create(&thread, NULL,
                               master_service, (void *)(intptr_t)fd);
                /*pthread_join(thread, &res);*/
                pthread_create(&thread, NULL,
                               master_service, (void *)(intptr_t)fd);
                /*pthread_join(thread, &res);*/
@@ -257,11 +260,11 @@ shell_span(hammer2_iocom_t *iocom, char *cmdbuf)
  * not modified and stays intact.  We use a one-way message with REPLY set
  * to distinguish between a debug command and debug terminal output.
  *
  * not modified and stays intact.  We use a one-way message with REPLY set
  * to distinguish between a debug command and debug terminal output.
  *
- * To prevent loops iocom_printf() can filter the message (cmd) related
- * to the iocom_printf().  We filter out DBG messages.
+ * To prevent loops router_printf() can filter the message (cmd) related
+ * to the router_printf().  We filter out DBG messages.
  */
 void
  */
 void
-iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...)
+router_printf(hammer2_router_t *router, const char *ctl, ...)
 {
        hammer2_msg_t *rmsg;
        va_list va;
 {
        hammer2_msg_t *rmsg;
        va_list va;
@@ -273,11 +276,12 @@ iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...)
        va_end(va);
        len = strlen(buf) + 1;
 
        va_end(va);
        len = strlen(buf) + 1;
 
-       rmsg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL |
-                                            HAMMER2_MSGF_REPLY);
+       rmsg = hammer2_msg_alloc(router, len, HAMMER2_DBG_SHELL |
+                                             HAMMER2_MSGF_REPLY,
+                                NULL, NULL);
        bcopy(buf, rmsg->aux_data, len);
 
        bcopy(buf, rmsg->aux_data, len);
 
-       hammer2_msg_write(iocom, rmsg, NULL, NULL, NULL);
+       hammer2_msg_write(rmsg);
 }
 
 /************************************************************************
 }
 
 /************************************************************************
index 4b7207c..82ca26b 100644 (file)
 #include "hammer2.h"
 
 static void *master_accept(void *data);
 #include "hammer2.h"
 
 static void *master_accept(void *data);
-static void master_auth_state(hammer2_iocom_t *iocom);
-static void master_auth_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-static void master_link_state(hammer2_iocom_t *iocom);
-static void master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void master_auth_signal(hammer2_router_t *router);
+static void master_auth_rxmsg(hammer2_msg_t *msg);
+static void master_link_signal(hammer2_router_t *router);
+static void master_link_rxmsg(hammer2_msg_t *msg);
 
 /*
  * Start-up the master listener daemon for the machine.
 
 /*
  * Start-up the master listener daemon for the machine.
@@ -164,7 +164,9 @@ master_service(void *data)
 
        fd = (int)(intptr_t)data;
        hammer2_iocom_init(&iocom, fd, -1,
 
        fd = (int)(intptr_t)data;
        hammer2_iocom_init(&iocom, fd, -1,
-                          master_auth_state, master_auth_rxmsg, NULL);
+                          master_auth_signal,
+                          master_auth_rxmsg,
+                          NULL);
        hammer2_iocom_core(&iocom);
 
        fprintf(stderr,
        hammer2_iocom_core(&iocom);
 
        fprintf(stderr,
@@ -185,11 +187,11 @@ master_service(void *data)
  * message operation.  The connection has already been encrypted at
  * this point.
  */
  * message operation.  The connection has already been encrypted at
  * this point.
  */
-static void master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg);
+static void master_auth_conn_rx(hammer2_msg_t *msg);
 
 static
 void
 
 static
 void
-master_auth_state(hammer2_iocom_t *iocom __unused)
+master_auth_signal(hammer2_router_t *router)
 {
        hammer2_msg_t *msg;
 
 {
        hammer2_msg_t *msg;
 
@@ -199,26 +201,29 @@ master_auth_state(hammer2_iocom_t *iocom __unused)
         *
         * XXX put additional authentication states here
         */
         *
         * XXX put additional authentication states here
         */
-       msg = hammer2_msg_alloc(iocom, 0, HAMMER2_LNK_CONN |
-                                         HAMMER2_MSGF_CREATE);
+       msg = hammer2_msg_alloc(router, 0, HAMMER2_LNK_CONN |
+                                                  HAMMER2_MSGF_CREATE,
+                               master_auth_conn_rx, NULL);
        snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*");
        snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*");
-       hammer2_msg_write(iocom, msg, master_auth_conn_rx, NULL, NULL);
+       hammer2_msg_write(msg);
 
 
-       hammer2_iocom_restate(iocom,
-                             master_link_state, master_link_rxmsg, NULL);
+       hammer2_router_restate(router,
+                             master_link_signal,
+                             master_link_rxmsg,
+                             NULL);
 }
 
 static
 void
 }
 
 static
 void
-master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg)
+master_auth_conn_rx(hammer2_msg_t *msg)
 {
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
 {
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
-               hammer2_msg_reply(state->iocom, msg, 0);
+               hammer2_msg_reply(msg, 0);
 }
 
 static
 void
 }
 
 static
 void
-master_auth_rxmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg __unused)
+master_auth_rxmsg(hammer2_msg_t *msg __unused)
 {
 }
 
 {
 }
 
@@ -230,13 +235,14 @@ master_auth_rxmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg __unused)
  */
 static
 void
  */
 static
 void
-master_link_state(hammer2_iocom_t *iocom __unused)
+master_link_signal(hammer2_router_t *router)
 {
 {
+       hammer2_msg_lnk_signal(router);
 }
 
 static
 void
 }
 
 static
 void
-master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+master_link_rxmsg(hammer2_msg_t *msg)
 {
        hammer2_state_t *state;
        uint32_t cmd;
 {
        hammer2_state_t *state;
        uint32_t cmd;
@@ -257,17 +263,17 @@ master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 
        if (state && state->func) {
                assert(state->func != NULL);
 
        if (state && state->func) {
                assert(state->func != NULL);
-               state->func(state, msg);
+               state->func(msg);
        } else {
                switch(cmd & HAMMER2_MSGF_PROTOS) {
                case HAMMER2_MSG_PROTO_LNK:
        } else {
                switch(cmd & HAMMER2_MSGF_PROTOS) {
                case HAMMER2_MSG_PROTO_LNK:
-                       hammer2_msg_lnk(iocom, msg);
+                       hammer2_msg_lnk(msg);
                        break;
                case HAMMER2_MSG_PROTO_DBG:
                        break;
                case HAMMER2_MSG_PROTO_DBG:
-                       hammer2_msg_dbg(iocom, msg);
+                       hammer2_msg_dbg(msg);
                        break;
                default:
                        break;
                default:
-                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+                       hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                        break;
                }
        }
                        break;
                }
        }
index ae261f3..8c69e87 100644 (file)
@@ -125,29 +125,27 @@ void hammer2_bswap_head(hammer2_msg_hdr_t *head);
 void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
 void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
-                       void (*state_func)(hammer2_iocom_t *),
-                       void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *),
+                       void (*state_func)(hammer2_router_t *),
+                       void (*rcvmsg_func)(hammer2_msg_t *),
                        void (*altmsg_func)(hammer2_iocom_t *));
                        void (*altmsg_func)(hammer2_iocom_t *));
-void hammer2_iocom_restate(hammer2_iocom_t *iocom,
-                       void (*state_func)(hammer2_iocom_t *),
-                       void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *),
+void hammer2_router_restate(hammer2_router_t *router,
+                       void (*state_func)(hammer2_router_t *),
+                       void (*rcvmsg_func)(hammer2_msg_t *),
                        void (*altmsg_func)(hammer2_iocom_t *));
                        void (*altmsg_func)(hammer2_iocom_t *));
+void hammer2_router_signal(hammer2_router_t *router);
 void hammer2_iocom_done(hammer2_iocom_t *iocom);
 void hammer2_iocom_done(hammer2_iocom_t *iocom);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size,
-                       uint32_t cmd);
-void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       uint32_t error);
-void hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       uint32_t error);
+hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router,
+                       size_t aux_size, uint32_t cmd,
+                       void (*func)(hammer2_msg_t *), void *data);
+void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
 void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
 
 void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
 
-void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_free(hammer2_msg_t *msg);
 
 void hammer2_iocom_core(hammer2_iocom_t *iocom);
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
 
 void hammer2_iocom_core(hammer2_iocom_t *iocom);
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
-void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       void (*func)(hammer2_state_t *, hammer2_msg_t *),
-                       void *data, hammer2_state_t **statep);
+void hammer2_msg_write(hammer2_msg_t *msg);
 
 void hammer2_iocom_drain(hammer2_iocom_t *iocom);
 void hammer2_iocom_flush1(hammer2_iocom_t *iocom);
 
 void hammer2_iocom_drain(hammer2_iocom_t *iocom);
 void hammer2_iocom_flush1(hammer2_iocom_t *iocom);
@@ -159,8 +157,9 @@ void hammer2_state_free(hammer2_state_t *state);
 /*
  * Msg protocol functions
  */
 /*
  * Msg protocol functions
  */
-void hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-void hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_lnk_signal(hammer2_router_t *router);
+void hammer2_msg_lnk(hammer2_msg_t *msg);
+void hammer2_msg_dbg(hammer2_msg_t *msg);
 
 /*
  * Crypto functions
 
 /*
  * Crypto functions
@@ -187,11 +186,11 @@ const char * hammer2_basecmd_str(uint32_t cmd);
 const char *hammer2_msg_str(hammer2_msg_t *msg);
 int hammer2_connect(const char *hostname);
 
 const char *hammer2_msg_str(hammer2_msg_t *msg);
 int hammer2_connect(const char *hostname);
 
-void shell_tree(hammer2_iocom_t *iocom, char *cmdbuf);
+void shell_tree(hammer2_router_t *router, char *cmdbuf);
 
 void *master_service(void *data);
 
 void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 void *master_service(void *data);
 
 void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-void iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...);
+void router_printf(hammer2_router_t *router, const char *ctl, ...);
 void *hammer2_alloc(size_t bytes);
 void hammer2_free(void *ptr);
 void *hammer2_alloc(size_t bytes);
 void hammer2_free(void *ptr);
index 1e33c80..ea97548 100644 (file)
@@ -82,43 +82,44 @@ hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                assert(0);      /* shouldn't happen */
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                assert(0);      /* shouldn't happen */
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        }
        if ((msg = ioq->msg) != NULL) {
                ioq->msg = NULL;
        }
        if ((msg = ioq->msg) != NULL) {
                ioq->msg = NULL;
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        }
 }
 
 /*
  * Initialize a low-level communications channel.
  *
        }
 }
 
 /*
  * Initialize a low-level communications channel.
  *
- * NOTE: The state_func() is called at least once from the loop and can be
+ * NOTE: The signal_func() is called at least once from the loop and can be
  *      re-armed via hammer2_iocom_restate().
  */
 void
 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
  *      re-armed via hammer2_iocom_restate().
  */
 void
 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
-                  void (*state_func)(hammer2_iocom_t *),
-                  void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
+                  void (*signal_func)(hammer2_router_t *),
+                  void (*rcvmsg_func)(hammer2_msg_t *),
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
        bzero(iocom, sizeof(*iocom));
 
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
        bzero(iocom, sizeof(*iocom));
 
-       iocom->state_callback = state_func;
-       iocom->rcvmsg_callback = rcvmsg_func;
-       iocom->altmsg_callback = altmsg_func;
+       iocom->router.signal_callback = signal_func;
+       iocom->router.rcvmsg_callback = rcvmsg_func;
+       iocom->router.altmsg_callback = altmsg_func;
 
        pthread_mutex_init(&iocom->mtx, NULL);
 
        pthread_mutex_init(&iocom->mtx, NULL);
-       RB_INIT(&iocom->staterd_tree);
-       RB_INIT(&iocom->statewr_tree);
+       RB_INIT(&iocom->router.staterd_tree);
+       RB_INIT(&iocom->router.statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->addrq);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->addrq);
-       TAILQ_INIT(&iocom->txmsgq);
+       TAILQ_INIT(&iocom->router.txmsgq);
+       iocom->router.iocom = iocom;
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = HAMMER2_IOCOMF_RREQ;
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = HAMMER2_IOCOMF_RREQ;
-       if (state_func)
+       if (signal_func)
                iocom->flags |= HAMMER2_IOCOMF_SWORK;
        hammer2_ioq_init(iocom, &iocom->ioq_rx);
        hammer2_ioq_init(iocom, &iocom->ioq_tx);
                iocom->flags |= HAMMER2_IOCOMF_SWORK;
        hammer2_ioq_init(iocom, &iocom->ioq_rx);
        hammer2_ioq_init(iocom, &iocom->ioq_tx);
@@ -152,18 +153,25 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
  * the recevmsg_func and the sendmsg_func is called at least once.
  */
 void
  * the recevmsg_func and the sendmsg_func is called at least once.
  */
 void
-hammer2_iocom_restate(hammer2_iocom_t *iocom,
-                  void (*state_func)(hammer2_iocom_t *),
-                  void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
+hammer2_router_restate(hammer2_router_t *router,
+                  void (*signal_func)(hammer2_router_t *),
+                  void (*rcvmsg_func)(hammer2_msg_t *msg),
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
-       iocom->state_callback = state_func;
-       iocom->rcvmsg_callback = rcvmsg_func;
-       iocom->altmsg_callback = altmsg_func;
-       if (state_func)
-               iocom->flags |= HAMMER2_IOCOMF_SWORK;
+       router->signal_callback = signal_func;
+       router->rcvmsg_callback = rcvmsg_func;
+       router->altmsg_callback = altmsg_func;
+       if (signal_func)
+               router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
        else
        else
-               iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
+               router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
+}
+
+void
+hammer2_router_signal(hammer2_router_t *router)
+{
+       if (router->signal_callback)
+               router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
 }
 
 /*
 }
 
 /*
@@ -212,8 +220,11 @@ hammer2_iocom_done(hammer2_iocom_t *iocom)
  * Allocate a new one-way message.
  */
 hammer2_msg_t *
  * Allocate a new one-way message.
  */
 hammer2_msg_t *
-hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
+hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
+                 void (*func)(hammer2_msg_t *), void *data)
 {
 {
+       hammer2_state_t *state = NULL;
+       hammer2_iocom_t *iocom = router->iocom;
        hammer2_msg_t *msg;
        int hbytes;
 
        hammer2_msg_t *msg;
        int hbytes;
 
@@ -227,6 +238,30 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
+       if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) ==
+           HAMMER2_MSGF_CREATE) {
+               /*
+                * Create state when CREATE is set without REPLY.
+                *
+                * NOTE: CREATE in txcmd handled by hammer2_msg_write()
+                * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx()
+                */
+               state = malloc(sizeof(*state));
+               bzero(state, sizeof(*state));
+               state->iocom = iocom;
+               state->flags = HAMMER2_STATE_DYNAMIC;
+               state->msgid = (uint64_t)(uintptr_t)state;
+               /* XXX set state->spanid from router */
+               state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
+                                      HAMMER2_MSGF_DELETE);
+               state->rxcmd = HAMMER2_MSGF_REPLY;
+               state->func = func;
+               state->any.any = data;
+               pthread_mutex_lock(&iocom->mtx);
+               RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state);
+               pthread_mutex_unlock(&iocom->mtx);
+               state->flags |= HAMMER2_STATE_INSERTED;
+       }
        pthread_mutex_unlock(&iocom->mtx);
        if (msg == NULL) {
                msg = malloc(sizeof(*msg));
        pthread_mutex_unlock(&iocom->mtx);
        if (msg == NULL) {
                msg = malloc(sizeof(*msg));
@@ -252,7 +287,12 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
        msg->any.head.cmd = cmd;
        msg->any.head.aux_descr = 0;
        msg->any.head.aux_crc = 0;
        msg->any.head.cmd = cmd;
        msg->any.head.aux_descr = 0;
        msg->any.head.aux_crc = 0;
-
+       msg->router = router;
+       if (state) {
+               msg->state = state;
+               state->msg = msg;
+               msg->any.head.msgid = state->msgid;
+       }
        return (msg);
 }
 
        return (msg);
 }
 
@@ -263,8 +303,10 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
  */
 static
 void
  */
 static
 void
-hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_free_locked(hammer2_msg_t *msg)
 {
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
+
        msg->state = NULL;
        if (msg->aux_data)
                TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
        msg->state = NULL;
        if (msg->aux_data)
                TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
@@ -273,10 +315,12 @@ hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 }
 
 void
 }
 
 void
-hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_free(hammer2_msg_t *msg)
 {
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
+
        pthread_mutex_lock(&iocom->mtx);
        pthread_mutex_lock(&iocom->mtx);
-       hammer2_msg_free_locked(iocom, msg);
+       hammer2_msg_free_locked(msg);
        pthread_mutex_unlock(&iocom->mtx);
 }
 
        pthread_mutex_unlock(&iocom->mtx);
 }
 
@@ -372,7 +416,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom)
 
                if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
 
                if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
-                       iocom->state_callback(iocom);
+                       iocom->router.signal_callback(&iocom->router);
                }
 
                /*
                }
 
                /*
@@ -385,7 +429,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom)
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
                        iocom->flags |= HAMMER2_IOCOMF_RWORK;
                        iocom->flags |= HAMMER2_IOCOMF_WWORK;
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
                        iocom->flags |= HAMMER2_IOCOMF_RWORK;
                        iocom->flags |= HAMMER2_IOCOMF_WWORK;
-                       if (TAILQ_FIRST(&iocom->txmsgq))
+                       if (TAILQ_FIRST(&iocom->router.txmsgq))
                                hammer2_iocom_flush1(iocom);
                }
 
                                hammer2_iocom_flush1(iocom);
                }
 
@@ -407,14 +451,14 @@ hammer2_iocom_core(hammer2_iocom_t *iocom)
                                        fprintf(stderr, "receive %s\n",
                                                hammer2_msg_str(msg));
                                }
                                        fprintf(stderr, "receive %s\n",
                                                hammer2_msg_str(msg));
                                }
-                               iocom->rcvmsg_callback(iocom, msg);
+                               iocom->router.rcvmsg_callback(msg);
                                hammer2_state_cleanuprx(iocom, msg);
                        }
                }
 
                if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
                                hammer2_state_cleanuprx(iocom, msg);
                        }
                }
 
                if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
-                       iocom->altmsg_callback(iocom);
+                       iocom->router.altmsg_callback(iocom);
                }
        }
 }
                }
        }
 }
@@ -553,7 +597,8 @@ again:
                /*
                 * Allocate the message, the next state will fill it in.
                 */
                /*
                 * Allocate the message, the next state will fill it in.
                 */
-               msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
+               msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0,
+                                       NULL, NULL);
                ioq->msg = msg;
 
                /*
                ioq->msg = msg;
 
                /*
@@ -777,7 +822,7 @@ again:
                error = hammer2_state_msgrx(iocom, msg);
                if (error) {
                        if (error == HAMMER2_IOQ_ERROR_EALREADY) {
                error = hammer2_state_msgrx(iocom, msg);
                if (error) {
                        if (error == HAMMER2_IOQ_ERROR_EALREADY) {
-                               hammer2_msg_free(iocom, msg);
+                               hammer2_msg_free(msg);
                                goto again;
                        }
                        ioq->error = error;
                                goto again;
                        }
                        ioq->error = error;
@@ -803,7 +848,7 @@ skip:
                 */
                assert(ioq->msg == msg);
                if (msg) {
                 */
                assert(ioq->msg == msg);
                if (msg) {
-                       hammer2_msg_free(iocom, msg);
+                       hammer2_msg_free(msg);
                        ioq->msg = NULL;
                }
 
                        ioq->msg = NULL;
                }
 
@@ -818,7 +863,7 @@ skip:
                 * LNK_ERROR (that the session can detect) when no
                 * transactions remain.
                 */
                 * LNK_ERROR (that the session can detect) when no
                 * transactions remain.
                 */
-               msg = hammer2_msg_alloc(iocom, 0, 0);
+               msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL);
                bzero(&msg->any.head, sizeof(msg->any.head));
                msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
                msg->any.head.cmd = HAMMER2_LNK_ERROR;
                bzero(&msg->any.head, sizeof(msg->any.head));
                msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
                msg->any.head.cmd = HAMMER2_LNK_ERROR;
@@ -826,13 +871,13 @@ skip:
 
                pthread_mutex_lock(&iocom->mtx);
                hammer2_iocom_drain(iocom);
 
                pthread_mutex_lock(&iocom->mtx);
                hammer2_iocom_drain(iocom);
-               if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
+               if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) {
                        /*
                         * Active remote transactions are still present.
                         * Simulate the other end sending us a DELETE.
                         */
                        if (state->rxcmd & HAMMER2_MSGF_DELETE) {
                        /*
                         * Active remote transactions are still present.
                         * Simulate the other end sending us a DELETE.
                         */
                        if (state->rxcmd & HAMMER2_MSGF_DELETE) {
-                               hammer2_msg_free(iocom, msg);
+                               hammer2_msg_free(msg);
                                msg = NULL;
                        } else {
                                /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
                                msg = NULL;
                        } else {
                                /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
@@ -842,13 +887,13 @@ skip:
                                msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                                     HAMMER2_MSGF_DELETE;
                        }
                                msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                                     HAMMER2_MSGF_DELETE;
                        }
-               } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
+               } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) {
                        /*
                         * Active local transactions are still present.
                         * Simulate the other end sending us a DELETE.
                         */
                        if (state->rxcmd & HAMMER2_MSGF_DELETE) {
                        /*
                         * Active local transactions are still present.
                         * Simulate the other end sending us a DELETE.
                         */
                        if (state->rxcmd & HAMMER2_MSGF_DELETE) {
-                               hammer2_msg_free(iocom, msg);
+                               hammer2_msg_free(msg);
                                msg = NULL;
                        } else {
                                msg->state = state;
                                msg = NULL;
                        } else {
                                msg->state = state;
@@ -941,8 +986,8 @@ hammer2_iocom_flush1(hammer2_iocom_t *iocom)
        iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
        iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
-       while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
-               TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
+       while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) {
+               TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry);
                TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        pthread_mutex_unlock(&iocom->mtx);
                TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        pthread_mutex_unlock(&iocom->mtx);
@@ -1178,11 +1223,9 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom)
  * Write a message to an iocom, with additional state processing.
  */
 void
  * Write a message to an iocom, with additional state processing.
  */
 void
-hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                 void (*func)(hammer2_state_t *, hammer2_msg_t *),
-                 void *data,
-                 hammer2_state_t **statep)
+hammer2_msg_write(hammer2_msg_t *msg)
 {
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
        hammer2_state_t *state;
        char dummy;
 
        hammer2_state_t *state;
        char dummy;
 
@@ -1195,57 +1238,34 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                 * Existing transaction (could be reply).  It is also
                 * possible for this to be the first reply (CREATE is set),
                 * in which case we populate state->txcmd.
                 * Existing transaction (could be reply).  It is also
                 * possible for this to be the first reply (CREATE is set),
                 * in which case we populate state->txcmd.
+                *
+                * state->txcmd is adjusted to hold the final message cmd,
+                * and we also be sure to set the CREATE bit here.  We did
+                * not set it in hammer2_msg_alloc() because that would have
+                * not been serialized (state could have gotten ripped out
+                * from under the message prior to it being transmitted).
                 */
                 */
+               if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE |
+                                         HAMMER2_MSGF_REPLY)) ==
+                   HAMMER2_MSGF_CREATE) {
+                       state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               }
                msg->any.head.msgid = state->msgid;
                msg->any.head.spanid = state->spanid;
                msg->any.head.msgid = state->msgid;
                msg->any.head.spanid = state->spanid;
-               if (func) {
-                       state->func = func;
-                       state->any.any = data;
-               }
                assert(((state->txcmd ^ msg->any.head.cmd) &
                        HAMMER2_MSGF_REPLY) == 0);
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
                        state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                assert(((state->txcmd ^ msg->any.head.cmd) &
                        HAMMER2_MSGF_REPLY) == 0);
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
                        state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
-       } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
-               /*
-                * No existing state and CREATE is set, create new
-                * state for outgoing command.  This can't happen if
-                * REPLY is set as the state would already exist for
-                * a transaction reply.
-                */
-               assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
-
-               state = malloc(sizeof(*state));
-               bzero(state, sizeof(*state));
-               state->iocom = iocom;
-               state->flags = HAMMER2_STATE_DYNAMIC;
-               state->msg = msg;
-               state->msgid = (uint64_t)(uintptr_t)state;
-               state->spanid = msg->any.head.spanid;
-               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
-               state->rxcmd = HAMMER2_MSGF_REPLY;
-               state->func = func;
-               state->any.any = data;
-               pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
-               pthread_mutex_unlock(&iocom->mtx);
-               state->flags |= HAMMER2_STATE_INSERTED;
-               msg->state = state;
-               msg->any.head.msgid = state->msgid;
-               /* spanid set by caller */
        } else {
                msg->any.head.msgid = 0;
        } else {
                msg->any.head.msgid = 0;
-               /* spanid set by caller */
+               /* XXX set spanid by router */
        }
 
        }
 
-       if (statep)
-               *statep = state;
-
        /*
         * Queue it for output, wake up the I/O pthread.  Note that the
         * I/O thread is responsible for generating the CRCs and encryption.
         */
        /*
         * Queue it for output, wake up the I/O pthread.  Note that the
         * I/O thread is responsible for generating the CRCs and encryption.
         */
-       TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
+       TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry);
        dummy = 0;
        write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
        pthread_mutex_unlock(&iocom->mtx);
        dummy = 0;
        write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
        pthread_mutex_unlock(&iocom->mtx);
@@ -1265,8 +1285,9 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
  * The reply contains no extended data.
  */
 void
  * The reply contains no extended data.
  */
 void
-hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
 {
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
        hammer2_state_t *state = msg->state;
        hammer2_msg_t *nmsg;
        uint32_t cmd;
        hammer2_state_t *state = msg->state;
        hammer2_msg_t *nmsg;
        uint32_t cmd;
@@ -1289,8 +1310,6 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
        if (state) {
                if (state->txcmd & HAMMER2_MSGF_DELETE)
                        return;
        if (state) {
                if (state->txcmd & HAMMER2_MSGF_DELETE)
                        return;
-               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
-                       cmd |= HAMMER2_MSGF_CREATE;
                if (state->txcmd & HAMMER2_MSGF_REPLY)
                        cmd |= HAMMER2_MSGF_REPLY;
                cmd |= HAMMER2_MSGF_DELETE;
                if (state->txcmd & HAMMER2_MSGF_REPLY)
                        cmd |= HAMMER2_MSGF_REPLY;
                cmd |= HAMMER2_MSGF_DELETE;
@@ -1299,10 +1318,19 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+       /*
+        * Allocate the message and associate it with the existing state.
+        * We cannot pass MSGF_CREATE to msg_alloc() because that may
+        * allocate new state.  We have our state already.
+        */
+       nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+       if (state) {
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+       }
        nmsg->any.head.error = error;
        nmsg->any.head.error = error;
-       nmsg->state = msg->state;
-       hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
+       nmsg->state = state;
+       hammer2_msg_write(nmsg);
 }
 
 /*
 }
 
 /*
@@ -1312,8 +1340,9 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
  * later.
  */
 void
  * later.
  */
 void
-hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
 {
 {
+       hammer2_iocom_t *iocom = msg->router->iocom;
        hammer2_state_t *state = msg->state;
        hammer2_msg_t *nmsg;
        uint32_t cmd;
        hammer2_state_t *state = msg->state;
        hammer2_msg_t *nmsg;
        uint32_t cmd;
@@ -1336,8 +1365,6 @@ hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
        if (state) {
                if (state->txcmd & HAMMER2_MSGF_DELETE)
                        return;
        if (state) {
                if (state->txcmd & HAMMER2_MSGF_DELETE)
                        return;
-               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
-                       cmd |= HAMMER2_MSGF_CREATE;
                if (state->txcmd & HAMMER2_MSGF_REPLY)
                        cmd |= HAMMER2_MSGF_REPLY;
                /* continuing transaction, do not set MSGF_DELETE */
                if (state->txcmd & HAMMER2_MSGF_REPLY)
                        cmd |= HAMMER2_MSGF_REPLY;
                /* continuing transaction, do not set MSGF_DELETE */
@@ -1346,10 +1373,14 @@ hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+       nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+       if (state) {
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+       }
        nmsg->any.head.error = error;
        nmsg->state = state;
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
+       hammer2_msg_write(nmsg);
 }
 
 /*
 }
 
 /*
@@ -1368,23 +1399,20 @@ hammer2_state_reply(hammer2_state_t *state, uint32_t error)
                return;
 
        /*
                return;
 
        /*
-        * We must also set CREATE if this is our first response to a
-        * remote command.
-        */
-       if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
-               cmd |= HAMMER2_MSGF_CREATE;
-
-       /*
         * Set REPLY if the other end initiated the command.  Otherwise
         * we are the command direction.
         */
        if (state->txcmd & HAMMER2_MSGF_REPLY)
                cmd |= HAMMER2_MSGF_REPLY;
 
         * Set REPLY if the other end initiated the command.  Otherwise
         * we are the command direction.
         */
        if (state->txcmd & HAMMER2_MSGF_REPLY)
                cmd |= HAMMER2_MSGF_REPLY;
 
-       nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
+       nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL);
+       if (state) {
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+       }
        nmsg->any.head.error = error;
        nmsg->state = state;
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
+       hammer2_msg_write(nmsg);
 }
 
 /************************************************************************
 }
 
 /************************************************************************
@@ -1479,10 +1507,10 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
        pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
        pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
-                               &iocom->statewr_tree, &dummy);
+                               &iocom->router.statewr_tree, &dummy);
        } else {
                state = RB_FIND(hammer2_state_tree,
        } else {
                state = RB_FIND(hammer2_state_tree,
-                               &iocom->staterd_tree, &dummy);
+                               &iocom->router.staterd_tree, &dummy);
        }
        msg->state = state;
        pthread_mutex_unlock(&iocom->mtx);
        }
        msg->state = state;
        pthread_mutex_unlock(&iocom->mtx);
@@ -1525,7 +1553,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                state->spanid = msg->any.head.spanid;
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
                state->spanid = msg->any.head.spanid;
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
+               RB_INSERT(hammer2_state_tree,
+                         &iocom->router.staterd_tree, state);
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
                if (DebugOpt) {
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
                if (DebugOpt) {
@@ -1661,7 +1690,7 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 * Free a non-transactional message, there is no state
                 * to worry about.
                 */
                 * Free a non-transactional message, there is no state
                 * to worry about.
                 */
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                /*
                 * Message terminating transaction, destroy the related
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                /*
                 * Message terminating transaction, destroy the related
@@ -1677,11 +1706,11 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (state->rxcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
                        if (state->rxcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->statewr_tree, state);
+                                         &iocom->router.statewr_tree, state);
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->staterd_tree, state);
+                                         &iocom->router.staterd_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
@@ -1689,13 +1718,13 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        ;
                }
                pthread_mutex_unlock(&iocom->mtx);
                        ;
                }
                pthread_mutex_unlock(&iocom->mtx);
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        } else if (state->msg != msg) {
                /*
                 * Message not terminating transaction, leave state intact
                 * and free message if it isn't the CREATE message.
                 */
        } else if (state->msg != msg) {
                /*
                 * Message not terminating transaction, leave state intact
                 * and free message if it isn't the CREATE message.
                 */
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        }
 }
 
        }
 }
 
@@ -1705,7 +1734,7 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
        hammer2_state_t *state;
 
        if ((state = msg->state) == NULL) {
        hammer2_state_t *state;
 
        if ((state = msg->state) == NULL) {
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                pthread_mutex_lock(&iocom->mtx);
                state->txcmd |= HAMMER2_MSGF_DELETE;
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                pthread_mutex_lock(&iocom->mtx);
                state->txcmd |= HAMMER2_MSGF_DELETE;
@@ -1716,11 +1745,11 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (state->txcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
                        if (state->txcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->staterd_tree, state);
+                                         &iocom->router.staterd_tree, state);
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->statewr_tree, state);
+                                         &iocom->router.statewr_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
@@ -1728,9 +1757,9 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        ;
                }
                pthread_mutex_unlock(&iocom->mtx);
                        ;
                }
                pthread_mutex_unlock(&iocom->mtx);
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        } else if (state->msg != msg) {
        } else if (state->msg != msg) {
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free(msg);
        }
 }
 
        }
 }
 
@@ -1752,7 +1781,7 @@ hammer2_state_free(hammer2_state_t *state)
        msg = state->msg;
        state->msg = NULL;
        if (msg)
        msg = state->msg;
        state->msg = NULL;
        if (msg)
-               hammer2_msg_free_locked(iocom, msg);
+               hammer2_msg_free_locked(msg);
        free(state);
 
        /*
        free(state);
 
        /*
@@ -1765,8 +1794,8 @@ hammer2_state_free(hammer2_state_t *state)
         * We may have to wake up the rx code.
         */
        if (iocom->ioq_rx.error &&
         * We may have to wake up the rx code.
         */
        if (iocom->ioq_rx.error &&
-           RB_EMPTY(&iocom->staterd_tree) &&
-           RB_EMPTY(&iocom->statewr_tree)) {
+           RB_EMPTY(&iocom->router.staterd_tree) &&
+           RB_EMPTY(&iocom->router.statewr_tree)) {
                dummy = 0;
                write(iocom->wakeupfds[1], &dummy, 1);
        }
                dummy = 0;
                write(iocom->wakeupfds[1], &dummy, 1);
        }
index 98b3266..229af80 100644 (file)
  * an active transaction is lost, the related messages will be fully aborted
  * and the higher protocol levels will retry as appropriate.
  *
  * an active transaction is lost, the related messages will be fully aborted
  * and the higher protocol levels will retry as appropriate.
  *
+ * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
+ * back to the originator.  Only the originator keeps tracks of a message.
+ * Routers just pass it through.  If a route is lost during transit the
+ * message is simply thrown away.
+ *
  * It is also important to note that several paths to the same PFS can be
  * propagated along the same link, which allows concurrency and even
  * redundancy over several network interfaces or via different routes through
  * It is also important to note that several paths to the same PFS can be
  * propagated along the same link, which allows concurrency and even
  * redundancy over several network interfaces or via different routes through
@@ -222,6 +227,7 @@ struct h2span_link {
        struct h2span_node *node;       /* related node */
        int32_t dist;
        struct h2span_relay_queue relayq; /* relay out */
        struct h2span_node *node;       /* related node */
        int32_t dist;
        struct h2span_relay_queue relayq; /* relay out */
+       struct hammer2_router   router;
 };
 
 /*
 };
 
 /*
@@ -276,9 +282,9 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
                return(-1);
        if (link1->dist > link2->dist)
                return(1);
-       if ((intptr_t)link1 < (intptr_t)link2)
+       if (link1->state->msgid < link2->state->msgid)
                return(-1);
                return(-1);
-       if ((intptr_t)link1 > (intptr_t)link2)
+       if (link1->state->msgid > link2->state->msgid)
                return(1);
        return(0);
 }
                return(1);
        return(0);
 }
@@ -292,17 +298,20 @@ static
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
-       if ((intptr_t)relay1->link->node < (intptr_t)relay2->link->node)
+       h2span_link_t *link1 = relay1->link;
+       h2span_link_t *link2 = relay2->link;
+
+       if ((intptr_t)link1->node < (intptr_t)link2->node)
                return(-1);
                return(-1);
-       if ((intptr_t)relay1->link->node > (intptr_t)relay2->link->node)
+       if ((intptr_t)link1->node > (intptr_t)link2->node)
                return(1);
                return(1);
-       if ((intptr_t)relay1->link->dist < (intptr_t)relay2->link->dist)
+       if (link1->dist < link2->dist)
                return(-1);
                return(-1);
-       if ((intptr_t)relay1->link->dist > (intptr_t)relay2->link->dist)
+       if (link1->dist > link2->dist)
                return(1);
                return(1);
-       if ((intptr_t)relay1->link < (intptr_t)relay2->link)
+       if (link1->state->msgid < link2->state->msgid)
                return(-1);
                return(-1);
-       if ((intptr_t)relay1->link > (intptr_t)relay2->link)
+       if (link1->state->msgid > link2->state->msgid)
                return(1);
        return(0);
 }
                return(1);
        return(0);
 }
@@ -332,39 +341,48 @@ static pthread_mutex_t cluster_mtx;
 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
 static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
 
 static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
 static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
 
-static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
-static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
-static void hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_lnk_span(hammer2_msg_t *msg);
+static void hammer2_lnk_conn(hammer2_msg_t *msg);
+static void hammer2_lnk_relay(hammer2_msg_t *msg);
 static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
 static void hammer2_relay_delete(h2span_relay_t *relay);
 
 static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
 static void hammer2_relay_delete(h2span_relay_t *relay);
 
+void
+hammer2_msg_lnk_signal(hammer2_router_t *router __unused)
+{
+       pthread_mutex_lock(&cluster_mtx);
+       hammer2_relay_scan(NULL, NULL);
+       pthread_mutex_unlock(&cluster_mtx);
+}
+
 /*
  * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
  * one-way and opening-transactions since state->func will be assigned
  * in all other cases.
  */
 void
 /*
  * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
  * one-way and opening-transactions since state->func will be assigned
  * in all other cases.
  */
 void
-hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_lnk(hammer2_msg_t *msg)
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
        case HAMMER2_LNK_CONN:
 {
        switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
        case HAMMER2_LNK_CONN:
-               hammer2_lnk_conn(msg->state, msg);
+               hammer2_lnk_conn(msg);
                break;
        case HAMMER2_LNK_SPAN:
                break;
        case HAMMER2_LNK_SPAN:
-               hammer2_lnk_span(msg->state, msg);
+               hammer2_lnk_span(msg);
                break;
        default:
                fprintf(stderr,
                        "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
                break;
        default:
                fprintf(stderr,
                        "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
-               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+               hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
                /* state invalid after reply */
                break;
        }
 }
 
 void
                /* state invalid after reply */
                break;
        }
 }
 
 void
-hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_lnk_conn(hammer2_msg_t *msg)
 {
 {
+       hammer2_state_t *state = msg->state;
        h2span_connect_t *conn;
        h2span_relay_t *relay;
        char *alloc = NULL;
        h2span_connect_t *conn;
        h2span_relay_t *relay;
        char *alloc = NULL;
@@ -393,12 +411,15 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
                state->any.conn = conn;
                TAILQ_INSERT_TAIL(&connq, conn, entry);
 
                state->any.conn = conn;
                TAILQ_INSERT_TAIL(&connq, conn, entry);
 
-               hammer2_msg_result(state->iocom, msg, 0);
+               hammer2_msg_result(msg, 0);
 
 
+#if 0
                /*
                 * Span-synchronize all nodes with the new connection
                 */
                hammer2_relay_scan(conn, NULL);
                /*
                 * Span-synchronize all nodes with the new connection
                 */
                hammer2_relay_scan(conn, NULL);
+#endif
+               hammer2_router_signal(msg->router);
        }
 
        /*
        }
 
        /*
@@ -426,15 +447,16 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
                TAILQ_REMOVE(&connq, conn, entry);
                hammer2_free(conn);
 
                TAILQ_REMOVE(&connq, conn, entry);
                hammer2_free(conn);
 
-               hammer2_msg_reply(state->iocom, msg, 0);
+               hammer2_msg_reply(msg, 0);
                /* state invalid after reply */
        }
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 void
                /* state invalid after reply */
        }
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 void
-hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_lnk_span(hammer2_msg_t *msg)
 {
 {
+       hammer2_state_t *state = msg->state;
        h2span_cluster_t dummy_cls;
        h2span_node_t dummy_node;
        h2span_cluster_t *cls;
        h2span_cluster_t dummy_cls;
        h2span_node_t dummy_node;
        h2span_cluster_t *cls;
@@ -443,23 +465,19 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
        h2span_relay_t *relay;
        char *alloc = NULL;
 
        h2span_relay_t *relay;
        char *alloc = NULL;
 
+       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+
        pthread_mutex_lock(&cluster_mtx);
 
        /*
         * On transaction start we initialize the tracking infrastructure
         */
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
        pthread_mutex_lock(&cluster_mtx);
 
        /*
         * On transaction start we initialize the tracking infrastructure
         */
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+               assert(state->func == NULL);
                state->func = hammer2_lnk_span;
 
                msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
 
                state->func = hammer2_lnk_span;
 
                msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
 
-               fprintf(stderr, "LNK_SPAN: %s/%s dist=%d\n",
-                       hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
-                                           &alloc),
-                       msg->any.lnk_span.label,
-                       msg->any.lnk_span.dist);
-               free(alloc);
-
                /*
                 * Find the cluster
                 */
                /*
                 * Find the cluster
                 */
@@ -497,9 +515,29 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
                slink->dist = msg->any.lnk_span.dist;
                slink->state = state;
                state->any.link = slink;
                slink->dist = msg->any.lnk_span.dist;
                slink->state = state;
                state->any.link = slink;
+
+               /*
+                * Embedded router structure in link for message forwarding.
+                */
+               TAILQ_INIT(&slink->router.txmsgq);
+               slink->router.iocom = state->iocom;
+               slink->router.link = slink;
+
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
+               fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n",
+                       msg->router->iocom,
+                       slink,
+                       hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
+                                           &alloc),
+                       msg->any.lnk_span.label,
+                       msg->any.lnk_span.dist);
+               free(alloc);
+
+#if 0
                hammer2_relay_scan(NULL, node);
                hammer2_relay_scan(NULL, node);
+#endif
+               hammer2_router_signal(msg->router);
        }
 
        /*
        }
 
        /*
@@ -511,6 +549,14 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
                node = slink->node;
                cls = node->cls;
 
                node = slink->node;
                cls = node->cls;
 
+               fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n",
+                       msg->router->iocom,
+                       slink,
+                       hammer2_uuid_to_str(&cls->pfs_clid, &alloc),
+                       state->msg->any.lnk_span.label,
+                       state->msg->any.lnk_span.dist);
+               free(alloc);
+
                /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
                /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
@@ -550,8 +596,12 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
                 * it doesn't then all related relays have already been
                 * removed and there's nothing left to do.
                 */
                 * it doesn't then all related relays have already been
                 * removed and there's nothing left to do.
                 */
+#if 0
                if (node)
                        hammer2_relay_scan(NULL, node);
                if (node)
                        hammer2_relay_scan(NULL, node);
+#endif
+               if (node)
+                       hammer2_router_signal(msg->router);
        }
 
        pthread_mutex_unlock(&cluster_mtx);
        }
 
        pthread_mutex_unlock(&cluster_mtx);
@@ -564,10 +614,13 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
  * XXX MPRACE on state structure
  */
 static void
  * XXX MPRACE on state structure
  */
 static void
-hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_lnk_relay(hammer2_msg_t *msg)
 {
 {
+       hammer2_state_t *state = msg->state;
        h2span_relay_t *relay;
 
        h2span_relay_t *relay;
 
+       assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
+
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                pthread_mutex_lock(&cluster_mtx);
                if ((relay = state->any.relay) != NULL) {
        if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
                pthread_mutex_lock(&cluster_mtx);
                if ((relay = state->any.relay) != NULL) {
@@ -676,8 +729,8 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
        info.relay = NULL;
 
        /*
        info.relay = NULL;
 
        /*
-        * Locate the first related relay for the connection.  relay will
-        * be NULL if there were none.
+        * Locate the first related relay for the node on this connection.
+        * relay will be NULL if there were none.
         */
        RB_SCAN(h2span_relay_tree, &conn->tree,
                hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
         */
        RB_SCAN(h2span_relay_tree, &conn->tree,
                hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
@@ -693,8 +746,13 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
         * Iterate the node's links (received SPANs) in distance order,
         * lowest (best) dist first.
         */
         * Iterate the node's links (received SPANs) in distance order,
         * lowest (best) dist first.
         */
+       /* fprintf(stderr, "LOOP\n"); */
        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                /*
        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                /*
+               fprintf(stderr, "SLINK %p RELAY %p(%p)\n",
+                       slink, relay, relay ? relay->link : NULL);
+               */
+               /*
                 * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
                 *
                 * Track relays while iterating the best links and construct
                 * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
                 *
                 * Track relays while iterating the best links and construct
@@ -702,12 +760,12 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
                 *
                 * (If some prior better link was removed it would have also
                 *  removed the relay, so the relay can only match exactly or
                 *
                 * (If some prior better link was removed it would have also
                 *  removed the relay, so the relay can only match exactly or
-                *  be worst).
+                *  be worse).
                 */
                if (relay && relay->link == slink) {
                        /*
                 */
                if (relay && relay->link == slink) {
                        /*
-                        * Match, get the next relay to match against the
-                        * next slink.
+                        * Match, relay already in-place, get the next
+                        * relay to match against the next slink.
                         */
                        relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
                        if (--count == 0)
                         */
                        relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
                        if (--count == 0)
@@ -718,35 +776,43 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
                         * do not relay.  This prevents endless closed
                         * loops with ever-incrementing distances when
                         * the seed span is lost in the graph.
                         * do not relay.  This prevents endless closed
                         * loops with ever-incrementing distances when
                         * the seed span is lost in the graph.
+                        *
+                        * All later spans will also be too far away so
+                        * we can break out of the loop.
                         */
                         */
-                       /* no code needed */
+                       break;
                } else {
                        /*
                         * No match, distance is ok, construct a new relay.
                } else {
                        /*
                         * No match, distance is ok, construct a new relay.
+                        * (slink is better than relay).
                         */
                        hammer2_msg_t *msg;
 
                        assert(relay == NULL ||
                         */
                        hammer2_msg_t *msg;
 
                        assert(relay == NULL ||
-                              relay->link->dist <= slink->dist);
+                              relay->link->node != slink->node ||
+                              relay->link->dist >= slink->dist);
                        relay = hammer2_alloc(sizeof(*relay));
                        relay->conn = conn;
                        relay->link = slink;
 
                        relay = hammer2_alloc(sizeof(*relay));
                        relay->conn = conn;
                        relay->link = slink;
 
+                       msg = hammer2_msg_alloc(&conn->state->iocom->router, 0,
+                                               HAMMER2_LNK_SPAN |
+                                               HAMMER2_MSGF_CREATE,
+                                               hammer2_lnk_relay, relay);
+                       relay->state = msg->state;
+                       msg->any.lnk_span = slink->state->msg->any.lnk_span;
+                       msg->any.lnk_span.dist = slink->dist + 1;
+
                        RB_INSERT(h2span_relay_tree, &conn->tree, relay);
                        TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
 
                        RB_INSERT(h2span_relay_tree, &conn->tree, relay);
                        TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
 
-                       msg = hammer2_msg_alloc(conn->state->iocom, 0,
-                                               HAMMER2_LNK_SPAN |
-                                               HAMMER2_MSGF_CREATE);
-                       msg->any.lnk_span = slink->state->msg->any.lnk_span;
-                       ++msg->any.lnk_span.dist; /* XXX add weighting */
+                       hammer2_msg_write(msg);
 
 
-                       hammer2_msg_write(conn->state->iocom, msg,
-                                         hammer2_lnk_relay, relay,
-                                         &relay->state);
                        fprintf(stderr,
                        fprintf(stderr,
-                               "RELAY SPAN ON CLS=%p NODE=%p DIST=%d "
+                               "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
                                "FD %d state %p\n",
                                "FD %d state %p\n",
+                               slink,
+                               relay,
                                node->cls, node, slink->dist,
                                conn->state->iocom->sock_fd, relay->state);
 
                                node->cls, node, slink->dist,
                                conn->state->iocom->sock_fd, relay->state);
 
@@ -777,7 +843,9 @@ void
 hammer2_relay_delete(h2span_relay_t *relay)
 {
        fprintf(stderr,
 hammer2_relay_delete(h2span_relay_t *relay)
 {
        fprintf(stderr,
-               "RELAY DELETE ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
+               "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
+               relay->link,
+               relay,
                relay->link->node->cls, relay->link->node,
                relay->link->dist,
                relay->conn->state->iocom->sock_fd, relay->state);
                relay->link->node->cls, relay->link->node,
                relay->link->dist,
                relay->conn->state->iocom->sock_fd, relay->state);
@@ -796,11 +864,38 @@ hammer2_relay_delete(h2span_relay_t *relay)
        hammer2_free(relay);
 }
 
        hammer2_free(relay);
 }
 
+/************************************************************************
+ *                             ROUTER                                  *
+ ************************************************************************
+ *
+ * Provides route functions to msg.c
+ */
+
+#if 0
+/*
+ * Acquire a persistent router structure given the cluster and node ids.
+ * Messages can be transacted via this structure while held.  If the route
+ * is lost messages will return failure.
+ */
+hammer2_router_t *
+hammer2_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
+{
+}
+
+/*
+ * Release previously acquired router.
+ */
+void
+hammer2_router_put(hammer2_router_t *router)
+{
+}
+#endif
+
 /*
  * Dumps the spanning tree
  */
 void
 /*
  * Dumps the spanning tree
  */
 void
-shell_tree(hammer2_iocom_t *iocom, char *cmdbuf __unused)
+shell_tree(hammer2_router_t *router, char *cmdbuf __unused)
 {
        h2span_cluster_t *cls;
        h2span_node_t *node;
 {
        h2span_cluster_t *cls;
        h2span_node_t *node;
@@ -809,14 +904,14 @@ shell_tree(hammer2_iocom_t *iocom, char *cmdbuf __unused)
 
        pthread_mutex_lock(&cluster_mtx);
        RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
 
        pthread_mutex_lock(&cluster_mtx);
        RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
-               iocom_printf(iocom, "Cluster %s\n",
+               router_printf(router, "Cluster %s\n",
                             hammer2_uuid_to_str(&cls->pfs_clid, &uustr));
                RB_FOREACH(node, h2span_node_tree, &cls->tree) {
                             hammer2_uuid_to_str(&cls->pfs_clid, &uustr));
                RB_FOREACH(node, h2span_node_tree, &cls->tree) {
-                       iocom_printf(iocom, "    Node %s (%s)\n",
+                       router_printf(router, "    Node %s (%s)\n",
                                 hammer2_uuid_to_str(&node->pfs_fsid, &uustr),
                                 node->label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                                 hammer2_uuid_to_str(&node->pfs_fsid, &uustr),
                                 node->label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
-                               iocom_printf(iocom, "\tLink dist=%d via %d\n",
+                               router_printf(router, "\tLink dist=%d via %d\n",
                                             slink->dist,
                                             slink->state->iocom->sock_fd);
                        }
                                             slink->dist,
                                             slink->state->iocom->sock_fd);
                        }
index e82d9fd..b79f501 100644 (file)
@@ -137,7 +137,7 @@ struct hammer2_state {
        int             flags;
        int             error;
        struct hammer2_msg *msg;
        int             flags;
        int             error;
        struct hammer2_msg *msg;
-       void (*func)(struct hammer2_state *, struct hammer2_msg *);
+       void (*func)(struct hammer2_msg *);
        union {
                void *any;
                struct h2span_link *link;
        union {
                void *any;
                struct h2span_link *link;
@@ -160,6 +160,7 @@ struct hammer2_address {
 
 struct hammer2_msg {
        TAILQ_ENTRY(hammer2_msg) qentry;
 
 struct hammer2_msg {
        TAILQ_ENTRY(hammer2_msg) qentry;
+       struct hammer2_router *router;
        struct hammer2_state *state;
        size_t          hdr_size;
        size_t          aux_size;
        struct hammer2_state *state;
        size_t          hdr_size;
        size_t          aux_size;
@@ -224,6 +225,27 @@ typedef struct hammer2_ioq hammer2_ioq_t;
 #define HAMMER2_IOQ_MAXIOVEC    16
 
 /*
 #define HAMMER2_IOQ_MAXIOVEC    16
 
 /*
+ * hammer2_router - governs the routing of a message.  Passed into
+ *                 hammer2_msg_write.
+ *
+ * The router is either connected to an iocom (socket) directly, or
+ * connected to a SPAN transaction (h2span_link structure).
+ */
+struct hammer2_router {
+       struct hammer2_iocom *iocom;
+       struct h2span_link   *link;             /* non-NULL if indirect */
+       void    (*signal_callback)(struct hammer2_router *);
+       void    (*rcvmsg_callback)(struct hammer2_msg *);
+       void    (*altmsg_callback)(struct hammer2_iocom *);
+       struct hammer2_state_tree staterd_tree; /* active messages */
+       struct hammer2_state_tree statewr_tree; /* active messages */
+       hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
+       int     refs;                           /* refs prevent destruction */
+};
+
+typedef struct hammer2_router hammer2_router_t;
+
+/*
  * hammer2_iocom - governs a messaging stream connection
  */
 struct hammer2_iocom {
  * hammer2_iocom - governs a messaging stream connection
  */
 struct hammer2_iocom {
@@ -232,10 +254,6 @@ struct hammer2_iocom {
        hammer2_msg_queue_t freeq;              /* free msgs hdr only */
        hammer2_msg_queue_t freeq_aux;          /* free msgs w/aux_data */
        struct hammer2_address_queue  addrq;    /* source/target addrs */
        hammer2_msg_queue_t freeq;              /* free msgs hdr only */
        hammer2_msg_queue_t freeq_aux;          /* free msgs w/aux_data */
        struct hammer2_address_queue  addrq;    /* source/target addrs */
-       void    (*state_callback)(struct hammer2_iocom *);
-       void    (*rcvmsg_callback)(struct hammer2_iocom *,
-                                  struct hammer2_msg *);
-       void    (*altmsg_callback)(struct hammer2_iocom *);
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
        int     wakeupfds[2];                   /* pipe wakes up iocom thread */
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
        int     wakeupfds[2];                   /* pipe wakes up iocom thread */
@@ -243,9 +261,7 @@ struct hammer2_iocom {
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
-       struct hammer2_state_tree staterd_tree; /* active messages */
-       struct hammer2_state_tree statewr_tree; /* active messages */
-       hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
+       struct hammer2_router router;
        pthread_mutex_t mtx;                    /* mutex for state*tree/rmsgq */
 };
 
        pthread_mutex_t mtx;                    /* mutex for state*tree/rmsgq */
 };