* Fix SPAN relay sort and sequencing bugs.
* Start reworking the APIs to accomodate routed messages. Start by
creating a hammer2_router structure and adjusting most of the msg
functions to pass it instead of the iocom.
* Fix hammer2_state races by moving the state allocation to
hammer2_msg_alloc() instead of hammer2_msg_write(). This gives
code a chance to assign the state->any.* field without having to
worry about the state getting ripped out from under us.
--- /dev/null
+
+TODO:
+ msg_lnk.c msg_lnk_signal rescans all connections. What we really
+ need to do is queue the conn (when possible) or the node
+ (when possible), or just flag it w/ a global, and then
+ only scan a subset.
+
+ link->dist propagation is still a bit screwy when there are a
+ lot of parallel network links.
#define SHOW_TAB 2
-static void shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void shell_rcvmsg(hammer2_msg_t *msg);
static void shell_ttymsg(hammer2_iocom_t *iocom);
-static void hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void hammer2_shell_parse(hammer2_msg_t *msg);
/************************************************************************
* SHELL *
fcntl(0, F_SETFL, O_NONBLOCK);
printf("debug: connected\n");
- msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
- hammer2_msg_write(&iocom, msg, NULL, NULL, NULL);
+ msg = hammer2_msg_alloc(&iocom.router, 0, HAMMER2_DBG_SHELL,
+ NULL, NULL);
+ hammer2_msg_write(msg);
hammer2_iocom_core(&iocom);
fprintf(stderr, "debug: disconnected\n");
close(fd);
*/
static
void
-shell_rcvmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+shell_rcvmsg(hammer2_msg_t *msg)
{
switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
case HAMMER2_LNK_ERROR:
* We send the commands, not accept them.
* (one-way message, not transactional)
*/
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
/*
break;
case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
fprintf(stderr, "Debug Shell is ignoring received LNK_CONN\n");
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE:
break;
*/
fprintf(stderr, "Unknown message: %s\n", hammer2_msg_str(msg));
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
}
}
if (len && buf[len - 1] == '\n')
buf[--len] = 0;
++len;
- msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
+ msg = hammer2_msg_alloc(&iocom->router, len, HAMMER2_DBG_SHELL,
+ NULL, NULL);
bcopy(buf, msg->aux_data, len);
- hammer2_msg_write(iocom, msg, NULL, NULL, NULL);
+ hammer2_msg_write(msg);
} else if (feof(stdin)) {
/*
* Set EOF flag without setting any error code for normal
* then finish up by outputting another prompt.
*/
void
-hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_dbg(hammer2_msg_t *msg)
{
switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
case HAMMER2_DBG_SHELL:
*/
if (msg->aux_data)
msg->aux_data[msg->aux_size - 1] = 0;
- hammer2_shell_parse(iocom, msg);
- hammer2_msg_reply(iocom, msg, 0);
+ hammer2_shell_parse(msg);
+ hammer2_msg_reply(msg, 0);
break;
case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
/*
write(2, msg->aux_data, strlen(msg->aux_data));
break;
default:
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
}
}
-static void shell_span(hammer2_iocom_t *iocom, char *cmdbuf);
-/*static void shell_tree(hammer2_iocom_t *iocom, char *cmdbuf);*/
+static void shell_span(hammer2_router_t *router, char *cmdbuf);
+/*static void shell_tree(hammer2_router_t *router, char *cmdbuf);*/
static void
-hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_shell_parse(hammer2_msg_t *msg)
{
+ hammer2_router_t *router = msg->router;
char *cmdbuf = msg->aux_data;
char *cmd = strsep(&cmdbuf, " \t");
if (cmd == NULL || *cmd == 0) {
;
} else if (strcmp(cmd, "span") == 0) {
- shell_span(iocom, cmdbuf);
+ shell_span(router, cmdbuf);
} else if (strcmp(cmd, "tree") == 0) {
- shell_tree(iocom, cmdbuf);
+ shell_tree(router, cmdbuf);
} else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
- iocom_printf(iocom, "help Command help\n");
- iocom_printf(iocom, "span <host> Span to target host\n");
- iocom_printf(iocom, "tree Dump spanning tree\n");
+ router_printf(router, "help Command help\n");
+ router_printf(router, "span <host> Span to target host\n");
+ router_printf(router, "tree Dump spanning tree\n");
} else {
- iocom_printf(iocom, "Unrecognized command: %s\n", cmd);
+ router_printf(router, "Unrecognized command: %s\n", cmd);
}
}
static void
-shell_span(hammer2_iocom_t *iocom, char *cmdbuf)
+shell_span(hammer2_router_t *router, char *cmdbuf)
{
const char *hostname = strsep(&cmdbuf, " \t");
pthread_t thread;
* Start master service
*/
if (fd < 0) {
- iocom_printf(iocom, "Connection to %s failed\n", hostname);
+ router_printf(router, "Connection to %s failed\n", hostname);
} else {
- iocom_printf(iocom, "Connected to %s\n", hostname);
+ router_printf(router, "Connected to %s\n", hostname);
pthread_create(&thread, NULL,
master_service, (void *)(intptr_t)fd);
/*pthread_join(thread, &res);*/
* not modified and stays intact. We use a one-way message with REPLY set
* to distinguish between a debug command and debug terminal output.
*
- * To prevent loops iocom_printf() can filter the message (cmd) related
- * to the iocom_printf(). We filter out DBG messages.
+ * To prevent loops router_printf() can filter the message (cmd) related
+ * to the router_printf(). We filter out DBG messages.
*/
void
-iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...)
+router_printf(hammer2_router_t *router, const char *ctl, ...)
{
hammer2_msg_t *rmsg;
va_list va;
va_end(va);
len = strlen(buf) + 1;
- rmsg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL |
- HAMMER2_MSGF_REPLY);
+ rmsg = hammer2_msg_alloc(router, len, HAMMER2_DBG_SHELL |
+ HAMMER2_MSGF_REPLY,
+ NULL, NULL);
bcopy(buf, rmsg->aux_data, len);
- hammer2_msg_write(iocom, rmsg, NULL, NULL, NULL);
+ hammer2_msg_write(rmsg);
}
/************************************************************************
#include "hammer2.h"
static void *master_accept(void *data);
-static void master_auth_state(hammer2_iocom_t *iocom);
-static void master_auth_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-static void master_link_state(hammer2_iocom_t *iocom);
-static void master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void master_auth_signal(hammer2_router_t *router);
+static void master_auth_rxmsg(hammer2_msg_t *msg);
+static void master_link_signal(hammer2_router_t *router);
+static void master_link_rxmsg(hammer2_msg_t *msg);
/*
* Start-up the master listener daemon for the machine.
fd = (int)(intptr_t)data;
hammer2_iocom_init(&iocom, fd, -1,
- master_auth_state, master_auth_rxmsg, NULL);
+ master_auth_signal,
+ master_auth_rxmsg,
+ NULL);
hammer2_iocom_core(&iocom);
fprintf(stderr,
* message operation. The connection has already been encrypted at
* this point.
*/
-static void master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg);
+static void master_auth_conn_rx(hammer2_msg_t *msg);
static
void
-master_auth_state(hammer2_iocom_t *iocom __unused)
+master_auth_signal(hammer2_router_t *router)
{
hammer2_msg_t *msg;
*
* XXX put additional authentication states here
*/
- msg = hammer2_msg_alloc(iocom, 0, HAMMER2_LNK_CONN |
- HAMMER2_MSGF_CREATE);
+ msg = hammer2_msg_alloc(router, 0, HAMMER2_LNK_CONN |
+ HAMMER2_MSGF_CREATE,
+ master_auth_conn_rx, NULL);
snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*");
- hammer2_msg_write(iocom, msg, master_auth_conn_rx, NULL, NULL);
+ hammer2_msg_write(msg);
- hammer2_iocom_restate(iocom,
- master_link_state, master_link_rxmsg, NULL);
+ hammer2_router_restate(router,
+ master_link_signal,
+ master_link_rxmsg,
+ NULL);
}
static
void
-master_auth_conn_rx(hammer2_state_t *state, hammer2_msg_t *msg)
+master_auth_conn_rx(hammer2_msg_t *msg)
{
if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
- hammer2_msg_reply(state->iocom, msg, 0);
+ hammer2_msg_reply(msg, 0);
}
static
void
-master_auth_rxmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg __unused)
+master_auth_rxmsg(hammer2_msg_t *msg __unused)
{
}
*/
static
void
-master_link_state(hammer2_iocom_t *iocom __unused)
+master_link_signal(hammer2_router_t *router)
{
+ hammer2_msg_lnk_signal(router);
}
static
void
-master_link_rxmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+master_link_rxmsg(hammer2_msg_t *msg)
{
hammer2_state_t *state;
uint32_t cmd;
if (state && state->func) {
assert(state->func != NULL);
- state->func(state, msg);
+ state->func(msg);
} else {
switch(cmd & HAMMER2_MSGF_PROTOS) {
case HAMMER2_MSG_PROTO_LNK:
- hammer2_msg_lnk(iocom, msg);
+ hammer2_msg_lnk(msg);
break;
case HAMMER2_MSG_PROTO_DBG:
- hammer2_msg_dbg(iocom, msg);
+ hammer2_msg_dbg(msg);
break;
default:
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
break;
}
}
void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
- void (*state_func)(hammer2_iocom_t *),
- void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *),
+ void (*state_func)(hammer2_router_t *),
+ void (*rcvmsg_func)(hammer2_msg_t *),
void (*altmsg_func)(hammer2_iocom_t *));
-void hammer2_iocom_restate(hammer2_iocom_t *iocom,
- void (*state_func)(hammer2_iocom_t *),
- void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *),
+void hammer2_router_restate(hammer2_router_t *router,
+ void (*state_func)(hammer2_router_t *),
+ void (*rcvmsg_func)(hammer2_msg_t *),
void (*altmsg_func)(hammer2_iocom_t *));
+void hammer2_router_signal(hammer2_router_t *router);
void hammer2_iocom_done(hammer2_iocom_t *iocom);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size,
- uint32_t cmd);
-void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
- uint32_t error);
-void hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
- uint32_t error);
+hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router,
+ size_t aux_size, uint32_t cmd,
+ void (*func)(hammer2_msg_t *), void *data);
+void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
-void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_free(hammer2_msg_t *msg);
void hammer2_iocom_core(hammer2_iocom_t *iocom);
hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
-void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
- void (*func)(hammer2_state_t *, hammer2_msg_t *),
- void *data, hammer2_state_t **statep);
+void hammer2_msg_write(hammer2_msg_t *msg);
void hammer2_iocom_drain(hammer2_iocom_t *iocom);
void hammer2_iocom_flush1(hammer2_iocom_t *iocom);
/*
* Msg protocol functions
*/
-void hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-void hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_lnk_signal(hammer2_router_t *router);
+void hammer2_msg_lnk(hammer2_msg_t *msg);
+void hammer2_msg_dbg(hammer2_msg_t *msg);
/*
* Crypto functions
const char *hammer2_msg_str(hammer2_msg_t *msg);
int hammer2_connect(const char *hostname);
-void shell_tree(hammer2_iocom_t *iocom, char *cmdbuf);
+void shell_tree(hammer2_router_t *router, char *cmdbuf);
void *master_service(void *data);
void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-void iocom_printf(hammer2_iocom_t *iocom, const char *ctl, ...);
+void router_printf(hammer2_router_t *router, const char *ctl, ...);
void *hammer2_alloc(size_t bytes);
void hammer2_free(void *ptr);
while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
assert(0); /* shouldn't happen */
TAILQ_REMOVE(&ioq->msgq, msg, qentry);
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
}
if ((msg = ioq->msg) != NULL) {
ioq->msg = NULL;
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
}
}
/*
* Initialize a low-level communications channel.
*
- * NOTE: The state_func() is called at least once from the loop and can be
+ * NOTE: The signal_func() is called at least once from the loop and can be
* re-armed via hammer2_iocom_restate().
*/
void
hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
- void (*state_func)(hammer2_iocom_t *),
- void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
+ void (*signal_func)(hammer2_router_t *),
+ void (*rcvmsg_func)(hammer2_msg_t *),
void (*altmsg_func)(hammer2_iocom_t *))
{
bzero(iocom, sizeof(*iocom));
- iocom->state_callback = state_func;
- iocom->rcvmsg_callback = rcvmsg_func;
- iocom->altmsg_callback = altmsg_func;
+ iocom->router.signal_callback = signal_func;
+ iocom->router.rcvmsg_callback = rcvmsg_func;
+ iocom->router.altmsg_callback = altmsg_func;
pthread_mutex_init(&iocom->mtx, NULL);
- RB_INIT(&iocom->staterd_tree);
- RB_INIT(&iocom->statewr_tree);
+ RB_INIT(&iocom->router.staterd_tree);
+ RB_INIT(&iocom->router.statewr_tree);
TAILQ_INIT(&iocom->freeq);
TAILQ_INIT(&iocom->freeq_aux);
TAILQ_INIT(&iocom->addrq);
- TAILQ_INIT(&iocom->txmsgq);
+ TAILQ_INIT(&iocom->router.txmsgq);
+ iocom->router.iocom = iocom;
iocom->sock_fd = sock_fd;
iocom->alt_fd = alt_fd;
iocom->flags = HAMMER2_IOCOMF_RREQ;
- if (state_func)
+ if (signal_func)
iocom->flags |= HAMMER2_IOCOMF_SWORK;
hammer2_ioq_init(iocom, &iocom->ioq_rx);
hammer2_ioq_init(iocom, &iocom->ioq_tx);
* the recevmsg_func and the sendmsg_func is called at least once.
*/
void
-hammer2_iocom_restate(hammer2_iocom_t *iocom,
- void (*state_func)(hammer2_iocom_t *),
- void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
+hammer2_router_restate(hammer2_router_t *router,
+ void (*signal_func)(hammer2_router_t *),
+ void (*rcvmsg_func)(hammer2_msg_t *msg),
void (*altmsg_func)(hammer2_iocom_t *))
{
- iocom->state_callback = state_func;
- iocom->rcvmsg_callback = rcvmsg_func;
- iocom->altmsg_callback = altmsg_func;
- if (state_func)
- iocom->flags |= HAMMER2_IOCOMF_SWORK;
+ router->signal_callback = signal_func;
+ router->rcvmsg_callback = rcvmsg_func;
+ router->altmsg_callback = altmsg_func;
+ if (signal_func)
+ router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
else
- iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
+ router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
+}
+
+void
+hammer2_router_signal(hammer2_router_t *router)
+{
+ if (router->signal_callback)
+ router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
}
/*
* Allocate a new one-way message.
*/
hammer2_msg_t *
-hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
+hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
+ void (*func)(hammer2_msg_t *), void *data)
{
+ hammer2_state_t *state = NULL;
+ hammer2_iocom_t *iocom = router->iocom;
hammer2_msg_t *msg;
int hbytes;
if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
TAILQ_REMOVE(&iocom->freeq, msg, qentry);
}
+ if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) ==
+ HAMMER2_MSGF_CREATE) {
+ /*
+ * Create state when CREATE is set without REPLY.
+ *
+ * NOTE: CREATE in txcmd handled by hammer2_msg_write()
+ * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx()
+ */
+ state = malloc(sizeof(*state));
+ bzero(state, sizeof(*state));
+ state->iocom = iocom;
+ state->flags = HAMMER2_STATE_DYNAMIC;
+ state->msgid = (uint64_t)(uintptr_t)state;
+ /* XXX set state->spanid from router */
+ state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
+ HAMMER2_MSGF_DELETE);
+ state->rxcmd = HAMMER2_MSGF_REPLY;
+ state->func = func;
+ state->any.any = data;
+ pthread_mutex_lock(&iocom->mtx);
+ RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state);
+ pthread_mutex_unlock(&iocom->mtx);
+ state->flags |= HAMMER2_STATE_INSERTED;
+ }
pthread_mutex_unlock(&iocom->mtx);
if (msg == NULL) {
msg = malloc(sizeof(*msg));
msg->any.head.cmd = cmd;
msg->any.head.aux_descr = 0;
msg->any.head.aux_crc = 0;
-
+ msg->router = router;
+ if (state) {
+ msg->state = state;
+ state->msg = msg;
+ msg->any.head.msgid = state->msgid;
+ }
return (msg);
}
*/
static
void
-hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_free_locked(hammer2_msg_t *msg)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
+
msg->state = NULL;
if (msg->aux_data)
TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
}
void
-hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_free(hammer2_msg_t *msg)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
+
pthread_mutex_lock(&iocom->mtx);
- hammer2_msg_free_locked(iocom, msg);
+ hammer2_msg_free_locked(msg);
pthread_mutex_unlock(&iocom->mtx);
}
if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
- iocom->state_callback(iocom);
+ iocom->router.signal_callback(&iocom->router);
}
/*
read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
iocom->flags |= HAMMER2_IOCOMF_RWORK;
iocom->flags |= HAMMER2_IOCOMF_WWORK;
- if (TAILQ_FIRST(&iocom->txmsgq))
+ if (TAILQ_FIRST(&iocom->router.txmsgq))
hammer2_iocom_flush1(iocom);
}
fprintf(stderr, "receive %s\n",
hammer2_msg_str(msg));
}
- iocom->rcvmsg_callback(iocom, msg);
+ iocom->router.rcvmsg_callback(msg);
hammer2_state_cleanuprx(iocom, msg);
}
}
if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
- iocom->altmsg_callback(iocom);
+ iocom->router.altmsg_callback(iocom);
}
}
}
/*
* Allocate the message, the next state will fill it in.
*/
- msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
+ msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0,
+ NULL, NULL);
ioq->msg = msg;
/*
error = hammer2_state_msgrx(iocom, msg);
if (error) {
if (error == HAMMER2_IOQ_ERROR_EALREADY) {
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
goto again;
}
ioq->error = error;
*/
assert(ioq->msg == msg);
if (msg) {
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
ioq->msg = NULL;
}
* LNK_ERROR (that the session can detect) when no
* transactions remain.
*/
- msg = hammer2_msg_alloc(iocom, 0, 0);
+ msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL);
bzero(&msg->any.head, sizeof(msg->any.head));
msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
msg->any.head.cmd = HAMMER2_LNK_ERROR;
pthread_mutex_lock(&iocom->mtx);
hammer2_iocom_drain(iocom);
- if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
+ if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) {
/*
* Active remote transactions are still present.
* Simulate the other end sending us a DELETE.
*/
if (state->rxcmd & HAMMER2_MSGF_DELETE) {
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
msg = NULL;
} else {
/*state->txcmd |= HAMMER2_MSGF_DELETE;*/
msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
HAMMER2_MSGF_DELETE;
}
- } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
+ } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) {
/*
* Active local transactions are still present.
* Simulate the other end sending us a DELETE.
*/
if (state->rxcmd & HAMMER2_MSGF_DELETE) {
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
msg = NULL;
} else {
msg->state = state;
iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
TAILQ_INIT(&tmpq);
pthread_mutex_lock(&iocom->mtx);
- while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
- TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
+ while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) {
+ TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry);
TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
}
pthread_mutex_unlock(&iocom->mtx);
* Write a message to an iocom, with additional state processing.
*/
void
-hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
- void (*func)(hammer2_state_t *, hammer2_msg_t *),
- void *data,
- hammer2_state_t **statep)
+hammer2_msg_write(hammer2_msg_t *msg)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
hammer2_state_t *state;
char dummy;
* Existing transaction (could be reply). It is also
* possible for this to be the first reply (CREATE is set),
* in which case we populate state->txcmd.
+ *
+ * state->txcmd is adjusted to hold the final message cmd,
+ * and we also be sure to set the CREATE bit here. We did
+ * not set it in hammer2_msg_alloc() because that would have
+ * not been serialized (state could have gotten ripped out
+ * from under the message prior to it being transmitted).
*/
+ if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE |
+ HAMMER2_MSGF_REPLY)) ==
+ HAMMER2_MSGF_CREATE) {
+ state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+ }
msg->any.head.msgid = state->msgid;
msg->any.head.spanid = state->spanid;
- if (func) {
- state->func = func;
- state->any.any = data;
- }
assert(((state->txcmd ^ msg->any.head.cmd) &
HAMMER2_MSGF_REPLY) == 0);
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
- } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
- /*
- * No existing state and CREATE is set, create new
- * state for outgoing command. This can't happen if
- * REPLY is set as the state would already exist for
- * a transaction reply.
- */
- assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
-
- state = malloc(sizeof(*state));
- bzero(state, sizeof(*state));
- state->iocom = iocom;
- state->flags = HAMMER2_STATE_DYNAMIC;
- state->msg = msg;
- state->msgid = (uint64_t)(uintptr_t)state;
- state->spanid = msg->any.head.spanid;
- state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
- state->rxcmd = HAMMER2_MSGF_REPLY;
- state->func = func;
- state->any.any = data;
- pthread_mutex_lock(&iocom->mtx);
- RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
- pthread_mutex_unlock(&iocom->mtx);
- state->flags |= HAMMER2_STATE_INSERTED;
- msg->state = state;
- msg->any.head.msgid = state->msgid;
- /* spanid set by caller */
} else {
msg->any.head.msgid = 0;
- /* spanid set by caller */
+ /* XXX set spanid by router */
}
- if (statep)
- *statep = state;
-
/*
* Queue it for output, wake up the I/O pthread. Note that the
* I/O thread is responsible for generating the CRCs and encryption.
*/
- TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
+ TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry);
dummy = 0;
write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
pthread_mutex_unlock(&iocom->mtx);
* The reply contains no extended data.
*/
void
-hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
hammer2_state_t *state = msg->state;
hammer2_msg_t *nmsg;
uint32_t cmd;
if (state) {
if (state->txcmd & HAMMER2_MSGF_DELETE)
return;
- if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
- cmd |= HAMMER2_MSGF_CREATE;
if (state->txcmd & HAMMER2_MSGF_REPLY)
cmd |= HAMMER2_MSGF_REPLY;
cmd |= HAMMER2_MSGF_DELETE;
cmd |= HAMMER2_MSGF_REPLY;
}
- nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+ /*
+ * Allocate the message and associate it with the existing state.
+ * We cannot pass MSGF_CREATE to msg_alloc() because that may
+ * allocate new state. We have our state already.
+ */
+ nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+ if (state) {
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+ }
nmsg->any.head.error = error;
- nmsg->state = msg->state;
- hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
+ nmsg->state = state;
+ hammer2_msg_write(nmsg);
}
/*
* later.
*/
void
-hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
{
+ hammer2_iocom_t *iocom = msg->router->iocom;
hammer2_state_t *state = msg->state;
hammer2_msg_t *nmsg;
uint32_t cmd;
if (state) {
if (state->txcmd & HAMMER2_MSGF_DELETE)
return;
- if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
- cmd |= HAMMER2_MSGF_CREATE;
if (state->txcmd & HAMMER2_MSGF_REPLY)
cmd |= HAMMER2_MSGF_REPLY;
/* continuing transaction, do not set MSGF_DELETE */
cmd |= HAMMER2_MSGF_REPLY;
}
- nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+ nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
+ if (state) {
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+ }
nmsg->any.head.error = error;
nmsg->state = state;
- hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
+ hammer2_msg_write(nmsg);
}
/*
return;
/*
- * We must also set CREATE if this is our first response to a
- * remote command.
- */
- if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
- cmd |= HAMMER2_MSGF_CREATE;
-
- /*
* Set REPLY if the other end initiated the command. Otherwise
* we are the command direction.
*/
if (state->txcmd & HAMMER2_MSGF_REPLY)
cmd |= HAMMER2_MSGF_REPLY;
- nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
+ nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL);
+ if (state) {
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
+ }
nmsg->any.head.error = error;
nmsg->state = state;
- hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
+ hammer2_msg_write(nmsg);
}
/************************************************************************
pthread_mutex_lock(&iocom->mtx);
if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
state = RB_FIND(hammer2_state_tree,
- &iocom->statewr_tree, &dummy);
+ &iocom->router.statewr_tree, &dummy);
} else {
state = RB_FIND(hammer2_state_tree,
- &iocom->staterd_tree, &dummy);
+ &iocom->router.staterd_tree, &dummy);
}
msg->state = state;
pthread_mutex_unlock(&iocom->mtx);
state->spanid = msg->any.head.spanid;
msg->state = state;
pthread_mutex_lock(&iocom->mtx);
- RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
+ RB_INSERT(hammer2_state_tree,
+ &iocom->router.staterd_tree, state);
pthread_mutex_unlock(&iocom->mtx);
error = 0;
if (DebugOpt) {
* Free a non-transactional message, there is no state
* to worry about.
*/
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
} else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
/*
* Message terminating transaction, destroy the related
if (state->rxcmd & HAMMER2_MSGF_REPLY) {
assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
RB_REMOVE(hammer2_state_tree,
- &iocom->statewr_tree, state);
+ &iocom->router.statewr_tree, state);
} else {
assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
RB_REMOVE(hammer2_state_tree,
- &iocom->staterd_tree, state);
+ &iocom->router.staterd_tree, state);
}
state->flags &= ~HAMMER2_STATE_INSERTED;
hammer2_state_free(state);
;
}
pthread_mutex_unlock(&iocom->mtx);
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
} else if (state->msg != msg) {
/*
* Message not terminating transaction, leave state intact
* and free message if it isn't the CREATE message.
*/
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
}
}
hammer2_state_t *state;
if ((state = msg->state) == NULL) {
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
} else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
pthread_mutex_lock(&iocom->mtx);
state->txcmd |= HAMMER2_MSGF_DELETE;
if (state->txcmd & HAMMER2_MSGF_REPLY) {
assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
RB_REMOVE(hammer2_state_tree,
- &iocom->staterd_tree, state);
+ &iocom->router.staterd_tree, state);
} else {
assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
RB_REMOVE(hammer2_state_tree,
- &iocom->statewr_tree, state);
+ &iocom->router.statewr_tree, state);
}
state->flags &= ~HAMMER2_STATE_INSERTED;
hammer2_state_free(state);
;
}
pthread_mutex_unlock(&iocom->mtx);
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
} else if (state->msg != msg) {
- hammer2_msg_free(iocom, msg);
+ hammer2_msg_free(msg);
}
}
msg = state->msg;
state->msg = NULL;
if (msg)
- hammer2_msg_free_locked(iocom, msg);
+ hammer2_msg_free_locked(msg);
free(state);
/*
* We may have to wake up the rx code.
*/
if (iocom->ioq_rx.error &&
- RB_EMPTY(&iocom->staterd_tree) &&
- RB_EMPTY(&iocom->statewr_tree)) {
+ RB_EMPTY(&iocom->router.staterd_tree) &&
+ RB_EMPTY(&iocom->router.statewr_tree)) {
dummy = 0;
write(iocom->wakeupfds[1], &dummy, 1);
}
* an active transaction is lost, the related messages will be fully aborted
* and the higher protocol levels will retry as appropriate.
*
+ * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
+ * back to the originator. Only the originator keeps tracks of a message.
+ * Routers just pass it through. If a route is lost during transit the
+ * message is simply thrown away.
+ *
* It is also important to note that several paths to the same PFS can be
* propagated along the same link, which allows concurrency and even
* redundancy over several network interfaces or via different routes through
struct h2span_node *node; /* related node */
int32_t dist;
struct h2span_relay_queue relayq; /* relay out */
+ struct hammer2_router router;
};
/*
return(-1);
if (link1->dist > link2->dist)
return(1);
- if ((intptr_t)link1 < (intptr_t)link2)
+ if (link1->state->msgid < link2->state->msgid)
return(-1);
- if ((intptr_t)link1 > (intptr_t)link2)
+ if (link1->state->msgid > link2->state->msgid)
return(1);
return(0);
}
int
h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
{
- if ((intptr_t)relay1->link->node < (intptr_t)relay2->link->node)
+ h2span_link_t *link1 = relay1->link;
+ h2span_link_t *link2 = relay2->link;
+
+ if ((intptr_t)link1->node < (intptr_t)link2->node)
return(-1);
- if ((intptr_t)relay1->link->node > (intptr_t)relay2->link->node)
+ if ((intptr_t)link1->node > (intptr_t)link2->node)
return(1);
- if ((intptr_t)relay1->link->dist < (intptr_t)relay2->link->dist)
+ if (link1->dist < link2->dist)
return(-1);
- if ((intptr_t)relay1->link->dist > (intptr_t)relay2->link->dist)
+ if (link1->dist > link2->dist)
return(1);
- if ((intptr_t)relay1->link < (intptr_t)relay2->link)
+ if (link1->state->msgid < link2->state->msgid)
return(-1);
- if ((intptr_t)relay1->link > (intptr_t)relay2->link)
+ if (link1->state->msgid > link2->state->msgid)
return(1);
return(0);
}
static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
-static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
-static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
-static void hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_lnk_span(hammer2_msg_t *msg);
+static void hammer2_lnk_conn(hammer2_msg_t *msg);
+static void hammer2_lnk_relay(hammer2_msg_t *msg);
static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
static void hammer2_relay_delete(h2span_relay_t *relay);
+void
+hammer2_msg_lnk_signal(hammer2_router_t *router __unused)
+{
+ pthread_mutex_lock(&cluster_mtx);
+ hammer2_relay_scan(NULL, NULL);
+ pthread_mutex_unlock(&cluster_mtx);
+}
+
/*
* Receive a HAMMER2_MSG_PROTO_LNK message. This only called for
* one-way and opening-transactions since state->func will be assigned
* in all other cases.
*/
void
-hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_lnk(hammer2_msg_t *msg)
{
switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
case HAMMER2_LNK_CONN:
- hammer2_lnk_conn(msg->state, msg);
+ hammer2_lnk_conn(msg);
break;
case HAMMER2_LNK_SPAN:
- hammer2_lnk_span(msg->state, msg);
+ hammer2_lnk_span(msg);
break;
default:
fprintf(stderr,
"MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_NOSUPP);
+ hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
/* state invalid after reply */
break;
}
}
void
-hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_lnk_conn(hammer2_msg_t *msg)
{
+ hammer2_state_t *state = msg->state;
h2span_connect_t *conn;
h2span_relay_t *relay;
char *alloc = NULL;
state->any.conn = conn;
TAILQ_INSERT_TAIL(&connq, conn, entry);
- hammer2_msg_result(state->iocom, msg, 0);
+ hammer2_msg_result(msg, 0);
+#if 0
/*
* Span-synchronize all nodes with the new connection
*/
hammer2_relay_scan(conn, NULL);
+#endif
+ hammer2_router_signal(msg->router);
}
/*
TAILQ_REMOVE(&connq, conn, entry);
hammer2_free(conn);
- hammer2_msg_reply(state->iocom, msg, 0);
+ hammer2_msg_reply(msg, 0);
/* state invalid after reply */
}
pthread_mutex_unlock(&cluster_mtx);
}
void
-hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_lnk_span(hammer2_msg_t *msg)
{
+ hammer2_state_t *state = msg->state;
h2span_cluster_t dummy_cls;
h2span_node_t dummy_node;
h2span_cluster_t *cls;
h2span_relay_t *relay;
char *alloc = NULL;
+ assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+
pthread_mutex_lock(&cluster_mtx);
/*
* On transaction start we initialize the tracking infrastructure
*/
if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+ assert(state->func == NULL);
state->func = hammer2_lnk_span;
msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
- fprintf(stderr, "LNK_SPAN: %s/%s dist=%d\n",
- hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
- &alloc),
- msg->any.lnk_span.label,
- msg->any.lnk_span.dist);
- free(alloc);
-
/*
* Find the cluster
*/
slink->dist = msg->any.lnk_span.dist;
slink->state = state;
state->any.link = slink;
+
+ /*
+ * Embedded router structure in link for message forwarding.
+ */
+ TAILQ_INIT(&slink->router.txmsgq);
+ slink->router.iocom = state->iocom;
+ slink->router.link = slink;
+
RB_INSERT(h2span_link_tree, &node->tree, slink);
+ fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n",
+ msg->router->iocom,
+ slink,
+ hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
+ &alloc),
+ msg->any.lnk_span.label,
+ msg->any.lnk_span.dist);
+ free(alloc);
+
+#if 0
hammer2_relay_scan(NULL, node);
+#endif
+ hammer2_router_signal(msg->router);
}
/*
node = slink->node;
cls = node->cls;
+ fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n",
+ msg->router->iocom,
+ slink,
+ hammer2_uuid_to_str(&cls->pfs_clid, &alloc),
+ state->msg->any.lnk_span.label,
+ state->msg->any.lnk_span.dist);
+ free(alloc);
+
/*
* Clean out all relays. This requires terminating each
* relay transaction.
* it doesn't then all related relays have already been
* removed and there's nothing left to do.
*/
+#if 0
if (node)
hammer2_relay_scan(NULL, node);
+#endif
+ if (node)
+ hammer2_router_signal(msg->router);
}
pthread_mutex_unlock(&cluster_mtx);
* XXX MPRACE on state structure
*/
static void
-hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg)
+hammer2_lnk_relay(hammer2_msg_t *msg)
{
+ hammer2_state_t *state = msg->state;
h2span_relay_t *relay;
+ assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
+
if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
pthread_mutex_lock(&cluster_mtx);
if ((relay = state->any.relay) != NULL) {
info.relay = NULL;
/*
- * Locate the first related relay for the connection. relay will
- * be NULL if there were none.
+ * Locate the first related relay for the node on this connection.
+ * relay will be NULL if there were none.
*/
RB_SCAN(h2span_relay_tree, &conn->tree,
hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
* Iterate the node's links (received SPANs) in distance order,
* lowest (best) dist first.
*/
+ /* fprintf(stderr, "LOOP\n"); */
RB_FOREACH(slink, h2span_link_tree, &node->tree) {
/*
+ fprintf(stderr, "SLINK %p RELAY %p(%p)\n",
+ slink, relay, relay ? relay->link : NULL);
+ */
+ /*
* PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
*
* Track relays while iterating the best links and construct
*
* (If some prior better link was removed it would have also
* removed the relay, so the relay can only match exactly or
- * be worst).
+ * be worse).
*/
if (relay && relay->link == slink) {
/*
- * Match, get the next relay to match against the
- * next slink.
+ * Match, relay already in-place, get the next
+ * relay to match against the next slink.
*/
relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
if (--count == 0)
* do not relay. This prevents endless closed
* loops with ever-incrementing distances when
* the seed span is lost in the graph.
+ *
+ * All later spans will also be too far away so
+ * we can break out of the loop.
*/
- /* no code needed */
+ break;
} else {
/*
* No match, distance is ok, construct a new relay.
+ * (slink is better than relay).
*/
hammer2_msg_t *msg;
assert(relay == NULL ||
- relay->link->dist <= slink->dist);
+ relay->link->node != slink->node ||
+ relay->link->dist >= slink->dist);
relay = hammer2_alloc(sizeof(*relay));
relay->conn = conn;
relay->link = slink;
+ msg = hammer2_msg_alloc(&conn->state->iocom->router, 0,
+ HAMMER2_LNK_SPAN |
+ HAMMER2_MSGF_CREATE,
+ hammer2_lnk_relay, relay);
+ relay->state = msg->state;
+ msg->any.lnk_span = slink->state->msg->any.lnk_span;
+ msg->any.lnk_span.dist = slink->dist + 1;
+
RB_INSERT(h2span_relay_tree, &conn->tree, relay);
TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
- msg = hammer2_msg_alloc(conn->state->iocom, 0,
- HAMMER2_LNK_SPAN |
- HAMMER2_MSGF_CREATE);
- msg->any.lnk_span = slink->state->msg->any.lnk_span;
- ++msg->any.lnk_span.dist; /* XXX add weighting */
+ hammer2_msg_write(msg);
- hammer2_msg_write(conn->state->iocom, msg,
- hammer2_lnk_relay, relay,
- &relay->state);
fprintf(stderr,
- "RELAY SPAN ON CLS=%p NODE=%p DIST=%d "
+ "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
"FD %d state %p\n",
+ slink,
+ relay,
node->cls, node, slink->dist,
conn->state->iocom->sock_fd, relay->state);
hammer2_relay_delete(h2span_relay_t *relay)
{
fprintf(stderr,
- "RELAY DELETE ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
+ "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
+ relay->link,
+ relay,
relay->link->node->cls, relay->link->node,
relay->link->dist,
relay->conn->state->iocom->sock_fd, relay->state);
hammer2_free(relay);
}
+/************************************************************************
+ * ROUTER *
+ ************************************************************************
+ *
+ * Provides route functions to msg.c
+ */
+
+#if 0
+/*
+ * Acquire a persistent router structure given the cluster and node ids.
+ * Messages can be transacted via this structure while held. If the route
+ * is lost messages will return failure.
+ */
+hammer2_router_t *
+hammer2_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
+{
+}
+
+/*
+ * Release previously acquired router.
+ */
+void
+hammer2_router_put(hammer2_router_t *router)
+{
+}
+#endif
+
/*
* Dumps the spanning tree
*/
void
-shell_tree(hammer2_iocom_t *iocom, char *cmdbuf __unused)
+shell_tree(hammer2_router_t *router, char *cmdbuf __unused)
{
h2span_cluster_t *cls;
h2span_node_t *node;
pthread_mutex_lock(&cluster_mtx);
RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
- iocom_printf(iocom, "Cluster %s\n",
+ router_printf(router, "Cluster %s\n",
hammer2_uuid_to_str(&cls->pfs_clid, &uustr));
RB_FOREACH(node, h2span_node_tree, &cls->tree) {
- iocom_printf(iocom, " Node %s (%s)\n",
+ router_printf(router, " Node %s (%s)\n",
hammer2_uuid_to_str(&node->pfs_fsid, &uustr),
node->label);
RB_FOREACH(slink, h2span_link_tree, &node->tree) {
- iocom_printf(iocom, "\tLink dist=%d via %d\n",
+ router_printf(router, "\tLink dist=%d via %d\n",
slink->dist,
slink->state->iocom->sock_fd);
}
int flags;
int error;
struct hammer2_msg *msg;
- void (*func)(struct hammer2_state *, struct hammer2_msg *);
+ void (*func)(struct hammer2_msg *);
union {
void *any;
struct h2span_link *link;
struct hammer2_msg {
TAILQ_ENTRY(hammer2_msg) qentry;
+ struct hammer2_router *router;
struct hammer2_state *state;
size_t hdr_size;
size_t aux_size;
#define HAMMER2_IOQ_MAXIOVEC 16
/*
+ * hammer2_router - governs the routing of a message. Passed into
+ * hammer2_msg_write.
+ *
+ * The router is either connected to an iocom (socket) directly, or
+ * connected to a SPAN transaction (h2span_link structure).
+ */
+struct hammer2_router {
+ struct hammer2_iocom *iocom;
+ struct h2span_link *link; /* non-NULL if indirect */
+ void (*signal_callback)(struct hammer2_router *);
+ void (*rcvmsg_callback)(struct hammer2_msg *);
+ void (*altmsg_callback)(struct hammer2_iocom *);
+ struct hammer2_state_tree staterd_tree; /* active messages */
+ struct hammer2_state_tree statewr_tree; /* active messages */
+ hammer2_msg_queue_t txmsgq; /* tx msgq from remote */
+ int refs; /* refs prevent destruction */
+};
+
+typedef struct hammer2_router hammer2_router_t;
+
+/*
* hammer2_iocom - governs a messaging stream connection
*/
struct hammer2_iocom {
hammer2_msg_queue_t freeq; /* free msgs hdr only */
hammer2_msg_queue_t freeq_aux; /* free msgs w/aux_data */
struct hammer2_address_queue addrq; /* source/target addrs */
- void (*state_callback)(struct hammer2_iocom *);
- void (*rcvmsg_callback)(struct hammer2_iocom *,
- struct hammer2_msg *);
- void (*altmsg_callback)(struct hammer2_iocom *);
int sock_fd; /* comm socket or pipe */
int alt_fd; /* thread signal, tty, etc */
int wakeupfds[2]; /* pipe wakes up iocom thread */
int rxmisc;
int txmisc;
char sess[HAMMER2_AES_KEY_SIZE]; /* aes_256_cbc key */
- struct hammer2_state_tree staterd_tree; /* active messages */
- struct hammer2_state_tree statewr_tree; /* active messages */
- hammer2_msg_queue_t txmsgq; /* tx msgq from remote */
+ struct hammer2_router router;
pthread_mutex_t mtx; /* mutex for state*tree/rmsgq */
};