X-Git-Url: https://gitweb.dragonflybsd.org/~tuxillo/dragonfly.git/blobdiff_plain/0c3a8cd0eb169fa13895a5691f9a1c298c489721..a2179323cae23105c50b96e4fe13169efe445734:/lib/libdmsg/msg_lnk.c diff --git a/lib/libdmsg/msg_lnk.c b/lib/libdmsg/msg_lnk.c index 6a48c6bbd8..e2e2b737cf 100644 --- a/lib/libdmsg/msg_lnk.c +++ b/lib/libdmsg/msg_lnk.c @@ -32,97 +32,14 @@ * SUCH DAMAGE. */ /* - * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - * - * This code supports the LNK_SPAN protocol. Essentially all PFS's - * clients and services rendezvous with the userland hammer2 service and - * open LNK_SPAN transactions using a message header linkid of 0, - * registering any PFS's they have connectivity to with us. - * - * -- - * - * Each registration maintains its own open LNK_SPAN message transaction. - * The SPANs are collected, aggregated, and retransmitted over available - * connections through the maintainance of additional LNK_SPAN message - * transactions on each link. - * - * The msgid for each active LNK_SPAN transaction we receive allows us to - * send a message to the target PFS (which might be one of many belonging - * to the same cluster), by specifying that msgid as the linkid in any - * message we send to the target PFS. - * - * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit - * (and remember we will maintain multiple open LNK_SPAN transactions on - * each connection representing the topology span, so every node sees every - * other node as a separate open transaction). So, similarly the msgid for - * these active transactions which we initiated can be used by the other - * end to route messages through us to another node, ultimately winding up - * at the identified hammer2 PFS. We have to adjust the spanid in the message - * header at each hop to be representative of the outgoing LNK_SPAN we - * are forwarding the message through. - * - * -- - * - * If we were to retransmit every LNK_SPAN transaction we receive it would - * create a huge mess, so we have to aggregate all received LNK_SPAN - * transactions, sort them by the fsid (the cluster) and sub-sort them by - * the pfs_fsid (individual nodes in the cluster), and only retransmit - * (create outgoing transactions) for a subset of the nearest distance-hops - * for each individual node. - * - * The higher level protocols can then issue transactions to the nodes making - * up a cluster to perform all actions required. - * - * -- - * - * Since this is a large topology and a spanning tree protocol, links can - * go up and down all the time. Any time a link goes down its transaction - * is closed. The transaction has to be closed on both ends before we can - * delete (and potentially reuse) the related spanid. The LNK_SPAN being - * closed may have been propagated out to other connections and those related - * LNK_SPANs are also closed. Ultimately all routes via the lost LNK_SPAN - * go away, ultimately reaching all sources and all targets. - * - * Any messages in-transit using a route that goes away will be thrown away. - * Open transactions are only tracked at the two end-points. When a link - * failure propagates to an end-point the related open transactions lose - * their spanid and are automatically aborted. - * - * It is important to note that internal route nodes cannot just associate - * a lost LNK_SPAN transaction with another route to the same destination. - * Message transactions MUST be serialized and MUST be ordered. All messages - * for a transaction must run over the same route. So if the route used by - * an active transaction is lost, the related messages will be fully aborted - * and the higher protocol levels will retry as appropriate. - * - * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation - * back to the originator. Only the originator keeps tracks of a message. - * Routers just pass it through. If a route is lost during transit the - * message is simply thrown away. - * - * It is also important to note that several paths to the same PFS can be - * propagated along the same link, which allows concurrency and even - * redundancy over several network interfaces or via different routes through - * the topology. Any given transaction will use only a single route but busy - * servers will often have hundreds of transactions active simultaniously, - * so having multiple active paths through the network topology for A<->B - * will improve performance. - * - * -- - * - * Most protocols consolidate operations rather than simply relaying them. - * This is particularly true of LEAF protocols (such as strict HAMMER2 - * clients), of which there can be millions connecting into the cluster at - * various points. The SPAN protocol is not used for these LEAF elements. - * - * Instead the primary service they connect to implements a proxy for the - * client protocols so the core topology only has to propagate a couple of - * LNK_SPANs and not millions. LNK_SPANs are meant to be used only for - * core master nodes and satellite slaves and cache nodes. + * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS - Please see sys/dmsg.h for an + * involved explanation of the protocol. */ #include "dmsg_local.h" +void (*dmsg_node_handler)(void **opaquep, struct dmsg_msg *msg, int op); + /* * Maximum spanning tree distance. This has the practical effect of * stopping tail-chasing closed loops when a feeder span is lost. @@ -193,6 +110,7 @@ RB_HEAD(h2span_cluster_tree, h2span_cluster); RB_HEAD(h2span_node_tree, h2span_node); RB_HEAD(h2span_link_tree, h2span_link); RB_HEAD(h2span_relay_tree, h2span_relay); +uint32_t DMsgRNSS; /* * This represents a media @@ -239,6 +157,8 @@ struct h2span_cluster { RB_ENTRY(h2span_cluster) rbnode; struct h2span_node_tree tree; uuid_t pfs_clid; /* shared fsid */ + uint8_t peer_type; + char cl_label[128]; /* cluster label (typ PEER_BLOCK) */ int refs; /* prevents destruction */ }; @@ -246,48 +166,45 @@ struct h2span_node { RB_ENTRY(h2span_node) rbnode; struct h2span_link_tree tree; struct h2span_cluster *cls; + uint8_t pfs_type; uuid_t pfs_fsid; /* unique fsid */ - char label[64]; + char fs_label[128]; /* fs label (typ PEER_HAMMER2) */ + void *opaque; }; struct h2span_link { RB_ENTRY(h2span_link) rbnode; dmsg_state_t *state; /* state<->link */ struct h2span_node *node; /* related node */ - int32_t dist; + uint32_t dist; + uint32_t rnss; struct h2span_relay_queue relayq; /* relay out */ - struct dmsg_router *router; /* route out this link */ }; /* * Any LNK_SPAN transactions we receive which are relayed out other - * connections utilize this structure to track the LNK_SPAN transaction - * we initiate on the other connections, if selected for relay. + * connections utilize this structure to track the LNK_SPAN transactions + * we initiate (relay out) on other connections. We only relay out + * LNK_SPANs on connections we have an open CONN transaction for. + * + * The relay structure points to the outgoing LNK_SPAN trans (out_state) + * and to the incoming LNK_SPAN transaction (in_state). The relay + * structure holds refs on the related states. * * In many respects this is the core of the protocol... actually figuring * out what LNK_SPANs to relay. The spanid used for relaying is the * address of the 'state' structure, which is why h2span_relay has to * be entered into a RB-TREE based at h2span_conn (so we can look * up the spanid to validate it). - * - * NOTE: Messages can be received via the LNK_SPAN transaction the - * relay maintains, and can be replied via relay->router, but - * messages are NOT initiated via a relay. Messages are initiated - * via incoming links (h2span_link's). - * - * relay->link represents the link being relayed, NOT the LNK_SPAN - * transaction the relay is holding open. */ struct h2span_relay { - RB_ENTRY(h2span_relay) rbnode; /* from h2span_conn */ - TAILQ_ENTRY(h2span_relay) entry; /* from link */ - struct h2span_conn *conn; - dmsg_state_t *state; /* transmitted LNK_SPAN */ - struct h2span_link *link; /* LNK_SPAN being relayed */ - struct dmsg_router *router;/* route out this relay */ + TAILQ_ENTRY(h2span_relay) entry; /* from link */ + RB_ENTRY(h2span_relay) rbnode; /* from h2span_conn */ + struct h2span_conn *conn; /* related CONN transaction */ + dmsg_state_t *source_rt; /* h2span_link state */ + dmsg_state_t *target_rt; /* h2span_relay state */ }; - typedef struct h2span_media h2span_media_t; typedef struct h2span_conn h2span_conn_t; typedef struct h2span_cluster h2span_cluster_t; @@ -295,18 +212,54 @@ typedef struct h2span_node h2span_node_t; typedef struct h2span_link h2span_link_t; typedef struct h2span_relay h2span_relay_t; +#define dmsg_termstr(array) _dmsg_termstr((array), sizeof(array)) + +static h2span_relay_t *dmsg_generate_relay(h2span_conn_t *conn, + h2span_link_t *slink); +static uint32_t dmsg_rnss(void); + +static __inline +void +_dmsg_termstr(char *base, size_t size) +{ + base[size-1] = 0; +} + +/* + * Cluster peer_type, uuid, AND label must match for a match + */ static int h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2) { - return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL)); + int r; + + if (cls1->peer_type < cls2->peer_type) + return(-1); + if (cls1->peer_type > cls2->peer_type) + return(1); + r = uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL); + if (r == 0) + r = strcmp(cls1->cl_label, cls2->cl_label); + + return r; } +/* + * Match against fs_label/pfs_fsid. Together these two items represent a + * unique node. In most cases the primary differentiator is pfs_fsid but + * we also string-match fs_label. + */ static int h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2) { - return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL)); + int r; + + r = strcmp(node1->fs_label, node2->fs_label); + if (r == 0) + r = uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL); + return (r); } /* @@ -325,6 +278,10 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2) return(-1); if (link1->dist > link2->dist) return(1); + if (link1->rnss < link2->rnss) + return(-1); + if (link1->rnss > link2->rnss) + return(1); #if 1 if ((uintptr_t)link1->state < (uintptr_t)link2->state) return(-1); @@ -348,8 +305,8 @@ static int h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) { - h2span_link_t *link1 = relay1->link; - h2span_link_t *link2 = relay2->link; + h2span_link_t *link1 = relay1->source_rt->any.link; + h2span_link_t *link2 = relay2->source_rt->any.link; if ((intptr_t)link1->node < (intptr_t)link2->node) return(-1); @@ -359,6 +316,10 @@ h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2) return(-1); if (link1->dist > link2->dist) return(1); + if (link1->rnss < link2->rnss) + return(-1); + if (link1->rnss > link2->rnss) + return(1); #if 1 if ((uintptr_t)link1->state < (uintptr_t)link2->state) return(-1); @@ -401,6 +362,7 @@ static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq); static void dmsg_lnk_span(dmsg_msg_t *msg); static void dmsg_lnk_conn(dmsg_msg_t *msg); +static void dmsg_lnk_circ(dmsg_msg_t *msg); static void dmsg_lnk_relay(dmsg_msg_t *msg); static void dmsg_relay_scan(h2span_conn_t *conn, h2span_node_t *node); static void dmsg_relay_delete(h2span_relay_t *relay); @@ -411,7 +373,7 @@ static void dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname); void -dmsg_msg_lnk_signal(dmsg_router_t *router __unused) +dmsg_msg_lnk_signal(dmsg_iocom_t *iocom __unused) { pthread_mutex_lock(&cluster_mtx); dmsg_relay_scan(NULL, NULL); @@ -419,20 +381,28 @@ dmsg_msg_lnk_signal(dmsg_router_t *router __unused) } /* - * Receive a DMSG_PROTO_LNK message. This only called for - * one-way and opening-transactions since state->func will be assigned - * in all other cases. + * DMSG_PROTO_LNK - Generic DMSG_PROTO_LNK. + * (incoming iocom lock not held) + * + * This function is typically called for one-way and opening-transactions + * since state->func is assigned after that, but it will also be called + * if no state->func is assigned on transaction-open. */ void dmsg_msg_lnk(dmsg_msg_t *msg) { - switch(msg->any.head.cmd & DMSGF_BASECMDMASK) { + uint32_t icmd = msg->state ? msg->state->icmd : msg->any.head.cmd; + + switch(icmd & DMSGF_BASECMDMASK) { case DMSG_LNK_CONN: dmsg_lnk_conn(msg); break; case DMSG_LNK_SPAN: dmsg_lnk_span(msg); break; + case DMSG_LNK_CIRC: + dmsg_lnk_circ(msg); + break; default: fprintf(stderr, "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd); @@ -442,6 +412,13 @@ dmsg_msg_lnk(dmsg_msg_t *msg) } } +/* + * LNK_CONN - iocom identify message reception. + * (incoming iocom lock not held) + * + * Remote node identifies itself to us, sets up a SPAN filter, and gives us + * the ok to start transmitting SPANs. + */ void dmsg_lnk_conn(dmsg_msg_t *msg) { @@ -455,6 +432,9 @@ dmsg_lnk_conn(dmsg_msg_t *msg) pthread_mutex_lock(&cluster_mtx); + fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n", + msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd); + switch(msg->any.head.cmd & DMSGF_TRANSMASK) { case DMSG_LNK_CONN | DMSGF_CREATE: case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE: @@ -463,16 +443,18 @@ dmsg_lnk_conn(dmsg_msg_t *msg) * acknowledge the request, leaving the transaction open. * We then relay priority-selected SPANs. */ - fprintf(stderr, "LNK_CONN(%08x): %s/%s\n", + fprintf(stderr, "LNK_CONN(%08x): %s/%s/%s\n", (uint32_t)msg->any.head.msgid, dmsg_uuid_to_str(&msg->any.lnk_conn.pfs_clid, &alloc), - msg->any.lnk_conn.label); + msg->any.lnk_conn.cl_label, + msg->any.lnk_conn.fs_label); free(alloc); conn = dmsg_alloc(sizeof(*conn)); RB_INIT(&conn->tree); + state->iocom->conn = conn; /* XXX only one */ conn->state = state; state->func = dmsg_lnk_conn; state->any.conn = conn; @@ -497,7 +479,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg) if ((msg->any.head.cmd & DMSGF_DELETE) == 0) { dmsg_msg_result(msg, 0); - dmsg_router_signal(msg->router); + dmsg_iocom_signal(msg->iocom); break; } /* FALL THROUGH */ @@ -558,6 +540,7 @@ deleteconn: conn->media = NULL; conn->state = NULL; msg->state->any.conn = NULL; + msg->state->iocom->conn = NULL; TAILQ_REMOVE(&connq, conn, entry); dmsg_free(conn); @@ -610,6 +593,13 @@ deleteconn: pthread_mutex_unlock(&cluster_mtx); } +/* + * LNK_SPAN - Spanning tree protocol message reception + * (incoming iocom lock not held) + * + * Receive a spanning tree transactional message, creating or destroying + * a SPAN and propagating it to other iocoms. + */ void dmsg_lnk_span(dmsg_msg_t *msg) { @@ -633,16 +623,25 @@ dmsg_lnk_span(dmsg_msg_t *msg) assert(state->func == NULL); state->func = dmsg_lnk_span; - msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0; + dmsg_termstr(msg->any.lnk_span.cl_label); + dmsg_termstr(msg->any.lnk_span.fs_label); /* * Find the cluster */ dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid; + dummy_cls.peer_type = msg->any.lnk_span.peer_type; + bcopy(msg->any.lnk_span.cl_label, + dummy_cls.cl_label, + sizeof(dummy_cls.cl_label)); cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls); if (cls == NULL) { cls = dmsg_alloc(sizeof(*cls)); cls->pfs_clid = msg->any.lnk_span.pfs_clid; + cls->peer_type = msg->any.lnk_span.peer_type; + bcopy(msg->any.lnk_span.cl_label, + cls->cl_label, + sizeof(cls->cl_label)); RB_INIT(&cls->tree); RB_INSERT(h2span_cluster_tree, &cluster_tree, cls); } @@ -651,15 +650,23 @@ dmsg_lnk_span(dmsg_msg_t *msg) * Find the node */ dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid; + bcopy(msg->any.lnk_span.fs_label, dummy_node.fs_label, + sizeof(dummy_node.fs_label)); node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node); if (node == NULL) { node = dmsg_alloc(sizeof(*node)); node->pfs_fsid = msg->any.lnk_span.pfs_fsid; + node->pfs_type = msg->any.lnk_span.pfs_type; + bcopy(msg->any.lnk_span.fs_label, + node->fs_label, + sizeof(node->fs_label)); node->cls = cls; RB_INIT(&node->tree); RB_INSERT(h2span_node_tree, &cls->tree, node); - snprintf(node->label, sizeof(node->label), - "%s", msg->any.lnk_span.label); + if (dmsg_node_handler) { + dmsg_node_handler(&node->opaque, msg, + DMSG_NODEOP_ADD); + } } /* @@ -670,36 +677,25 @@ dmsg_lnk_span(dmsg_msg_t *msg) TAILQ_INIT(&slink->relayq); slink->node = node; slink->dist = msg->any.lnk_span.dist; + slink->rnss = msg->any.lnk_span.rnss; slink->state = state; state->any.link = slink; - /* - * Embedded router structure in link for message forwarding. - * - * The spanning id for the router is the message id of - * the SPAN link it is embedded in, allowing messages to - * be routed via &slink->router. - */ - slink->router = dmsg_router_alloc(); - slink->router->iocom = state->iocom; - slink->router->link = slink; - slink->router->target = state->msgid; - dmsg_router_connect(slink->router); - RB_INSERT(h2span_link_tree, &node->tree, slink); - fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n", - msg->router->iocom, + fprintf(stderr, + "LNK_SPAN(thr %p): %p %s cl=%s fs=%s dist=%d\n", + msg->iocom, slink, - dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, - &alloc), - msg->any.lnk_span.label, + dmsg_uuid_to_str(&msg->any.lnk_span.pfs_clid, &alloc), + msg->any.lnk_span.cl_label, + msg->any.lnk_span.fs_label, msg->any.lnk_span.dist); free(alloc); #if 0 dmsg_relay_scan(NULL, node); #endif - dmsg_router_signal(msg->router); + dmsg_iocom_signal(msg->iocom); } /* @@ -711,19 +707,15 @@ dmsg_lnk_span(dmsg_msg_t *msg) node = slink->node; cls = node->cls; - fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n", - msg->router->iocom, + fprintf(stderr, "LNK_DELE(thr %p): %p %s cl=%s fs=%s dist=%d\n", + msg->iocom, slink, dmsg_uuid_to_str(&cls->pfs_clid, &alloc), - state->msg->any.lnk_span.label, + state->msg->any.lnk_span.cl_label, + state->msg->any.lnk_span.fs_label, state->msg->any.lnk_span.dist); free(alloc); - /* - * Remove the router from consideration - */ - dmsg_router_disconnect(&slink->router); - /* * Clean out all relays. This requires terminating each * relay transaction. @@ -738,6 +730,10 @@ dmsg_lnk_span(dmsg_msg_t *msg) RB_REMOVE(h2span_link_tree, &node->tree, slink); if (RB_EMPTY(&node->tree)) { RB_REMOVE(h2span_node_tree, &cls->tree, node); + if (dmsg_node_handler) { + dmsg_node_handler(&node->opaque, msg, + DMSG_NODEOP_DEL); + } if (RB_EMPTY(&cls->tree) && cls->refs == 0) { RB_REMOVE(h2span_cluster_tree, &cluster_tree, cls); @@ -768,35 +764,279 @@ dmsg_lnk_span(dmsg_msg_t *msg) dmsg_relay_scan(NULL, node); #endif if (node) - dmsg_router_signal(msg->router); + dmsg_iocom_signal(msg->iocom); } pthread_mutex_unlock(&cluster_mtx); } /* - * Messages received on relay SPANs. These are open transactions so it is - * in fact possible for the other end to close the transaction. + * LNK_CIRC - Virtual circuit protocol message reception + * (incoming iocom lock not held) * - * XXX MPRACE on state structure + * Handles all cases. */ -static void -dmsg_lnk_relay(dmsg_msg_t *msg) +void +dmsg_lnk_circ(dmsg_msg_t *msg) { - dmsg_state_t *state = msg->state; - h2span_relay_t *relay; + dmsg_circuit_t *circA; + dmsg_circuit_t *circB; + dmsg_state_t *rx_state; + dmsg_state_t *tx_state; + dmsg_state_t *state; + dmsg_state_t dummy; + dmsg_msg_t *fwd_msg; + dmsg_iocom_t *iocomA; + dmsg_iocom_t *iocomB; + int disconnect; + + /*pthread_mutex_lock(&cluster_mtx);*/ + + if (DMsgDebugOpt >= 4) + fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd); + + switch (msg->any.head.cmd & (DMSGF_CREATE | + DMSGF_DELETE | + DMSGF_REPLY)) { + case DMSGF_CREATE: + case DMSGF_CREATE | DMSGF_DELETE: + /* + * (A) wishes to establish a virtual circuit through us to (B). + * (B) is specified by lnk_circ.target (the message id for + * a LNK_SPAN that (A) received from us which represents (B)). + * + * Designate the originator of the circuit (the current + * remote end) as (A) and the other side as (B). + * + * Accept the VC but do not reply. We will wait for the end- + * to-end reply to propagate back. + */ + iocomA = msg->iocom; - assert(msg->any.head.cmd & DMSGF_REPLY); + /* + * Locate the open transaction state that the other end + * specified in . This will be an open SPAN + * transaction that we transmitted (h2span_relay) over + * the interface the LNK_CIRC is being received on. + * + * (all LNK_CIRC's that we transmit are on circuit0) + */ + pthread_mutex_lock(&iocomA->mtx); + dummy.msgid = msg->any.lnk_circ.target; + tx_state = RB_FIND(dmsg_state_tree, + &iocomA->circuit0.statewr_tree, + &dummy); + pthread_mutex_unlock(&iocomA->mtx); + if (tx_state == NULL) { + /* XXX SMP race */ + fprintf(stderr, "dmsg_lnk_circ: no circuit\n"); + dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC); + break; + } + if (tx_state->icmd != DMSG_LNK_SPAN) { + /* XXX SMP race */ + fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n"); + dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC); + break; + } - if (msg->any.head.cmd & DMSGF_DELETE) { - pthread_mutex_lock(&cluster_mtx); - if ((relay = state->any.relay) != NULL) { - dmsg_relay_delete(relay); + /* locate h2span_link */ + rx_state = tx_state->any.relay->source_rt; + + /* + * A wishes to establish a VC through us to the + * specified target. + * + * A sends us the msgid of an open SPAN transaction + * it received from us as . + */ + circA = dmsg_alloc(sizeof(*circA)); + dmsg_circuit_init(iocomA, circA); + circA->state = msg->state; /* LNK_CIRC state */ + circA->msgid = msg->state->msgid; + circA->span_state = tx_state; /* H2SPAN_RELAY state */ + circA->is_relay = 1; + circA->refs = 2; /* state and peer */ + msg->state->any.circ = circA; + + iocomB = rx_state->iocom; + + circB = dmsg_alloc(sizeof(*circB)); + dmsg_circuit_init(iocomB, circB); + + /* + * Create a LNK_CIRC transaction on B + */ + fwd_msg = dmsg_msg_alloc(&iocomB->circuit0, + 0, DMSG_LNK_CIRC | DMSGF_CREATE, + dmsg_lnk_circ, circB); + fwd_msg->state->any.circ = circB; + fwd_msg->any.lnk_circ.target = rx_state->msgid; + circB->state = fwd_msg->state; /* LNK_CIRC state */ + circB->msgid = fwd_msg->any.head.msgid; + circB->span_state = rx_state; /* H2SPAN_LINK state */ + circB->is_relay = 0; + circB->refs = 2; /* state and peer */ + + if (DMsgDebugOpt >= 4) + fprintf(stderr, "CIRC forward %p->%p\n", circA, circB); + + /* + * Link the two circuits together. + */ + circA->peer = circB; + circB->peer = circA; + + if (iocomA < iocomB) { + pthread_mutex_lock(&iocomA->mtx); + pthread_mutex_lock(&iocomB->mtx); } else { - dmsg_state_reply(state, 0); + pthread_mutex_lock(&iocomB->mtx); + pthread_mutex_lock(&iocomA->mtx); } - pthread_mutex_unlock(&cluster_mtx); + if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA)) + assert(0); + if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB)) + assert(0); + if (iocomA < iocomB) { + pthread_mutex_unlock(&iocomB->mtx); + pthread_mutex_unlock(&iocomA->mtx); + } else { + pthread_mutex_unlock(&iocomA->mtx); + pthread_mutex_unlock(&iocomB->mtx); + } + + dmsg_msg_write(fwd_msg); + + if ((msg->any.head.cmd & DMSGF_DELETE) == 0) + break; + /* FALL THROUGH TO DELETE */ + case DMSGF_DELETE: + /* + * (A) Is deleting the virtual circuit, propogate closure + * to (B). + */ + iocomA = msg->iocom; + if (msg->state->any.circ == NULL) { + /* already returned an error/deleted */ + break; + } + circA = msg->state->any.circ; + circB = circA->peer; + assert(msg->state == circA->state); + + /* + * We are closing B's send side. If B's receive side is + * already closed we disconnect the circuit from B's state. + */ + disconnect = 0; + if (circB && (state = circB->state) != NULL) { + if (state->rxcmd & DMSGF_DELETE) { + circB->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circB); + } + dmsg_state_reply(state, msg->any.head.error); + disconnect = 1; + } + + /* + * We received a close on A. If A's send side is already + * closed we disconnect the circuit from A's state. + */ + if (circA && (state = circA->state) != NULL) { + if (state->txcmd & DMSGF_DELETE) { + circA->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circA); + } + disconnect = 1; + } + + /* + * Disconnect the peer<->peer association + */ + if (disconnect) { + if (circB) { + circA->peer = NULL; + circB->peer = NULL; + dmsg_circuit_drop(circA); + dmsg_circuit_drop(circB); /* XXX SMP */ + } + } + break; + case DMSGF_REPLY | DMSGF_CREATE: + case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE: + /* + * (B) is acknowledging the creation of the virtual + * circuit. This propagates all the way back to (A), though + * it should be noted that (A) can start issuing commands + * via the virtual circuit before seeing this reply. + */ + circB = msg->state->any.circ; + assert(circB); + circA = circB->peer; + assert(msg->state == circB->state); + assert(circA); + if ((msg->any.head.cmd & DMSGF_DELETE) == 0) { + dmsg_state_result(circA->state, msg->any.head.error); + break; + } + /* FALL THROUGH TO DELETE */ + case DMSGF_REPLY | DMSGF_DELETE: + /* + * (B) Is deleting the virtual circuit or acknowledging + * our deletion of the virtual circuit, propogate closure + * to (A). + */ + iocomB = msg->iocom; + circB = msg->state->any.circ; + circA = circB->peer; + assert(msg->state == circB->state); + + /* + * We received a close on (B), propagate to (A). If we have + * already received the close from (A) we disconnect the state. + */ + disconnect = 0; + if (circA && (state = circA->state) != NULL) { + if (state->rxcmd & DMSGF_DELETE) { + circA->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circA); + } + dmsg_state_reply(state, msg->any.head.error); + disconnect = 1; + } + + /* + * We received a close on (B). If (B)'s send side is already + * closed we disconnect the state. + */ + if (circB && (state = circB->state) != NULL) { + if (state->txcmd & DMSGF_DELETE) { + circB->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circB); + } + disconnect = 1; + } + + /* + * Disconnect the peer<->peer association + */ + if (disconnect) { + if (circA) { + circB->peer = NULL; + circA->peer = NULL; + dmsg_circuit_drop(circB); + dmsg_circuit_drop(circA); /* XXX SMP */ + } + } + break; } + + /*pthread_mutex_lock(&cluster_mtx);*/ } /* @@ -867,9 +1107,9 @@ dmsg_relay_scan_cmp(h2span_relay_t *relay, void *arg) { struct relay_scan_info *info = arg; - if ((intptr_t)relay->link->node < (intptr_t)info->node) + if ((intptr_t)relay->source_rt->any.link->node < (intptr_t)info->node) return(-1); - if ((intptr_t)relay->link->node > (intptr_t)info->node) + if ((intptr_t)relay->source_rt->any.link->node > (intptr_t)info->node) return(1); return(0); } @@ -891,9 +1131,11 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) h2span_relay_t *next_relay; h2span_link_t *slink; dmsg_lnk_conn_t *lconn; - dmsg_msg_t *msg; - int count = 2; - uint8_t peer_type; + dmsg_lnk_span_t *lspan; + int count; + int maxcount = 2; + uint32_t lastdist = DMSG_SPAN_MAXDIST; + uint32_t lastrnss = 0; info.node = node; info.relay = NULL; @@ -907,7 +1149,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) relay = info.relay; info.relay = NULL; if (relay) - assert(relay->link->node == node); + assert(relay->source_rt->any.link->node == node); if (DMsgDebugOpt > 8) fprintf(stderr, "relay scan for connection %p\n", conn); @@ -925,15 +1167,39 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * removed the relay, so the relay can only match exactly or * be worse). */ + count = 0; RB_FOREACH(slink, h2span_link_tree, &node->tree) { + /* + * Increment count of successful relays. This isn't + * quite accurate if we break out but nothing after + * the loop uses (count). + * + * If count exceeds the maximum number of relays we desire + * we normally want to break out. However, in order to + * guarantee a symmetric path we have to continue if both + * (dist) and (rnss) continue to match. Otherwise the SPAN + * propagation in the reverse direction may choose different + * routes and we will not have a symmetric path. + * + * NOTE: Spanning tree does not have to be symmetrical so + * this code is not currently enabled. + */ + if (++count >= maxcount) { +#ifdef REQUIRE_SYMMETRICAL + if (lastdist != slink->dist || lastrnss != slink->rnss) + break; +#else + break; +#endif + /* go beyond the nominal maximum desired relays */ + } + /* * Match, relay already in-place, get the next * relay to match against the next slink. */ - if (relay && relay->link == slink) { + if (relay && relay->source_rt->any.link == slink) { relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); - if (--count == 0) - break; continue; } @@ -960,73 +1226,69 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * * Don't bother transmitting if the remote connection * is not accepting this SPAN's peer_type. + * + * pfs_mask is typically used so pure clients can filter + * out receiving SPANs for other pure clients. */ - peer_type = slink->state->msg->any.lnk_span.peer_type; + lspan = &slink->state->msg->any.lnk_span; lconn = &conn->state->msg->any.lnk_conn; - if (((1LLU << peer_type) & lconn->peer_mask) == 0) + if (((1LLU << lspan->peer_type) & lconn->peer_mask) == 0) + break; + if (((1LLU << lspan->pfs_type) & lconn->pfs_mask) == 0) break; /* - * Filter based on pfs_clid or label (XXX). This typically - * reduces the amount of SPAN traffic that a mount end-point - * sees by only passing along SPANs related to the cluster id - * (that is, it will see all PFS's associated with the - * particular cluster it represents). + * Do not give pure clients visibility to other pure clients */ - if (peer_type == lconn->peer_type && - peer_type == DMSG_PEER_HAMMER2) { - if (!uuid_is_nil(&slink->node->cls->pfs_clid, NULL) && - uuid_compare(&slink->node->cls->pfs_clid, - &lconn->pfs_clid, NULL) != 0) { - break; - } + if (lconn->pfs_type == DMSG_PFSTYPE_CLIENT && + lspan->pfs_type == DMSG_PFSTYPE_CLIENT) { + break; } /* - * Ok, we've accepted this SPAN for relaying. + * Connection filter, if cluster uuid is not NULL it must + * match the span cluster uuid. Only applies when the + * peer_type matches. */ - assert(relay == NULL || - relay->link->node != slink->node || - relay->link->dist >= slink->dist); - relay = dmsg_alloc(sizeof(*relay)); - relay->conn = conn; - relay->link = slink; - - msg = dmsg_msg_alloc(conn->state->iocom->router, 0, - DMSG_LNK_SPAN | - DMSGF_CREATE, - dmsg_lnk_relay, relay); - relay->state = msg->state; - relay->router = dmsg_router_alloc(); - relay->router->iocom = relay->state->iocom; - relay->router->relay = relay; - relay->router->target = relay->state->msgid; - - msg->any.lnk_span = slink->state->msg->any.lnk_span; - msg->any.lnk_span.dist = slink->dist + 1; - - dmsg_router_connect(relay->router); + if (lspan->peer_type == lconn->peer_type && + !uuid_is_nil(&lconn->pfs_clid, NULL) && + uuid_compare(&slink->node->cls->pfs_clid, + &lconn->pfs_clid, NULL)) { + break; + } - RB_INSERT(h2span_relay_tree, &conn->tree, relay); - TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); + /* + * Connection filter, if cluster label is not empty it must + * match the span cluster label. Only applies when the + * peer_type matches. + */ + if (lspan->peer_type == lconn->peer_type && + lconn->cl_label[0] && + strcmp(lconn->cl_label, slink->node->cls->cl_label)) { + break; + } - dmsg_msg_write(msg); + /* + * NOTE! pfs_fsid differentiates nodes within the same cluster + * so we obviously don't want to match those. Similarly + * for fs_label. + */ - fprintf(stderr, - "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d " - "FD %d state %p\n", - slink, - relay, - node->cls, node, slink->dist, - conn->state->iocom->sock_fd, relay->state); + /* + * Ok, we've accepted this SPAN for relaying. + */ + assert(relay == NULL || + relay->source_rt->any.link->node != slink->node || + relay->source_rt->any.link->dist >= slink->dist); + relay = dmsg_generate_relay(conn, slink); + lastdist = slink->dist; + lastrnss = slink->rnss; /* * Match (created new relay), get the next relay to * match against the next slink. */ relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); - if (--count == 0) - break; } /* @@ -1034,38 +1296,106 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) * the node are in excess of the current aggregate spanning state * and should be removed. */ - while (relay && relay->link->node == node) { + while (relay && relay->source_rt->any.link->node == node) { next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); + fprintf(stderr, "RELAY DELETE FROM EXTRAS\n"); dmsg_relay_delete(relay); relay = next_relay; } } +/* + * Helper function to generate missing relay. + * + * cluster_mtx must be held + */ +static +h2span_relay_t * +dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink) +{ + h2span_relay_t *relay; + h2span_node_t *node; + dmsg_msg_t *msg; + + node = slink->node; + + relay = dmsg_alloc(sizeof(*relay)); + relay->conn = conn; + relay->source_rt = slink->state; + /* relay->source_rt->any.link = slink; */ + + /* + * NOTE: relay->target_rt->any.relay set to relay by alloc. + */ + msg = dmsg_msg_alloc(&conn->state->iocom->circuit0, + 0, DMSG_LNK_SPAN | DMSGF_CREATE, + dmsg_lnk_relay, relay); + relay->target_rt = msg->state; + + msg->any.lnk_span = slink->state->msg->any.lnk_span; + msg->any.lnk_span.dist = slink->dist + 1; + msg->any.lnk_span.rnss = slink->rnss + dmsg_rnss(); + + RB_INSERT(h2span_relay_tree, &conn->tree, relay); + TAILQ_INSERT_TAIL(&slink->relayq, relay, entry); + + dmsg_msg_write(msg); + + return (relay); +} + +/* + * Messages received on relay SPANs. These are open transactions so it is + * in fact possible for the other end to close the transaction. + * + * XXX MPRACE on state structure + */ +static void +dmsg_lnk_relay(dmsg_msg_t *msg) +{ + dmsg_state_t *state = msg->state; + h2span_relay_t *relay; + + assert(msg->any.head.cmd & DMSGF_REPLY); + + if (msg->any.head.cmd & DMSGF_DELETE) { + pthread_mutex_lock(&cluster_mtx); + fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n"); + if ((relay = state->any.relay) != NULL) { + dmsg_relay_delete(relay); + } else { + dmsg_state_reply(state, 0); + } + pthread_mutex_unlock(&cluster_mtx); + } +} + +/* + * cluster_mtx held by caller + */ static void dmsg_relay_delete(h2span_relay_t *relay) { fprintf(stderr, "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n", - relay->link, + relay->source_rt->any.link, relay, - relay->link->node->cls, relay->link->node, - relay->link->dist, - relay->conn->state->iocom->sock_fd, relay->state); - - dmsg_router_disconnect(&relay->router); + relay->source_rt->any.link->node->cls, relay->source_rt->any.link->node, + relay->source_rt->any.link->dist, + relay->conn->state->iocom->sock_fd, relay->target_rt); RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay); - TAILQ_REMOVE(&relay->link->relayq, relay, entry); + TAILQ_REMOVE(&relay->source_rt->any.link->relayq, relay, entry); - if (relay->state) { - relay->state->any.relay = NULL; - dmsg_state_reply(relay->state, 0); + if (relay->target_rt) { + relay->target_rt->any.relay = NULL; + dmsg_state_reply(relay->target_rt, 0); /* state invalid after reply */ - relay->state = NULL; + relay->target_rt = NULL; } relay->conn = NULL; - relay->link = NULL; + relay->source_rt = NULL; dmsg_free(relay); } @@ -1160,6 +1490,55 @@ dmsg_volconf_start(h2span_media_config_t *conf, const char *hostname) } } +/************************************************************************ + * MESSAGE ROUTING AND SOURCE VALIDATION * + ************************************************************************/ + +int +dmsg_circuit_relay(dmsg_msg_t *msg) +{ + dmsg_iocom_t *iocom = msg->iocom; + dmsg_circuit_t *circ; + dmsg_circuit_t *peer; + dmsg_circuit_t dummy; + int error = 0; + + /* + * Relay occurs before any state processing, msg state should always + * be NULL. + */ + assert(msg->state == NULL); + + /* + * Lookup the circuit on the incoming iocom. + */ + pthread_mutex_lock(&iocom->mtx); + + dummy.msgid = msg->any.head.circuit; + circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy); + assert(circ); + peer = circ->peer; + dmsg_circuit_hold(peer); + + if (DMsgDebugOpt >= 4) { + fprintf(stderr, + "CIRC relay %08x %p->%p\n", + msg->any.head.cmd, circ, peer); + } + + msg->iocom = peer->iocom; + msg->any.head.circuit = peer->msgid; + dmsg_circuit_drop_locked(msg->circuit); + msg->circuit = peer; + + pthread_mutex_unlock(&iocom->mtx); + + dmsg_msg_write(msg); + error = DMSG_IOQ_ERROR_ROUTED; + + return error; +} + /************************************************************************ * ROUTER AND MESSAGING HANDLES * ************************************************************************ @@ -1220,50 +1599,45 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid) #endif -#if 0 -/* - * Acquire a persistent router structure given the cluster and node ids. - * Messages can be transacted via this structure while held. If the route - * is lost messages will return failure. - */ -dmsg_router_t * -dmsg_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid) -{ -} - -/* - * Release previously acquired router. - */ -void -dmsg_router_put(dmsg_router_t *router) -{ -} -#endif - /* * Dumps the spanning tree + * + * DEBUG ONLY */ void -dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused) +dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) { h2span_cluster_t *cls; h2span_node_t *node; h2span_link_t *slink; + h2span_relay_t *relay; char *uustr = NULL; pthread_mutex_lock(&cluster_mtx); RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { - dmsg_router_printf(router, "Cluster %s\n", - dmsg_uuid_to_str(&cls->pfs_clid, &uustr)); + dmsg_circuit_printf(circuit, "Cluster %s %s (%s)\n", + dmsg_peer_type_to_str(cls->peer_type), + dmsg_uuid_to_str(&cls->pfs_clid, &uustr), + cls->cl_label); RB_FOREACH(node, h2span_node_tree, &cls->tree) { - dmsg_router_printf(router, " Node %s (%s)\n", + dmsg_circuit_printf(circuit, " Node %s %s (%s)\n", + dmsg_pfs_type_to_str(node->pfs_type), dmsg_uuid_to_str(&node->pfs_fsid, &uustr), - node->label); + node->fs_label); RB_FOREACH(slink, h2span_link_tree, &node->tree) { - dmsg_router_printf(router, - "\tLink dist=%d via %d\n", + dmsg_circuit_printf(circuit, + "\tSLink msgid %016jx " + "dist=%d via %d\n", + (intmax_t)slink->state->msgid, slink->dist, slink->state->iocom->sock_fd); + TAILQ_FOREACH(relay, &slink->relayq, entry) { + dmsg_circuit_printf(circuit, + "\t Relay-out msgid %016jx " + "via %d\n", + (intmax_t)relay->target_rt->msgid, + relay->target_rt->iocom->sock_fd); + } } } } @@ -1275,3 +1649,60 @@ dmsg_shell_tree(dmsg_router_t *router, char *cmdbuf __unused) } #endif } + +/* + * DEBUG ONLY + * + * Locate the state representing an incoming LNK_SPAN given its msgid. + */ +int +dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep) +{ + h2span_cluster_t *cls; + h2span_node_t *node; + h2span_link_t *slink; + h2span_relay_t *relay; + + pthread_mutex_lock(&cluster_mtx); + relay = NULL; + RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { + RB_FOREACH(node, h2span_node_tree, &cls->tree) { + RB_FOREACH(slink, h2span_link_tree, &node->tree) { + if (slink->state->msgid == msgid) { + *statep = slink->state; + goto found; + } + } + } + } + pthread_mutex_unlock(&cluster_mtx); + *statep = NULL; + return(ENOENT); +found: + pthread_mutex_unlock(&cluster_mtx); + return(0); +} + +/* + * Random number sub-sort value to add to SPAN rnss fields on relay. + * This allows us to differentiate spans with the same field + * for relaying purposes. We must normally limit the number of relays + * for any given SPAN origination but we must also guarantee that a + * symmetric reverse path exists, so we use the rnss field as a sub-sort + * (since there can be thousands or millions if we only match on ), + * and if there STILL too many spans we go past the limit. + */ +static +uint32_t +dmsg_rnss(void) +{ + if (DMsgRNSS == 0) { + pthread_mutex_lock(&cluster_mtx); + while (DMsgRNSS == 0) { + srandomdev(); + DMsgRNSS = random(); + } + pthread_mutex_unlock(&cluster_mtx); + } + return(DMsgRNSS); +}