#include "hammer2.h"
-static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static int hammer2_state_msgrx(hammer2_msg_t *msg);
+static void hammer2_state_cleanuptx(hammer2_msg_t *msg);
/*
* ROUTER TREE - Represents available routes for message routing, indexed
int
hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2)
{
- if (router1->spanid < router2->spanid)
+ if (router1->target < router2->target)
return(-1);
- if (router1->spanid > router2->spanid)
+ if (router1->target > router2->target)
return(1);
return(0);
}
/*
* STATE TREE - Represents open transactions which are indexed by their
- * {spanid,msgid} relative to the governing iocom. spanid
- * will usually be 0 since a non-zero spanid would have been
- * routed elsewhere.
+ * {router,msgid} relative to the governing iocom.
+ *
+ * router is usually iocom->router since state isn't stored
+ * for relayed messages.
*/
int
hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
{
- if (state1->spanid < state2->spanid)
+#if 0
+ if (state1->router < state2->router)
return(-1);
- if (state1->spanid > state2->spanid)
+ if (state1->router > state2->router)
return(1);
+#endif
if (state1->msgid < state2->msgid)
return(-1);
if (state1->msgid > state2->msgid)
RB_INIT(&iocom->router->statewr_tree);
TAILQ_INIT(&iocom->freeq);
TAILQ_INIT(&iocom->freeq_aux);
- TAILQ_INIT(&iocom->addrq);
TAILQ_INIT(&iocom->router->txmsgq);
iocom->router->iocom = iocom;
iocom->sock_fd = sock_fd;
state->iocom = iocom;
state->flags = HAMMER2_STATE_DYNAMIC;
state->msgid = (uint64_t)(uintptr_t)state;
- if (router->link)
- state->spanid = router->spanid;
+ state->router = router;
state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
HAMMER2_MSGF_DELETE);
state->rxcmd = HAMMER2_MSGF_REPLY;
* Process transactional state for the message.
*/
if (msg && ioq->error == 0) {
- error = hammer2_state_msgrx(iocom, msg);
+ error = hammer2_state_msgrx(msg);
if (error) {
if (error == HAMMER2_IOQ_ERROR_EALREADY) {
hammer2_msg_free(msg);
} else {
/*state->txcmd |= HAMMER2_MSGF_DELETE;*/
msg->state = state;
- msg->any.head.spanid = state->spanid;
+ msg->router = state->router;
msg->any.head.msgid = state->msgid;
msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
HAMMER2_MSGF_DELETE;
msg = NULL;
} else {
msg->state = state;
- msg->any.head.spanid = state->spanid;
+ msg->router = state->router;
msg->any.head.msgid = state->msgid;
msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
HAMMER2_MSGF_DELETE |
ioq->hbytes = 0;
ioq->abytes = 0;
- hammer2_state_cleanuptx(iocom, msg);
+ hammer2_state_cleanuptx(msg);
}
assert(nact == 0);
while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
TAILQ_REMOVE(&ioq->msgq, msg, qentry);
--ioq->msgcount;
- hammer2_state_cleanuptx(iocom, msg);
+ hammer2_state_cleanuptx(msg);
}
}
state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
}
msg->any.head.msgid = state->msgid;
- msg->any.head.spanid = state->spanid;
assert(((state->txcmd ^ msg->any.head.cmd) &
HAMMER2_MSGF_REPLY) == 0);
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
msg->any.head.msgid = 0;
/* XXX set spanid by router */
}
+ msg->any.head.source = 0;
+ msg->any.head.target = msg->router->target;
/*
* Queue it for output, wake up the I/O pthread. Note that the
* will typically just contain status updates.
*/
static int
-hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_state_msgrx(hammer2_msg_t *msg)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
hammer2_state_t *state;
hammer2_state_t dummy;
int error;
* If received msg is a command state is on staterd_tree.
* If received msg is a reply state is on statewr_tree.
*/
-
dummy.msgid = msg->any.head.msgid;
- dummy.spanid = msg->any.head.spanid;
pthread_mutex_lock(&iocom->mtx);
if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
state = RB_FIND(hammer2_state_tree,
state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
state->flags |= HAMMER2_STATE_INSERTED;
state->msgid = msg->any.head.msgid;
- state->spanid = msg->any.head.spanid;
+ state->router = msg->router;
msg->state = state;
pthread_mutex_lock(&iocom->mtx);
RB_INSERT(hammer2_state_tree,
}
static void
-hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_state_cleanuptx(hammer2_msg_t *msg)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
hammer2_state_t *state;
if ((state = msg->state) == NULL) {
* Generate the buf
*/
snprintf(buf, sizeof(buf),
- "msg=%s%s %s id=%08x span=%08x %s",
+ "msg=%s%s %s id=%08x src=%08x tgt=%08x %s",
hammer2_basecmd_str(msg->any.head.cmd),
flagbuf,
errstr,
(uint32_t)(intmax_t)msg->any.head.msgid, /* for brevity */
- (uint32_t)(intmax_t)msg->any.head.spanid, /* for brevity */
+ (uint32_t)(intmax_t)msg->any.head.source, /* for brevity */
+ (uint32_t)(intmax_t)msg->any.head.target, /* for brevity */
statestr);
return(buf);
}
/*
- * NOTE: Sort/subsort must match h2span_relay_cmp() under any given
- * node.
+ * Sort/subsort must match h2span_relay_cmp() under any given node
+ * to make the aggregation algorithm easier, so the best links are
+ * in the same sorted order as the best relays.
+ *
+ * NOTE: We cannot use link*->state->msgid because this msgid is created
+ * by each remote host and thus might wind up being the same.
*/
static
int
return(-1);
if (link1->dist > link2->dist)
return(1);
+#if 1
+ if ((uintptr_t)link1->state < (uintptr_t)link2->state)
+ return(-1);
+ if ((uintptr_t)link1->state > (uintptr_t)link2->state)
+ return(1);
+#else
if (link1->state->msgid < link2->state->msgid)
return(-1);
if (link1->state->msgid > link2->state->msgid)
return(1);
+#endif
return(0);
}
return(-1);
if (link1->dist > link2->dist)
return(1);
+#if 1
+ if ((uintptr_t)link1->state < (uintptr_t)link2->state)
+ return(-1);
+ if ((uintptr_t)link1->state > (uintptr_t)link2->state)
+ return(1);
+#else
if (link1->state->msgid < link2->state->msgid)
return(-1);
if (link1->state->msgid > link2->state->msgid)
return(1);
+#endif
return(0);
}
slink->router = hammer2_router_alloc();
slink->router->iocom = state->iocom;
slink->router->link = slink;
- slink->router->spanid = state->msgid;
+ slink->router->target = state->msgid;
hammer2_router_connect(slink->router);
RB_INSERT(h2span_link_tree, &node->tree, slink);
relay->router = hammer2_router_alloc();
relay->router->iocom = relay->state->iocom;
relay->router->relay = relay;
- relay->router->spanid = relay->state->msgid;
+ relay->router->target = relay->state->msgid;
msg->any.lnk_span = slink->state->msg->any.lnk_span;
msg->any.lnk_span.dist = slink->dist + 1;
struct hammer2_persist;
struct hammer2_state;
struct hammer2_router;
-struct hammer2_address;
struct hammer2_msg;
TAILQ_HEAD(hammer2_state_queue, hammer2_state);
TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
-TAILQ_HEAD(hammer2_address_queue, hammer2_address);
RB_HEAD(hammer2_state_tree, hammer2_state);
RB_HEAD(hammer2_router_tree, hammer2_router);
struct hammer2_state {
RB_ENTRY(hammer2_state) rbnode; /* indexed by msgid */
- TAILQ_ENTRY(hammer2_state) source_entry;/* if routed */
- TAILQ_ENTRY(hammer2_state) target_entry;/* if routed */
struct hammer2_iocom *iocom;
- struct hammer2_address_t *source_addr; /* if routed */
- struct hammer2_address_t *target_addr; /* if routed */
+ struct hammer2_router *router; /* if routed */
uint32_t txcmd; /* mostly for CMDF flags */
uint32_t rxcmd; /* mostly for CMDF flags */
- uint64_t spanid; /* routing id */
uint64_t msgid; /* {spanid,msgid} uniq */
int flags;
int error;
} any;
};
-struct hammer2_address {
- TAILQ_ENTRY(hammer2_address) entry; /* on-iocom */
- struct hammer2_iocom *iocom; /* related iocom */
- struct hammer2_state_queue sourceq; /* states on source queue */
- struct hammer2_state_queue targetq; /* states on target queue */
- uint16_t id;
-};
-
#define HAMMER2_STATE_INSERTED 0x0001
#define HAMMER2_STATE_DYNAMIC 0x0002
#define HAMMER2_STATE_NODEID 0x0004 /* manages a node id */
};
typedef struct hammer2_state hammer2_state_t;
-typedef struct hammer2_address hammer2_address_t;
typedef struct hammer2_msg hammer2_msg_t;
typedef struct hammer2_msg_queue hammer2_msg_queue_t;
* or to a SPAN transaction (h2span_relay structure for incoming).
*/
struct hammer2_router {
- RB_ENTRY(hammer2_router) rbnode; /* indexed by spanid */
+ RB_ENTRY(hammer2_router) rbnode; /* indexed by target */
struct hammer2_iocom *iocom;
struct h2span_link *link; /* may be NULL */
struct h2span_relay *relay; /* may be NULL */
struct hammer2_state_tree staterd_tree; /* active messages */
struct hammer2_state_tree statewr_tree; /* active messages */
hammer2_msg_queue_t txmsgq; /* tx msgq from remote */
- uint64_t spanid; /* for routing */
+ uint64_t target; /* for routing */
int flags;
int refs; /* refs prevent destruction */
};
hammer2_ioq_t ioq_tx;
hammer2_msg_queue_t freeq; /* free msgs hdr only */
hammer2_msg_queue_t freeq_aux; /* free msgs w/aux_data */
- struct hammer2_address_queue addrq; /* source/target addrs */
int sock_fd; /* comm socket or pipe */
int alt_fd; /* thread signal, tty, etc */
int wakeupfds[2]; /* pipe wakes up iocom thread */
head->salt = bswap32(head->salt);
head->msgid = bswap64(head->msgid);
- head->spanid = bswap64(head->spanid);
+ head->source = bswap64(head->source);
+ head->target = bswap64(head->target);
head->cmd = bswap32(head->cmd);
head->aux_crc = bswap32(head->aux_crc);
head->aux_bytes = bswap32(head->aux_bytes);
head->error = bswap32(head->error);
head->aux_descr = bswap64(head->aux_descr);
- head->reserved30= bswap64(head->reserved30);
head->reserved38= bswap32(head->reserved38);
head->hdr_crc = bswap32(head->hdr_crc);
}
struct hammer2_inode;
struct hammer2_mount;
struct hammer2_pfsmount;
+struct hammer2_span;
struct hammer2_state;
struct hammer2_msg;
typedef struct hammer2_freecache hammer2_freecache_t;
/*
+ * Structure used to represent a virtual circuit for a messaging
+ * route. Typically associated from hammer2_state but the hammer2_pfsmount
+ * structure also has one to represent the point-to-point link.
+ */
+struct hammer2_router {
+ struct hammer2_pfsmount *pmp;
+ struct hammer2_state *state; /* received LNK_SPAN state */
+ uint64_t target; /* target */
+};
+
+typedef struct hammer2_router hammer2_router_t;
+
+/*
* Global (per device) mount structure for device (aka vp->v_mount->hmp)
*/
struct hammer2_mount {
struct hammer2_state *freewr_state; /* allocation cache */
struct hammer2_state_tree staterd_tree; /* active messages */
struct hammer2_state_tree statewr_tree; /* active messages */
+ struct hammer2_router router;
};
typedef struct hammer2_pfsmount hammer2_pfsmount_t;
#define HAMMER2_CLUSTERCTL_KILL 0x0001
/*
- * In-memory message structure for hammer2.
- *
- * Persistent cache state messages will be associated with a hammer2_chain.
- *
- * NOTE! If REPLY is set then the source and target fields in the message
- * are swapped. That is, source and target remain unchanged whether
- * the message is a command from side A or a reply from side B.
- * The message is routed based on target if REPLY is not set, and on
- * source if REPLY is set.
+ * Transactional state structure, representing an open transaction. The
+ * transaction might represent a cache state (and thus have a chain
+ * association), or a VOP op, LNK_SPAN, or other things.
*/
struct hammer2_state {
RB_ENTRY(hammer2_state) rbnode; /* indexed by msgid */
struct hammer2_pfsmount *pmp;
+ struct hammer2_router *router; /* related LNK_SPAN route */
uint32_t txcmd; /* mostly for CMDF flags */
uint32_t rxcmd; /* mostly for CMDF flags */
- uint64_t spanid; /* spanning tree routing */
uint64_t msgid; /* {spanid,msgid} uniq */
int flags;
int error;
struct hammer2_msg {
TAILQ_ENTRY(hammer2_msg) qentry; /* serialized queue */
+ struct hammer2_router *router;
struct hammer2_state *state;
size_t hdr_size;
size_t aux_size;
hammer2_msg_any_t any;
};
+typedef struct hammer2_link hammer2_link_t;
typedef struct hammer2_state hammer2_state_t;
typedef struct hammer2_msg hammer2_msg_t;
/*
* hammer2_msg.c
*/
-int hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-int hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-void hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-void hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-int hammer2_msg_execute(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+int hammer2_state_msgrx(hammer2_msg_t *msg);
+int hammer2_state_msgtx(hammer2_msg_t *msg);
+void hammer2_state_cleanuprx(hammer2_msg_t *msg);
+void hammer2_state_cleanuptx(hammer2_msg_t *msg);
+int hammer2_msg_execute(hammer2_msg_t *msg);
void hammer2_state_free(hammer2_state_t *state);
-void hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid,
- uint32_t cmd);
-void hammer2_msg_write(hammer2_pfsmount_t *pmp,
- hammer2_msg_t *msg,
+void hammer2_msg_free(hammer2_msg_t *msg);
+hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd);
+void hammer2_msg_write(hammer2_msg_t *msg,
int (*func)(hammer2_state_t *, hammer2_msg_t *),
void *data);
-void hammer2_msg_reply(hammer2_pfsmount_t *pmp,
- hammer2_msg_t *msg, uint32_t error);
-void hammer2_msg_result(hammer2_pfsmount_t *pmp,
- hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
/*
* hammer2_msgops.c
*/
-int hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-int hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+int hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg);
+int hammer2_msg_adhoc_input(hammer2_msg_t *msg);
/*
* hammer2_freemap.c
* will typically just contain status updates.
*/
int
-hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_msgrx(hammer2_msg_t *msg)
{
+ hammer2_pfsmount_t *pmp;
hammer2_state_t *state;
int error;
+ pmp = msg->router->pmp;
+
+ /*
+ * XXX resolve msg->any.head.source and msg->any.head.target
+ * into LNK_SPAN references.
+ *
+ * XXX replace msg->router
+ */
+
/*
* Make sure a state structure is ready to go in case we need a new
* one. This is the only routine which uses freerd_state so no
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
state->msgid = msg->any.head.msgid;
- state->spanid = msg->any.head.spanid;
- kprintf("received msg %08x msgid %jx spanid=%jx\n",
+ state->router = &pmp->router;
+ kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
msg->any.head.cmd,
- (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
+ (intmax_t)msg->any.head.msgid,
+ (intmax_t)msg->any.head.source,
+ (intmax_t)msg->any.head.target);
if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
else
state = pmp->freerd_state;
pmp->freerd_state = NULL;
msg->state = state;
+ state->router = msg->router;
state->msg = msg;
state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
state->txcmd = HAMMER2_MSGF_REPLY;
}
void
-hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_cleanuprx(hammer2_msg_t *msg)
{
+ hammer2_pfsmount_t *pmp = msg->router->pmp;
hammer2_state_t *state;
if ((state = msg->state) == NULL) {
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
} else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
state->rxcmd |= HAMMER2_MSGF_DELETE;
} else {
lockmgr(&pmp->msglk, LK_RELEASE);
}
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
} else if (state->msg != msg) {
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
}
}
* A NULL state may be returned in this case.
*/
int
-hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_msgtx(hammer2_msg_t *msg)
{
+ hammer2_pfsmount_t *pmp = msg->router->pmp;
hammer2_state_t *state;
int error;
}
void
-hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_state_cleanuptx(hammer2_msg_t *msg)
{
+ hammer2_pfsmount_t *pmp = msg->router->pmp;
hammer2_state_t *state;
if ((state = msg->state) == NULL) {
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
} else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
state->txcmd |= HAMMER2_MSGF_DELETE;
} else {
lockmgr(&pmp->msglk, LK_RELEASE);
}
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
} else if (state->msg != msg) {
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
}
}
state->msg = NULL;
kfree(state, pmp->mmsg);
if (msg)
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
}
hammer2_msg_t *
-hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
+hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd)
{
hammer2_msg_t *msg;
size_t hbytes;
hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
- pmp->mmsg, M_WAITOK | M_ZERO);
+ router->pmp->mmsg, M_WAITOK | M_ZERO);
msg->hdr_size = hbytes;
+ msg->router = router;
+ KKASSERT(router != NULL);
msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
- msg->any.head.spanid = spanid;
+ msg->any.head.source = 0;
+ msg->any.head.target = router->target;
msg->any.head.cmd = cmd;
return (msg);
}
void
-hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_free(hammer2_msg_t *msg)
{
+ hammer2_pfsmount_t *pmp = msg->router->pmp;
+
if (msg->aux_data && msg->aux_size) {
kfree(msg->aux_data, pmp->mmsg);
msg->aux_data = NULL;
msg->aux_size = 0;
+ msg->router = NULL;
}
kfree(msg, pmp->mmsg);
}
int
hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
{
- if (state1->spanid < state2->spanid)
+ if (state1->router < state2->router)
return(-1);
- if (state1->spanid > state2->spanid)
+ if (state1->router > state2->router)
return(1);
if (state1->msgid < state2->msgid)
return(-1);
* Write a message. All requisit command flags have been set.
*
* If msg->state is non-NULL the message is written to the existing
- * transaction. Both msgid and spanid will be set accordingly.
+ * transaction. msgid will be set accordingly.
*
* If msg->state is NULL and CREATE is set new state is allocated and
- * (func, data) is installed.
+ * (func, data) is installed. A msgid is assigned.
*
* If msg->state is NULL and CREATE is not set the message is assumed
- * to be a one-way message. The msgid and spanid must be set by the
- * caller (msgid is typically set to 0 for this case).
+ * to be a one-way message. The originator must assign the msgid
+ * (or leave it 0, which is typical.
*
* This function merely queues the message to the management thread, it
* does not write to the message socket/pipe.
*/
void
-hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
+hammer2_msg_write(hammer2_msg_t *msg,
int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
{
+ hammer2_pfsmount_t *pmp = msg->router->pmp;
hammer2_state_t *state;
if (msg->state) {
* (Function callback and aux data for the receive side can
* be replaced or left alone).
*/
- msg->any.head.msgid = msg->state->msgid;
- msg->any.head.spanid = msg->state->spanid;
+ state = msg->state;
+ msg->any.head.msgid = state->msgid;
+ msg->any.head.source = 0;
+ msg->any.head.target = state->router->target;
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
if (func) {
- msg->state->func = func;
- msg->state->any.any = data;
+ state->func = func;
+ state->any.any = data;
}
} else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
/*
state->any.any = data;
state->msg = msg;
state->msgid = (uint64_t)(uintptr_t)state;
- state->spanid = msg->any.head.spanid;
+ state->router = msg->router;
msg->state = state;
+ msg->any.head.source = 0;
+ msg->any.head.target = state->router->target;
+ msg->any.head.msgid = state->msgid;
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
* competing aborts and a real one-off message?)
*/
msg->any.head.msgid = 0;
+ msg->any.head.source = 0;
+ msg->any.head.target = msg->router->target;
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
}
* If msg->state is non-NULL we are replying to a one-way message.
*/
void
-hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
{
hammer2_state_t *state = msg->state;
+ hammer2_msg_t *nmsg;
uint32_t cmd;
/*
*/
if (state) {
if (state->txcmd & HAMMER2_MSGF_DELETE) {
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
return;
}
if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
cmd |= HAMMER2_MSGF_REPLY;
}
- msg = hammer2_msg_alloc(pmp, 0, cmd);
- msg->any.head.error = error;
- msg->state = state;
- hammer2_msg_write(pmp, msg, NULL, NULL);
+ nmsg = hammer2_msg_alloc(msg->router, cmd);
+ nmsg->any.head.error = error;
+ nmsg->state = state;
+ hammer2_msg_write(nmsg, NULL, NULL);
}
/*
* function degenerates into the same as hammer2_msg_reply().
*/
void
-hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
{
hammer2_state_t *state = msg->state;
+ hammer2_msg_t *nmsg;
uint32_t cmd;
/*
*/
if (state) {
if (state->txcmd & HAMMER2_MSGF_DELETE) {
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
return;
}
if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
cmd |= HAMMER2_MSGF_REPLY;
}
- msg = hammer2_msg_alloc(pmp, 0, cmd);
- msg->any.head.error = error;
- msg->state = state;
- hammer2_msg_write(pmp, msg, NULL, NULL);
+ nmsg = hammer2_msg_alloc(msg->router, cmd);
+ nmsg->any.head.error = error;
+ nmsg->state = state;
+ hammer2_msg_write(nmsg, NULL, NULL);
}
#include "hammer2.h"
int
-hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_adhoc_input(hammer2_msg_t *msg)
{
kprintf("ADHOC INPUT MSG %08x\n", msg->any.head.cmd);
return(0);
}
int
-hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_dbg_rcvmsg(hammer2_msg_t *msg)
{
switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
case HAMMER2_DBG_SHELL:
/*
* Execute shell command (not supported atm)
*/
- hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
if (msg->aux_data) {
}
break;
default:
- hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
}
return(0);
/*
* Mesh network protocol structures.
*
+ * SPAN PROTOCOL
+ *
* The mesh is constructed from point-to-point streaming links with varying
- * levels of interconnectedness, forming a graph. The spanning tree protocol
- * running on each node transmits a LNK_SPAN transactional message to the
- * other end. The protocol collects LNK_SPAN messages from all sources,
- * aggregates them using a shortest-distance-path algorithm, and transmits
- * them over each link as well, creating a multplication within the topology.
- *
- * Any node in the graph may transmit a message to any other node by using
- * the msgid of the LNK_SPAN open transaction as the message's 'linkid'.
- * This identifies both sides so there is no 'source' and 'target' per-say.
- *
- * Open transactions are recorded by the source and the target, but not by
- * intermediate nodes in the route. Streaming protocols are used. If a
- * span element is lost its transaction will be aborted automatically (even
- * if other routes to the same target are available), and any related
- * messages will be aborted. If the span element was chosen for aggregation
- * this will propagate through the entire topology and thus ultimately reach
- * the target which used the aggregated span element, but does not
- * necessarily effect all paths in the topology.
- *
- * When a link failure occurs all SPANs related to that link are
- * transactionally closed. The SPANs are not deleted until closed in
- * both directions, thus the spanid serves as a placeholder allowing all
- * in-transit messages being routed over that spanid to be properly thrown
- * out. Once completely closed the spanid can be reused.
- *
- * NOTE: Multiple spans for the same physical {fsid,pfs_fsid} can be
- * forwarded, allowing concurrency within the topology.
- *
- * NOTE: It is important that messages in a lost route be aborted because
- * the messaging protocol expects serialization over any given route.
- * Only propagated spans are forwarded as spans to other nodes, so any
- * given open span transaction will represent a specific path.
- *
- * If a portion of the path in the middle of the topology is lost it
- * will propagate in both directions all the way to the ends that used
- * it. Intermediate route nodes DO NOT silently re-route messages to
- * another span. Messages in-flight will meet the updating SPAN and
- * simply be discarded by intermediate nodes. Ultimately the updating
- * SPAN reaches all end-points and auto-aborts the open transaction.
- *
- * If another path is available the transaction can be instantly
- * retried.
- *
- * NOTE: It is possible to route messages virtually using the msgid of any
- * open transaction instead of the msgid of a SPAN transaction, but
- * not recommended and not currently coded.
- *
- * NOTE: Both the msgid and the spanid are 64-bit fields and may be populated
- * with actual memory pointers (which simplifies the end-points).
- * However, all such identifiers must be indexed as appropriate by the
- * nodes and verified as being valid before any memory dereference
- * occurs, for obvious reasons.
- *
- * All message responses follow the SAME PATH that the original message
- * followed, but in reverse. This is an absolute requirement since messages
- * expecting replies record persistent state at each hop. Sequencing must
- * be preserved.
+ * levels of interconnectedness, forming a graph. Terminii in the graph
+ * are entities such as a HAMMER2 PFS or a network mount or other types
+ * of nodes.
+ *
+ * The spanning tree protocol runs symmetrically on every node. Each node
+ * transmits a representitive LNK_SPAN out all available connections. Nodes
+ * also receive LNK_SPANs from other nodes (obviously), and must aggregate,
+ * reduce, and relay those LNK_SPANs out all available connections, thus
+ * propagating the spanning tree. Any connection failure or topology change
+ * causes changes in the LNK_SPAN propagation.
+ *
+ * Each LNK_SPAN or LNK_SPAN relay represents a virtual circuit for routing
+ * purposes. In addition, each relay is chained in one direction,
+ * representing a 1:N fan-out (i.e. one received LNK_SPAN can be relayed out
+ * multiple connections). In order to be able to route a message via a
+ * LNK_SPAN over a deterministic route THE MESSAGE CAN ONLY FLOW FROM A
+ * REMOTE NODE TOWARDS OUR NODE (N:1 fan-in).
+ *
+ * This supports the requirement that we have both message serialization
+ * and positive feedback if a topology change breaks the chain of VCs
+ * the message is flowing over. A remote node sending a message to us
+ * will get positive feedback that the route was broken and can take suitable
+ * action to terminate the transaction with an error.
+ *
+ * TRANSACTIONAL REPLIES
+ *
+ * However, when we receive a command message from a remote node and we want
+ * to reply to it, we have a problem. We want the remote node to have
+ * positive feedback if our reply fails to make it, but if we use a virtual
+ * circuit based on the remote node's LNK_SPAN to us it will be a DIFFERENT
+ * virtual circuit than the one the remote node used to message us. That's
+ * a problem because it means we have no reliable way to notify the remote
+ * node if we get notified that our reply has failed.
+ *
+ * The solution is to first note the fact that the remote chose an optimal
+ * route to get to us, so the reverse should be true. The reason the VC
+ * might not exist over the same route in the reverse is because there may
+ * be multiple paths available with the same distance metric.
+ *
+ * But this also means that we can adjust the messaging protocols to
+ * propagate a LNK_SPAN from the remote to us WHILE the remote's command
+ * message is being sent to us, and it will not only likely be optimal but
+ * it might also already exist, and it will also guarantee that a reply
+ * failure will propagate back to both sides (because even though each
+ * direction is using a different VC chain, the two chains are still
+ * going along the same path).
+ *
+ * We communicate the return VC by having the relay adjust both the target
+ * and the source fields in the message, rather than just the target, on
+ * each relay. As of when the message gets to us the 'source' field will
+ * represent the VC for the return direction (and of course also identify
+ * the node the message came from).
+ *
+ * This way both sides get positive feedback if a topology change disrupts
+ * the VC for the transaction. We also get one additional guarantee, and
+ * that is no spurious messages. Messages simply die when the VC they are
+ * traveling over is broken, in either direction, simple as that.
+ * It makes managing message transactional states very easy.
*
* MESSAGE TRANSACTIONAL STATES
*
*/
struct hammer2_msg_hdr {
uint16_t magic; /* 00 sanity, synchro, endian */
- uint16_t reserved02; /* 02 size of header in bytes */
+ uint16_t reserved02; /* 02 */
uint32_t salt; /* 04 random salt helps w/crypto */
uint64_t msgid; /* 08 message transaction id */
- uint64_t spanid; /* 10 message routing id or 0 */
-
- uint32_t cmd; /* 18 flags | cmd | hdr_size / ALIGN */
- uint32_t aux_crc; /* 1C auxillary data crc */
- uint32_t aux_bytes; /* 20 auxillary data length (bytes) */
- uint32_t error; /* 24 error code or 0 */
- uint64_t aux_descr; /* 28 negotiated OOB data descr */
- uint64_t reserved30; /* 30 */
+ uint64_t source; /* 10 originator or 0 */
+ uint64_t target; /* 18 destination or 0 */
+
+ uint32_t cmd; /* 20 flags | cmd | hdr_size / ALIGN */
+ uint32_t aux_crc; /* 24 auxillary data crc */
+ uint32_t aux_bytes; /* 28 auxillary data length (bytes) */
+ uint32_t error; /* 2C error code or 0 */
+ uint64_t aux_descr; /* 30 negotiated OOB data descr */
uint32_t reserved38; /* 38 */
uint32_t hdr_crc; /* 3C (aligned) extended header crc */
};
static void hammer2_cluster_thread_wr(void *arg);
static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
-static int hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+static int hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg);
/*
* HAMMER2 vfs operations.
}
ccms_domain_init(&pmp->ccms_dom);
pmp->hmp = hmp;
+ pmp->router.pmp = pmp;
++hmp->pmp_count;
lockmgr(&hammer2_mntlk, LK_RELEASE);
kprintf("hammer2_mount hmp=%p pmpcnt=%d\n", hmp, hmp->pmp_count);
error = EINVAL;
break;
}
- msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
- pmp->mmsg, M_WAITOK | M_ZERO);
+ msg = hammer2_msg_alloc(&pmp->router, hdr.cmd);
msg->any.head = hdr;
msg->hdr_size = hbytes;
if (hbytes > sizeof(hdr)) {
* to the connection except for EALREADY which forces
* a discard without execution.
*/
- error = hammer2_state_msgrx(pmp, msg);
+ error = hammer2_state_msgrx(msg);
if (error) {
/*
* Raw protocol or connection error
*/
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
if (error == EALREADY)
error = 0;
} else if (msg->state && msg->state->func) {
* handling function installed for it.
*/
error = msg->state->func(msg->state, msg);
- hammer2_state_cleanuprx(pmp, msg);
+ hammer2_state_cleanuprx(msg);
} else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
HAMMER2_MSG_PROTO_LNK) {
/*
* Message related to the LNK protocol set
*/
- error = hammer2_msg_lnk_rcvmsg(pmp, msg);
- hammer2_state_cleanuprx(pmp, msg);
+ error = hammer2_msg_lnk_rcvmsg(msg);
+ hammer2_state_cleanuprx(msg);
} else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
HAMMER2_MSG_PROTO_DBG) {
/*
* Message related to the DBG protocol set
*/
- error = hammer2_msg_dbg_rcvmsg(pmp, msg);
- hammer2_state_cleanuprx(pmp, msg);
+ error = hammer2_msg_dbg_rcvmsg(msg);
+ hammer2_state_cleanuprx(msg);
} else {
/*
* Other higher-level messages (e.g. vnops)
*/
- error = hammer2_msg_adhoc_input(pmp, msg);
- hammer2_state_cleanuprx(pmp, msg);
+ error = hammer2_msg_adhoc_input(msg);
+ hammer2_state_cleanuprx(msg);
}
msg = NULL;
}
if (msg) {
if (msg->state && msg->state->msg == msg)
msg->state->msg = NULL;
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
}
if ((state = pmp->freerd_state) != NULL) {
* The transaction remains fully open for the duration of the
* connection.
*/
- msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE);
+ msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_CONN |
+ HAMMER2_MSGF_CREATE);
msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
name_len = sizeof(msg->any.lnk_conn.label) - 1;
bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
msg->any.lnk_conn.label[name_len] = 0;
- hammer2_msg_write(pmp, msg, hammer2_msg_conn_reply, pmp);
+ hammer2_msg_write(msg, hammer2_msg_conn_reply, pmp);
/*
* Transmit loop
TAILQ_REMOVE(&pmp->msgq, msg, qentry);
lockmgr(&pmp->msglk, LK_RELEASE);
- error = hammer2_state_msgtx(pmp, msg);
+ error = hammer2_state_msgtx(msg);
if (error == EALREADY) {
error = 0;
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
continue;
}
break;
}
}
- hammer2_state_cleanuptx(pmp, msg);
+ hammer2_state_cleanuptx(msg);
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
}
}
if (msg) {
if (msg->state && msg->state->msg == msg)
msg->state->msg = NULL;
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
}
while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
TAILQ_REMOVE(&pmp->msgq, msg, qentry);
if (msg->state && msg->state->msg == msg)
msg->state->msg = NULL;
- hammer2_msg_free(pmp, msg);
+ hammer2_msg_free(msg);
}
if ((state = pmp->freewr_state) != NULL) {
}
static int
-hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_lnk_rcvmsg(hammer2_msg_t *msg)
{
switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
+ /*
+ * reply & leave trans open
+ */
kprintf("CONN RECEIVE - (just ignore it)\n");
- hammer2_msg_result(pmp, msg, 0); /* reply & leave trans open */
+ hammer2_msg_result(msg, 0);
break;
case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
kprintf("SPAN RECEIVE - ADDED FROM CLUSTER\n");
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
- msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_SPAN |
- HAMMER2_MSGF_CREATE);
+ msg = hammer2_msg_alloc(&pmp->router, HAMMER2_LNK_SPAN |
+ HAMMER2_MSGF_CREATE);
msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
msg->any.lnk_span.label,
name_len);
msg->any.lnk_span.label[name_len] = 0;
- hammer2_msg_write(pmp, msg, hammer2_msg_span_reply, pmp);
+ hammer2_msg_write(msg, hammer2_msg_span_reply, pmp);
}
if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
kprintf("LNK_CONN transaction terminated by remote\n");
- hammer2_msg_reply(pmp, msg, 0);
+ hammer2_msg_reply(msg, 0);
}
return(0);
}