From: Matthew Dillon Date: Fri, 30 Nov 2012 23:39:12 +0000 (-0800) Subject: hammer2 - cluster / libdmsg circuit work X-Git-Tag: v3.4.0rc~767 X-Git-Url: https://gitweb.dragonflybsd.org/~tuxillo/dragonfly.git/commitdiff_plain/0d20ec8a5563b308a87bb20142331cbcef0174c6?hp=537d97bcf27d2bdb3bc8521c287713d78edd026c hammer2 - cluster / libdmsg circuit work * Major work on the virtual circuit code. Note there are still some thread races. * Major work on the spanning tree code to allow for future symmetric pathing support. * fs_label is now part of the RB tree compare, along with pfs_fsid. (e.g. the serial number for BLK protocol services). * Rip out the 'router' structure, replace with the 'circuit' structure. Embed circuit0 in the iocom. --- diff --git a/lib/libdmsg/debug.c b/lib/libdmsg/debug.c index 9c69ef0170..9251975ed2 100644 --- a/lib/libdmsg/debug.c +++ b/lib/libdmsg/debug.c @@ -247,13 +247,11 @@ dmsg_msg_str(dmsg_msg_t *msg) * Generate the buf */ snprintf(buf, sizeof(buf), - "msg=%s%s %s id=%08x src=%08x tgt=%08x %s", + "msg=%s%s %s msgid=%08x %s", dmsg_basecmd_str(msg->any.head.cmd), flagbuf, errstr, (uint32_t)(intmax_t)msg->any.head.msgid, /* for brevity */ - (uint32_t)(intmax_t)msg->any.head.source, /* for brevity */ - (uint32_t)(intmax_t)msg->any.head.target, /* for brevity */ statestr); return(buf); diff --git a/lib/libdmsg/dmsg.h b/lib/libdmsg/dmsg.h index 9e99129ec2..2a3eb7be40 100644 --- a/lib/libdmsg/dmsg.h +++ b/lib/libdmsg/dmsg.h @@ -122,41 +122,59 @@ typedef struct dmsg_handshake dmsg_handshake_t; * directly embedded (any), and the message may contain a reference * to allocated auxillary data. The structure is recycled quite often * by a connection. - * - * This structure is typically not used for storing persistent message - * state (see dmsg_persist for that). */ struct dmsg_iocom; -struct dmsg_persist; +struct dmsg_circuit; struct dmsg_state; -struct dmsg_router; struct dmsg_msg; TAILQ_HEAD(dmsg_state_queue, dmsg_state); TAILQ_HEAD(dmsg_msg_queue, dmsg_msg); RB_HEAD(dmsg_state_tree, dmsg_state); -RB_HEAD(dmsg_router_tree, dmsg_router); +RB_HEAD(dmsg_circuit_tree, dmsg_circuit); struct h2span_link; struct h2span_relay; struct h2span_conn; +struct dmsg_circuit { + RB_ENTRY(dmsg_circuit) rbnode; + uint64_t msgid; + struct dmsg_iocom *iocom; + struct dmsg_state_tree staterd_tree; /* active transactions */ + struct dmsg_state_tree statewr_tree; /* active transactions */ + struct dmsg_circuit *peer; /* (if circuit relay) */ + struct dmsg_state *state; /* open VC transaction state */ + struct dmsg_state *span_state; /* span, relay or link */ + int is_relay; /* span is h2span_relay */ + int refs; +}; + +/* + * The state structure is ref-counted. The iocom cannot go away while + * state structures are active. However, the related h2span_* linkages + * can be destroyed and NULL'd out if the state is terminated in both + * directions. + */ struct dmsg_state { RB_ENTRY(dmsg_state) rbnode; /* indexed by msgid */ struct dmsg_iocom *iocom; - struct dmsg_router *router; /* if routed */ + struct dmsg_circuit *circuit; /* associated circuit */ + uint32_t icmd; /* command creating state */ uint32_t txcmd; /* mostly for CMDF flags */ uint32_t rxcmd; /* mostly for CMDF flags */ uint64_t msgid; /* {spanid,msgid} uniq */ int flags; int error; - struct dmsg_msg *msg; + int refs; /* prevent destruction */ + struct dmsg_msg *msg; /* msg creating orig state */ void (*func)(struct dmsg_msg *); union { void *any; struct h2span_link *link; struct h2span_conn *conn; struct h2span_relay *relay; + struct dmsg_circuit *circ; } any; }; @@ -164,22 +182,33 @@ struct dmsg_state { #define DMSG_STATE_DYNAMIC 0x0002 #define DMSG_STATE_NODEID 0x0004 /* manages a node id */ +/* + * This is the core in-memory representation of a message structure. + * The iocom represents the incoming or outgoing iocom. Various state + * pointers are calculated based on the message's raw source and target + * fields, and will ref the underlying state. Message headers are embedded + * while auxillary data is separately allocated. + */ struct dmsg_msg { TAILQ_ENTRY(dmsg_msg) qentry; - struct dmsg_router *router; - struct dmsg_state *state; + struct dmsg_iocom *iocom; /* incoming/outgoing iocom */ + struct dmsg_circuit *circuit; /* associated circuit */ + struct dmsg_state *state; /* message state */ size_t hdr_size; size_t aux_size; char *aux_data; dmsg_any_t any; }; +typedef struct dmsg_circuit dmsg_circuit_t; typedef struct dmsg_state dmsg_state_t; typedef struct dmsg_msg dmsg_msg_t; typedef struct dmsg_msg_queue dmsg_msg_queue_t; int dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2); RB_PROTOTYPE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); +int dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2); +RB_PROTOTYPE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp); /* * dmsg_ioq - An embedded component of dmsg_conn, holds state @@ -232,42 +261,13 @@ typedef struct dmsg_ioq dmsg_ioq_t; #define DMSG_IOQ_ERROR_IVWRAP 18 /* IVs exhaused */ #define DMSG_IOQ_ERROR_MACFAIL 19 /* MAC of encr alg failed */ #define DMSG_IOQ_ERROR_ALGO 20 /* Misc. encr alg error */ +#define DMSG_IOQ_ERROR_ROUTED 21 /* ignore routed message */ +#define DMSG_IOQ_ERROR_BAD_CIRCUIT 22 /* unconfigured circuit */ +#define DMSG_IOQ_ERROR_UNUSED23 23 +#define DMSG_IOQ_ERROR_ASSYM 24 /* Assymetric path */ #define DMSG_IOQ_MAXIOVEC 16 -/* - * dmsg_router - governs the routing of a message. Passed into - * dmsg_msg_write. - * - * The router is either connected to an iocom (socket) directly, or - * connected to a SPAN transaction (h2span_link structure for outgoing) - * or to a SPAN transaction (h2span_relay structure for incoming). - */ -struct dmsg_router { - RB_ENTRY(dmsg_router) rbnode; /* indexed by target */ - struct dmsg_iocom *iocom; - struct h2span_link *link; /* may be NULL */ - struct h2span_relay *relay; /* may be NULL */ - void (*signal_callback)(struct dmsg_router *); - void (*rcvmsg_callback)(struct dmsg_msg *); - void (*altmsg_callback)(struct dmsg_iocom *); - void (*dbgmsg_callback)(dmsg_msg_t *msg); - struct dmsg_state_tree staterd_tree; /* active messages */ - struct dmsg_state_tree statewr_tree; /* active messages */ - dmsg_msg_queue_t txmsgq; /* tx msgq from remote */ - uint64_t target; /* for routing */ - int flags; - int refs; /* refs prevent destruction */ -}; - -#define DMSG_ROUTER_CONNECTED 0x0001 /* on global RB tree */ -#define DMSG_ROUTER_DELETED 0x0002 /* parent structure destroyed */ - -typedef struct dmsg_router dmsg_router_t; - -int dmsg_router_cmp(dmsg_router_t *router1, dmsg_router_t *router2); -RB_PROTOTYPE(dmsg_router_tree, dmsg_router, rbnode, dmsg_router_cmp); - /* * dmsg_iocom - governs a messaging stream connection */ @@ -282,7 +282,14 @@ struct dmsg_iocom { int flags; int rxmisc; int txmisc; - struct dmsg_router *router; + void (*signal_callback)(struct dmsg_iocom *); + void (*rcvmsg_callback)(struct dmsg_msg *); + void (*altmsg_callback)(struct dmsg_iocom *); + void (*dbgmsg_callback)(dmsg_msg_t *msg); + struct dmsg_circuit_tree circuit_tree; /* active circuits */ + struct dmsg_circuit circuit0; /* embedded circuit0 */ + dmsg_msg_queue_t txmsgq; /* tx msgq from remote */ + struct h2span_conn *conn; /* if LNK_CONN active */ pthread_mutex_t mtx; /* mutex for state*tree/rmsgq */ }; @@ -328,6 +335,14 @@ struct dmsg_master_service_info { typedef struct dmsg_master_service_info dmsg_master_service_info_t; +/* + * node callbacks + */ +#define DMSG_NODEOP_ADD 1 +#define DMSG_NODEOP_DEL 2 + +extern void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op); + /* * icrc @@ -347,6 +362,8 @@ const char *dmsg_msg_str(dmsg_msg_t *msg); void *dmsg_alloc(size_t bytes); void dmsg_free(void *ptr); const char *dmsg_uuid_to_str(uuid_t *uuid, char **strp); +const char *dmsg_peer_type_to_str(uint8_t type); +const char *dmsg_pfs_type_to_str(uint8_t type); int dmsg_connect(const char *hostname); /* @@ -356,22 +373,24 @@ void dmsg_bswap_head(dmsg_hdr_t *head); void dmsg_ioq_init(dmsg_iocom_t *iocom, dmsg_ioq_t *ioq); void dmsg_ioq_done(dmsg_iocom_t *iocom, dmsg_ioq_t *ioq); void dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, - void (*state_func)(dmsg_router_t *), + void (*state_func)(dmsg_iocom_t *), void (*rcvmsg_func)(dmsg_msg_t *), void (*dbgmsg_func)(dmsg_msg_t *), void (*altmsg_func)(dmsg_iocom_t *)); -void dmsg_router_restate(dmsg_router_t *router, - void (*state_func)(dmsg_router_t *), +void dmsg_iocom_restate(dmsg_iocom_t *iocom, + void (*state_func)(dmsg_iocom_t *), void (*rcvmsg_func)(dmsg_msg_t *), void (*altmsg_func)(dmsg_iocom_t *)); -void dmsg_router_signal(dmsg_router_t *router); +void dmsg_iocom_signal(dmsg_iocom_t *iocom); void dmsg_iocom_done(dmsg_iocom_t *iocom); -dmsg_msg_t *dmsg_msg_alloc(dmsg_router_t *router, +void dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit); +dmsg_msg_t *dmsg_msg_alloc(dmsg_circuit_t *circuit, size_t aux_size, uint32_t cmd, void (*func)(dmsg_msg_t *), void *data); void dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error); void dmsg_msg_result(dmsg_msg_t *msg, uint32_t error); void dmsg_state_reply(dmsg_state_t *state, uint32_t error); +void dmsg_state_result(dmsg_state_t *state, uint32_t error); void dmsg_msg_free(dmsg_msg_t *msg); @@ -385,18 +404,17 @@ void dmsg_iocom_flush2(dmsg_iocom_t *iocom); void dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg); void dmsg_state_free(dmsg_state_t *state); +void dmsg_circuit_drop(dmsg_circuit_t *circuit); -dmsg_router_t *dmsg_router_alloc(void); -void dmsg_router_connect(dmsg_router_t *router); -void dmsg_router_disconnect(dmsg_router_t **routerp); +int dmsg_circuit_relay(dmsg_msg_t *msg); /* * Msg protocol functions */ -void dmsg_msg_lnk_signal(dmsg_router_t *router); +void dmsg_msg_lnk_signal(dmsg_iocom_t *iocom); void dmsg_msg_lnk(dmsg_msg_t *msg); void dmsg_msg_dbg(dmsg_msg_t *msg); -void dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused); +void dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused); /* * Crypto functions @@ -411,7 +429,7 @@ int dmsg_crypto_encrypt(dmsg_iocom_t *iocom, dmsg_ioq_t *ioq, * Service daemon functions */ void *dmsg_master_service(void *data); -void dmsg_router_printf(dmsg_router_t *router, const char *ctl, ...) +void dmsg_circuit_printf(dmsg_circuit_t *circuit, const char *ctl, ...) __printflike(2, 3); extern int DMsgDebugOpt; diff --git a/lib/libdmsg/dmsg_local.h b/lib/libdmsg/dmsg_local.h index a9875321d7..99aac894dd 100644 --- a/lib/libdmsg/dmsg_local.h +++ b/lib/libdmsg/dmsg_local.h @@ -62,4 +62,6 @@ #include #include /* aes_256_cbc functions */ +#include + #include "dmsg.h" diff --git a/lib/libdmsg/msg.c b/lib/libdmsg/msg.c index ee9a8e9b7c..8c0b3aecf8 100644 --- a/lib/libdmsg/msg.c +++ b/lib/libdmsg/msg.c @@ -40,53 +40,37 @@ int DMsgDebugOpt; static int dmsg_state_msgrx(dmsg_msg_t *msg); static void dmsg_state_cleanuptx(dmsg_msg_t *msg); +RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); +RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp); + /* - * ROUTER TREE - Represents available routes for message routing, indexed - * by their span transaction id. The router structure is - * embedded in either an iocom, h2span_link (incoming), - * or h2span_relay (outgoing) (see msg_lnk.c). + * STATE TREE - Represents open transactions which are indexed by their + * { msgid } relative to the governing iocom. */ int -dmsg_router_cmp(dmsg_router_t *router1, dmsg_router_t *router2) +dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2) { - if (router1->target < router2->target) + if (state1->msgid < state2->msgid) return(-1); - if (router1->target > router2->target) + if (state1->msgid > state2->msgid) return(1); return(0); } -RB_GENERATE(dmsg_router_tree, dmsg_router, rbnode, dmsg_router_cmp); - -static pthread_mutex_t router_mtx; -static struct dmsg_router_tree router_ltree = RB_INITIALIZER(router_ltree); -static struct dmsg_router_tree router_rtree = RB_INITIALIZER(router_rtree); - /* - * STATE TREE - Represents open transactions which are indexed by their - * {router,msgid} relative to the governing iocom. - * - * router is usually iocom->router since state isn't stored - * for relayed messages. + * CIRCUIT TREE - Represents open circuits which are indexed by their + * { msgid } relative to the governing iocom. */ int -dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2) +dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2) { -#if 0 - if (state1->router < state2->router) - return(-1); - if (state1->router > state2->router) - return(1); -#endif - if (state1->msgid < state2->msgid) + if (circuit1->msgid < circuit2->msgid) return(-1); - if (state1->msgid > state2->msgid) + if (circuit1->msgid > circuit2->msgid) return(1); return(0); } -RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); - /* * Initialize a low-level ioq */ @@ -127,7 +111,7 @@ dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq) */ void dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, - void (*signal_func)(dmsg_router_t *), + void (*signal_func)(dmsg_iocom_t *), void (*rcvmsg_func)(dmsg_msg_t *), void (*dbgmsg_func)(dmsg_msg_t *), void (*altmsg_func)(dmsg_iocom_t *)) @@ -136,20 +120,16 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, bzero(iocom, sizeof(*iocom)); - iocom->router = dmsg_router_alloc(); - iocom->router->signal_callback = signal_func; - iocom->router->rcvmsg_callback = rcvmsg_func; - iocom->router->altmsg_callback = altmsg_func; - iocom->router->dbgmsg_callback = dbgmsg_func; - /* we do not call dmsg_router_connect() for iocom routers */ + iocom->signal_callback = signal_func; + iocom->rcvmsg_callback = rcvmsg_func; + iocom->altmsg_callback = altmsg_func; + iocom->dbgmsg_callback = dbgmsg_func; pthread_mutex_init(&iocom->mtx, NULL); - RB_INIT(&iocom->router->staterd_tree); - RB_INIT(&iocom->router->statewr_tree); + RB_INIT(&iocom->circuit_tree); TAILQ_INIT(&iocom->freeq); TAILQ_INIT(&iocom->freeq_aux); - TAILQ_INIT(&iocom->router->txmsgq); - iocom->router->iocom = iocom; + TAILQ_INIT(&iocom->txmsgq); iocom->sock_fd = sock_fd; iocom->alt_fd = alt_fd; iocom->flags = DMSG_IOCOMF_RREQ; @@ -162,6 +142,8 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK); fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK); + dmsg_circuit_init(iocom, &iocom->circuit0); + /* * Negotiate session crypto synchronously. This will mark the * connection as error'd if it fails. If this is a pipe it's @@ -192,25 +174,25 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, * the recevmsg_func and the sendmsg_func is called at least once. */ void -dmsg_router_restate(dmsg_router_t *router, - void (*signal_func)(dmsg_router_t *), +dmsg_iocom_restate(dmsg_iocom_t *iocom, + void (*signal_func)(dmsg_iocom_t *), void (*rcvmsg_func)(dmsg_msg_t *msg), void (*altmsg_func)(dmsg_iocom_t *)) { - router->signal_callback = signal_func; - router->rcvmsg_callback = rcvmsg_func; - router->altmsg_callback = altmsg_func; + iocom->signal_callback = signal_func; + iocom->rcvmsg_callback = rcvmsg_func; + iocom->altmsg_callback = altmsg_func; if (signal_func) - router->iocom->flags |= DMSG_IOCOMF_SWORK; + iocom->flags |= DMSG_IOCOMF_SWORK; else - router->iocom->flags &= ~DMSG_IOCOMF_SWORK; + iocom->flags &= ~DMSG_IOCOMF_SWORK; } void -dmsg_router_signal(dmsg_router_t *router) +dmsg_iocom_signal(dmsg_iocom_t *iocom) { - if (router->signal_callback) - router->iocom->flags |= DMSG_IOCOMF_SWORK; + if (iocom->signal_callback) + iocom->flags |= DMSG_IOCOMF_SWORK; } /* @@ -255,15 +237,30 @@ dmsg_iocom_done(dmsg_iocom_t *iocom) pthread_mutex_destroy(&iocom->mtx); } +/* + * Initialize a circuit structure and add it to the iocom's circuit_tree. + * circuit0 is left out and will not be added to the tree. + */ +void +dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit) +{ + circuit->iocom = iocom; + RB_INIT(&circuit->staterd_tree); + RB_INIT(&circuit->statewr_tree); + if (circuit->msgid) + RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit); +} + /* * Allocate a new one-way message. */ dmsg_msg_t * -dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd, - void (*func)(dmsg_msg_t *), void *data) +dmsg_msg_alloc(dmsg_circuit_t *circuit, + size_t aux_size, uint32_t cmd, + void (*func)(dmsg_msg_t *), void *data) { + dmsg_iocom_t *iocom = circuit->iocom; dmsg_state_t *state = NULL; - dmsg_iocom_t *iocom = router->iocom; dmsg_msg_t *msg; int hbytes; @@ -280,24 +277,30 @@ dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd, if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { /* * Create state when CREATE is set without REPLY. + * Assign a unique msgid, in this case simply using + * the pointer value for 'state'. * * NOTE: CREATE in txcmd handled by dmsg_msg_write() * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx() + * + * NOTE: state initiated by us and state initiated by + * a remote create are placed in different RB trees. + * The msgid for SPAN state is used in source/target + * for message routing as appropriate. */ state = malloc(sizeof(*state)); bzero(state, sizeof(*state)); state->iocom = iocom; + state->circuit = circuit; state->flags = DMSG_STATE_DYNAMIC; state->msgid = (uint64_t)(uintptr_t)state; - state->router = router; state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE); state->rxcmd = DMSGF_REPLY; + state->icmd = state->txcmd & DMSGF_BASECMDMASK; state->func = func; state->any.any = data; pthread_mutex_lock(&iocom->mtx); - RB_INSERT(dmsg_state_tree, - &iocom->router->statewr_tree, - state); + RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state); pthread_mutex_unlock(&iocom->mtx); state->flags |= DMSG_STATE_INSERTED; } @@ -323,14 +326,19 @@ dmsg_msg_alloc(dmsg_router_t *router, size_t aux_size, uint32_t cmd, if (hbytes) bzero(&msg->any.head, hbytes); msg->hdr_size = hbytes; + msg->any.head.magic = DMSG_HDR_MAGIC; msg->any.head.cmd = cmd; msg->any.head.aux_descr = 0; msg->any.head.aux_crc = 0; - msg->router = router; + msg->any.head.circuit = 0; + msg->circuit = circuit; + msg->iocom = iocom; if (state) { msg->state = state; state->msg = msg; msg->any.head.msgid = state->msgid; + } else { + msg->any.head.msgid = 0; } return (msg); } @@ -344,7 +352,7 @@ static void dmsg_msg_free_locked(dmsg_msg_t *msg) { - dmsg_iocom_t *iocom = msg->router->iocom; + dmsg_iocom_t *iocom = msg->iocom; msg->state = NULL; if (msg->aux_data) @@ -356,7 +364,7 @@ dmsg_msg_free_locked(dmsg_msg_t *msg) void dmsg_msg_free(dmsg_msg_t *msg) { - dmsg_iocom_t *iocom = msg->router->iocom; + dmsg_iocom_t *iocom = msg->iocom; pthread_mutex_lock(&iocom->mtx); dmsg_msg_free_locked(msg); @@ -455,7 +463,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) if (iocom->flags & DMSG_IOCOMF_SWORK) { iocom->flags &= ~DMSG_IOCOMF_SWORK; - iocom->router->signal_callback(iocom->router); + iocom->signal_callback(iocom); } /* @@ -468,7 +476,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); iocom->flags |= DMSG_IOCOMF_RWORK; iocom->flags |= DMSG_IOCOMF_WWORK; - if (TAILQ_FIRST(&iocom->router->txmsgq)) + if (TAILQ_FIRST(&iocom->txmsgq)) dmsg_iocom_flush1(iocom); } @@ -490,14 +498,14 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) fprintf(stderr, "receive %s\n", dmsg_msg_str(msg)); } - iocom->router->rcvmsg_callback(msg); + iocom->rcvmsg_callback(msg); dmsg_state_cleanuprx(iocom, msg); } } if (iocom->flags & DMSG_IOCOMF_ARWORK) { iocom->flags &= ~DMSG_IOCOMF_ARWORK; - iocom->router->altmsg_callback(iocom); + iocom->altmsg_callback(iocom); } } } @@ -556,6 +564,7 @@ dmsg_ioq_read(dmsg_iocom_t *iocom) dmsg_ioq_t *ioq = &iocom->ioq_rx; dmsg_msg_t *msg; dmsg_state_t *state; + dmsg_circuit_t *circuit0; dmsg_hdr_t *head; ssize_t n; size_t bytes; @@ -682,8 +691,8 @@ again: /* * Allocate the message, the next state will fill it in. */ - msg = dmsg_msg_alloc(iocom->router, ioq->abytes, 0, - NULL, NULL); + msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0, + NULL, NULL); ioq->msg = msg; /* @@ -924,24 +933,6 @@ again: } } - /* - * Handle relaying. Transactional state is not recorded XXX - */ - - /* - * Process transactional state for the message. - */ - if (msg && ioq->error == 0) { - error = dmsg_state_msgrx(msg); - if (error) { - if (error == DMSG_IOQ_ERROR_EALREADY) { - dmsg_msg_free(msg); - goto again; - } - ioq->error = error; - } - } - /* * Handle error, RREQ, or completion * @@ -975,16 +966,20 @@ skip: * transactions, ending with a final non-transactional * LNK_ERROR (that the session can detect) when no * transactions remain. + * + * We only need to scan transactions on circuit0 as these + * will contain all circuit forges, and terminating circuit + * forges will automatically terminate the transactions on + * any other circuits as well as those circuits. */ - msg = dmsg_msg_alloc(iocom->router, 0, 0, NULL, NULL); - bzero(&msg->any.head, sizeof(msg->any.head)); - msg->any.head.magic = DMSG_HDR_MAGIC; - msg->any.head.cmd = DMSG_LNK_ERROR; + circuit0 = &iocom->circuit0; + msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL); msg->any.head.error = ioq->error; pthread_mutex_lock(&iocom->mtx); dmsg_iocom_drain(iocom); - if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) { + + if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) { /* * Active remote transactions are still present. * Simulate the other end sending us a DELETE. @@ -995,13 +990,12 @@ skip: } else { /*state->txcmd |= DMSGF_DELETE;*/ msg->state = state; - msg->router = state->router; + msg->iocom = iocom; msg->any.head.msgid = state->msgid; msg->any.head.cmd |= DMSGF_ABORT | DMSGF_DELETE; } - } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) != - NULL) { + } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) { /* * Active local transactions are still present. * Simulate the other end sending us a DELETE. @@ -1011,7 +1005,7 @@ skip: msg = NULL; } else { msg->state = state; - msg->router = state->router; + msg->iocom = iocom; msg->any.head.msgid = state->msgid; msg->any.head.cmd |= DMSGF_ABORT | DMSGF_DELETE | @@ -1055,7 +1049,7 @@ skip: iocom->flags |= DMSG_IOCOMF_RREQ; } else { /* - * Return msg. + * Continue processing msg. * * The fifo has already been advanced past the message. * Trivially reset the FIFO indices if possible. @@ -1076,6 +1070,43 @@ skip: } ioq->state = DMSG_MSGQ_STATE_HEADER1; ioq->msg = NULL; + + /* + * Handle message routing. Validates non-zero sources + * and routes message. Error will be 0 if the message is + * destined for us. + * + * State processing only occurs for messages destined for us. + */ + if (msg->any.head.circuit) + error = dmsg_circuit_relay(msg); + else + error = dmsg_state_msgrx(msg); + + if (error) { + /* + * Abort-after-closure, throw message away and + * start reading another. + */ + if (error == DMSG_IOQ_ERROR_EALREADY) { + dmsg_msg_free(msg); + goto again; + } + + /* + * msg routed, msg pointer no longer owned by us. + * Go to the top and start reading another. + */ + if (error == DMSG_IOQ_ERROR_ROUTED) + goto again; + + /* + * Process real error and throw away message. + */ + ioq->error = error; + goto skip; + } + /* no error, not routed. Fall through and return msg */ } return (msg); } @@ -1102,8 +1133,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); TAILQ_INIT(&tmpq); pthread_mutex_lock(&iocom->mtx); - while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) { - TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry); + while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) { + TAILQ_REMOVE(&iocom->txmsgq, msg, qentry); TAILQ_INSERT_TAIL(&tmpq, msg, qentry); } pthread_mutex_unlock(&iocom->mtx); @@ -1361,7 +1392,7 @@ dmsg_iocom_drain(dmsg_iocom_t *iocom) void dmsg_msg_write(dmsg_msg_t *msg) { - dmsg_iocom_t *iocom = msg->router->iocom; + dmsg_iocom_t *iocom = msg->iocom; dmsg_state_t *state; char dummy; @@ -1384,23 +1415,21 @@ dmsg_msg_write(dmsg_msg_t *msg) if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; + state->icmd = state->txcmd & DMSGF_BASECMDMASK; } msg->any.head.msgid = state->msgid; assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0); - if (msg->any.head.cmd & DMSGF_CREATE) + if (msg->any.head.cmd & DMSGF_CREATE) { state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; - } else { - msg->any.head.msgid = 0; - /* XXX set spanid by router */ + state->icmd = state->txcmd & DMSGF_BASECMDMASK; + } } - msg->any.head.source = 0; - msg->any.head.target = msg->router->target; /* * Queue it for output, wake up the I/O pthread. Note that the * I/O thread is responsible for generating the CRCs and encryption. */ - TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry); + TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry); dummy = 0; write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ pthread_mutex_unlock(&iocom->mtx); @@ -1422,7 +1451,6 @@ dmsg_msg_write(dmsg_msg_t *msg) void dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error) { - dmsg_iocom_t *iocom = msg->router->iocom; dmsg_state_t *state = msg->state; dmsg_msg_t *nmsg; uint32_t cmd; @@ -1455,15 +1483,17 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error) /* * Allocate the message and associate it with the existing state. - * We cannot pass MSGF_CREATE to msg_alloc() because that may + * We cannot pass DMSGF_CREATE to msg_alloc() because that may * allocate new state. We have our state already. */ - nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL); + nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL); if (state) { if ((state->txcmd & DMSGF_CREATE) == 0) nmsg->any.head.cmd |= DMSGF_CREATE; } nmsg->any.head.error = error; + nmsg->any.head.msgid = msg->any.head.msgid; + nmsg->any.head.circuit = msg->any.head.circuit; nmsg->state = state; dmsg_msg_write(nmsg); } @@ -1477,7 +1507,6 @@ dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error) void dmsg_msg_result(dmsg_msg_t *msg, uint32_t error) { - dmsg_iocom_t *iocom = msg->router->iocom; dmsg_state_t *state = msg->state; dmsg_msg_t *nmsg; uint32_t cmd; @@ -1508,12 +1537,14 @@ dmsg_msg_result(dmsg_msg_t *msg, uint32_t error) cmd |= DMSGF_REPLY; } - nmsg = dmsg_msg_alloc(iocom->router, 0, cmd, NULL, NULL); + nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL); if (state) { if ((state->txcmd & DMSGF_CREATE) == 0) nmsg->any.head.cmd |= DMSGF_CREATE; } nmsg->any.head.error = error; + nmsg->any.head.msgid = msg->any.head.msgid; + nmsg->any.head.circuit = msg->any.head.circuit; nmsg->state = state; dmsg_msg_write(nmsg); } @@ -1540,12 +1571,48 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error) if (state->txcmd & DMSGF_REPLY) cmd |= DMSGF_REPLY; - nmsg = dmsg_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL); + nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL); if (state) { if ((state->txcmd & DMSGF_CREATE) == 0) nmsg->any.head.cmd |= DMSGF_CREATE; } nmsg->any.head.error = error; + nmsg->any.head.msgid = state->msgid; + nmsg->any.head.circuit = state->msg->any.head.circuit; + nmsg->state = state; + dmsg_msg_write(nmsg); +} + +/* + * Terminate a transaction given a state structure by issuing a DELETE. + */ +void +dmsg_state_result(dmsg_state_t *state, uint32_t error) +{ + dmsg_msg_t *nmsg; + uint32_t cmd = DMSG_LNK_ERROR; + + /* + * Nothing to do if we already transmitted a delete + */ + if (state->txcmd & DMSGF_DELETE) + return; + + /* + * Set REPLY if the other end initiated the command. Otherwise + * we are the command direction. + */ + if (state->txcmd & DMSGF_REPLY) + cmd |= DMSGF_REPLY; + + nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL); + if (state) { + if ((state->txcmd & DMSGF_CREATE) == 0) + nmsg->any.head.cmd |= DMSGF_CREATE; + } + nmsg->any.head.error = error; + nmsg->any.head.msgid = state->msgid; + nmsg->any.head.circuit = state->msg->any.head.circuit; nmsg->state = state; dmsg_msg_write(nmsg); } @@ -1557,8 +1624,8 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error) */ /* - * Process state tracking for a message after reception, prior to - * execution. + * Process circuit and state tracking for a message after reception, prior + * to execution. * * Called with msglk held and the msg dequeued. * @@ -1626,25 +1693,41 @@ dmsg_state_reply(dmsg_state_t *state, uint32_t error) static int dmsg_state_msgrx(dmsg_msg_t *msg) { - dmsg_iocom_t *iocom = msg->router->iocom; + dmsg_iocom_t *iocom = msg->iocom; + dmsg_circuit_t *circuit; dmsg_state_t *state; - dmsg_state_t dummy; + dmsg_state_t sdummy; + dmsg_circuit_t cdummy; int error; + pthread_mutex_lock(&iocom->mtx); + + /* + * Locate existing persistent circuit and state, if any. + */ + if (msg->any.head.circuit == 0) { + circuit = &iocom->circuit0; + } else { + cdummy.msgid = msg->any.head.circuit; + circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, + &cdummy); + if (circuit == NULL) + return (DMSG_IOQ_ERROR_BAD_CIRCUIT); + } + msg->circuit = circuit; + ++circuit->refs; + /* - * Lock RB tree and locate existing persistent state, if any. - * * If received msg is a command state is on staterd_tree. * If received msg is a reply state is on statewr_tree. */ - dummy.msgid = msg->any.head.msgid; - pthread_mutex_lock(&iocom->mtx); + sdummy.msgid = msg->any.head.msgid; if (msg->any.head.cmd & DMSGF_REPLY) { - state = RB_FIND(dmsg_state_tree, - &iocom->router->statewr_tree, &dummy); + state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree, + &sdummy); } else { - state = RB_FIND(dmsg_state_tree, - &iocom->router->staterd_tree, &dummy); + state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree, + &sdummy); } msg->state = state; pthread_mutex_unlock(&iocom->mtx); @@ -1678,17 +1761,17 @@ dmsg_state_msgrx(dmsg_msg_t *msg) state = malloc(sizeof(*state)); bzero(state, sizeof(*state)); state->iocom = iocom; + state->circuit = circuit; state->flags = DMSG_STATE_DYNAMIC; state->msg = msg; state->txcmd = DMSGF_REPLY; state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; + state->icmd = state->rxcmd & DMSGF_BASECMDMASK; state->flags |= DMSG_STATE_INSERTED; state->msgid = msg->any.head.msgid; - state->router = msg->router; msg->state = state; pthread_mutex_lock(&iocom->mtx); - RB_INSERT(dmsg_state_tree, - &iocom->router->staterd_tree, state); + RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state); pthread_mutex_unlock(&iocom->mtx); error = 0; if (DMsgDebugOpt) { @@ -1840,11 +1923,11 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) if (state->rxcmd & DMSGF_REPLY) { assert(msg->any.head.cmd & DMSGF_REPLY); RB_REMOVE(dmsg_state_tree, - &iocom->router->statewr_tree, state); + &msg->circuit->statewr_tree, state); } else { assert((msg->any.head.cmd & DMSGF_REPLY) == 0); RB_REMOVE(dmsg_state_tree, - &iocom->router->staterd_tree, state); + &msg->circuit->staterd_tree, state); } state->flags &= ~DMSG_STATE_INSERTED; dmsg_state_free(state); @@ -1865,13 +1948,14 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) static void dmsg_state_cleanuptx(dmsg_msg_t *msg) { - dmsg_iocom_t *iocom = msg->router->iocom; + dmsg_iocom_t *iocom = msg->iocom; dmsg_state_t *state; if ((state = msg->state) == NULL) { dmsg_msg_free(msg); } else if (msg->any.head.cmd & DMSGF_DELETE) { pthread_mutex_lock(&iocom->mtx); + assert((state->txcmd & DMSGF_DELETE) == 0); state->txcmd |= DMSGF_DELETE; if (state->rxcmd & DMSGF_DELETE) { if (state->msg == msg) @@ -1880,11 +1964,11 @@ dmsg_state_cleanuptx(dmsg_msg_t *msg) if (state->txcmd & DMSGF_REPLY) { assert(msg->any.head.cmd & DMSGF_REPLY); RB_REMOVE(dmsg_state_tree, - &iocom->router->staterd_tree, state); + &msg->circuit->staterd_tree, state); } else { assert((msg->any.head.cmd & DMSGF_REPLY) == 0); RB_REMOVE(dmsg_state_tree, - &iocom->router->statewr_tree, state); + &msg->circuit->statewr_tree, state); } state->flags &= ~DMSG_STATE_INSERTED; dmsg_state_free(state); @@ -1904,9 +1988,7 @@ dmsg_state_cleanuptx(dmsg_msg_t *msg) void dmsg_state_free(dmsg_state_t *state) { - dmsg_iocom_t *iocom = state->iocom; dmsg_msg_t *msg; - char dummy; if (DMsgDebugOpt) { fprintf(stderr, "terminate state %p id=%08x\n", @@ -1918,101 +2000,48 @@ dmsg_state_free(dmsg_state_t *state) if (msg) dmsg_msg_free_locked(msg); free(state); - - /* - * When an iocom error is present we are trying to close down the - * iocom, but we have to wait for all states to terminate before - * we can do so. The iocom rx code will terminate the receive side - * for all transactions by simulating incoming DELETE messages, - * but the state doesn't go away until both sides are terminated. - * - * We may have to wake up the rx code. - */ - if (iocom->ioq_rx.error && - RB_EMPTY(&iocom->router->staterd_tree) && - RB_EMPTY(&iocom->router->statewr_tree)) { - dummy = 0; - write(iocom->wakeupfds[1], &dummy, 1); - } } -/************************************************************************ - * ROUTING * - ************************************************************************ - * - * Incoming messages are routed by their spanid, matched up against - * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c). - * Any replies run through the same router. - * - * Originated messages are routed by their spanid, matched up against - * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c). - * Replies come back through the same route. - * - * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular - * transaction runs through the same route. Commands and replies both. - * - * An originated message will use a different routing spanid to - * reach a target node than a message which originates from that node. - * They might use the same physical pipes (each pipe can have multiple - * SPANs and RELAYs), but the routes are distinct from the perspective - * of the router. +/* + * Called with iocom locked */ -dmsg_router_t * -dmsg_router_alloc(void) -{ - dmsg_router_t *router; - - router = dmsg_alloc(sizeof(*router)); - TAILQ_INIT(&router->txmsgq); - return (router); -} - void -dmsg_router_connect(dmsg_router_t *router) +dmsg_circuit_drop(dmsg_circuit_t *circuit) { - dmsg_router_t *tmp; - - assert(router->link || router->relay); - assert((router->flags & DMSG_ROUTER_CONNECTED) == 0); - - pthread_mutex_lock(&router_mtx); - if (router->link) - tmp = RB_INSERT(dmsg_router_tree, &router_ltree, router); - else - tmp = RB_INSERT(dmsg_router_tree, &router_rtree, router); - assert(tmp == NULL); - router->flags |= DMSG_ROUTER_CONNECTED; - pthread_mutex_unlock(&router_mtx); -} + dmsg_iocom_t *iocom = circuit->iocom; + char dummy; -void -dmsg_router_disconnect(dmsg_router_t **routerp) -{ - dmsg_router_t *router; + assert(circuit->refs > 0); + assert(iocom); - router = *routerp; - assert(router->link || router->relay); - assert(router->flags & DMSG_ROUTER_CONNECTED); + /* + * Decrement circuit refs, destroy circuit when refs drops to 0. + */ + if (--circuit->refs > 0) + return; - pthread_mutex_lock(&router_mtx); - if (router->link) - RB_REMOVE(dmsg_router_tree, &router_ltree, router); - else - RB_REMOVE(dmsg_router_tree, &router_rtree, router); - router->flags &= ~DMSG_ROUTER_CONNECTED; - *routerp = NULL; - pthread_mutex_unlock(&router_mtx); -} + assert(RB_EMPTY(&circuit->staterd_tree)); + assert(RB_EMPTY(&circuit->statewr_tree)); + RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit); + circuit->iocom = NULL; + dmsg_free(circuit); -#if 0 -/* - * XXX - */ -dmsg_router_t * -dmsg_route_msg(dmsg_msg_t *msg) -{ + /* + * When an iocom error is present the rx code will terminate the + * receive side for all transactions and (indirectly) all circuits + * by simulating DELETE messages. The state and related circuits + * don't disappear until the related states are closed in both + * directions + * + * Detect the case where the last circuit is now gone (and thus all + * states for all circuits are gone), and wakeup the rx thread to + * complete the termination. + */ + if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) { + dummy = 0; + write(iocom->wakeupfds[1], &dummy, 1); + } } -#endif /* * This swaps endian for a hammer2_msg_hdr. Note that the extended @@ -2026,8 +2055,8 @@ dmsg_bswap_head(dmsg_hdr_t *head) head->salt = bswap32(head->salt); head->msgid = bswap64(head->msgid); - head->source = bswap64(head->source); - head->target = bswap64(head->target); + head->circuit = bswap64(head->circuit); + head->reserved18= bswap64(head->reserved18); head->cmd = bswap32(head->cmd); head->aux_crc = bswap32(head->aux_crc); diff --git a/lib/libdmsg/msg_lnk.c b/lib/libdmsg/msg_lnk.c index 5e7eb27ba7..c76da5548a 100644 --- a/lib/libdmsg/msg_lnk.c +++ b/lib/libdmsg/msg_lnk.c @@ -32,97 +32,14 @@ * SUCH DAMAGE. */ /* - * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - * - * This code supports the LNK_SPAN protocol. Essentially all PFS's - * clients and services rendezvous with the userland hammer2 service and - * open LNK_SPAN transactions using a message header linkid of 0, - * registering any PFS's they have connectivity to with us. - * - * -- - * - * Each registration maintains its own open LNK_SPAN message transaction. - * The SPANs are collected, aggregated, and retransmitted over available - * connections through the maintainance of additional LNK_SPAN message - * transactions on each link. - * - * The msgid for each active LNK_SPAN transaction we receive allows us to - * send a message to the target PFS (which might be one of many belonging - * to the same cluster), by specifying that msgid as the linkid in any - * message we send to the target PFS. - * - * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit - * (and remember we will maintain multiple open LNK_SPAN transactions on - * each connection representing the topology span, so every node sees every - * other node as a separate open transaction). So, similarly the msgid for - * these active transactions which we initiated can be used by the other - * end to route messages through us to another node, ultimately winding up - * at the identified hammer2 PFS. We have to adjust the spanid in the message - * header at each hop to be representative of the outgoing LNK_SPAN we - * are forwarding the message through. - * - * -- - * - * If we were to retransmit every LNK_SPAN transaction we receive it would - * create a huge mess, so we have to aggregate all received LNK_SPAN - * transactions, sort them by the fsid (the cluster) and sub-sort them by - * the pfs_fsid (individual nodes in the cluster), and only retransmit - * (create outgoing transactions) for a subset of the nearest distance-hops - * for each individual node. - * - * The higher level protocols can then issue transactions to the nodes making - * up a cluster to perform all actions required. - * - * -- - * - * Since this is a large topology and a spanning tree protocol, links can - * go up and down all the time. Any time a link goes down its transaction - * is closed. The transaction has to be closed on both ends before we can - * delete (and potentially reuse) the related spanid. The LNK_SPAN being - * closed may have been propagated out to other connections and those related - * LNK_SPANs are also closed. Ultimately all routes via the lost LNK_SPAN - * go away, ultimately reaching all sources and all targets. - * - * Any messages in-transit using a route that goes away will be thrown away. - * Open transactions are only tracked at the two end-points. When a link - * failure propagates to an end-point the related open transactions lose - * their spanid and are automatically aborted. - * - * It is important to note that internal route nodes cannot just associate - * a lost LNK_SPAN transaction with another route to the same destination. - * Message transactions MUST be serialized and MUST be ordered. All messages - * for a transaction must run over the same route. So if the route used by - * an active transaction is lost, the related messages will be fully aborted - * and the higher protocol levels will retry as appropriate. - * - * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation - * back to the originator. Only the originator keeps tracks of a message. - * Routers just pass it through. If a route is lost during transit the - * message is simply thrown away. - * - * It is also important to note that several paths to the same PFS can be - * propagated along the same link, which allows concurrency and even - * redundancy over several network interfaces or via different routes through - * the topology. Any given transaction will use only a single route but busy - * servers will often have hundreds of transactions active simultaniously, - * so having multiple active paths through the network topology for A<->B - * will improve performance. - * - * -- - * - * Most protocols consolidate operations rather than simply relaying them. - * This is particularly true of LEAF protocols (such as strict HAMMER2 - * clients), of which there can be millions connecting into the cluster at - * various points. The SPAN protocol is not used for these LEAF elements. - * - * Instead the primary service they connect to implements a proxy for the - * client protocols so the core topology only has to propagate a couple of - * LNK_SPANs and not millions. LNK_SPANs are meant to be used only for - * core master nodes and satellite slaves and cache nodes. + * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an + * involved explanation of the protocol. */ #include "dmsg_local.h" +void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op); + /* * Maximum spanning tree distance. This has the practical effect of * stopping tail-chasing closed loops when a feeder span is lost. @@ -193,6 +110,7 @@ RB_HEAD(h2span_cluster_tree, h2span_cluster); RB_HEAD(h2span_node_tree, h2span_node); RB_HEAD(h2span_link_tree, h2span_link); RB_HEAD(h2span_relay_tree, h2span_relay); +uint32_t DMsgRNSS; /* * This represents a media @@ -251,46 +169,42 @@ struct h2span_node { uint8_t pfs_type; uuid_t pfs_fsid; /* unique fsid */ char fs_label[128]; /* fs label (typ PEER_HAMMER2) */ + void *opaque; }; struct h2span_link { RB_ENTRY(h2span_link) rbnode; dmsg_state_t *state; /* state<->link */ struct h2span_node *node; /* related node */ - int32_t dist; + uint32_t dist; + uint32_t rnss; struct h2span_relay_queue relayq; /* relay out */ - struct dmsg_router *router; /* route out this link */ }; /* * Any LNK_SPAN transactions we receive which are relayed out other - * connections utilize this structure to track the LNK_SPAN transaction - * we initiate on the other connections, if selected for relay. + * connections utilize this structure to track the LNK_SPAN transactions + * we initiate (relay out) on other connections. We only relay out + * LNK_SPANs on connections we have an open CONN transaction for. + * + * The relay structure points to the outgoing LNK_SPAN trans (out_state) + * and to the incoming LNK_SPAN transaction (in_state). The relay + * structure holds refs on the related states. * * In many respects this is the core of the protocol... actually figuring * out what LNK_SPANs to relay. The spanid used for relaying is the * address of the 'state' structure, which is why h2span_relay has to * be entered into a RB-TREE based at h2span_conn (so we can look * up the spanid to validate it). - * - * NOTE: Messages can be received via the LNK_SPAN transaction the - * relay maintains, and can be replied via relay->router, but - * messages are NOT initiated via a relay. Messages are initiated - * via incoming links (h2span_link's). - * - * relay->link represents the link being relayed, NOT the LNK_SPAN - * transaction the relay is holding open. */ struct h2span_relay { - RB_ENTRY(h2span_relay) rbnode; /* from h2span_conn */ - TAILQ_ENTRY(h2span_relay) entry; /* from link */ - struct h2span_conn *conn; - dmsg_state_t *state; /* transmitted LNK_SPAN */ - struct h2span_link *link; /* LNK_SPAN being relayed */ - struct dmsg_router *router;/* route out this relay */ + TAILQ_ENTRY(h2span_relay) entry; /* from link */ + RB_ENTRY(h2span_relay) rbnode; /* from h2span_conn */ + struct h2span_conn *conn; /* related CONN transaction */ + dmsg_state_t *source_rt; /* h2span_link state */ + dmsg_state_t *target_rt; /* h2span_relay state */ }; - typedef struct h2span_media h2span_media_t; typedef struct h2span_conn h2span_conn_t; typedef struct h2span_cluster h2span_cluster_t; @@ -300,6 +214,10 @@ typedef struct h2span_relay h2span_relay_t; #define dmsg_termstr(array) _dmsg_termstr((array), sizeof(array)) +static h2span_relay_t *dmsg_generate_relay(h2span_conn_t *conn, + h2span_link_t *slink); +static uint32_t dmsg_rnss(void); + static __inline void _dmsg_termstr(char *base, size_t size) @@ -328,7 +246,9 @@ h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2) } /* - * Match against the uuid. Currently we never match against the label. + * Match against fs_label/pfs_fsid. Together these two items represent a + * unique node. In most cases the primary differentiator is pfs_fsid but + * we also string-match fs_label. */ static int @@ -336,7 +256,9 @@ h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2) { int r; - r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL); + r = strcmp(node1->fs_label, node2->fs_label); + if (r == 0) + r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL); return (r); } @@ -356,6 +278,10 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2) return(-1); if (link1->dist > link2->dist) return(1); + if (link1->rnss < link2->rnss) + return(-1); + if (link1->rnss > link2->rnss) + return(1); #if 1 if ((uintptr_t)link1->state < (uintptr_t)link2->state) return(-1); @@ -379,8 +305,8 @@ static int h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) { - h2span_link_t *link1 = relay1->link; - h2span_link_t *link2 = relay2->link; + h2span_link_t *link1 = relay1->source_rt->any.link; + h2span_link_t *link2 = relay2->source_rt->any.link; if ((intptr_t)link1->node < (intptr_t)link2->node) return(-1); @@ -390,6 +316,10 @@ h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) return(-1); if (link1->dist > link2->dist) return(1); + if (link1->rnss < link2->rnss) + return(-1); + if (link1->rnss > link2->rnss) + return(1); #if 1 if ((uintptr_t)link1->state < (uintptr_t)link2->state) return(-1); @@ -432,6 +362,7 @@ static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq); static void dmsg_lnk_span(dmsg_msg_t *msg); static void dmsg_lnk_conn(dmsg_msg_t *msg); +static void dmsg_lnk_circ(dmsg_msg_t *msg); static void dmsg_lnk_relay(dmsg_msg_t *msg); static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node); static void dmsg_relay_delete(h2span_relay_t *relay); @@ -442,7 +373,7 @@ static void dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname); void -dmsg_msg_lnk_signal(dmsg_router_t *router __unused) +dmsg_msg_lnk_signal(dmsg_iocom_t *iocom __unused) { pthread_mutex_lock(&cluster_mtx); dmsg_relay_scan(NULL, NULL); @@ -450,20 +381,28 @@ dmsg_msg_lnk_signal(dmsg_router_t *router __unused) } /* - * Receive a DMSG_PROTO_LNK message. This only called for - * one-way and opening-transactions since state->func will be assigned - * in all other cases. + * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK. + * (incoming iocom lock not held) + * + * This function is typically called for one-way and opening-transactions + * since state->func is assigned after that, but it will also be called + * if no state->func is assigned on transaction-open. */ void dmsg_msg_lnk(dmsg_msg_t *msg) { - switch(msg->any.head.cmd & DMSGF_BASECMDMASK) { + uint32_t icmd = msg->state ? msg->state->icmd : msg->any.head.cmd; + + switch(icmd & DMSGF_BASECMDMASK) { case DMSG_LNK_CONN: dmsg_lnk_conn(msg); break; case DMSG_LNK_SPAN: dmsg_lnk_span(msg); break; + case DMSG_LNK_CIRC: + dmsg_lnk_circ(msg); + break; default: fprintf(stderr, "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd); @@ -473,6 +412,13 @@ dmsg_msg_lnk(dmsg_msg_t *msg) } } +/* + * LNK_CONN - iocom identify message reception. + * (incoming iocom lock not held) + * + * Remote node identifies itself to us, sets up a SPAN filter, and gives us + * the ok to start transmitting SPANs. + */ void dmsg_lnk_conn(dmsg_msg_t *msg) { @@ -505,6 +451,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg) conn = dmsg_alloc(sizeof(*conn)); RB_INIT(&conn->tree); + state->iocom->conn = conn; /* XXX only one */ conn->state = state; state->func = dmsg_lnk_conn; state->any.conn = conn; @@ -529,7 +476,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg) if ((msg->any.head.cmd & DMSGF_DELETE) == 0) { dmsg_msg_result(msg, 0); - dmsg_router_signal(msg->router); + dmsg_iocom_signal(msg->iocom); break; } /* FALL THROUGH */ @@ -590,6 +537,7 @@ deleteconn: conn->media = NULL; conn->state = NULL; msg->state->any.conn = NULL; + msg->state->iocom->conn = NULL; TAILQ_REMOVE(&connq, conn, entry); dmsg_free(conn); @@ -642,6 +590,13 @@ deleteconn: pthread_mutex_unlock(&cluster_mtx); } +/* + * LNK_SPAN - Spanning tree protocol message reception + * (incoming iocom lock not held) + * + * Receive a spanning tree transactional message, creating or destroying + * a SPAN and propagating it to other iocoms. + */ void dmsg_lnk_span(dmsg_msg_t *msg) { @@ -692,16 +647,23 @@ dmsg_lnk_span(dmsg_msg_t *msg) * Find the node */ dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid; + bcopy(msg->any.lnk_span.fs_label, dummy_node.fs_label, + sizeof(dummy_node.fs_label)); node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node); if (node == NULL) { node = dmsg_alloc(sizeof(*node)); node->pfs_fsid = msg->any.lnk_span.pfs_fsid; + node->pfs_type = msg->any.lnk_span.pfs_type; bcopy(msg->any.lnk_span.fs_label, node->fs_label, sizeof(node->fs_label)); node->cls = cls; RB_INIT(&node->tree); RB_INSERT(h2span_node_tree, &cls->tree, node); + if (dmsg_node_handler) { + dmsg_node_handler(&node->opaque, msg, + DMSG_NODEOP_ADD); + } } /* @@ -712,27 +674,15 @@ dmsg_lnk_span(dmsg_msg_t *msg) TAILQ_INIT(&slink->relayq); slink->node = node; slink->dist = msg->any.lnk_span.dist; + slink->rnss = msg->any.lnk_span.rnss; slink->state = state; state->any.link = slink; - /* - * Embedded router structure in link for message forwarding. - * - * The spanning id for the router is the message id of - * the SPAN link it is embedded in, allowing messages to - * be routed via &slink->router. - */ - slink->router = dmsg_router_alloc(); - slink->router->iocom = state->iocom; - slink->router->link = slink; - slink->router->target = state->msgid; - dmsg_router_connect(slink->router); - RB_INSERT(h2span_link_tree, &node->tree, slink); fprintf(stderr, "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n", - msg->router->iocom, + msg->iocom, slink, dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc), msg->any.lnk_span.cl_label, @@ -742,7 +692,7 @@ dmsg_lnk_span(dmsg_msg_t *msg) #if 0 dmsg_relay_scan(NULL, node); #endif - dmsg_router_signal(msg->router); + dmsg_iocom_signal(msg->iocom); } /* @@ -755,7 +705,7 @@ dmsg_lnk_span(dmsg_msg_t *msg) cls = node->cls; fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n", - msg->router->iocom, + msg->iocom, slink, dmsg_uuid_to_str(&cls->pfs_clid, &alloc), state->msg->any.lnk_span.cl_label, @@ -763,11 +713,6 @@ dmsg_lnk_span(dmsg_msg_t *msg) state->msg->any.lnk_span.dist); free(alloc); - /* - * Remove the router from consideration - */ - dmsg_router_disconnect(&slink->router); - /* * Clean out all relays. This requires terminating each * relay transaction. @@ -782,6 +727,10 @@ dmsg_lnk_span(dmsg_msg_t *msg) RB_REMOVE(h2span_link_tree, &node->tree, slink); if (RB_EMPTY(&node->tree)) { RB_REMOVE(h2span_node_tree, &cls->tree, node); + if (dmsg_node_handler) { + dmsg_node_handler(&node->opaque, msg, + DMSG_NODEOP_DEL); + } if (RB_EMPTY(&cls->tree) && cls->refs == 0) { RB_REMOVE(h2span_cluster_tree, &cluster_tree, cls); @@ -812,35 +761,219 @@ dmsg_lnk_span(dmsg_msg_t *msg) dmsg_relay_scan(NULL, node); #endif if (node) - dmsg_router_signal(msg->router); + dmsg_iocom_signal(msg->iocom); } pthread_mutex_unlock(&cluster_mtx); } /* - * Messages received on relay SPANs. These are open transactions so it is - * in fact possible for the other end to close the transaction. + * LNK_CIRC - Virtual circuit protocol message reception + * (incoming iocom lock not held) * - * XXX MPRACE on state structure + * Handles all cases. */ -static void -dmsg_lnk_relay(dmsg_msg_t *msg) +void +dmsg_lnk_circ(dmsg_msg_t *msg) { - dmsg_state_t *state = msg->state; - h2span_relay_t *relay; + dmsg_circuit_t *circA; + dmsg_circuit_t *circB; + dmsg_state_t *rx_state; + dmsg_state_t *tx_state; + dmsg_state_t *state; + dmsg_state_t dummy; + dmsg_msg_t *fwd_msg; + dmsg_iocom_t *iocomA; + dmsg_iocom_t *iocomB; + + /*pthread_mutex_lock(&cluster_mtx);*/ + + switch (msg->any.head.cmd & (DMSGF_CREATE | + DMSGF_DELETE | + DMSGF_REPLY)) { + case DMSGF_CREATE: + case DMSGF_CREATE | DMSGF_DELETE: + /* + * (A) wishes to establish a virtual circuit through us to (B). + * (B) is specified by lnk_circ.target (the message id for + * a LNK_SPAN that (A) received from us which represents (B)). + * + * Designate the originator of the circuit (the current + * remote end) as (A) and the other side as (B). + * + * Accept the VC but do not reply. We will wait for the end- + * to-end reply to propagate back. + */ + iocomA = msg->iocom; - assert(msg->any.head.cmd & DMSGF_REPLY); + /* + * Locate the open transaction state that the other end + * specified in . This will be an open SPAN + * transaction that we transmitted (h2span_relay) over + * the interface the LNK_CIRC is being received on. + * + * (all LNK_CIRC's that we transmit are on circuit0) + */ + pthread_mutex_lock(&iocomA->mtx); + dummy.msgid = msg->any.lnk_circ.target; + tx_state = RB_FIND(dmsg_state_tree, + &iocomA->circuit0.statewr_tree, + &dummy); + /* XXX state refs */ + assert(tx_state); + pthread_mutex_unlock(&iocomA->mtx); + + /* locate h2span_link */ + rx_state = tx_state->any.relay->source_rt; - if (msg->any.head.cmd & DMSGF_DELETE) { - pthread_mutex_lock(&cluster_mtx); - if ((relay = state->any.relay) != NULL) { - dmsg_relay_delete(relay); - } else { - dmsg_state_reply(state, 0); + /* + * A wishes to establish a VC through us to the + * specified target. + * + * A sends us the msgid of an open SPAN transaction + * it received from us as . + */ + circA = dmsg_alloc(sizeof(*circA)); + circA->iocom = iocomA; + circA->state = msg->state; /* LNK_CIRC state */ + circA->msgid = msg->state->msgid; + circA->span_state = tx_state; /* H2SPAN_RELAY state */ + circA->is_relay = 1; + circA->refs = 2; /* state and peer */ + msg->state->any.circ = circA; + + iocomB = rx_state->iocom; + + circB = dmsg_alloc(sizeof(*circB)); + + /* + * Create a LNK_CIRC transaction on B + */ + fwd_msg = dmsg_msg_alloc(&iocomB->circuit0, + 0, DMSG_LNK_CIRC | DMSGF_CREATE, + dmsg_lnk_circ, circB); + fwd_msg->state->any.circ = circB; + circB->iocom = iocomB; + circB->state = fwd_msg->state; /* LNK_CIRC state */ + circB->msgid = fwd_msg->any.head.msgid; + circB->span_state = rx_state; /* H2SPAN_LINK state */ + circB->is_relay = 0; + circB->refs = 2; /* state and peer */ + + /* + * Link the two circuits together. + */ + circA->peer = circB; + circB->peer = circA; + + if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA)) + assert(0); + if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB)) + assert(0); + + dmsg_msg_write(fwd_msg); + + if ((msg->any.head.cmd & DMSGF_DELETE) == 0) + break; + /* FALL THROUGH TO DELETE */ + case DMSGF_DELETE: + /* + * (A) Is deleting the virtual circuit, propogate closure + * to (B). + */ + iocomA = msg->iocom; + circA = msg->state->any.circ; + circB = circA->peer; + assert(msg->state == circA->state); + + /* + * If we are closing A and the peer B is closed, disconnect. + */ + if (circB && (state = circB->state) != NULL) { + if (state->rxcmd & DMSGF_DELETE) { + circB->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circB); + } + dmsg_state_reply(state, msg->any.head.error); } - pthread_mutex_unlock(&cluster_mtx); + + /* + * If both sides now closed terminate the peer association + * and the state association. This may drop up to two refs + * on circA and one on circB. + */ + if (circA->state->txcmd & DMSGF_DELETE) { + if (circB) { + circA->peer = NULL; + circB->peer = NULL; + dmsg_circuit_drop(circA); + dmsg_circuit_drop(circB); /* XXX SMP */ + } + circA->state->any.circ = NULL; + circA->state = NULL; + dmsg_circuit_drop(circA); + } + break; + case DMSGF_REPLY | DMSGF_CREATE: + case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE: + /* + * (B) is acknowledging the creation of the virtual + * circuit. This propagates all the way back to (A), though + * it should be noted that (A) can start issuing commands + * via the virtual circuit before seeing this reply. + */ + circB = msg->state->any.circ; + circA = circB->peer; + assert(msg->state == circB->state); + if (circA && (msg->any.head.cmd & DMSGF_DELETE) == 0) { + dmsg_state_result(circA->state, msg->any.head.error); + break; + } + /* FALL THROUGH TO DELETE */ + case DMSGF_REPLY | DMSGF_DELETE: + /* + * (B) Is deleting the virtual circuit or acknowledging + * our deletion of the virtual circuit, propogate closure + * to (A). + */ + iocomB = msg->iocom; + circB = msg->state->any.circ; + circA = circB->peer; + assert(msg->state == circB->state); + + /* + * If we are closing A and the peer B is closed, disconnect. + */ + if (circA && (state = circA->state) != NULL) { + if (state->rxcmd & DMSGF_DELETE) { + circA->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circA); + } + dmsg_state_reply(state, msg->any.head.error); + } + + /* + * If both sides now closed terminate the peer association + * and the state association. This may drop up to two refs + * on circA and one on circB. + */ + if (circB->state->txcmd & DMSGF_DELETE) { + if (circA) { + circB->peer = NULL; + circA->peer = NULL; + dmsg_circuit_drop(circB); + dmsg_circuit_drop(circA); /* XXX SMP */ + } + circB->state->any.circ = NULL; + circB->state = NULL; + dmsg_circuit_drop(circB); + } + break; } + + /*pthread_mutex_lock(&cluster_mtx);*/ } /* @@ -911,9 +1044,9 @@ dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg) { struct relay_scan_info *info = arg; - if ((intptr_t)relay->link->node < (intptr_t)info->node) + if ((intptr_t)relay->source_rt->any.link->node < (intptr_t)info->node) return(-1); - if ((intptr_t)relay->link->node > (intptr_t)info->node) + if ((intptr_t)relay->source_rt->any.link->node > (intptr_t)info->node) return(1); return(0); } @@ -936,8 +1069,10 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) h2span_link_t *slink; dmsg_lnk_conn_t *lconn; dmsg_lnk_span_t *lspan; - dmsg_msg_t *msg; - int count = 2; + int count; + int maxcount = 2; + uint32_t lastdist = DMSG_SPAN_MAXDIST; + uint32_t lastrnss = 0; info.node = node; info.relay = NULL; @@ -951,7 +1086,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) relay = info.relay; info.relay = NULL; if (relay) - assert(relay->link->node == node); + assert(relay->source_rt->any.link->node == node); if (DMsgDebugOpt > 8) fprintf(stderr, "relay scan for connection %p\n", conn); @@ -969,15 +1104,39 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * removed the relay, so the relay can only match exactly or * be worse). */ + count = 0; RB_FOREACH(slink, h2span_link_tree, &node->tree) { + /* + * Increment count of successful relays. This isn't + * quite accurate if we break out but nothing after + * the loop uses (count). + * + * If count exceeds the maximum number of relays we desire + * we normally want to break out. However, in order to + * guarantee a symmetric path we have to continue if both + * (dist) and (rnss) continue to match. Otherwise the SPAN + * propagation in the reverse direction may choose different + * routes and we will not have a symmetric path. + * + * NOTE: Spanning tree does not have to be symmetrical so + * this code is not currently enabled. + */ + if (++count >= maxcount) { +#ifdef REQUIRE_SYMMETRICAL + if (lastdist != slink->dist || lastrnss != slink->rnss) + break; +#else + break; +#endif + /* go beyond the nominal maximum desired relays */ + } + /* * Match, relay already in-place, get the next * relay to match against the next slink. */ - if (relay && relay->link == slink) { + if (relay && relay->source_rt->any.link == slink) { relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); - if (--count == 0) - break; continue; } @@ -1004,11 +1163,16 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * * Don't bother transmitting if the remote connection * is not accepting this SPAN's peer_type. + * + * pfs_mask is typically used so pure clients can filter + * out receiving SPANs for other pure clients. */ lspan = &slink->state->msg->any.lnk_span; lconn = &conn->state->msg->any.lnk_conn; if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0) break; + if (((1LLU << lspan->pfs_type) & lconn->pfs_mask) == 0) + break; /* * Do not give pure clients visibility to other pure clients @@ -1042,7 +1206,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) } /* - * NOTE! fs_uuid differentiates nodes within the same cluster + * NOTE! pfs_fsid differentiates nodes within the same cluster * so we obviously don't want to match those. Similarly * for fs_label. */ @@ -1051,47 +1215,17 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * Ok, we've accepted this SPAN for relaying. */ assert(relay == NULL || - relay->link->node != slink->node || - relay->link->dist >= slink->dist); - relay = dmsg_alloc(sizeof(*relay)); - relay->conn = conn; - relay->link = slink; - - msg = dmsg_msg_alloc(conn->state->iocom->router, 0, - DMSG_LNK_SPAN | - DMSGF_CREATE, - dmsg_lnk_relay, relay); - relay->state = msg->state; - relay->router = dmsg_router_alloc(); - relay->router->iocom = relay->state->iocom; - relay->router->relay = relay; - relay->router->target = relay->state->msgid; - - msg->any.lnk_span = slink->state->msg->any.lnk_span; - msg->any.lnk_span.dist = slink->dist + 1; - - dmsg_router_connect(relay->router); - - RB_INSERT(h2span_relay_tree, &conn->tree, relay); - TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); - - dmsg_msg_write(msg); - - fprintf(stderr, - "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d " - "FD %d state %p\n", - slink, - relay, - node->cls, node, slink->dist, - conn->state->iocom->sock_fd, relay->state); + relay->source_rt->any.link->node != slink->node || + relay->source_rt->any.link->dist >= slink->dist); + relay = dmsg_generate_relay(conn, slink); + lastdist = slink->dist; + lastrnss = slink->rnss; /* * Match (created new relay), get the next relay to * match against the next slink. */ relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); - if (--count == 0) - break; } /* @@ -1099,38 +1233,102 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * the node are in excess of the current aggregate spanning state * and should be removed. */ - while (relay && relay->link->node == node) { + while (relay && relay->source_rt->any.link->node == node) { next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); dmsg_relay_delete(relay); relay = next_relay; } } +/* + * Helper function to generate missing relay. + * + * cluster_mtx must be held + */ +static +h2span_relay_t * +dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink) +{ + h2span_relay_t *relay; + h2span_node_t *node; + dmsg_msg_t *msg; + + node = slink->node; + + relay = dmsg_alloc(sizeof(*relay)); + relay->conn = conn; + relay->source_rt = slink->state; + /* relay->source_rt->any.link = slink; */ + + /* + * NOTE: relay->target_rt->any.relay set to relay by alloc. + */ + msg = dmsg_msg_alloc(&conn->state->iocom->circuit0, + 0, DMSG_LNK_SPAN | DMSGF_CREATE, + dmsg_lnk_relay, relay); + relay->target_rt = msg->state; + + msg->any.lnk_span = slink->state->msg->any.lnk_span; + msg->any.lnk_span.dist = slink->dist + 1; + msg->any.lnk_span.rnss = slink->rnss + dmsg_rnss(); + + RB_INSERT(h2span_relay_tree, &conn->tree, relay); + TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); + + dmsg_msg_write(msg); + + return (relay); +} + +/* + * Messages received on relay SPANs. These are open transactions so it is + * in fact possible for the other end to close the transaction. + * + * XXX MPRACE on state structure + */ +static void +dmsg_lnk_relay(dmsg_msg_t *msg) +{ + dmsg_state_t *state = msg->state; + h2span_relay_t *relay; + + assert(msg->any.head.cmd & DMSGF_REPLY); + + if (msg->any.head.cmd & DMSGF_DELETE) { + pthread_mutex_lock(&cluster_mtx); + if ((relay = state->any.relay) != NULL) { + dmsg_relay_delete(relay); + } else { + dmsg_state_reply(state, 0); + } + pthread_mutex_unlock(&cluster_mtx); + } +} + + static void dmsg_relay_delete(h2span_relay_t *relay) { fprintf(stderr, "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n", - relay->link, + relay->source_rt->any.link, relay, - relay->link->node->cls, relay->link->node, - relay->link->dist, - relay->conn->state->iocom->sock_fd, relay->state); - - dmsg_router_disconnect(&relay->router); + relay->source_rt->any.link->node->cls, relay->source_rt->any.link->node, + relay->source_rt->any.link->dist, + relay->conn->state->iocom->sock_fd, relay->target_rt); RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay); - TAILQ_REMOVE(&relay->link->relayq, relay, entry); + TAILQ_REMOVE(&relay->source_rt->any.link->relayq, relay, entry); - if (relay->state) { - relay->state->any.relay = NULL; - dmsg_state_reply(relay->state, 0); + if (relay->target_rt) { + relay->target_rt->any.relay = NULL; + dmsg_state_reply(relay->target_rt, 0); /* state invalid after reply */ - relay->state = NULL; + relay->target_rt = NULL; } relay->conn = NULL; - relay->link = NULL; + relay->source_rt = NULL; dmsg_free(relay); } @@ -1225,6 +1423,48 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname) } } +/************************************************************************ + * MESSAGE ROUTING AND SOURCE VALIDATION * + ************************************************************************/ + +int +dmsg_circuit_relay(dmsg_msg_t *msg) +{ + dmsg_iocom_t *iocom = msg->iocom; + dmsg_circuit_t *circ; + dmsg_circuit_t *peer; + dmsg_circuit_t dummy; + int error = 0; + + /* + * Relay occurs before any state processing, msg state should always + * be NULL. + */ + assert(msg->state == NULL); + + /* + * Lookup the circuit on the incoming iocom. + */ + pthread_mutex_lock(&cluster_mtx); + + dummy.msgid = msg->any.head.circuit; + circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy); + assert(circ); + peer = circ->peer; + + msg->iocom = peer->iocom; + msg->any.head.circuit = peer->msgid; + + pthread_mutex_unlock(&cluster_mtx); + + fprintf(stderr, "ROUTE MESSAGE VC %08x to %08x\n", + (uint32_t)circ->msgid, (uint32_t)peer->msgid); /* brevity */ + dmsg_msg_write(msg); + error = DMSG_IOQ_ERROR_ROUTED; + + return error; +} + /************************************************************************ * ROUTER AND MESSAGING HANDLES * ************************************************************************ @@ -1285,31 +1525,11 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid) #endif -#if 0 -/* - * Acquire a persistent router structure given the cluster and node ids. - * Messages can be transacted via this structure while held. If the route - * is lost messages will return failure. - */ -dmsg_router_t * -dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid) -{ -} - -/* - * Release previously acquired router. - */ -void -dmsg_router_put(dmsg_router_t *router) -{ -} -#endif - /* * Dumps the spanning tree */ void -dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused) +dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) { h2span_cluster_t *cls; h2span_node_t *node; @@ -1318,15 +1538,17 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused) pthread_mutex_lock(&cluster_mtx); RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { - dmsg_router_printf(router, "Cluster %s (%s)\n", - dmsg_uuid_to_str(&cls->pfs_clid, &uustr), - cls->cl_label); + dmsg_circuit_printf(circuit, "Cluster %s %s (%s)\n", + dmsg_peer_type_to_str(cls->peer_type), + dmsg_uuid_to_str(&cls->pfs_clid, &uustr), + cls->cl_label); RB_FOREACH(node, h2span_node_tree, &cls->tree) { - dmsg_router_printf(router, " Node %s (%s)\n", + dmsg_circuit_printf(circuit, " Node %s %s (%s)\n", + dmsg_pfs_type_to_str(node->pfs_type), dmsg_uuid_to_str(&node->pfs_fsid, &uustr), node->fs_label); RB_FOREACH(slink, h2span_link_tree, &node->tree) { - dmsg_router_printf(router, + dmsg_circuit_printf(circuit, "\tLink dist=%d via %d\n", slink->dist, slink->state->iocom->sock_fd); @@ -1341,3 +1563,27 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused) } #endif } + +/* + * Random number sub-sort value to add to SPAN rnss fields on relay. + * This allows us to differentiate spans with the same field + * for relaying purposes. We must normally limit the number of relays + * for any given SPAN origination but we must also guarantee that a + * symmetric reverse path exists, so we use the rnss field as a sub-sort + * (since there can be thousands or millions if we only match on ), + * and if there STILL too many spans we go past the limit. + */ +static +uint32_t +dmsg_rnss(void) +{ + if (DMsgRNSS == 0) { + pthread_mutex_lock(&cluster_mtx); + while (DMsgRNSS == 0) { + srandomdev(); + DMsgRNSS = random(); + } + pthread_mutex_unlock(&cluster_mtx); + } + return(DMsgRNSS); +} diff --git a/lib/libdmsg/service.c b/lib/libdmsg/service.c index 387b1cbcd6..43fcb71133 100644 --- a/lib/libdmsg/service.c +++ b/lib/libdmsg/service.c @@ -35,9 +35,9 @@ #include "dmsg_local.h" -static void master_auth_signal(dmsg_router_t *router); +static void master_auth_signal(dmsg_iocom_t *iocom); static void master_auth_rxmsg(dmsg_msg_t *msg); -static void master_link_signal(dmsg_router_t *router); +static void master_link_signal(dmsg_iocom_t *iocom); static void master_link_rxmsg(dmsg_msg_t *msg); /* @@ -87,7 +87,7 @@ static void master_auth_conn_rx(dmsg_msg_t *msg); static void -master_auth_signal(dmsg_router_t *router) +master_auth_signal(dmsg_iocom_t *iocom) { dmsg_msg_t *msg; @@ -97,14 +97,16 @@ master_auth_signal(dmsg_router_t *router) * * XXX put additional authentication states here? */ - msg = dmsg_msg_alloc(router, 0, DMSG_LNK_CONN | DMSGF_CREATE, + msg = dmsg_msg_alloc(&iocom->circuit0, 0, + DMSG_LNK_CONN | DMSGF_CREATE, master_auth_conn_rx, NULL); msg->any.lnk_conn.peer_mask = (uint64_t)-1; msg->any.lnk_conn.peer_type = DMSG_PEER_CLUSTER; + msg->any.lnk_conn.pfs_mask = (uint64_t)-1; dmsg_msg_write(msg); - dmsg_router_restate(router, + dmsg_iocom_restate(iocom, master_link_signal, master_link_rxmsg, NULL); @@ -132,9 +134,9 @@ master_auth_rxmsg(dmsg_msg_t *msg __unused) */ static void -master_link_signal(dmsg_router_t *router) +master_link_signal(dmsg_iocom_t *iocom) { - dmsg_msg_lnk_signal(router); + dmsg_msg_lnk_signal(iocom); } static @@ -192,7 +194,7 @@ dmsg_msg_dbg(dmsg_msg_t *msg) */ if (msg->aux_data) msg->aux_data[msg->aux_size - 1] = 0; - msg->router->dbgmsg_callback(msg); + msg->iocom->dbgmsg_callback(msg); dmsg_msg_reply(msg, 0); break; case DMSG_DBG_SHELL | DMSGF_REPLY: @@ -216,11 +218,11 @@ dmsg_msg_dbg(dmsg_msg_t *msg) * not modified and stays intact. We use a one-way message with REPLY set * to distinguish between a debug command and debug terminal output. * - * To prevent loops router_printf() can filter the message (cmd) related - * to the router_printf(). We filter out DBG messages. + * To prevent loops circuit_printf() can filter the message (cmd) related + * to the circuit_printf(). We filter out DBG messages. */ void -dmsg_router_printf(dmsg_router_t *router, const char *ctl, ...) +dmsg_circuit_printf(dmsg_circuit_t *circuit, const char *ctl, ...) { dmsg_msg_t *rmsg; va_list va; @@ -232,8 +234,9 @@ dmsg_router_printf(dmsg_router_t *router, const char *ctl, ...) va_end(va); len = strlen(buf) + 1; - rmsg = dmsg_msg_alloc(router, len, DMSG_DBG_SHELL | DMSGF_REPLY, - NULL, NULL); + rmsg = dmsg_msg_alloc(circuit, len, + DMSG_DBG_SHELL | DMSGF_REPLY, + NULL, NULL); bcopy(buf, rmsg->aux_data, len); dmsg_msg_write(rmsg); diff --git a/lib/libdmsg/subs.c b/lib/libdmsg/subs.c index fc744d493b..492d815bc3 100644 --- a/lib/libdmsg/subs.c +++ b/lib/libdmsg/subs.c @@ -67,6 +67,52 @@ dmsg_uuid_to_str(uuid_t *uuid, char **strp) return (*strp); } +const char * +dmsg_peer_type_to_str(uint8_t type) +{ + switch(type) { + case DMSG_PEER_NONE: + return("NONE"); + case DMSG_PEER_CLUSTER: + return("CLUSTER"); + case DMSG_PEER_BLOCK: + return("BLOCK"); + case DMSG_PEER_HAMMER2: + return("HAMMER2"); + default: + return("?PEERTYPE?"); + } +} + +const char * +dmsg_pfs_type_to_str(uint8_t type) +{ + switch(type) { + case DMSG_PFSTYPE_NONE: + return("NONE"); + case DMSG_PFSTYPE_ADMIN: + return("ADMIN"); + case DMSG_PFSTYPE_CLIENT: + return("CLIENT"); + case DMSG_PFSTYPE_CACHE: + return("CACHE"); + case DMSG_PFSTYPE_COPY: + return("COPY"); + case DMSG_PFSTYPE_SLAVE: + return("SLAVE"); + case DMSG_PFSTYPE_SOFT_SLAVE: + return("SOFT_SLAVE"); + case DMSG_PFSTYPE_SOFT_MASTER: + return("SOFT_MASTER"); + case DMSG_PFSTYPE_MASTER: + return("MASTER"); + case DMSG_PFSTYPE_SERVER: + return("SERVER"); + default: + return("?PFSTYPE?"); + } +} + int dmsg_connect(const char *hostname) {