static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
/*
+ * Indexed messages are stored in a red-black tree indexed by their
+ * msgid. Only persistent messages are indexed.
+ */
+int
+hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
+{
+ if (state1->spanid < state2->spanid)
+ return(-1);
+ if (state1->spanid > state2->spanid)
+ return(1);
+ if (state1->msgid < state2->msgid)
+ return(-1);
+ if (state1->msgid > state2->msgid)
+ return(1);
+ return(0);
+}
+
+RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
+
+/*
* Initialize a low-level ioq
*/
void
return (msg);
}
+ if (ioq->error)
+ goto skip;
+
/*
* Message read in-progress (msg is NULL at the moment). We don't
* allocate a msg until we have its core header.
* to update them when breaking out.
*/
if (ioq->error) {
+skip:
/*
* An unrecoverable error causes all active receive
* transactions to be terminated with a LNK_ERROR message.
msg->any.head.error = ioq->error;
pthread_mutex_lock(&iocom->mtx);
+ hammer2_iocom_drain(iocom);
if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
/*
* Active remote transactions are still present.
state->rxcmd = HAMMER2_MSGF_REPLY;
state->func = func;
state->any.any = data;
+ pthread_mutex_lock(&iocom->mtx);
RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
+ pthread_mutex_unlock(&iocom->mtx);
state->flags |= HAMMER2_STATE_INSERTED;
msg->state = state;
msg->any.head.msgid = state->msgid;
*
*/
-RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
-
/*
* Process state tracking for a message after reception, prior to
* execution.
fprintf(stderr, "duplicate-trans %s\n",
hammer2_msg_str(msg));
error = HAMMER2_IOQ_ERROR_TRANS;
+ assert(0);
break;
}
state = malloc(sizeof(*state));
state->msg = msg;
state->txcmd = HAMMER2_MSGF_REPLY;
state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
- pthread_mutex_lock(&iocom->mtx);
- RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
- pthread_mutex_unlock(&iocom->mtx);
state->flags |= HAMMER2_STATE_INSERTED;
state->msgid = msg->any.head.msgid;
state->spanid = msg->any.head.spanid;
msg->state = state;
+ pthread_mutex_lock(&iocom->mtx);
+ RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
+ pthread_mutex_unlock(&iocom->mtx);
error = 0;
+ if (DebugOpt) {
+ fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
+ state, (uint32_t)state->msgid, iocom);
+ }
break;
case HAMMER2_MSGF_DELETE:
/*
fprintf(stderr, "missing-state %s\n",
hammer2_msg_str(msg));
error = HAMMER2_IOQ_ERROR_TRANS;
+ assert(0);
}
break;
}
fprintf(stderr, "reused-state %s\n",
hammer2_msg_str(msg));
error = HAMMER2_IOQ_ERROR_TRANS;
+ assert(0);
}
break;
}
fprintf(stderr, "no-state(r) %s\n",
hammer2_msg_str(msg));
error = HAMMER2_IOQ_ERROR_TRANS;
+ assert(0);
break;
}
assert(((state->rxcmd ^ msg->any.head.cmd) &
fprintf(stderr, "no-state(r,d) %s\n",
hammer2_msg_str(msg));
error = HAMMER2_IOQ_ERROR_TRANS;
+ assert(0);
}
break;
}
fprintf(stderr, "reused-state(r,d) %s\n",
hammer2_msg_str(msg));
error = HAMMER2_IOQ_ERROR_TRANS;
+ assert(0);
}
break;
}
char dummy;
if (DebugOpt) {
- fprintf(stderr, "terminate state id=%08x\n",
- (uint32_t)state->msgid);
+ fprintf(stderr, "terminate state %p id=%08x\n",
+ state, (uint32_t)state->msgid);
}
assert(state->any.any == NULL);
msg = state->msg;
}
}
-/*
- * Indexed messages are stored in a red-black tree indexed by their
- * msgid. Only persistent messages are indexed.
- */
-int
-hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
-{
- if (state1->spanid < state2->spanid)
- return(-1);
- if (state1->spanid > state2->spanid)
- return(1);
- if (state1->msgid < state2->msgid)
- return(-1);
- if (state1->msgid > state2->msgid)
- return(1);
- return(0);
-}
-
const char *
hammer2_basecmd_str(uint32_t cmd)
{
#include "hammer2.h"
/*
+ * Maximum spanning tree distance. This has the practical effect of
+ * stopping tail-chasing closed loops when a feeder span is lost.
+ */
+#define HAMMER2_SPAN_MAXDIST 16
+
+/*
* RED-BLACK TREE DEFINITIONS
*
* We need to track:
return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL));
}
+/*
+ * NOTE: Sort/subsort must match h2span_relay_cmp() under any given
+ * node.
+ */
static
int
h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
RB_SCAN(h2span_relay_tree, &conn->tree,
hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
relay = info.relay;
+ info.relay = NULL;
+ if (relay)
+ assert(relay->link->node == node);
if (DebugOpt > 8)
fprintf(stderr, "relay scan for connection %p\n", conn);
*/
RB_FOREACH(slink, h2span_link_tree, &node->tree) {
/*
- * PROPAGATE THE BEST RELAYS BY TRANSMITTING SPANs.
- *
- * Check for match against current best relay.
+ * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
*
- * A match failure means that the current best relay is not
- * as good as the link, create a new relay for the link.
+ * Track relays while iterating the best links and construct
+ * missing relays when necessary.
*
* (If some prior better link was removed it would have also
* removed the relay, so the relay can only match exactly or
* be worst).
*/
- info.relay = relay;
- if (relay == NULL || relay->link != slink) {
+ if (relay && relay->link == slink) {
+ /*
+ * Match, get the next relay to match against the
+ * next slink.
+ */
+ relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+ if (--count == 0)
+ break;
+ } else if (slink->dist > HAMMER2_SPAN_MAXDIST) {
+ /*
+ * No match but span distance is too great,
+ * do not relay. This prevents endless closed
+ * loops with ever-incrementing distances when
+ * the seed span is lost in the graph.
+ */
+ /* no code needed */
+ } else {
+ /*
+ * No match, distance is ok, construct a new relay.
+ */
hammer2_msg_t *msg;
assert(relay == NULL ||
relay->conn = conn;
relay->link = slink;
+ RB_INSERT(h2span_relay_tree, &conn->tree, relay);
+ TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
+
msg = hammer2_msg_alloc(conn->state->iocom, 0,
HAMMER2_LNK_SPAN |
HAMMER2_MSGF_CREATE);
hammer2_lnk_relay, relay,
&relay->state);
fprintf(stderr,
- "RELAY SPAN ON CLS=%p NODE=%p FD %d state %p\n",
- node->cls, node,
+ "RELAY SPAN ON CLS=%p NODE=%p DIST=%d "
+ "FD %d state %p\n",
+ node->cls, node, slink->dist,
conn->state->iocom->sock_fd, relay->state);
- RB_INSERT(h2span_relay_tree, &conn->tree, relay);
- TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
- }
-
- /*
- * Iterate, figure out the next relay.
- */
- relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
- if (--count == 0) {
- break;
- continue;
+ /*
+ * Match (created new relay), get the next relay to
+ * match against the next slink.
+ */
+ relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+ if (--count == 0)
+ break;
}
}
hammer2_relay_delete(h2span_relay_t *relay)
{
fprintf(stderr,
- "RELAY DELETE ON CLS=%p NODE=%p FD %d STATE %p\n",
+ "RELAY DELETE ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
relay->link->node->cls, relay->link->node,
+ relay->link->dist,
relay->conn->state->iocom->sock_fd, relay->state);
- fprintf(stderr, "RELAY TX %08x RX %08x\n", relay->state->txcmd, relay->state->rxcmd);
RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
TAILQ_REMOVE(&relay->link->relayq, relay, entry);