static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
/*
- * Indexed messages are stored in a red-black tree indexed by their
- * msgid. Only persistent messages are indexed.
+ * ROUTER TREE - Represents available routes for message routing, indexed
+ * by their spanid. The router structure is embedded in
+ * either an iocom, h2span_link, or h2span_relay (see msg_lnk.c).
+ */
+int
+hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2)
+{
+ if (router1->spanid < router2->spanid)
+ return(-1);
+ if (router1->spanid > router2->spanid)
+ return(1);
+ return(0);
+}
+
+RB_GENERATE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp);
+
+static pthread_mutex_t router_mtx;
+static struct hammer2_router_tree router_ltree = RB_INITIALIZER(router_ltree);
+static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree);
+
+/*
+ * STATE TREE - Represents open transactions which are indexed by their
+ * {spanid,msgid} relative to the governing iocom. spanid
+ * will usually be 0 since a non-zero spanid would have been
+ * routed elsewhere.
*/
int
hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
{
bzero(iocom, sizeof(*iocom));
- iocom->router.signal_callback = signal_func;
- iocom->router.rcvmsg_callback = rcvmsg_func;
- iocom->router.altmsg_callback = altmsg_func;
+ iocom->router = hammer2_router_alloc();
+ iocom->router->signal_callback = signal_func;
+ iocom->router->rcvmsg_callback = rcvmsg_func;
+ iocom->router->altmsg_callback = altmsg_func;
+ /* we do not call hammer2_router_connect() for iocom routers */
pthread_mutex_init(&iocom->mtx, NULL);
- RB_INIT(&iocom->router.staterd_tree);
- RB_INIT(&iocom->router.statewr_tree);
+ RB_INIT(&iocom->router->staterd_tree);
+ RB_INIT(&iocom->router->statewr_tree);
TAILQ_INIT(&iocom->freeq);
TAILQ_INIT(&iocom->freeq_aux);
TAILQ_INIT(&iocom->addrq);
- TAILQ_INIT(&iocom->router.txmsgq);
- iocom->router.iocom = iocom;
+ TAILQ_INIT(&iocom->router->txmsgq);
+ iocom->router->iocom = iocom;
iocom->sock_fd = sock_fd;
iocom->alt_fd = alt_fd;
iocom->flags = HAMMER2_IOCOMF_RREQ;
state->iocom = iocom;
state->flags = HAMMER2_STATE_DYNAMIC;
state->msgid = (uint64_t)(uintptr_t)state;
- /* XXX set state->spanid from router */
+ if (router->link)
+ state->spanid = router->spanid;
state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
HAMMER2_MSGF_DELETE);
state->rxcmd = HAMMER2_MSGF_REPLY;
state->func = func;
state->any.any = data;
pthread_mutex_lock(&iocom->mtx);
- RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state);
+ RB_INSERT(hammer2_state_tree,
+ &iocom->router->statewr_tree,
+ state);
pthread_mutex_unlock(&iocom->mtx);
state->flags |= HAMMER2_STATE_INSERTED;
}
if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
- iocom->router.signal_callback(&iocom->router);
+ iocom->router->signal_callback(iocom->router);
}
/*
read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
iocom->flags |= HAMMER2_IOCOMF_RWORK;
iocom->flags |= HAMMER2_IOCOMF_WWORK;
- if (TAILQ_FIRST(&iocom->router.txmsgq))
+ if (TAILQ_FIRST(&iocom->router->txmsgq))
hammer2_iocom_flush1(iocom);
}
fprintf(stderr, "receive %s\n",
hammer2_msg_str(msg));
}
- iocom->router.rcvmsg_callback(msg);
+ iocom->router->rcvmsg_callback(msg);
hammer2_state_cleanuprx(iocom, msg);
}
}
if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
- iocom->router.altmsg_callback(iocom);
+ iocom->router->altmsg_callback(iocom);
}
}
}
/*
* If a message is already pending we can just remove and
* return it. Message state has already been processed.
+ * (currently not implemented)
*/
if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
TAILQ_REMOVE(&ioq->msgq, msg, qentry);
return (msg);
}
+ /*
+ * If the stream is errored out we stop processing it.
+ */
if (ioq->error)
goto skip;
/*
* Allocate the message, the next state will fill it in.
*/
- msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0,
+ msg = hammer2_msg_alloc(iocom->router, ioq->abytes, 0,
NULL, NULL);
ioq->msg = msg;
* LNK_ERROR (that the session can detect) when no
* transactions remain.
*/
- msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL);
+ msg = hammer2_msg_alloc(iocom->router, 0, 0, NULL, NULL);
bzero(&msg->any.head, sizeof(msg->any.head));
msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
msg->any.head.cmd = HAMMER2_LNK_ERROR;
pthread_mutex_lock(&iocom->mtx);
hammer2_iocom_drain(iocom);
- if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) {
+ if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) {
/*
* Active remote transactions are still present.
* Simulate the other end sending us a DELETE.
msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
HAMMER2_MSGF_DELETE;
}
- } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) {
+ } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) !=
+ NULL) {
/*
* Active local transactions are still present.
* Simulate the other end sending us a DELETE.
iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
TAILQ_INIT(&tmpq);
pthread_mutex_lock(&iocom->mtx);
- while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) {
- TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry);
+ while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) {
+ TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry);
TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
}
pthread_mutex_unlock(&iocom->mtx);
* Queue it for output, wake up the I/O pthread. Note that the
* I/O thread is responsible for generating the CRCs and encryption.
*/
- TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry);
+ TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry);
dummy = 0;
write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
pthread_mutex_unlock(&iocom->mtx);
* We cannot pass MSGF_CREATE to msg_alloc() because that may
* allocate new state. We have our state already.
*/
- nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+ nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
if (state) {
if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
cmd |= HAMMER2_MSGF_REPLY;
}
- nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+ nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
if (state) {
if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
if (state->txcmd & HAMMER2_MSGF_REPLY)
cmd |= HAMMER2_MSGF_REPLY;
- nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL);
+ nmsg = hammer2_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL);
if (state) {
if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
pthread_mutex_lock(&iocom->mtx);
if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
state = RB_FIND(hammer2_state_tree,
- &iocom->router.statewr_tree, &dummy);
+ &iocom->router->statewr_tree, &dummy);
} else {
state = RB_FIND(hammer2_state_tree,
- &iocom->router.staterd_tree, &dummy);
+ &iocom->router->staterd_tree, &dummy);
}
msg->state = state;
pthread_mutex_unlock(&iocom->mtx);
msg->state = state;
pthread_mutex_lock(&iocom->mtx);
RB_INSERT(hammer2_state_tree,
- &iocom->router.staterd_tree, state);
+ &iocom->router->staterd_tree, state);
pthread_mutex_unlock(&iocom->mtx);
error = 0;
if (DebugOpt) {
if (state->rxcmd & HAMMER2_MSGF_REPLY) {
assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
RB_REMOVE(hammer2_state_tree,
- &iocom->router.statewr_tree, state);
+ &iocom->router->statewr_tree, state);
} else {
assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
RB_REMOVE(hammer2_state_tree,
- &iocom->router.staterd_tree, state);
+ &iocom->router->staterd_tree, state);
}
state->flags &= ~HAMMER2_STATE_INSERTED;
hammer2_state_free(state);
if (state->txcmd & HAMMER2_MSGF_REPLY) {
assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
RB_REMOVE(hammer2_state_tree,
- &iocom->router.staterd_tree, state);
+ &iocom->router->staterd_tree, state);
} else {
assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
RB_REMOVE(hammer2_state_tree,
- &iocom->router.statewr_tree, state);
+ &iocom->router->statewr_tree, state);
}
state->flags &= ~HAMMER2_STATE_INSERTED;
hammer2_state_free(state);
* We may have to wake up the rx code.
*/
if (iocom->ioq_rx.error &&
- RB_EMPTY(&iocom->router.staterd_tree) &&
- RB_EMPTY(&iocom->router.statewr_tree)) {
+ RB_EMPTY(&iocom->router->staterd_tree) &&
+ RB_EMPTY(&iocom->router->statewr_tree)) {
dummy = 0;
write(iocom->wakeupfds[1], &dummy, 1);
}
}
+/************************************************************************
+ * ROUTING *
+ ************************************************************************
+ *
+ * Incoming messages are routed by their spanid, matched up against
+ * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c).
+ * Any replies run through the same router.
+ *
+ * Originated messages are routed by their spanid, matched up against
+ * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c).
+ * Replies come back through the same route.
+ *
+ * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular
+ * transaction runs through the same route. Commands and replies both.
+ *
+ * An originated message will use a different routing spanid to
+ * reach a target node than a message which originates from that node.
+ * They might use the same physical pipes (each pipe can have multiple
+ * SPANs and RELAYs), but the routes are distinct from the perspective
+ * of the router.
+ */
+hammer2_router_t *
+hammer2_router_alloc(void)
+{
+ hammer2_router_t *router;
+
+ router = hammer2_alloc(sizeof(*router));
+ TAILQ_INIT(&router->txmsgq);
+ return (router);
+}
+
+void
+hammer2_router_connect(hammer2_router_t *router)
+{
+ hammer2_router_t *tmp;
+
+ assert(router->link || router->relay);
+ assert((router->flags & HAMMER2_ROUTER_CONNECTED) == 0);
+
+ pthread_mutex_lock(&router_mtx);
+ if (router->link)
+ tmp = RB_INSERT(hammer2_router_tree, &router_ltree, router);
+ else
+ tmp = RB_INSERT(hammer2_router_tree, &router_rtree, router);
+ assert(tmp == NULL);
+ router->flags |= HAMMER2_ROUTER_CONNECTED;
+ pthread_mutex_unlock(&router_mtx);
+}
+
+void
+hammer2_router_disconnect(hammer2_router_t **routerp)
+{
+ hammer2_router_t *router;
+
+ router = *routerp;
+ assert(router->link || router->relay);
+ assert(router->flags & HAMMER2_ROUTER_CONNECTED);
+
+ pthread_mutex_lock(&router_mtx);
+ if (router->link)
+ RB_REMOVE(hammer2_router_tree, &router_ltree, router);
+ else
+ RB_REMOVE(hammer2_router_tree, &router_rtree, router);
+ router->flags &= ~HAMMER2_ROUTER_CONNECTED;
+ *routerp = NULL;
+ pthread_mutex_unlock(&router_mtx);
+}
+
+#if 0
+/*
+ * XXX
+ */
+hammer2_router_t *
+hammer2_route_msg(hammer2_msg_t *msg)
+{
+}
+#endif
+
+/************************************************************************
+ * DEBUGGING *
+ ************************************************************************/
+
const char *
hammer2_basecmd_str(uint32_t cmd)
{
RB_ENTRY(h2span_cluster) rbnode;
struct h2span_node_tree tree;
uuid_t pfs_clid; /* shared fsid */
+ int refs; /* prevents destruction */
};
struct h2span_node {
struct h2span_node *node; /* related node */
int32_t dist;
struct h2span_relay_queue relayq; /* relay out */
- struct hammer2_router router;
+ struct hammer2_router *router; /* route out this link */
};
/*
* address of the 'state' structure, which is why h2span_relay has to
* be entered into a RB-TREE based at h2span_connect (so we can look
* up the spanid to validate it).
+ *
+ * NOTE: Messages can be received via the LNK_SPAN transaction the
+ * relay maintains, and can be replied via relay->router, but
+ * messages are NOT initiated via a relay. Messages are initiated
+ * via incoming links (h2span_link's).
+ *
+ * relay->link represents the link being relayed, NOT the LNK_SPAN
+ * transaction the relay is holding open.
*/
struct h2span_relay {
RB_ENTRY(h2span_relay) rbnode; /* from h2span_connect */
TAILQ_ENTRY(h2span_relay) entry; /* from link */
struct h2span_connect *conn;
hammer2_state_t *state; /* transmitted LNK_SPAN */
- struct h2span_link *link; /* received LNK_SPAN */
+ struct h2span_link *link; /* LNK_SPAN being relayed */
+ struct hammer2_router *router;/* route out this relay */
};
/*
* Embedded router structure in link for message forwarding.
+ *
+ * The spanning id for the router is the message id of
+ * the SPAN link it is embedded in, allowing messages to
+ * be routed via &slink->router.
*/
- TAILQ_INIT(&slink->router.txmsgq);
- slink->router.iocom = state->iocom;
- slink->router.link = slink;
+ slink->router = hammer2_router_alloc();
+ slink->router->iocom = state->iocom;
+ slink->router->link = slink;
+ slink->router->spanid = state->msgid;
+ hammer2_router_connect(slink->router);
RB_INSERT(h2span_link_tree, &node->tree, slink);
msg->any.lnk_span.label,
msg->any.lnk_span.dist);
free(alloc);
-
#if 0
hammer2_relay_scan(NULL, node);
#endif
free(alloc);
/*
+ * Remove the router from consideration
+ */
+ hammer2_router_disconnect(&slink->router);
+
+ /*
* Clean out all relays. This requires terminating each
* relay transaction.
*/
RB_REMOVE(h2span_link_tree, &node->tree, slink);
if (RB_EMPTY(&node->tree)) {
RB_REMOVE(h2span_node_tree, &cls->tree, node);
- if (RB_EMPTY(&cls->tree)) {
+ if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
RB_REMOVE(h2span_cluster_tree,
&cluster_tree, cls);
hammer2_free(cls);
relay->conn = conn;
relay->link = slink;
- msg = hammer2_msg_alloc(&conn->state->iocom->router, 0,
+ msg = hammer2_msg_alloc(conn->state->iocom->router, 0,
HAMMER2_LNK_SPAN |
HAMMER2_MSGF_CREATE,
hammer2_lnk_relay, relay);
relay->state = msg->state;
+ relay->router = hammer2_router_alloc();
+ relay->router->iocom = relay->state->iocom;
+ relay->router->relay = relay;
+ relay->router->spanid = relay->state->msgid;
+
msg->any.lnk_span = slink->state->msg->any.lnk_span;
msg->any.lnk_span.dist = slink->dist + 1;
+ hammer2_router_connect(relay->router);
+
RB_INSERT(h2span_relay_tree, &conn->tree, relay);
TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
relay->link->dist,
relay->conn->state->iocom->sock_fd, relay->state);
+ hammer2_router_disconnect(&relay->router);
+
RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
TAILQ_REMOVE(&relay->link->relayq, relay, entry);
}
/************************************************************************
- * ROUTER *
+ * ROUTER AND MESSAGING HANDLES *
************************************************************************
*
- * Provides route functions to msg.c
+ * Basically the idea here is to provide a stable data structure which
+ * can be localized to the caller for higher level protocols to work with.
+ * Depends on the context, these hammer2_handle's can be pooled by use-case
+ * and remain persistent through a client (or mount point's) life.
+ */
+
+#if 0
+/*
+ * Obtain a stable handle on a cluster given its uuid. This ties directly
+ * into the global cluster topology, creating the structure if necessary
+ * (even if the uuid does not exist or does not exist yet), and preventing
+ * the structure from getting ripped out from under us while we hold a
+ * pointer to it.
+ */
+h2span_cluster_t *
+hammer2_cluster_get(uuid_t *pfs_clid)
+{
+ h2span_cluster_t dummy_cls;
+ h2span_cluster_t *cls;
+
+ dummy_cls.pfs_clid = *pfs_clid;
+ pthread_mutex_lock(&cluster_mtx);
+ cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
+ if (cls)
+ ++cls->refs;
+ pthread_mutex_unlock(&cluster_mtx);
+ return (cls);
+}
+
+void
+hammer2_cluster_put(h2span_cluster_t *cls)
+{
+ pthread_mutex_lock(&cluster_mtx);
+ assert(cls->refs > 0);
+ --cls->refs;
+ if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
+ RB_REMOVE(h2span_cluster_tree,
+ &cluster_tree, cls);
+ hammer2_free(cls);
+ }
+ pthread_mutex_unlock(&cluster_mtx);
+}
+
+/*
+ * Obtain a stable handle to a specific cluster node given its uuid.
+ * This handle does NOT lock in the route to the node and is typically
+ * used as part of the hammer2_handle_*() API to obtain a set of
+ * stable nodes.
*/
+h2span_node_t *
+hammer2_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
+{
+}
+
+#endif
#if 0
/*
}
#endif
+/************************************************************************
+ * DEBUGGER *
+ ************************************************************************/
/*
* Dumps the spanning tree
*/