From: Matthew Dillon Date: Fri, 10 Aug 2012 01:11:32 +0000 (-0700) Subject: hammer2 - Message routing work X-Git-Tag: v3.4.0rc~1026 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/90e8cd1d796ee6df36f9f968cd5ced3b14819b09?ds=sidebyside hammer2 - Message routing work * Make hammer2_router a separately allocated structure so we can use it as a persistent entity when its related hammer2_iocom, h2span_link, or h2span_relay structure gets ripped out. --- diff --git a/sbin/hammer2/cmd_debug.c b/sbin/hammer2/cmd_debug.c index f79117adf2..369e9138a8 100644 --- a/sbin/hammer2/cmd_debug.c +++ b/sbin/hammer2/cmd_debug.c @@ -66,7 +66,7 @@ cmd_shell(const char *hostname) fcntl(0, F_SETFL, O_NONBLOCK); printf("debug: connected\n"); - msg = hammer2_msg_alloc(&iocom.router, 0, HAMMER2_DBG_SHELL, + msg = hammer2_msg_alloc(iocom.router, 0, HAMMER2_DBG_SHELL, NULL, NULL); hammer2_msg_write(msg); hammer2_iocom_core(&iocom); @@ -151,7 +151,7 @@ shell_ttymsg(hammer2_iocom_t *iocom) if (len && buf[len - 1] == '\n') buf[--len] = 0; ++len; - msg = hammer2_msg_alloc(&iocom->router, len, HAMMER2_DBG_SHELL, + msg = hammer2_msg_alloc(iocom->router, len, HAMMER2_DBG_SHELL, NULL, NULL); bcopy(buf, msg->aux_data, len); hammer2_msg_write(msg); diff --git a/sbin/hammer2/hammer2.h b/sbin/hammer2/hammer2.h index 8049034a75..5c91865004 100644 --- a/sbin/hammer2/hammer2.h +++ b/sbin/hammer2/hammer2.h @@ -154,6 +154,10 @@ void hammer2_iocom_flush2(hammer2_iocom_t *iocom); void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); void hammer2_state_free(hammer2_state_t *state); +hammer2_router_t *hammer2_router_alloc(void); +void hammer2_router_connect(hammer2_router_t *router); +void hammer2_router_disconnect(hammer2_router_t **routerp); + /* * Msg protocol functions */ diff --git a/sbin/hammer2/msg.c b/sbin/hammer2/msg.c index e0805982fd..d898da1dc4 100644 --- a/sbin/hammer2/msg.c +++ b/sbin/hammer2/msg.c @@ -39,8 +39,31 @@ static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); /* - * Indexed messages are stored in a red-black tree indexed by their - * msgid. Only persistent messages are indexed. + * ROUTER TREE - Represents available routes for message routing, indexed + * by their spanid. The router structure is embedded in + * either an iocom, h2span_link, or h2span_relay (see msg_lnk.c). + */ +int +hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2) +{ + if (router1->spanid < router2->spanid) + return(-1); + if (router1->spanid > router2->spanid) + return(1); + return(0); +} + +RB_GENERATE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp); + +static pthread_mutex_t router_mtx; +static struct hammer2_router_tree router_ltree = RB_INITIALIZER(router_ltree); +static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree); + +/* + * STATE TREE - Represents open transactions which are indexed by their + * {spanid,msgid} relative to the governing iocom. spanid + * will usually be 0 since a non-zero spanid would have been + * routed elsewhere. */ int hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2) @@ -104,18 +127,20 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd, { bzero(iocom, sizeof(*iocom)); - iocom->router.signal_callback = signal_func; - iocom->router.rcvmsg_callback = rcvmsg_func; - iocom->router.altmsg_callback = altmsg_func; + iocom->router = hammer2_router_alloc(); + iocom->router->signal_callback = signal_func; + iocom->router->rcvmsg_callback = rcvmsg_func; + iocom->router->altmsg_callback = altmsg_func; + /* we do not call hammer2_router_connect() for iocom routers */ pthread_mutex_init(&iocom->mtx, NULL); - RB_INIT(&iocom->router.staterd_tree); - RB_INIT(&iocom->router.statewr_tree); + RB_INIT(&iocom->router->staterd_tree); + RB_INIT(&iocom->router->statewr_tree); TAILQ_INIT(&iocom->freeq); TAILQ_INIT(&iocom->freeq_aux); TAILQ_INIT(&iocom->addrq); - TAILQ_INIT(&iocom->router.txmsgq); - iocom->router.iocom = iocom; + TAILQ_INIT(&iocom->router->txmsgq); + iocom->router->iocom = iocom; iocom->sock_fd = sock_fd; iocom->alt_fd = alt_fd; iocom->flags = HAMMER2_IOCOMF_RREQ; @@ -251,14 +276,17 @@ hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd, state->iocom = iocom; state->flags = HAMMER2_STATE_DYNAMIC; state->msgid = (uint64_t)(uintptr_t)state; - /* XXX set state->spanid from router */ + if (router->link) + state->spanid = router->spanid; state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE); state->rxcmd = HAMMER2_MSGF_REPLY; state->func = func; state->any.any = data; pthread_mutex_lock(&iocom->mtx); - RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state); + RB_INSERT(hammer2_state_tree, + &iocom->router->statewr_tree, + state); pthread_mutex_unlock(&iocom->mtx); state->flags |= HAMMER2_STATE_INSERTED; } @@ -416,7 +444,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom) if (iocom->flags & HAMMER2_IOCOMF_SWORK) { iocom->flags &= ~HAMMER2_IOCOMF_SWORK; - iocom->router.signal_callback(&iocom->router); + iocom->router->signal_callback(iocom->router); } /* @@ -429,7 +457,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom) read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); iocom->flags |= HAMMER2_IOCOMF_RWORK; iocom->flags |= HAMMER2_IOCOMF_WWORK; - if (TAILQ_FIRST(&iocom->router.txmsgq)) + if (TAILQ_FIRST(&iocom->router->txmsgq)) hammer2_iocom_flush1(iocom); } @@ -451,14 +479,14 @@ hammer2_iocom_core(hammer2_iocom_t *iocom) fprintf(stderr, "receive %s\n", hammer2_msg_str(msg)); } - iocom->router.rcvmsg_callback(msg); + iocom->router->rcvmsg_callback(msg); hammer2_state_cleanuprx(iocom, msg); } } if (iocom->flags & HAMMER2_IOCOMF_ARWORK) { iocom->flags &= ~HAMMER2_IOCOMF_ARWORK; - iocom->router.altmsg_callback(iocom); + iocom->router->altmsg_callback(iocom); } } } @@ -530,12 +558,16 @@ again: /* * If a message is already pending we can just remove and * return it. Message state has already been processed. + * (currently not implemented) */ if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { TAILQ_REMOVE(&ioq->msgq, msg, qentry); return (msg); } + /* + * If the stream is errored out we stop processing it. + */ if (ioq->error) goto skip; @@ -639,7 +671,7 @@ again: /* * Allocate the message, the next state will fill it in. */ - msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0, + msg = hammer2_msg_alloc(iocom->router, ioq->abytes, 0, NULL, NULL); ioq->msg = msg; @@ -929,7 +961,7 @@ skip: * LNK_ERROR (that the session can detect) when no * transactions remain. */ - msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL); + msg = hammer2_msg_alloc(iocom->router, 0, 0, NULL, NULL); bzero(&msg->any.head, sizeof(msg->any.head)); msg->any.head.magic = HAMMER2_MSGHDR_MAGIC; msg->any.head.cmd = HAMMER2_LNK_ERROR; @@ -937,7 +969,7 @@ skip: pthread_mutex_lock(&iocom->mtx); hammer2_iocom_drain(iocom); - if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) { + if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) { /* * Active remote transactions are still present. * Simulate the other end sending us a DELETE. @@ -953,7 +985,8 @@ skip: msg->any.head.cmd |= HAMMER2_MSGF_ABORT | HAMMER2_MSGF_DELETE; } - } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) { + } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) != + NULL) { /* * Active local transactions are still present. * Simulate the other end sending us a DELETE. @@ -1054,8 +1087,8 @@ hammer2_iocom_flush1(hammer2_iocom_t *iocom) iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK); TAILQ_INIT(&tmpq); pthread_mutex_lock(&iocom->mtx); - while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) { - TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry); + while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) { + TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry); TAILQ_INSERT_TAIL(&tmpq, msg, qentry); } pthread_mutex_unlock(&iocom->mtx); @@ -1353,7 +1386,7 @@ hammer2_msg_write(hammer2_msg_t *msg) * Queue it for output, wake up the I/O pthread. Note that the * I/O thread is responsible for generating the CRCs and encryption. */ - TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry); + TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry); dummy = 0; write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ pthread_mutex_unlock(&iocom->mtx); @@ -1411,7 +1444,7 @@ hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error) * We cannot pass MSGF_CREATE to msg_alloc() because that may * allocate new state. We have our state already. */ - nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL); + nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL); if (state) { if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; @@ -1461,7 +1494,7 @@ hammer2_msg_result(hammer2_msg_t *msg, uint32_t error) cmd |= HAMMER2_MSGF_REPLY; } - nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL); + nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL); if (state) { if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; @@ -1493,7 +1526,7 @@ hammer2_state_reply(hammer2_state_t *state, uint32_t error) if (state->txcmd & HAMMER2_MSGF_REPLY) cmd |= HAMMER2_MSGF_REPLY; - nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL); + nmsg = hammer2_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL); if (state) { if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; @@ -1595,10 +1628,10 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) pthread_mutex_lock(&iocom->mtx); if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { state = RB_FIND(hammer2_state_tree, - &iocom->router.statewr_tree, &dummy); + &iocom->router->statewr_tree, &dummy); } else { state = RB_FIND(hammer2_state_tree, - &iocom->router.staterd_tree, &dummy); + &iocom->router->staterd_tree, &dummy); } msg->state = state; pthread_mutex_unlock(&iocom->mtx); @@ -1642,7 +1675,7 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) msg->state = state; pthread_mutex_lock(&iocom->mtx); RB_INSERT(hammer2_state_tree, - &iocom->router.staterd_tree, state); + &iocom->router->staterd_tree, state); pthread_mutex_unlock(&iocom->mtx); error = 0; if (DebugOpt) { @@ -1794,11 +1827,11 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) if (state->rxcmd & HAMMER2_MSGF_REPLY) { assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); RB_REMOVE(hammer2_state_tree, - &iocom->router.statewr_tree, state); + &iocom->router->statewr_tree, state); } else { assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); RB_REMOVE(hammer2_state_tree, - &iocom->router.staterd_tree, state); + &iocom->router->staterd_tree, state); } state->flags &= ~HAMMER2_STATE_INSERTED; hammer2_state_free(state); @@ -1833,11 +1866,11 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) if (state->txcmd & HAMMER2_MSGF_REPLY) { assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); RB_REMOVE(hammer2_state_tree, - &iocom->router.staterd_tree, state); + &iocom->router->staterd_tree, state); } else { assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); RB_REMOVE(hammer2_state_tree, - &iocom->router.statewr_tree, state); + &iocom->router->statewr_tree, state); } state->flags &= ~HAMMER2_STATE_INSERTED; hammer2_state_free(state); @@ -1882,13 +1915,95 @@ hammer2_state_free(hammer2_state_t *state) * We may have to wake up the rx code. */ if (iocom->ioq_rx.error && - RB_EMPTY(&iocom->router.staterd_tree) && - RB_EMPTY(&iocom->router.statewr_tree)) { + RB_EMPTY(&iocom->router->staterd_tree) && + RB_EMPTY(&iocom->router->statewr_tree)) { dummy = 0; write(iocom->wakeupfds[1], &dummy, 1); } } +/************************************************************************ + * ROUTING * + ************************************************************************ + * + * Incoming messages are routed by their spanid, matched up against + * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c). + * Any replies run through the same router. + * + * Originated messages are routed by their spanid, matched up against + * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c). + * Replies come back through the same route. + * + * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular + * transaction runs through the same route. Commands and replies both. + * + * An originated message will use a different routing spanid to + * reach a target node than a message which originates from that node. + * They might use the same physical pipes (each pipe can have multiple + * SPANs and RELAYs), but the routes are distinct from the perspective + * of the router. + */ +hammer2_router_t * +hammer2_router_alloc(void) +{ + hammer2_router_t *router; + + router = hammer2_alloc(sizeof(*router)); + TAILQ_INIT(&router->txmsgq); + return (router); +} + +void +hammer2_router_connect(hammer2_router_t *router) +{ + hammer2_router_t *tmp; + + assert(router->link || router->relay); + assert((router->flags & HAMMER2_ROUTER_CONNECTED) == 0); + + pthread_mutex_lock(&router_mtx); + if (router->link) + tmp = RB_INSERT(hammer2_router_tree, &router_ltree, router); + else + tmp = RB_INSERT(hammer2_router_tree, &router_rtree, router); + assert(tmp == NULL); + router->flags |= HAMMER2_ROUTER_CONNECTED; + pthread_mutex_unlock(&router_mtx); +} + +void +hammer2_router_disconnect(hammer2_router_t **routerp) +{ + hammer2_router_t *router; + + router = *routerp; + assert(router->link || router->relay); + assert(router->flags & HAMMER2_ROUTER_CONNECTED); + + pthread_mutex_lock(&router_mtx); + if (router->link) + RB_REMOVE(hammer2_router_tree, &router_ltree, router); + else + RB_REMOVE(hammer2_router_tree, &router_rtree, router); + router->flags &= ~HAMMER2_ROUTER_CONNECTED; + *routerp = NULL; + pthread_mutex_unlock(&router_mtx); +} + +#if 0 +/* + * XXX + */ +hammer2_router_t * +hammer2_route_msg(hammer2_msg_t *msg) +{ +} +#endif + +/************************************************************************ + * DEBUGGING * + ************************************************************************/ + const char * hammer2_basecmd_str(uint32_t cmd) { diff --git a/sbin/hammer2/msg_lnk.c b/sbin/hammer2/msg_lnk.c index 229af80298..12d5399999 100644 --- a/sbin/hammer2/msg_lnk.c +++ b/sbin/hammer2/msg_lnk.c @@ -211,6 +211,7 @@ struct h2span_cluster { RB_ENTRY(h2span_cluster) rbnode; struct h2span_node_tree tree; uuid_t pfs_clid; /* shared fsid */ + int refs; /* prevents destruction */ }; struct h2span_node { @@ -227,7 +228,7 @@ struct h2span_link { struct h2span_node *node; /* related node */ int32_t dist; struct h2span_relay_queue relayq; /* relay out */ - struct hammer2_router router; + struct hammer2_router *router; /* route out this link */ }; /* @@ -240,13 +241,22 @@ struct h2span_link { * address of the 'state' structure, which is why h2span_relay has to * be entered into a RB-TREE based at h2span_connect (so we can look * up the spanid to validate it). + * + * NOTE: Messages can be received via the LNK_SPAN transaction the + * relay maintains, and can be replied via relay->router, but + * messages are NOT initiated via a relay. Messages are initiated + * via incoming links (h2span_link's). + * + * relay->link represents the link being relayed, NOT the LNK_SPAN + * transaction the relay is holding open. */ struct h2span_relay { RB_ENTRY(h2span_relay) rbnode; /* from h2span_connect */ TAILQ_ENTRY(h2span_relay) entry; /* from link */ struct h2span_connect *conn; hammer2_state_t *state; /* transmitted LNK_SPAN */ - struct h2span_link *link; /* received LNK_SPAN */ + struct h2span_link *link; /* LNK_SPAN being relayed */ + struct hammer2_router *router;/* route out this relay */ }; @@ -518,10 +528,16 @@ hammer2_lnk_span(hammer2_msg_t *msg) /* * Embedded router structure in link for message forwarding. + * + * The spanning id for the router is the message id of + * the SPAN link it is embedded in, allowing messages to + * be routed via &slink->router. */ - TAILQ_INIT(&slink->router.txmsgq); - slink->router.iocom = state->iocom; - slink->router.link = slink; + slink->router = hammer2_router_alloc(); + slink->router->iocom = state->iocom; + slink->router->link = slink; + slink->router->spanid = state->msgid; + hammer2_router_connect(slink->router); RB_INSERT(h2span_link_tree, &node->tree, slink); @@ -533,7 +549,6 @@ hammer2_lnk_span(hammer2_msg_t *msg) msg->any.lnk_span.label, msg->any.lnk_span.dist); free(alloc); - #if 0 hammer2_relay_scan(NULL, node); #endif @@ -557,6 +572,11 @@ hammer2_lnk_span(hammer2_msg_t *msg) state->msg->any.lnk_span.dist); free(alloc); + /* + * Remove the router from consideration + */ + hammer2_router_disconnect(&slink->router); + /* * Clean out all relays. This requires terminating each * relay transaction. @@ -571,7 +591,7 @@ hammer2_lnk_span(hammer2_msg_t *msg) RB_REMOVE(h2span_link_tree, &node->tree, slink); if (RB_EMPTY(&node->tree)) { RB_REMOVE(h2span_node_tree, &cls->tree, node); - if (RB_EMPTY(&cls->tree)) { + if (RB_EMPTY(&cls->tree) && cls->refs == 0) { RB_REMOVE(h2span_cluster_tree, &cluster_tree, cls); hammer2_free(cls); @@ -795,14 +815,21 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn) relay->conn = conn; relay->link = slink; - msg = hammer2_msg_alloc(&conn->state->iocom->router, 0, + msg = hammer2_msg_alloc(conn->state->iocom->router, 0, HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE, hammer2_lnk_relay, relay); relay->state = msg->state; + relay->router = hammer2_router_alloc(); + relay->router->iocom = relay->state->iocom; + relay->router->relay = relay; + relay->router->spanid = relay->state->msgid; + msg->any.lnk_span = slink->state->msg->any.lnk_span; msg->any.lnk_span.dist = slink->dist + 1; + hammer2_router_connect(relay->router); + RB_INSERT(h2span_relay_tree, &conn->tree, relay); TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); @@ -850,6 +877,8 @@ hammer2_relay_delete(h2span_relay_t *relay) relay->link->dist, relay->conn->state->iocom->sock_fd, relay->state); + hammer2_router_disconnect(&relay->router); + RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay); TAILQ_REMOVE(&relay->link->relayq, relay, entry); @@ -865,11 +894,64 @@ hammer2_relay_delete(h2span_relay_t *relay) } /************************************************************************ - * ROUTER * + * ROUTER AND MESSAGING HANDLES * ************************************************************************ * - * Provides route functions to msg.c + * Basically the idea here is to provide a stable data structure which + * can be localized to the caller for higher level protocols to work with. + * Depends on the context, these hammer2_handle's can be pooled by use-case + * and remain persistent through a client (or mount point's) life. + */ + +#if 0 +/* + * Obtain a stable handle on a cluster given its uuid. This ties directly + * into the global cluster topology, creating the structure if necessary + * (even if the uuid does not exist or does not exist yet), and preventing + * the structure from getting ripped out from under us while we hold a + * pointer to it. + */ +h2span_cluster_t * +hammer2_cluster_get(uuid_t *pfs_clid) +{ + h2span_cluster_t dummy_cls; + h2span_cluster_t *cls; + + dummy_cls.pfs_clid = *pfs_clid; + pthread_mutex_lock(&cluster_mtx); + cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls); + if (cls) + ++cls->refs; + pthread_mutex_unlock(&cluster_mtx); + return (cls); +} + +void +hammer2_cluster_put(h2span_cluster_t *cls) +{ + pthread_mutex_lock(&cluster_mtx); + assert(cls->refs > 0); + --cls->refs; + if (RB_EMPTY(&cls->tree) && cls->refs == 0) { + RB_REMOVE(h2span_cluster_tree, + &cluster_tree, cls); + hammer2_free(cls); + } + pthread_mutex_unlock(&cluster_mtx); +} + +/* + * Obtain a stable handle to a specific cluster node given its uuid. + * This handle does NOT lock in the route to the node and is typically + * used as part of the hammer2_handle_*() API to obtain a set of + * stable nodes. */ +h2span_node_t * +hammer2_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid) +{ +} + +#endif #if 0 /* @@ -891,6 +973,9 @@ hammer2_router_put(hammer2_router_t *router) } #endif +/************************************************************************ + * DEBUGGER * + ************************************************************************/ /* * Dumps the spanning tree */ diff --git a/sbin/hammer2/network.h b/sbin/hammer2/network.h index 7e86481a01..6b1b16603e 100644 --- a/sbin/hammer2/network.h +++ b/sbin/hammer2/network.h @@ -116,6 +116,7 @@ typedef struct hammer2_handshake hammer2_handshake_t; struct hammer2_iocom; struct hammer2_persist; struct hammer2_state; +struct hammer2_router; struct hammer2_address; struct hammer2_msg; @@ -123,8 +124,10 @@ TAILQ_HEAD(hammer2_state_queue, hammer2_state); TAILQ_HEAD(hammer2_msg_queue, hammer2_msg); TAILQ_HEAD(hammer2_address_queue, hammer2_address); RB_HEAD(hammer2_state_tree, hammer2_state); +RB_HEAD(hammer2_router_tree, hammer2_router); struct h2span_link; +struct h2span_relay; struct h2span_connect; struct hammer2_state { @@ -239,22 +242,33 @@ typedef struct hammer2_ioq hammer2_ioq_t; * hammer2_msg_write. * * The router is either connected to an iocom (socket) directly, or - * connected to a SPAN transaction (h2span_link structure). + * connected to a SPAN transaction (h2span_link structure for outgoing) + * or to a SPAN transaction (h2span_relay structure for incoming). */ struct hammer2_router { + RB_ENTRY(hammer2_router) rbnode; /* indexed by spanid */ struct hammer2_iocom *iocom; - struct h2span_link *link; /* non-NULL if indirect */ + struct h2span_link *link; /* may be NULL */ + struct h2span_relay *relay; /* may be NULL */ void (*signal_callback)(struct hammer2_router *); void (*rcvmsg_callback)(struct hammer2_msg *); void (*altmsg_callback)(struct hammer2_iocom *); struct hammer2_state_tree staterd_tree; /* active messages */ struct hammer2_state_tree statewr_tree; /* active messages */ hammer2_msg_queue_t txmsgq; /* tx msgq from remote */ + uint64_t spanid; /* for routing */ + int flags; int refs; /* refs prevent destruction */ }; +#define HAMMER2_ROUTER_CONNECTED 0x0001 /* on global RB tree */ +#define HAMMER2_ROUTER_DELETED 0x0002 /* parent structure destroyed */ + typedef struct hammer2_router hammer2_router_t; +int hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2); +RB_PROTOTYPE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp); + /* * hammer2_iocom - governs a messaging stream connection */ @@ -271,7 +285,7 @@ struct hammer2_iocom { int rxmisc; int txmisc; char sess[HAMMER2_AES_KEY_SIZE]; /* aes_256_cbc key */ - struct hammer2_router router; + struct hammer2_router *router; pthread_mutex_t mtx; /* mutex for state*tree/rmsgq */ };