hammer2 - Message routing work
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 10 Aug 2012 01:11:32 +0000 (18:11 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 10 Aug 2012 01:11:32 +0000 (18:11 -0700)
* Make hammer2_router a separately allocated structure so we can use
  it as a persistent entity when its related hammer2_iocom, h2span_link,
  or h2span_relay structure gets ripped out.

sbin/hammer2/cmd_debug.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c
sbin/hammer2/network.h

index f79117a..369e913 100644 (file)
@@ -66,7 +66,7 @@ cmd_shell(const char *hostname)
        fcntl(0, F_SETFL, O_NONBLOCK);
        printf("debug: connected\n");
 
        fcntl(0, F_SETFL, O_NONBLOCK);
        printf("debug: connected\n");
 
-       msg = hammer2_msg_alloc(&iocom.router, 0, HAMMER2_DBG_SHELL,
+       msg = hammer2_msg_alloc(iocom.router, 0, HAMMER2_DBG_SHELL,
                                NULL, NULL);
        hammer2_msg_write(msg);
        hammer2_iocom_core(&iocom);
                                NULL, NULL);
        hammer2_msg_write(msg);
        hammer2_iocom_core(&iocom);
@@ -151,7 +151,7 @@ shell_ttymsg(hammer2_iocom_t *iocom)
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
-               msg = hammer2_msg_alloc(&iocom->router, len, HAMMER2_DBG_SHELL,
+               msg = hammer2_msg_alloc(iocom->router, len, HAMMER2_DBG_SHELL,
                                        NULL, NULL);
                bcopy(buf, msg->aux_data, len);
                hammer2_msg_write(msg);
                                        NULL, NULL);
                bcopy(buf, msg->aux_data, len);
                hammer2_msg_write(msg);
index 8049034..5c91865 100644 (file)
@@ -154,6 +154,10 @@ void hammer2_iocom_flush2(hammer2_iocom_t *iocom);
 void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 
 void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 
+hammer2_router_t *hammer2_router_alloc(void);
+void hammer2_router_connect(hammer2_router_t *router);
+void hammer2_router_disconnect(hammer2_router_t **routerp);
+
 /*
  * Msg protocol functions
  */
 /*
  * Msg protocol functions
  */
index e080598..d898da1 100644 (file)
@@ -39,8 +39,31 @@ static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 /*
 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 /*
- * Indexed messages are stored in a red-black tree indexed by their
- * msgid.  Only persistent messages are indexed.
+ * ROUTER TREE - Represents available routes for message routing, indexed
+ *              by their spanid.  The router structure is embedded in
+ *              either an iocom, h2span_link, or h2span_relay (see msg_lnk.c).
+ */
+int
+hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2)
+{
+       if (router1->spanid < router2->spanid)
+               return(-1);
+       if (router1->spanid > router2->spanid)
+               return(1);
+       return(0);
+}
+
+RB_GENERATE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp);
+
+static pthread_mutex_t router_mtx;
+static struct hammer2_router_tree router_ltree = RB_INITIALIZER(router_ltree);
+static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree);
+
+/*
+ * STATE TREE - Represents open transactions which are indexed by their
+ *             {spanid,msgid} relative to the governing iocom.  spanid
+ *             will usually be 0 since a non-zero spanid would have been
+ *             routed elsewhere.
  */
 int
 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
  */
 int
 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
