From: Matthew Dillon Date: Thu, 9 Aug 2012 06:25:23 +0000 (-0700) Subject: hammer2 - SPAN protocol work, router work X-Git-Tag: v3.4.0rc~1035 X-Git-Url: http://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/29ead430388b5bba37da0a913a4f54a4ba126227 hammer2 - SPAN protocol work, router work * Fix SPAN relay sort and sequencing bugs. * Start reworking the APIs to accomodate routed messages. Start by creating a hammer2_router structure and adjusting most of the msg functions to pass it instead of the iocom. * Fix hammer2_state races by moving the state allocation to hammer2_msg_alloc() instead of hammer2_msg_write(). This gives code a chance to assign the state->any.* field without having to worry about the state getting ripped out from under us. --- diff --git a/sbin/hammer2/TODO b/sbin/hammer2/TODO new file mode 100644 index 0000000..33e49f9 --- /dev/null +++ b/sbin/hammer2/TODO @@ -0,0 +1,9 @@ + +TODO: + msg_lnk.c msg_lnk_signal rescans all connections. What we really + need to do is queue the conn (when possible) or the node + (when possible), or just flag it w/ a global, and then + only scan a subset. + + link->dist propagation is still a bit screwy when there are a + lot of parallel network links. diff --git a/sbin/hammer2/cmd_debug.c b/sbin/hammer2/cmd_debug.c index 0c098a6..f79117a 100644 --- a/sbin/hammer2/cmd_debug.c +++ b/sbin/hammer2/cmd_debug.c @@ -37,9 +37,9 @@ #define SHOW_TAB 2 -static void shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +static void shell_rcvmsg(hammer2_msg_t *msg); static void shell_ttymsg(hammer2_iocom_t *iocom); -static void hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +static void hammer2_shell_parse(hammer2_msg_t *msg); /************************************************************************ * SHELL * @@ -66,8 +66,9 @@ cmd_shell(const char *hostname) fcntl(0, F_SETFL, O_NONBLOCK); printf("debug: connected\n"); - msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL); - hammer2_msg_write(&iocom, msg, NULL, NULL, NULL); + msg = hammer2_msg_alloc(&iocom.router, 0, HAMMER2_DBG_SHELL, + NULL, NULL); + hammer2_msg_write(msg); hammer2_iocom_core(&iocom); fprintf(stderr, "debug: disconnected\n"); close(fd); @@ -80,7 +81,7 @@ cmd_shell(const char *hostname) */ static void -shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +shell_rcvmsg(hammer2_msg_t *msg) { switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) { case HAMMER2_LNK_ERROR: @@ -105,7 +106,7 @@ shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) * We send the commands, not accept them. * (one-way message, not transactional) */ - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); break; case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY: /* @@ -119,7 +120,7 @@ shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) break; case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE: fprintf(stderr, "Debug Shell is ignoring received LNK_CONN\n"); - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); break; case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE: break; @@ -130,9 +131,9 @@ shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) */ fprintf(stderr, "Unknown message: %s\n", hammer2_msg_str(msg)); if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); break; } } @@ -150,9 +151,10 @@ shell_ttymsg(hammer2_iocom_t *iocom) if (len && buf[len - 1] == '\n') buf[--len] = 0; ++len; - msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL); + msg = hammer2_msg_alloc(&iocom->router, len, HAMMER2_DBG_SHELL, + NULL, NULL); bcopy(buf, msg->aux_data, len); - hammer2_msg_write(iocom, msg, NULL, NULL, NULL); + hammer2_msg_write(msg); } else if (feof(stdin)) { /* * Set EOF flag without setting any error code for normal @@ -170,7 +172,7 @@ shell_ttymsg(hammer2_iocom_t *iocom) * then finish up by outputting another prompt. */ void -hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +hammer2_msg_dbg(hammer2_msg_t *msg) { switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) { case HAMMER2_DBG_SHELL: @@ -180,8 +182,8 @@ hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) */ if (msg->aux_data) msg->aux_data[msg->aux_size - 1] = 0; - hammer2_shell_parse(iocom, msg); - hammer2_msg_reply(iocom, msg, 0); + hammer2_shell_parse(msg); + hammer2_msg_reply(msg, 0); break; case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY: /* @@ -194,37 +196,38 @@ hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) write(2, msg->aux_data, strlen(msg->aux_data)); break; default: - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); break; } } -static void shell_span(hammer2_iocom_t *iocom, char *cmdbuf); -/*static void shell_tree(hammer2_iocom_t *iocom, char *cmdbuf);*/ +static void shell_span(hammer2_router_t *router, char *cmdbuf); +/*static void shell_tree(hammer2_router_t *router, char *cmdbuf);*/ static void -hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +hammer2_shell_parse(hammer2_msg_t *msg) { + hammer2_router_t *router = msg->router; char *cmdbuf = msg->aux_data; char *cmd = strsep(&cmdbuf, " \t"); if (cmd == NULL || *cmd == 0) { ; } else if (strcmp(cmd, "span") == 0) { - shell_span(iocom, cmdbuf); + shell_span(router, cmdbuf); } else if (strcmp(cmd, "tree") == 0) { - shell_tree(iocom, cmdbuf); + shell_tree(router, cmdbuf); } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) { - iocom_printf(iocom, "help Command help\n"); - iocom_printf(iocom, "span Span to target host\n"); - iocom_printf(iocom, "tree Dump spanning tree\n"); + router_printf(router, "help Command help\n"); + router_printf(router, "span Span to target host\n"); + router_printf(router, "tree Dump spanning tree\n"); } else { - iocom_printf(iocom, "Unrecognized command: %s\n", cmd); + router_printf(router, "Unrecognized command: %s\n", cmd); } } static void -shell_span(hammer2_iocom_t *iocom, char *cmdbuf) +shell_span(hammer2_router_t *router, char *cmdbuf) { const char *hostname = strsep(&cmdbuf, " \t"); pthread_t thread; @@ -243,9 +246,9 @@ shell_span(hammer2_iocom_t *iocom, char *cmdbuf) * Start master service */ if (fd < 0) { - iocom_printf(iocom, "Connection to %s failed\n", hostname); + router_printf(router, "Connection to %s failed\n", hostname); } else { - iocom_printf(iocom, "Connected to %s\n", hostname); + router_printf(router, "Connected to %s\n", hostname); pthread_create(&thread, NULL, master_service, (void *)(intptr_t)fd); /*pthread_join(thread, &res);*/ @@ -257,11 +260,11 @@ shell_span(hammer2_iocom_t *iocom, char *cmdbuf) * not modified and stays intact. We use a one-way message with REPLY set * to distinguish between a debug command and debug terminal output. * - * To prevent loops iocom_printf() can filter the message (cmd) related - * to the iocom_printf(). We filter out DBG messages. + * To prevent loops router_printf() can filter the message (cmd) related + * to the router_printf(). We filter out DBG messages. */ void -iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...) +router_printf(hammer2_router_t *router, const char *ctl, ...) { hammer2_msg_t *rmsg; va_list va; @@ -273,11 +276,12 @@ iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...) va_end(va); len = strlen(buf) + 1; - rmsg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL | - HAMMER2_MSGF_REPLY); + rmsg = hammer2_msg_alloc(router, len, HAMMER2_DBG_SHELL | + HAMMER2_MSGF_REPLY, + NULL, NULL); bcopy(buf, rmsg->aux_data, len); - hammer2_msg_write(iocom, rmsg, NULL, NULL, NULL); + hammer2_msg_write(rmsg); } /************************************************************************ diff --git a/sbin/hammer2/cmd_service.c b/sbin/hammer2/cmd_service.c index 4b7207c..82ca26b 100644 --- a/sbin/hammer2/cmd_service.c +++ b/sbin/hammer2/cmd_service.c @@ -36,10 +36,10 @@ #include "hammer2.h" static void *master_accept(void *data); -static void master_auth_state(hammer2_iocom_t *iocom); -static void master_auth_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg); -static void master_link_state(hammer2_iocom_t *iocom); -static void master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +static void master_auth_signal(hammer2_router_t *router); +static void master_auth_rxmsg(hammer2_msg_t *msg); +static void master_link_signal(hammer2_router_t *router); +static void master_link_rxmsg(hammer2_msg_t *msg); /* * Start-up the master listener daemon for the machine. @@ -164,7 +164,9 @@ master_service(void *data) fd = (int)(intptr_t)data; hammer2_iocom_init(&iocom, fd, -1, - master_auth_state, master_auth_rxmsg, NULL); + master_auth_signal, + master_auth_rxmsg, + NULL); hammer2_iocom_core(&iocom); fprintf(stderr, @@ -185,11 +187,11 @@ master_service(void *data) * message operation. The connection has already been encrypted at * this point. */ -static void master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg); +static void master_auth_conn_rx(hammer2_msg_t *msg); static void -master_auth_state(hammer2_iocom_t *iocom __unused) +master_auth_signal(hammer2_router_t *router) { hammer2_msg_t *msg; @@ -199,26 +201,29 @@ master_auth_state(hammer2_iocom_t *iocom __unused) * * XXX put additional authentication states here */ - msg = hammer2_msg_alloc(iocom, 0, HAMMER2_LNK_CONN | - HAMMER2_MSGF_CREATE); + msg = hammer2_msg_alloc(router, 0, HAMMER2_LNK_CONN | + HAMMER2_MSGF_CREATE, + master_auth_conn_rx, NULL); snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*"); - hammer2_msg_write(iocom, msg, master_auth_conn_rx, NULL, NULL); + hammer2_msg_write(msg); - hammer2_iocom_restate(iocom, - master_link_state, master_link_rxmsg, NULL); + hammer2_router_restate(router, + master_link_signal, + master_link_rxmsg, + NULL); } static void -master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg) +master_auth_conn_rx(hammer2_msg_t *msg) { if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) - hammer2_msg_reply(state->iocom, msg, 0); + hammer2_msg_reply(msg, 0); } static void -master_auth_rxmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg __unused) +master_auth_rxmsg(hammer2_msg_t *msg __unused) { } @@ -230,13 +235,14 @@ master_auth_rxmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg __unused) */ static void -master_link_state(hammer2_iocom_t *iocom __unused) +master_link_signal(hammer2_router_t *router) { + hammer2_msg_lnk_signal(router); } static void -master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +master_link_rxmsg(hammer2_msg_t *msg) { hammer2_state_t *state; uint32_t cmd; @@ -257,17 +263,17 @@ master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg) if (state && state->func) { assert(state->func != NULL); - state->func(state, msg); + state->func(msg); } else { switch(cmd & HAMMER2_MSGF_PROTOS) { case HAMMER2_MSG_PROTO_LNK: - hammer2_msg_lnk(iocom, msg); + hammer2_msg_lnk(msg); break; case HAMMER2_MSG_PROTO_DBG: - hammer2_msg_dbg(iocom, msg); + hammer2_msg_dbg(msg); break; default: - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); break; } } diff --git a/sbin/hammer2/hammer2.h b/sbin/hammer2/hammer2.h index ae261f3..8c69e87 100644 --- a/sbin/hammer2/hammer2.h +++ b/sbin/hammer2/hammer2.h @@ -125,29 +125,27 @@ void hammer2_bswap_head(hammer2_msg_hdr_t *head); void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd, - void (*state_func)(hammer2_iocom_t *), - void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *), + void (*state_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *), void (*altmsg_func)(hammer2_iocom_t *)); -void hammer2_iocom_restate(hammer2_iocom_t *iocom, - void (*state_func)(hammer2_iocom_t *), - void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *), +void hammer2_router_restate(hammer2_router_t *router, + void (*state_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *), void (*altmsg_func)(hammer2_iocom_t *)); +void hammer2_router_signal(hammer2_router_t *router); void hammer2_iocom_done(hammer2_iocom_t *iocom); -hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, - uint32_t cmd); -void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, - uint32_t error); -void hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, - uint32_t error); +hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, + size_t aux_size, uint32_t cmd, + void (*func)(hammer2_msg_t *), void *data); +void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error); +void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error); void hammer2_state_reply(hammer2_state_t *state, uint32_t error); -void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +void hammer2_msg_free(hammer2_msg_t *msg); void hammer2_iocom_core(hammer2_iocom_t *iocom); hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom); -void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg, - void (*func)(hammer2_state_t *, hammer2_msg_t *), - void *data, hammer2_state_t **statep); +void hammer2_msg_write(hammer2_msg_t *msg); void hammer2_iocom_drain(hammer2_iocom_t *iocom); void hammer2_iocom_flush1(hammer2_iocom_t *iocom); @@ -159,8 +157,9 @@ void hammer2_state_free(hammer2_state_t *state); /* * Msg protocol functions */ -void hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg); -void hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +void hammer2_msg_lnk_signal(hammer2_router_t *router); +void hammer2_msg_lnk(hammer2_msg_t *msg); +void hammer2_msg_dbg(hammer2_msg_t *msg); /* * Crypto functions @@ -187,11 +186,11 @@ const char * hammer2_basecmd_str(uint32_t cmd); const char *hammer2_msg_str(hammer2_msg_t *msg); int hammer2_connect(const char *hostname); -void shell_tree(hammer2_iocom_t *iocom, char *cmdbuf); +void shell_tree(hammer2_router_t *router, char *cmdbuf); void *master_service(void *data); void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg); -void iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...); +void router_printf(hammer2_router_t *router, const char *ctl, ...); void *hammer2_alloc(size_t bytes); void hammer2_free(void *ptr); diff --git a/sbin/hammer2/msg.c b/sbin/hammer2/msg.c index 1e33c80..ea97548 100644 --- a/sbin/hammer2/msg.c +++ b/sbin/hammer2/msg.c @@ -82,43 +82,44 @@ hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq) while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { assert(0); /* shouldn't happen */ TAILQ_REMOVE(&ioq->msgq, msg, qentry); - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } if ((msg = ioq->msg) != NULL) { ioq->msg = NULL; - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } } /* * Initialize a low-level communications channel. * - * NOTE: The state_func() is called at least once from the loop and can be + * NOTE: The signal_func() is called at least once from the loop and can be * re-armed via hammer2_iocom_restate(). */ void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd, - void (*state_func)(hammer2_iocom_t *), - void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg), + void (*signal_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *), void (*altmsg_func)(hammer2_iocom_t *)) { bzero(iocom, sizeof(*iocom)); - iocom->state_callback = state_func; - iocom->rcvmsg_callback = rcvmsg_func; - iocom->altmsg_callback = altmsg_func; + iocom->router.signal_callback = signal_func; + iocom->router.rcvmsg_callback = rcvmsg_func; + iocom->router.altmsg_callback = altmsg_func; pthread_mutex_init(&iocom->mtx, NULL); - RB_INIT(&iocom->staterd_tree); - RB_INIT(&iocom->statewr_tree); + RB_INIT(&iocom->router.staterd_tree); + RB_INIT(&iocom->router.statewr_tree); TAILQ_INIT(&iocom->freeq); TAILQ_INIT(&iocom->freeq_aux); TAILQ_INIT(&iocom->addrq); - TAILQ_INIT(&iocom->txmsgq); + TAILQ_INIT(&iocom->router.txmsgq); + iocom->router.iocom = iocom; iocom->sock_fd = sock_fd; iocom->alt_fd = alt_fd; iocom->flags = HAMMER2_IOCOMF_RREQ; - if (state_func) + if (signal_func) iocom->flags |= HAMMER2_IOCOMF_SWORK; hammer2_ioq_init(iocom, &iocom->ioq_rx); hammer2_ioq_init(iocom, &iocom->ioq_tx); @@ -152,18 +153,25 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd, * the recevmsg_func and the sendmsg_func is called at least once. */ void -hammer2_iocom_restate(hammer2_iocom_t *iocom, - void (*state_func)(hammer2_iocom_t *), - void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg), +hammer2_router_restate(hammer2_router_t *router, + void (*signal_func)(hammer2_router_t *), + void (*rcvmsg_func)(hammer2_msg_t *msg), void (*altmsg_func)(hammer2_iocom_t *)) { - iocom->state_callback = state_func; - iocom->rcvmsg_callback = rcvmsg_func; - iocom->altmsg_callback = altmsg_func; - if (state_func) - iocom->flags |= HAMMER2_IOCOMF_SWORK; + router->signal_callback = signal_func; + router->rcvmsg_callback = rcvmsg_func; + router->altmsg_callback = altmsg_func; + if (signal_func) + router->iocom->flags |= HAMMER2_IOCOMF_SWORK; else - iocom->flags &= ~HAMMER2_IOCOMF_SWORK; + router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK; +} + +void +hammer2_router_signal(hammer2_router_t *router) +{ + if (router->signal_callback) + router->iocom->flags |= HAMMER2_IOCOMF_SWORK; } /* @@ -212,8 +220,11 @@ hammer2_iocom_done(hammer2_iocom_t *iocom) * Allocate a new one-way message. */ hammer2_msg_t * -hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd) +hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd, + void (*func)(hammer2_msg_t *), void *data) { + hammer2_state_t *state = NULL; + hammer2_iocom_t *iocom = router->iocom; hammer2_msg_t *msg; int hbytes; @@ -227,6 +238,30 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd) if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) TAILQ_REMOVE(&iocom->freeq, msg, qentry); } + if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) == + HAMMER2_MSGF_CREATE) { + /* + * Create state when CREATE is set without REPLY. + * + * NOTE: CREATE in txcmd handled by hammer2_msg_write() + * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx() + */ + state = malloc(sizeof(*state)); + bzero(state, sizeof(*state)); + state->iocom = iocom; + state->flags = HAMMER2_STATE_DYNAMIC; + state->msgid = (uint64_t)(uintptr_t)state; + /* XXX set state->spanid from router */ + state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE | + HAMMER2_MSGF_DELETE); + state->rxcmd = HAMMER2_MSGF_REPLY; + state->func = func; + state->any.any = data; + pthread_mutex_lock(&iocom->mtx); + RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state); + pthread_mutex_unlock(&iocom->mtx); + state->flags |= HAMMER2_STATE_INSERTED; + } pthread_mutex_unlock(&iocom->mtx); if (msg == NULL) { msg = malloc(sizeof(*msg)); @@ -252,7 +287,12 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd) msg->any.head.cmd = cmd; msg->any.head.aux_descr = 0; msg->any.head.aux_crc = 0; - + msg->router = router; + if (state) { + msg->state = state; + state->msg = msg; + msg->any.head.msgid = state->msgid; + } return (msg); } @@ -263,8 +303,10 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd) */ static void -hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +hammer2_msg_free_locked(hammer2_msg_t *msg) { + hammer2_iocom_t *iocom = msg->router->iocom; + msg->state = NULL; if (msg->aux_data) TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); @@ -273,10 +315,12 @@ hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg) } void -hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +hammer2_msg_free(hammer2_msg_t *msg) { + hammer2_iocom_t *iocom = msg->router->iocom; + pthread_mutex_lock(&iocom->mtx); - hammer2_msg_free_locked(iocom, msg); + hammer2_msg_free_locked(msg); pthread_mutex_unlock(&iocom->mtx); } @@ -372,7 +416,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom) if (iocom->flags & HAMMER2_IOCOMF_SWORK) { iocom->flags &= ~HAMMER2_IOCOMF_SWORK; - iocom->state_callback(iocom); + iocom->router.signal_callback(&iocom->router); } /* @@ -385,7 +429,7 @@ hammer2_iocom_core(hammer2_iocom_t *iocom) read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); iocom->flags |= HAMMER2_IOCOMF_RWORK; iocom->flags |= HAMMER2_IOCOMF_WWORK; - if (TAILQ_FIRST(&iocom->txmsgq)) + if (TAILQ_FIRST(&iocom->router.txmsgq)) hammer2_iocom_flush1(iocom); } @@ -407,14 +451,14 @@ hammer2_iocom_core(hammer2_iocom_t *iocom) fprintf(stderr, "receive %s\n", hammer2_msg_str(msg)); } - iocom->rcvmsg_callback(iocom, msg); + iocom->router.rcvmsg_callback(msg); hammer2_state_cleanuprx(iocom, msg); } } if (iocom->flags & HAMMER2_IOCOMF_ARWORK) { iocom->flags &= ~HAMMER2_IOCOMF_ARWORK; - iocom->altmsg_callback(iocom); + iocom->router.altmsg_callback(iocom); } } } @@ -553,7 +597,8 @@ again: /* * Allocate the message, the next state will fill it in. */ - msg = hammer2_msg_alloc(iocom, ioq->abytes, 0); + msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0, + NULL, NULL); ioq->msg = msg; /* @@ -777,7 +822,7 @@ again: error = hammer2_state_msgrx(iocom, msg); if (error) { if (error == HAMMER2_IOQ_ERROR_EALREADY) { - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); goto again; } ioq->error = error; @@ -803,7 +848,7 @@ skip: */ assert(ioq->msg == msg); if (msg) { - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); ioq->msg = NULL; } @@ -818,7 +863,7 @@ skip: * LNK_ERROR (that the session can detect) when no * transactions remain. */ - msg = hammer2_msg_alloc(iocom, 0, 0); + msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL); bzero(&msg->any.head, sizeof(msg->any.head)); msg->any.head.magic = HAMMER2_MSGHDR_MAGIC; msg->any.head.cmd = HAMMER2_LNK_ERROR; @@ -826,13 +871,13 @@ skip: pthread_mutex_lock(&iocom->mtx); hammer2_iocom_drain(iocom); - if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) { + if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) { /* * Active remote transactions are still present. * Simulate the other end sending us a DELETE. */ if (state->rxcmd & HAMMER2_MSGF_DELETE) { - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); msg = NULL; } else { /*state->txcmd |= HAMMER2_MSGF_DELETE;*/ @@ -842,13 +887,13 @@ skip: msg->any.head.cmd |= HAMMER2_MSGF_ABORT | HAMMER2_MSGF_DELETE; } - } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) { + } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) { /* * Active local transactions are still present. * Simulate the other end sending us a DELETE. */ if (state->rxcmd & HAMMER2_MSGF_DELETE) { - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); msg = NULL; } else { msg->state = state; @@ -941,8 +986,8 @@ hammer2_iocom_flush1(hammer2_iocom_t *iocom) iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK); TAILQ_INIT(&tmpq); pthread_mutex_lock(&iocom->mtx); - while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) { - TAILQ_REMOVE(&iocom->txmsgq, msg, qentry); + while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) { + TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry); TAILQ_INSERT_TAIL(&tmpq, msg, qentry); } pthread_mutex_unlock(&iocom->mtx); @@ -1178,11 +1223,9 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom) * Write a message to an iocom, with additional state processing. */ void -hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg, - void (*func)(hammer2_state_t *, hammer2_msg_t *), - void *data, - hammer2_state_t **statep) +hammer2_msg_write(hammer2_msg_t *msg) { + hammer2_iocom_t *iocom = msg->router->iocom; hammer2_state_t *state; char dummy; @@ -1195,57 +1238,34 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg, * Existing transaction (could be reply). It is also * possible for this to be the first reply (CREATE is set), * in which case we populate state->txcmd. + * + * state->txcmd is adjusted to hold the final message cmd, + * and we also be sure to set the CREATE bit here. We did + * not set it in hammer2_msg_alloc() because that would have + * not been serialized (state could have gotten ripped out + * from under the message prior to it being transmitted). */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | + HAMMER2_MSGF_REPLY)) == + HAMMER2_MSGF_CREATE) { + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + } msg->any.head.msgid = state->msgid; msg->any.head.spanid = state->spanid; - if (func) { - state->func = func; - state->any.any = data; - } assert(((state->txcmd ^ msg->any.head.cmd) & HAMMER2_MSGF_REPLY) == 0); if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; - } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { - /* - * No existing state and CREATE is set, create new - * state for outgoing command. This can't happen if - * REPLY is set as the state would already exist for - * a transaction reply. - */ - assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); - - state = malloc(sizeof(*state)); - bzero(state, sizeof(*state)); - state->iocom = iocom; - state->flags = HAMMER2_STATE_DYNAMIC; - state->msg = msg; - state->msgid = (uint64_t)(uintptr_t)state; - state->spanid = msg->any.head.spanid; - state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; - state->rxcmd = HAMMER2_MSGF_REPLY; - state->func = func; - state->any.any = data; - pthread_mutex_lock(&iocom->mtx); - RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state); - pthread_mutex_unlock(&iocom->mtx); - state->flags |= HAMMER2_STATE_INSERTED; - msg->state = state; - msg->any.head.msgid = state->msgid; - /* spanid set by caller */ } else { msg->any.head.msgid = 0; - /* spanid set by caller */ + /* XXX set spanid by router */ } - if (statep) - *statep = state; - /* * Queue it for output, wake up the I/O pthread. Note that the * I/O thread is responsible for generating the CRCs and encryption. */ - TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry); + TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry); dummy = 0; write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ pthread_mutex_unlock(&iocom->mtx); @@ -1265,8 +1285,9 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg, * The reply contains no extended data. */ void -hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) +hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error) { + hammer2_iocom_t *iocom = msg->router->iocom; hammer2_state_t *state = msg->state; hammer2_msg_t *nmsg; uint32_t cmd; @@ -1289,8 +1310,6 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) if (state) { if (state->txcmd & HAMMER2_MSGF_DELETE) return; - if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) - cmd |= HAMMER2_MSGF_CREATE; if (state->txcmd & HAMMER2_MSGF_REPLY) cmd |= HAMMER2_MSGF_REPLY; cmd |= HAMMER2_MSGF_DELETE; @@ -1299,10 +1318,19 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) cmd |= HAMMER2_MSGF_REPLY; } - nmsg = hammer2_msg_alloc(iocom, 0, cmd); + /* + * Allocate the message and associate it with the existing state. + * We cannot pass MSGF_CREATE to msg_alloc() because that may + * allocate new state. We have our state already. + */ + nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) + nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; + } nmsg->any.head.error = error; - nmsg->state = msg->state; - hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL); + nmsg->state = state; + hammer2_msg_write(nmsg); } /* @@ -1312,8 +1340,9 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) * later. */ void -hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) +hammer2_msg_result(hammer2_msg_t *msg, uint32_t error) { + hammer2_iocom_t *iocom = msg->router->iocom; hammer2_state_t *state = msg->state; hammer2_msg_t *nmsg; uint32_t cmd; @@ -1336,8 +1365,6 @@ hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) if (state) { if (state->txcmd & HAMMER2_MSGF_DELETE) return; - if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) - cmd |= HAMMER2_MSGF_CREATE; if (state->txcmd & HAMMER2_MSGF_REPLY) cmd |= HAMMER2_MSGF_REPLY; /* continuing transaction, do not set MSGF_DELETE */ @@ -1346,10 +1373,14 @@ hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error) cmd |= HAMMER2_MSGF_REPLY; } - nmsg = hammer2_msg_alloc(iocom, 0, cmd); + nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) + nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; + } nmsg->any.head.error = error; nmsg->state = state; - hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL); + hammer2_msg_write(nmsg); } /* @@ -1368,23 +1399,20 @@ hammer2_state_reply(hammer2_state_t *state, uint32_t error) return; /* - * We must also set CREATE if this is our first response to a - * remote command. - */ - if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) - cmd |= HAMMER2_MSGF_CREATE; - - /* * Set REPLY if the other end initiated the command. Otherwise * we are the command direction. */ if (state->txcmd & HAMMER2_MSGF_REPLY) cmd |= HAMMER2_MSGF_REPLY; - nmsg = hammer2_msg_alloc(state->iocom, 0, cmd); + nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) + nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE; + } nmsg->any.head.error = error; nmsg->state = state; - hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL); + hammer2_msg_write(nmsg); } /************************************************************************ @@ -1479,10 +1507,10 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) pthread_mutex_lock(&iocom->mtx); if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { state = RB_FIND(hammer2_state_tree, - &iocom->statewr_tree, &dummy); + &iocom->router.statewr_tree, &dummy); } else { state = RB_FIND(hammer2_state_tree, - &iocom->staterd_tree, &dummy); + &iocom->router.staterd_tree, &dummy); } msg->state = state; pthread_mutex_unlock(&iocom->mtx); @@ -1525,7 +1553,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) state->spanid = msg->any.head.spanid; msg->state = state; pthread_mutex_lock(&iocom->mtx); - RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state); + RB_INSERT(hammer2_state_tree, + &iocom->router.staterd_tree, state); pthread_mutex_unlock(&iocom->mtx); error = 0; if (DebugOpt) { @@ -1661,7 +1690,7 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) * Free a non-transactional message, there is no state * to worry about. */ - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { /* * Message terminating transaction, destroy the related @@ -1677,11 +1706,11 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) if (state->rxcmd & HAMMER2_MSGF_REPLY) { assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); RB_REMOVE(hammer2_state_tree, - &iocom->statewr_tree, state); + &iocom->router.statewr_tree, state); } else { assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); RB_REMOVE(hammer2_state_tree, - &iocom->staterd_tree, state); + &iocom->router.staterd_tree, state); } state->flags &= ~HAMMER2_STATE_INSERTED; hammer2_state_free(state); @@ -1689,13 +1718,13 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) ; } pthread_mutex_unlock(&iocom->mtx); - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } else if (state->msg != msg) { /* * Message not terminating transaction, leave state intact * and free message if it isn't the CREATE message. */ - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } } @@ -1705,7 +1734,7 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) hammer2_state_t *state; if ((state = msg->state) == NULL) { - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { pthread_mutex_lock(&iocom->mtx); state->txcmd |= HAMMER2_MSGF_DELETE; @@ -1716,11 +1745,11 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) if (state->txcmd & HAMMER2_MSGF_REPLY) { assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); RB_REMOVE(hammer2_state_tree, - &iocom->staterd_tree, state); + &iocom->router.staterd_tree, state); } else { assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); RB_REMOVE(hammer2_state_tree, - &iocom->statewr_tree, state); + &iocom->router.statewr_tree, state); } state->flags &= ~HAMMER2_STATE_INSERTED; hammer2_state_free(state); @@ -1728,9 +1757,9 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) ; } pthread_mutex_unlock(&iocom->mtx); - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } else if (state->msg != msg) { - hammer2_msg_free(iocom, msg); + hammer2_msg_free(msg); } } @@ -1752,7 +1781,7 @@ hammer2_state_free(hammer2_state_t *state) msg = state->msg; state->msg = NULL; if (msg) - hammer2_msg_free_locked(iocom, msg); + hammer2_msg_free_locked(msg); free(state); /* @@ -1765,8 +1794,8 @@ hammer2_state_free(hammer2_state_t *state) * We may have to wake up the rx code. */ if (iocom->ioq_rx.error && - RB_EMPTY(&iocom->staterd_tree) && - RB_EMPTY(&iocom->statewr_tree)) { + RB_EMPTY(&iocom->router.staterd_tree) && + RB_EMPTY(&iocom->router.statewr_tree)) { dummy = 0; write(iocom->wakeupfds[1], &dummy, 1); } diff --git a/sbin/hammer2/msg_lnk.c b/sbin/hammer2/msg_lnk.c index 98b3266..229af80 100644 --- a/sbin/hammer2/msg_lnk.c +++ b/sbin/hammer2/msg_lnk.c @@ -95,6 +95,11 @@ * an active transaction is lost, the related messages will be fully aborted * and the higher protocol levels will retry as appropriate. * + * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation + * back to the originator. Only the originator keeps tracks of a message. + * Routers just pass it through. If a route is lost during transit the + * message is simply thrown away. + * * It is also important to note that several paths to the same PFS can be * propagated along the same link, which allows concurrency and even * redundancy over several network interfaces or via different routes through @@ -222,6 +227,7 @@ struct h2span_link { struct h2span_node *node; /* related node */ int32_t dist; struct h2span_relay_queue relayq; /* relay out */ + struct hammer2_router router; }; /* @@ -276,9 +282,9 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2) return(-1); if (link1->dist > link2->dist) return(1); - if ((intptr_t)link1 < (intptr_t)link2) + if (link1->state->msgid < link2->state->msgid) return(-1); - if ((intptr_t)link1 > (intptr_t)link2) + if (link1->state->msgid > link2->state->msgid) return(1); return(0); } @@ -292,17 +298,20 @@ static int h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) { - if ((intptr_t)relay1->link->node < (intptr_t)relay2->link->node) + h2span_link_t *link1 = relay1->link; + h2span_link_t *link2 = relay2->link; + + if ((intptr_t)link1->node < (intptr_t)link2->node) return(-1); - if ((intptr_t)relay1->link->node > (intptr_t)relay2->link->node) + if ((intptr_t)link1->node > (intptr_t)link2->node) return(1); - if ((intptr_t)relay1->link->dist < (intptr_t)relay2->link->dist) + if (link1->dist < link2->dist) return(-1); - if ((intptr_t)relay1->link->dist > (intptr_t)relay2->link->dist) + if (link1->dist > link2->dist) return(1); - if ((intptr_t)relay1->link < (intptr_t)relay2->link) + if (link1->state->msgid < link2->state->msgid) return(-1); - if ((intptr_t)relay1->link > (intptr_t)relay2->link) + if (link1->state->msgid > link2->state->msgid) return(1); return(0); } @@ -332,39 +341,48 @@ static pthread_mutex_t cluster_mtx; static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree); static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq); -static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg); -static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg); -static void hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg); +static void hammer2_lnk_span(hammer2_msg_t *msg); +static void hammer2_lnk_conn(hammer2_msg_t *msg); +static void hammer2_lnk_relay(hammer2_msg_t *msg); static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node); static void hammer2_relay_delete(h2span_relay_t *relay); +void +hammer2_msg_lnk_signal(hammer2_router_t *router __unused) +{ + pthread_mutex_lock(&cluster_mtx); + hammer2_relay_scan(NULL, NULL); + pthread_mutex_unlock(&cluster_mtx); +} + /* * Receive a HAMMER2_MSG_PROTO_LNK message. This only called for * one-way and opening-transactions since state->func will be assigned * in all other cases. */ void -hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +hammer2_msg_lnk(hammer2_msg_t *msg) { switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) { case HAMMER2_LNK_CONN: - hammer2_lnk_conn(msg->state, msg); + hammer2_lnk_conn(msg); break; case HAMMER2_LNK_SPAN: - hammer2_lnk_span(msg->state, msg); + hammer2_lnk_span(msg); break; default: fprintf(stderr, "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd); - hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP); + hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP); /* state invalid after reply */ break; } } void -hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg) +hammer2_lnk_conn(hammer2_msg_t *msg) { + hammer2_state_t *state = msg->state; h2span_connect_t *conn; h2span_relay_t *relay; char *alloc = NULL; @@ -393,12 +411,15 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg) state->any.conn = conn; TAILQ_INSERT_TAIL(&connq, conn, entry); - hammer2_msg_result(state->iocom, msg, 0); + hammer2_msg_result(msg, 0); +#if 0 /* * Span-synchronize all nodes with the new connection */ hammer2_relay_scan(conn, NULL); +#endif + hammer2_router_signal(msg->router); } /* @@ -426,15 +447,16 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg) TAILQ_REMOVE(&connq, conn, entry); hammer2_free(conn); - hammer2_msg_reply(state->iocom, msg, 0); + hammer2_msg_reply(msg, 0); /* state invalid after reply */ } pthread_mutex_unlock(&cluster_mtx); } void -hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg) +hammer2_lnk_span(hammer2_msg_t *msg) { + hammer2_state_t *state = msg->state; h2span_cluster_t dummy_cls; h2span_node_t dummy_node; h2span_cluster_t *cls; @@ -443,23 +465,19 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg) h2span_relay_t *relay; char *alloc = NULL; + assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); + pthread_mutex_lock(&cluster_mtx); /* * On transaction start we initialize the tracking infrastructure */ if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) { + assert(state->func == NULL); state->func = hammer2_lnk_span; msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0; - fprintf(stderr, "LNK_SPAN: %s/%s dist=%d\n", - hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid, - &alloc), - msg->any.lnk_span.label, - msg->any.lnk_span.dist); - free(alloc); - /* * Find the cluster */ @@ -497,9 +515,29 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg) slink->dist = msg->any.lnk_span.dist; slink->state = state; state->any.link = slink; + + /* + * Embedded router structure in link for message forwarding. + */ + TAILQ_INIT(&slink->router.txmsgq); + slink->router.iocom = state->iocom; + slink->router.link = slink; + RB_INSERT(h2span_link_tree, &node->tree, slink); + fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n", + msg->router->iocom, + slink, + hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid, + &alloc), + msg->any.lnk_span.label, + msg->any.lnk_span.dist); + free(alloc); + +#if 0 hammer2_relay_scan(NULL, node); +#endif + hammer2_router_signal(msg->router); } /* @@ -511,6 +549,14 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg) node = slink->node; cls = node->cls; + fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n", + msg->router->iocom, + slink, + hammer2_uuid_to_str(&cls->pfs_clid, &alloc), + state->msg->any.lnk_span.label, + state->msg->any.lnk_span.dist); + free(alloc); + /* * Clean out all relays. This requires terminating each * relay transaction. @@ -550,8 +596,12 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg) * it doesn't then all related relays have already been * removed and there's nothing left to do. */ +#if 0 if (node) hammer2_relay_scan(NULL, node); +#endif + if (node) + hammer2_router_signal(msg->router); } pthread_mutex_unlock(&cluster_mtx); @@ -564,10 +614,13 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg) * XXX MPRACE on state structure */ static void -hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg) +hammer2_lnk_relay(hammer2_msg_t *msg) { + hammer2_state_t *state = msg->state; h2span_relay_t *relay; + assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY); + if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { pthread_mutex_lock(&cluster_mtx); if ((relay = state->any.relay) != NULL) { @@ -676,8 +729,8 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn) info.relay = NULL; /* - * Locate the first related relay for the connection. relay will - * be NULL if there were none. + * Locate the first related relay for the node on this connection. + * relay will be NULL if there were none. */ RB_SCAN(h2span_relay_tree, &conn->tree, hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info); @@ -693,8 +746,13 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn) * Iterate the node's links (received SPANs) in distance order, * lowest (best) dist first. */ + /* fprintf(stderr, "LOOP\n"); */ RB_FOREACH(slink, h2span_link_tree, &node->tree) { /* + fprintf(stderr, "SLINK %p RELAY %p(%p)\n", + slink, relay, relay ? relay->link : NULL); + */ + /* * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION. * * Track relays while iterating the best links and construct @@ -702,12 +760,12 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn) * * (If some prior better link was removed it would have also * removed the relay, so the relay can only match exactly or - * be worst). + * be worse). */ if (relay && relay->link == slink) { /* - * Match, get the next relay to match against the - * next slink. + * Match, relay already in-place, get the next + * relay to match against the next slink. */ relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); if (--count == 0) @@ -718,35 +776,43 @@ hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn) * do not relay. This prevents endless closed * loops with ever-incrementing distances when * the seed span is lost in the graph. + * + * All later spans will also be too far away so + * we can break out of the loop. */ - /* no code needed */ + break; } else { /* * No match, distance is ok, construct a new relay. + * (slink is better than relay). */ hammer2_msg_t *msg; assert(relay == NULL || - relay->link->dist <= slink->dist); + relay->link->node != slink->node || + relay->link->dist >= slink->dist); relay = hammer2_alloc(sizeof(*relay)); relay->conn = conn; relay->link = slink; + msg = hammer2_msg_alloc(&conn->state->iocom->router, 0, + HAMMER2_LNK_SPAN | + HAMMER2_MSGF_CREATE, + hammer2_lnk_relay, relay); + relay->state = msg->state; + msg->any.lnk_span = slink->state->msg->any.lnk_span; + msg->any.lnk_span.dist = slink->dist + 1; + RB_INSERT(h2span_relay_tree, &conn->tree, relay); TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); - msg = hammer2_msg_alloc(conn->state->iocom, 0, - HAMMER2_LNK_SPAN | - HAMMER2_MSGF_CREATE); - msg->any.lnk_span = slink->state->msg->any.lnk_span; - ++msg->any.lnk_span.dist; /* XXX add weighting */ + hammer2_msg_write(msg); - hammer2_msg_write(conn->state->iocom, msg, - hammer2_lnk_relay, relay, - &relay->state); fprintf(stderr, - "RELAY SPAN ON CLS=%p NODE=%p DIST=%d " + "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d " "FD %d state %p\n", + slink, + relay, node->cls, node, slink->dist, conn->state->iocom->sock_fd, relay->state); @@ -777,7 +843,9 @@ void hammer2_relay_delete(h2span_relay_t *relay) { fprintf(stderr, - "RELAY DELETE ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n", + "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n", + relay->link, + relay, relay->link->node->cls, relay->link->node, relay->link->dist, relay->conn->state->iocom->sock_fd, relay->state); @@ -796,11 +864,38 @@ hammer2_relay_delete(h2span_relay_t *relay) hammer2_free(relay); } +/************************************************************************ + * ROUTER * + ************************************************************************ + * + * Provides route functions to msg.c + */ + +#if 0 +/* + * Acquire a persistent router structure given the cluster and node ids. + * Messages can be transacted via this structure while held. If the route + * is lost messages will return failure. + */ +hammer2_router_t * +hammer2_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid) +{ +} + +/* + * Release previously acquired router. + */ +void +hammer2_router_put(hammer2_router_t *router) +{ +} +#endif + /* * Dumps the spanning tree */ void -shell_tree(hammer2_iocom_t *iocom, char *cmdbuf __unused) +shell_tree(hammer2_router_t *router, char *cmdbuf __unused) { h2span_cluster_t *cls; h2span_node_t *node; @@ -809,14 +904,14 @@ shell_tree(hammer2_iocom_t *iocom, char *cmdbuf __unused) pthread_mutex_lock(&cluster_mtx); RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { - iocom_printf(iocom, "Cluster %s\n", + router_printf(router, "Cluster %s\n", hammer2_uuid_to_str(&cls->pfs_clid, &uustr)); RB_FOREACH(node, h2span_node_tree, &cls->tree) { - iocom_printf(iocom, " Node %s (%s)\n", + router_printf(router, " Node %s (%s)\n", hammer2_uuid_to_str(&node->pfs_fsid, &uustr), node->label); RB_FOREACH(slink, h2span_link_tree, &node->tree) { - iocom_printf(iocom, "\tLink dist=%d via %d\n", + router_printf(router, "\tLink dist=%d via %d\n", slink->dist, slink->state->iocom->sock_fd); } diff --git a/sbin/hammer2/network.h b/sbin/hammer2/network.h index e82d9fd..b79f501 100644 --- a/sbin/hammer2/network.h +++ b/sbin/hammer2/network.h @@ -137,7 +137,7 @@ struct hammer2_state { int flags; int error; struct hammer2_msg *msg; - void (*func)(struct hammer2_state *, struct hammer2_msg *); + void (*func)(struct hammer2_msg *); union { void *any; struct h2span_link *link; @@ -160,6 +160,7 @@ struct hammer2_address { struct hammer2_msg { TAILQ_ENTRY(hammer2_msg) qentry; + struct hammer2_router *router; struct hammer2_state *state; size_t hdr_size; size_t aux_size; @@ -224,6 +225,27 @@ typedef struct hammer2_ioq hammer2_ioq_t; #define HAMMER2_IOQ_MAXIOVEC 16 /* + * hammer2_router - governs the routing of a message. Passed into + * hammer2_msg_write. + * + * The router is either connected to an iocom (socket) directly, or + * connected to a SPAN transaction (h2span_link structure). + */ +struct hammer2_router { + struct hammer2_iocom *iocom; + struct h2span_link *link; /* non-NULL if indirect */ + void (*signal_callback)(struct hammer2_router *); + void (*rcvmsg_callback)(struct hammer2_msg *); + void (*altmsg_callback)(struct hammer2_iocom *); + struct hammer2_state_tree staterd_tree; /* active messages */ + struct hammer2_state_tree statewr_tree; /* active messages */ + hammer2_msg_queue_t txmsgq; /* tx msgq from remote */ + int refs; /* refs prevent destruction */ +}; + +typedef struct hammer2_router hammer2_router_t; + +/* * hammer2_iocom - governs a messaging stream connection */ struct hammer2_iocom { @@ -232,10 +254,6 @@ struct hammer2_iocom { hammer2_msg_queue_t freeq; /* free msgs hdr only */ hammer2_msg_queue_t freeq_aux; /* free msgs w/aux_data */ struct hammer2_address_queue addrq; /* source/target addrs */ - void (*state_callback)(struct hammer2_iocom *); - void (*rcvmsg_callback)(struct hammer2_iocom *, - struct hammer2_msg *); - void (*altmsg_callback)(struct hammer2_iocom *); int sock_fd; /* comm socket or pipe */ int alt_fd; /* thread signal, tty, etc */ int wakeupfds[2]; /* pipe wakes up iocom thread */ @@ -243,9 +261,7 @@ struct hammer2_iocom { int rxmisc; int txmisc; char sess[HAMMER2_AES_KEY_SIZE]; /* aes_256_cbc key */ - struct hammer2_state_tree staterd_tree; /* active messages */ - struct hammer2_state_tree statewr_tree; /* active messages */ - hammer2_msg_queue_t txmsgq; /* tx msgq from remote */ + struct hammer2_router router; pthread_mutex_t mtx; /* mutex for state*tree/rmsgq */ };