@@ -104,18 +127,20 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
 {
        bzero(iocom, sizeof(*iocom));
 
 {
        bzero(iocom, sizeof(*iocom));
 
-       iocom->router.signal_callback = signal_func;
-       iocom->router.rcvmsg_callback = rcvmsg_func;
-       iocom->router.altmsg_callback = altmsg_func;
+       iocom->router = hammer2_router_alloc();
+       iocom->router->signal_callback = signal_func;
+       iocom->router->rcvmsg_callback = rcvmsg_func;
+       iocom->router->altmsg_callback = altmsg_func;
+       /* we do not call hammer2_router_connect() for iocom routers */
 
        pthread_mutex_init(&iocom->mtx, NULL);
 
        pthread_mutex_init(&iocom->mtx, NULL);
-       RB_INIT(&iocom->router.staterd_tree);
-       RB_INIT(&iocom->router.statewr_tree);
+       RB_INIT(&iocom->router->staterd_tree);
+       RB_INIT(&iocom->router->statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->addrq);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->addrq);
-       TAILQ_INIT(&iocom->router.txmsgq);
-       iocom->router.iocom = iocom;
+       TAILQ_INIT(&iocom->router->txmsgq);
+       iocom->router->iocom = iocom;
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = HAMMER2_IOCOMF_RREQ;
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = HAMMER2_IOCOMF_RREQ;
@@ -251,14 +276,17 @@ hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
                state->iocom = iocom;
                state->flags = HAMMER2_STATE_DYNAMIC;
                state->msgid = (uint64_t)(uintptr_t)state;
                state->iocom = iocom;
                state->flags = HAMMER2_STATE_DYNAMIC;
                state->msgid = (uint64_t)(uintptr_t)state;
-               /* XXX set state->spanid from router */
+               if (router->link)
+                       state->spanid = router->spanid;
                state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
                                       HAMMER2_MSGF_DELETE);
                state->rxcmd = HAMMER2_MSGF_REPLY;
                state->func = func;
                state->any.any = data;
                pthread_mutex_lock(&iocom->mtx);
                state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
                                       HAMMER2_MSGF_DELETE);
                state->rxcmd = HAMMER2_MSGF_REPLY;
                state->func = func;
                state->any.any = data;
                pthread_mutex_lock(&iocom->mtx);
-               RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state);
+               RB_INSERT(hammer2_state_tree,
+                         &iocom->router->statewr_tree,
+                         state);
                pthread_mutex_unlock(&iocom->mtx);
                state->flags |= HAMMER2_STATE_INSERTED;
        }
                pthread_mutex_unlock(&iocom->mtx);
                state->flags |= HAMMER2_STATE_INSERTED;
        }
@@ -416,7 +444,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom)
 
                if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
 
                if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
-                       iocom->router.signal_callback(&iocom->router);
+                       iocom->router->signal_callback(iocom->router);
                }
 
                /*
                }
 
                /*
@@ -429,7 +457,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom)
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
                        iocom->flags |= HAMMER2_IOCOMF_RWORK;
                        iocom->flags |= HAMMER2_IOCOMF_WWORK;
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
                        iocom->flags |= HAMMER2_IOCOMF_RWORK;
                        iocom->flags |= HAMMER2_IOCOMF_WWORK;
-                       if (TAILQ_FIRST(&iocom->router.txmsgq))
+                       if (TAILQ_FIRST(&iocom->router->txmsgq))
                                hammer2_iocom_flush1(iocom);
                }
 
                                hammer2_iocom_flush1(iocom);
                }
 
@@ -451,14 +479,14 @@ hammer2_iocom_core(hammer2_iocom_t *iocom)
                                        fprintf(stderr, "receive %s\n",
                                                hammer2_msg_str(msg));
                                }
                                        fprintf(stderr, "receive %s\n",
                                                hammer2_msg_str(msg));
                                }
-                               iocom->router.rcvmsg_callback(msg);
+                               iocom->router->rcvmsg_callback(msg);
                                hammer2_state_cleanuprx(iocom, msg);
                        }
                }
 
                if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
                                hammer2_state_cleanuprx(iocom, msg);
                        }
                }
 
                if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
                        iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
-                       iocom->router.altmsg_callback(iocom);
+                       iocom->router->altmsg_callback(iocom);
                }
        }
 }
                }
        }
 }
@@ -530,12 +558,16 @@ again:
        /*
         * If a message is already pending we can just remove and
         * return it.  Message state has already been processed.
        /*
         * If a message is already pending we can just remove and
         * return it.  Message state has already been processed.
+        * (currently not implemented)
         */
        if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                return (msg);
        }
 
         */
        if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                return (msg);
        }
 
+       /*
+        * If the stream is errored out we stop processing it.
+        */
        if (ioq->error)
                goto skip;
 
        if (ioq->error)
                goto skip;
 
@@ -639,7 +671,7 @@ again:
                /*
                 * Allocate the message, the next state will fill it in.
                 */
                /*
                 * Allocate the message, the next state will fill it in.
                 */
-               msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0,
+               msg = hammer2_msg_alloc(iocom->router, ioq->abytes, 0,
                                        NULL, NULL);
                ioq->msg = msg;
 
                                        NULL, NULL);
                ioq->msg = msg;
 
@@ -929,7 +961,7 @@ skip:
                 * LNK_ERROR (that the session can detect) when no
                 * transactions remain.
                 */
                 * LNK_ERROR (that the session can detect) when no
                 * transactions remain.
                 */
-               msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL);
+               msg = hammer2_msg_alloc(iocom->router, 0, 0, NULL, NULL);
                bzero(&msg->any.head, sizeof(msg->any.head));
                msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
                msg->any.head.cmd = HAMMER2_LNK_ERROR;
                bzero(&msg->any.head, sizeof(msg->any.head));
                msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
                msg->any.head.cmd = HAMMER2_LNK_ERROR;
@@ -937,7 +969,7 @@ skip:
 
                pthread_mutex_lock(&iocom->mtx);
                hammer2_iocom_drain(iocom);
 
                pthread_mutex_lock(&iocom->mtx);
                hammer2_iocom_drain(iocom);
-               if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) {
+               if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) {
                        /*
                         * Active remote transactions are still present.
                         * Simulate the other end sending us a DELETE.
                        /*
                         * Active remote transactions are still present.
                         * Simulate the other end sending us a DELETE.
@@ -953,7 +985,8 @@ skip:
                                msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                                     HAMMER2_MSGF_DELETE;
                        }
                                msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                                     HAMMER2_MSGF_DELETE;
                        }
-               } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) {
+               } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) !=
+                          NULL) {
                        /*
                         * Active local transactions are still present.
                         * Simulate the other end sending us a DELETE.
                        /*
                         * Active local transactions are still present.
                         * Simulate the other end sending us a DELETE.
@@ -1054,8 +1087,8 @@ hammer2_iocom_flush1(hammer2_iocom_t *iocom)
        iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
        iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
-       while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) {
-               TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry);
+       while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) {
+               TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry);
                TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        pthread_mutex_unlock(&iocom->mtx);
                TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        pthread_mutex_unlock(&iocom->mtx);
@@ -1353,7 +1386,7 @@ hammer2_msg_write(hammer2_msg_t *msg)
         * Queue it for output, wake up the I/O pthread.  Note that the
         * I/O thread is responsible for generating the CRCs and encryption.
         */
         * Queue it for output, wake up the I/O pthread.  Note that the
         * I/O thread is responsible for generating the CRCs and encryption.
         */
-       TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry);
+       TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry);
        dummy = 0;
        write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
        pthread_mutex_unlock(&iocom->mtx);
        dummy = 0;
        write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
        pthread_mutex_unlock(&iocom->mtx);
@@ -1411,7 +1444,7 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
         * We cannot pass MSGF_CREATE to msg_alloc() because that may
         * allocate new state.  We have our state already.
         */
         * We cannot pass MSGF_CREATE to msg_alloc() because that may
         * allocate new state.  We have our state already.
         */
-       nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+       nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
        if (state) {
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
@@ -1461,7 +1494,7 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
                        cmd |= HAMMER2_MSGF_REPLY;
        }
 
-       nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+       nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
        if (state) {
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
@@ -1493,7 +1526,7 @@ hammer2_state_reply(hammer2_state_t *state, uint32_t error)
        if (state->txcmd & HAMMER2_MSGF_REPLY)
                cmd |= HAMMER2_MSGF_REPLY;
 
        if (state->txcmd & HAMMER2_MSGF_REPLY)
                cmd |= HAMMER2_MSGF_REPLY;
 
-       nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL);
+       nmsg = hammer2_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL);
        if (state) {
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
        if (state) {
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
@@ -1595,10 +1628,10 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
        pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
        pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
-                               &iocom->router.statewr_tree, &dummy);
+                               &iocom->router->statewr_tree, &dummy);
        } else {
                state = RB_FIND(hammer2_state_tree,
        } else {
                state = RB_FIND(hammer2_state_tree,
-                               &iocom->router.staterd_tree, &dummy);
+                               &iocom->router->staterd_tree, &dummy);
        }
        msg->state = state;
        pthread_mutex_unlock(&iocom->mtx);
        }
        msg->state = state;
        pthread_mutex_unlock(&iocom->mtx);
@@ -1642,7 +1675,7 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
                RB_INSERT(hammer2_state_tree,
                msg->state = state;
                pthread_mutex_lock(&iocom->mtx);
                RB_INSERT(hammer2_state_tree,
-                         &iocom->router.staterd_tree, state);
+                         &iocom->router->staterd_tree, state);
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
                if (DebugOpt) {
                pthread_mutex_unlock(&iocom->mtx);
                error = 0;
                if (DebugOpt) {
@@ -1794,11 +1827,11 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (state->rxcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
                        if (state->rxcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->router.statewr_tree, state);
+                                         &iocom->router->statewr_tree, state);
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->router.staterd_tree, state);
+                                         &iocom->router->staterd_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
@@ -1833,11 +1866,11 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        if (state->txcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
                        if (state->txcmd & HAMMER2_MSGF_REPLY) {
                                assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->router.staterd_tree, state);
+                                         &iocom->router->staterd_tree, state);
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
                        } else {
                                assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
-                                         &iocom->router.statewr_tree, state);
+                                         &iocom->router->statewr_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                        hammer2_state_free(state);
@@ -1882,13 +1915,95 @@ hammer2_state_free(hammer2_state_t *state)
         * We may have to wake up the rx code.
         */
        if (iocom->ioq_rx.error &&
         * We may have to wake up the rx code.
         */
        if (iocom->ioq_rx.error &&
-           RB_EMPTY(&iocom->router.staterd_tree) &&
-           RB_EMPTY(&iocom->router.statewr_tree)) {
+           RB_EMPTY(&iocom->router->staterd_tree) &&
+           RB_EMPTY(&iocom->router->statewr_tree)) {
                dummy = 0;
                write(iocom->wakeupfds[1], &dummy, 1);
        }
 }
 
                dummy = 0;
                write(iocom->wakeupfds[1], &dummy, 1);
        }
 }
 
+/************************************************************************
+ *                             ROUTING                                 *
+ ************************************************************************
+ *
+ * Incoming messages are routed by their spanid, matched up against
+ * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c).
+ * Any replies run through the same router.
+ *
+ * Originated messages are routed by their spanid, matched up against
+ * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c).
+ * Replies come back through the same route.
+ *
+ * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular
+ * transaction runs through the same route.  Commands and replies both.
+ *
+ * An originated message will use a different routing spanid to
+ * reach a target node than a message which originates from that node.
+ * They might use the same physical pipes (each pipe can have multiple
+ * SPANs and RELAYs), but the routes are distinct from the perspective
+ * of the router.
+ */
+hammer2_router_t *
+hammer2_router_alloc(void)
+{
+       hammer2_router_t *router;
+
+       router = hammer2_alloc(sizeof(*router));
+       TAILQ_INIT(&router->txmsgq);
+       return (router);
+}
+
+void
+hammer2_router_connect(hammer2_router_t *router)
+{
+       hammer2_router_t *tmp;
+
+       assert(router->link || router->relay);
+       assert((router->flags & HAMMER2_ROUTER_CONNECTED) == 0);
+
+       pthread_mutex_lock(&router_mtx);
+       if (router->link)
+               tmp = RB_INSERT(hammer2_router_tree, &router_ltree, router);
+       else
+               tmp = RB_INSERT(hammer2_router_tree, &router_rtree, router);
+       assert(tmp == NULL);
+       router->flags |= HAMMER2_ROUTER_CONNECTED;
+       pthread_mutex_unlock(&router_mtx);
+}
+
+void
+hammer2_router_disconnect(hammer2_router_t **routerp)
+{
+       hammer2_router_t *router;
+
+       router = *routerp;
+       assert(router->link || router->relay);
+       assert(router->flags & HAMMER2_ROUTER_CONNECTED);
+
+       pthread_mutex_lock(&router_mtx);
+       if (router->link)
+               RB_REMOVE(hammer2_router_tree, &router_ltree, router);
+       else
+               RB_REMOVE(hammer2_router_tree, &router_rtree, router);
+       router->flags &= ~HAMMER2_ROUTER_CONNECTED;
+       *routerp = NULL;
+       pthread_mutex_unlock(&router_mtx);
+}
+
+#if 0
+/*
+ * XXX
+ */
+hammer2_router_t *
+hammer2_route_msg(hammer2_msg_t *msg)
+{
+}
+#endif
+
+/************************************************************************
+ *                             DEBUGGING                               *
+ ************************************************************************/
+
 const char *
 hammer2_basecmd_str(uint32_t cmd)
 {
 const char *
 hammer2_basecmd_str(uint32_t cmd)
 {
index 229af80..12d5399 100644 (file)
@@ -211,6 +211,7 @@ struct h2span_cluster {
        RB_ENTRY(h2span_cluster) rbnode;
        struct h2span_node_tree tree;
        uuid_t  pfs_clid;               /* shared fsid */
        RB_ENTRY(h2span_cluster) rbnode;
        struct h2span_node_tree tree;
        uuid_t  pfs_clid;               /* shared fsid */
+       int     refs;                   /* prevents destruction */
 };
 
 struct h2span_node {
 };
 
 struct h2span_node {
@@ -227,7 +228,7 @@ struct h2span_link {
        struct h2span_node *node;       /* related node */
        int32_t dist;
        struct h2span_relay_queue relayq; /* relay out */
        struct h2span_node *node;       /* related node */
        int32_t dist;
        struct h2span_relay_queue relayq; /* relay out */
-       struct hammer2_router   router;
+       struct hammer2_router *router;  /* route out this link */
 };
 
 /*
 };
 
 /*
@@ -240,13 +241,22 @@ struct h2span_link {
  * address of the 'state' structure, which is why h2span_relay has to
  * be entered into a RB-TREE based at h2span_connect (so we can look
  * up the spanid to validate it).
  * address of the 'state' structure, which is why h2span_relay has to
  * be entered into a RB-TREE based at h2span_connect (so we can look
  * up the spanid to validate it).
+ *
+ * NOTE: Messages can be received via the LNK_SPAN transaction the
+ *      relay maintains, and can be replied via relay->router, but
+ *      messages are NOT initiated via a relay.  Messages are initiated
+ *      via incoming links (h2span_link's).
+ *
+ *      relay->link represents the link being relayed, NOT the LNK_SPAN
+ *      transaction the relay is holding open.
  */
 struct h2span_relay {
        RB_ENTRY(h2span_relay) rbnode;  /* from h2span_connect */
        TAILQ_ENTRY(h2span_relay) entry; /* from link */
        struct h2span_connect *conn;
        hammer2_state_t *state;         /* transmitted LNK_SPAN */
  */
 struct h2span_relay {
        RB_ENTRY(h2span_relay) rbnode;  /* from h2span_connect */
        TAILQ_ENTRY(h2span_relay) entry; /* from link */
        struct h2span_connect *conn;
        hammer2_state_t *state;         /* transmitted LNK_SPAN */
-       struct h2span_link *link;       /* received LNK_SPAN */
+       struct h2span_link *link;       /* LNK_SPAN being relayed */
+       struct hammer2_router   *router;/* route out this relay */
 };
 
 
 };
 
 
@@ -518,10 +528,16 @@ hammer2_lnk_span(hammer2_msg_t *msg)
 
                /*
                 * Embedded router structure in link for message forwarding.
 
                /*
                 * Embedded router structure in link for message forwarding.
+                *
+                * The spanning id for the router is the message id of
+                * the SPAN link it is embedded in, allowing messages to
+                * be routed via &slink->router.
                 */
                 */
-               TAILQ_INIT(&slink->router.txmsgq);
-               slink->router.iocom = state->iocom;
-               slink->router.link = slink;
+               slink->router = hammer2_router_alloc();
+               slink->router->iocom = state->iocom;
+               slink->router->link = slink;
+               slink->router->spanid = state->msgid;
+               hammer2_router_connect(slink->router);
 
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
 
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
@@ -533,7 +549,6 @@ hammer2_lnk_span(hammer2_msg_t *msg)
                        msg->any.lnk_span.label,
                        msg->any.lnk_span.dist);
                free(alloc);
                        msg->any.lnk_span.label,
                        msg->any.lnk_span.dist);
                free(alloc);
-
 #if 0
                hammer2_relay_scan(NULL, node);
 #endif
 #if 0
                hammer2_relay_scan(NULL, node);
 #endif
@@ -558,6 +573,11 @@ hammer2_lnk_span(hammer2_msg_t *msg)
                free(alloc);
 
                /*
                free(alloc);
 
                /*
+                * Remove the router from consideration
+                */
+               hammer2_router_disconnect(&slink->router);
+
+               /*
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
                 */
                 * Clean out all relays.  This requires terminating each
                 * relay transaction.
                 */
@@ -571,7 +591,7 @@ hammer2_lnk_span(hammer2_msg_t *msg)
                RB_REMOVE(h2span_link_tree, &node->tree, slink);
                if (RB_EMPTY(&node->tree)) {
                        RB_REMOVE(h2span_node_tree, &cls->tree, node);
                RB_REMOVE(h2span_link_tree, &node->tree, slink);
                if (RB_EMPTY(&node->tree)) {
                        RB_REMOVE(h2span_node_tree, &cls->tree, node);
-                       if (RB_EMPTY(&cls->tree)) {
+                       if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
                                RB_REMOVE(h2span_cluster_tree,
                                          &cluster_tree, cls);
                                hammer2_free(cls);
                                RB_REMOVE(h2span_cluster_tree,
                                          &cluster_tree, cls);
                                hammer2_free(cls);
@@ -795,14 +815,21 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
                        relay->conn = conn;
                        relay->link = slink;
 
                        relay->conn = conn;
                        relay->link = slink;
 
-                       msg = hammer2_msg_alloc(&conn->state->iocom->router, 0,
+                       msg = hammer2_msg_alloc(conn->state->iocom->router, 0,
                                                HAMMER2_LNK_SPAN |
                                                HAMMER2_MSGF_CREATE,
                                                hammer2_lnk_relay, relay);
                        relay->state = msg->state;
                                                HAMMER2_LNK_SPAN |
                                                HAMMER2_MSGF_CREATE,
                                                hammer2_lnk_relay, relay);
                        relay->state = msg->state;
+                       relay->router = hammer2_router_alloc();
+                       relay->router->iocom = relay->state->iocom;
+                       relay->router->relay = relay;
+                       relay->router->spanid = relay->state->msgid;
+
                        msg->any.lnk_span = slink->state->msg->any.lnk_span;
                        msg->any.lnk_span.dist = slink->dist + 1;
 
                        msg->any.lnk_span = slink->state->msg->any.lnk_span;
                        msg->any.lnk_span.dist = slink->dist + 1;
 
+                       hammer2_router_connect(relay->router);
+
                        RB_INSERT(h2span_relay_tree, &conn->tree, relay);
                        TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
 
                        RB_INSERT(h2span_relay_tree, &conn->tree, relay);
                        TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
 
@@ -850,6 +877,8 @@ hammer2_relay_delete(h2span_relay_t *relay)
                relay->link->dist,
                relay->conn->state->iocom->sock_fd, relay->state);
 
                relay->link->dist,
                relay->conn->state->iocom->sock_fd, relay->state);
 
+       hammer2_router_disconnect(&relay->router);
+
        RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
        TAILQ_REMOVE(&relay->link->relayq, relay, entry);
 
        RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
        TAILQ_REMOVE(&relay->link->relayq, relay, entry);
 
@@ -865,11 +894,64 @@ hammer2_relay_delete(h2span_relay_t *relay)
 }
 
 /************************************************************************
 }
 
 /************************************************************************
- *                             ROUTER                                  *
+ *                     ROUTER AND MESSAGING HANDLES                    *
  ************************************************************************
  *
  ************************************************************************
  *
- * Provides route functions to msg.c
+ * Basically the idea here is to provide a stable data structure which
+ * can be localized to the caller for higher level protocols to work with.
+ * Depends on the context, these hammer2_handle's can be pooled by use-case
+ * and remain persistent through a client (or mount point's) life.
+ */
+
+#if 0
+/*
+ * Obtain a stable handle on a cluster given its uuid.  This ties directly
+ * into the global cluster topology, creating the structure if necessary
+ * (even if the uuid does not exist or does not exist yet), and preventing
+ * the structure from getting ripped out from under us while we hold a
+ * pointer to it.
+ */
+h2span_cluster_t *
+hammer2_cluster_get(uuid_t *pfs_clid)
+{
+       h2span_cluster_t dummy_cls;
+       h2span_cluster_t *cls;
+
+       dummy_cls.pfs_clid = *pfs_clid;
+       pthread_mutex_lock(&cluster_mtx);
+       cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
+       if (cls)
+               ++cls->refs;
+       pthread_mutex_unlock(&cluster_mtx);
+       return (cls);
+}
+
+void
+hammer2_cluster_put(h2span_cluster_t *cls)
+{
+       pthread_mutex_lock(&cluster_mtx);
+       assert(cls->refs > 0);
+       --cls->refs;
+       if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
+               RB_REMOVE(h2span_cluster_tree,
+                         &cluster_tree, cls);
+               hammer2_free(cls);
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+}
+
+/*
+ * Obtain a stable handle to a specific cluster node given its uuid.
+ * This handle does NOT lock in the route to the node and is typically
+ * used as part of the hammer2_handle_*() API to obtain a set of
+ * stable nodes.
  */
  */
+h2span_node_t *
+hammer2_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
+{
+}
+
+#endif
 
 #if 0
 /*
 
 #if 0
 /*
@@ -891,6 +973,9 @@ hammer2_router_put(hammer2_router_t *router)
 }
 #endif
 
 }
 #endif
 
+/************************************************************************
+ *                             DEBUGGER                                *
+ ************************************************************************/
 /*
  * Dumps the spanning tree
  */
 /*
  * Dumps the spanning tree
  */
index 7e86481..6b1b166 100644 (file)
@@ -116,6 +116,7 @@ typedef struct hammer2_handshake hammer2_handshake_t;
 struct hammer2_iocom;
 struct hammer2_persist;
 struct hammer2_state;
 struct hammer2_iocom;
 struct hammer2_persist;
 struct hammer2_state;
+struct hammer2_router;
 struct hammer2_address;
 struct hammer2_msg;
 
 struct hammer2_address;
 struct hammer2_msg;
 
@@ -123,8 +124,10 @@ TAILQ_HEAD(hammer2_state_queue, hammer2_state);
 TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
 TAILQ_HEAD(hammer2_address_queue, hammer2_address);
 RB_HEAD(hammer2_state_tree, hammer2_state);
 TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
 TAILQ_HEAD(hammer2_address_queue, hammer2_address);
 RB_HEAD(hammer2_state_tree, hammer2_state);
+RB_HEAD(hammer2_router_tree, hammer2_router);
 
 struct h2span_link;
 
 struct h2span_link;
+struct h2span_relay;
 struct h2span_connect;
 
 struct hammer2_state {
 struct h2span_connect;
 
 struct hammer2_state {
@@ -239,22 +242,33 @@ typedef struct hammer2_ioq hammer2_ioq_t;
  *                 hammer2_msg_write.
  *
  * The router is either connected to an iocom (socket) directly, or
  *                 hammer2_msg_write.
  *
  * The router is either connected to an iocom (socket) directly, or
- * connected to a SPAN transaction (h2span_link structure).
+ * connected to a SPAN transaction (h2span_link structure for outgoing)
+ * or to a SPAN transaction (h2span_relay structure for incoming).
  */
 struct hammer2_router {
  */
 struct hammer2_router {
+       RB_ENTRY(hammer2_router) rbnode;        /* indexed by spanid */
        struct hammer2_iocom *iocom;
        struct hammer2_iocom *iocom;
-       struct h2span_link   *link;             /* non-NULL if indirect */
+       struct h2span_link   *link;             /* may be NULL */
+       struct h2span_relay  *relay;            /* may be NULL */
        void    (*signal_callback)(struct hammer2_router *);
        void    (*rcvmsg_callback)(struct hammer2_msg *);
        void    (*altmsg_callback)(struct hammer2_iocom *);
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state_tree statewr_tree; /* active messages */
        hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
        void    (*signal_callback)(struct hammer2_router *);
        void    (*rcvmsg_callback)(struct hammer2_msg *);
        void    (*altmsg_callback)(struct hammer2_iocom *);
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state_tree statewr_tree; /* active messages */
        hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
+       uint64_t        spanid;                 /* for routing */
+       int     flags;
        int     refs;                           /* refs prevent destruction */
 };
 
        int     refs;                           /* refs prevent destruction */
 };
 
+#define HAMMER2_ROUTER_CONNECTED       0x0001  /* on global RB tree */
+#define HAMMER2_ROUTER_DELETED         0x0002  /* parent structure destroyed */
+
 typedef struct hammer2_router hammer2_router_t;
 
 typedef struct hammer2_router hammer2_router_t;
 
+int hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2);
+RB_PROTOTYPE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp);
+
 /*
  * hammer2_iocom - governs a messaging stream connection
  */
 /*
  * hammer2_iocom - governs a messaging stream connection
  */
@@ -271,7 +285,7 @@ struct hammer2_iocom {
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
-       struct hammer2_router router;
+       struct hammer2_router *router;
        pthread_mutex_t mtx;                    /* mutex for state*tree/rmsgq */
 };
 
        pthread_mutex_t mtx;                    /* mutex for state*tree/rmsgq */
 };