PROG= hammer2
-SRCS= main.c subs.c icrc.c msg.c crypto.c
+SRCS= main.c subs.c icrc.c crypto.c
SRCS+= cmd_remote.c cmd_snapshot.c cmd_pfs.c
SRCS+= cmd_service.c cmd_leaf.c cmd_debug.c
SRCS+= cmd_rsa.c cmd_stat.c
+SRCS+= msg.c msg_lnk.c
#MAN= hammer2.8
NOMAN= TRUE
DEBUG_FLAGS=-g
printf("debug: connected\n");
msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
- hammer2_ioq_write(&iocom, msg);
+ hammer2_msg_write(&iocom, msg, NULL, NULL);
hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty);
fprintf(stderr, "debug: disconnected\n");
++len;
msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
bcopy(buf, msg->aux_data, len);
- hammer2_ioq_write(iocom, msg);
+ hammer2_msg_write(iocom, msg, NULL, NULL);
} else {
/*
* Set EOF flag without setting any error code for normal
* then finish up by outputting another prompt.
*/
void
-hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
{
- if (msg->aux_data)
- msg->aux_data[msg->aux_size - 1] = 0;
- if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+ switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
+ case HAMMER2_DBG_SHELL:
+ /*
+ * This is a command which we must process.
+ * When we are finished we generate a final reply.
+ */
+ if (msg->aux_data)
+ msg->aux_data[msg->aux_size - 1] = 0;
+ hammer2_shell_parse(iocom, msg);
+ hammer2_msg_reply(iocom, msg, 0);
+ break;
+ case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
/*
* A reply just prints out the string. No newline is added
* (it is expected to be embedded if desired).
*/
if (msg->aux_data)
+ msg->aux_data[msg->aux_size - 1] = 0;
+ if (msg->aux_data)
write(2, msg->aux_data, strlen(msg->aux_data));
- } else {
- /*
- * Otherwise this is a command which we must process.
- * When we are finished we generate a final reply.
- */
- hammer2_shell_parse(iocom, msg);
- hammer2_msg_reply(iocom, msg, 0);
+ break;
+ default:
+ hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
+ break;
}
}
HAMMER2_MSGF_REPLY);
bcopy(buf, rmsg->aux_data, len);
- hammer2_ioq_write(iocom, rmsg);
+ hammer2_msg_write(iocom, rmsg, NULL, NULL);
}
/************************************************************************
hammer2_pfstype_to_str(media.ipdata.pfs_type));
tabprintf(tab, "pfs_inum 0x%016jx\n",
(uintmax_t)media.ipdata.pfs_inum);
- tabprintf(tab, "pfs_id %s\n",
- hammer2_uuid_to_str(&media.ipdata.pfs_id,
+ tabprintf(tab, "pfs_clid %s\n",
+ hammer2_uuid_to_str(&media.ipdata.pfs_clid,
&str));
tabprintf(tab, "pfs_fsid %s\n",
hammer2_uuid_to_str(&media.ipdata.pfs_fsid,
printf("%02x ", pfs.pfs_type);
break;
}
- uuid_to_string(&pfs.pfs_id, &pfs_id_str, &status);
+ uuid_to_string(&pfs.pfs_clid, &pfs_id_str, &status);
printf("%s ", pfs_id_str);
free(pfs_id_str);
pfs_id_str = NULL;
snprintf(pfs.name, sizeof(pfs.name), "%s", name);
pfs.pfs_type = pfs_type;
if (uuid_str) {
- uuid_from_string(uuid_str, &pfs.pfs_id, &status);
+ uuid_from_string(uuid_str, &pfs.pfs_clid, &status);
} else {
- uuid_create(&pfs.pfs_id, &status);
+ uuid_create(&pfs.pfs_clid, &status);
}
if (status == uuid_s_ok)
uuid_create(&pfs.pfs_fsid, &status);
static void master_link_rx(hammer2_iocom_t *iocom);
static void master_link_tx(hammer2_iocom_t *iocom);
-static void hammer2_lnk_span(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-
/*
* Start-up the master listener daemon for the machine.
*
master_link_rx(hammer2_iocom_t *iocom)
{
hammer2_msg_t *msg;
+ hammer2_state_t *state;
uint32_t cmd;
while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
(msg = hammer2_ioq_read(iocom)) != NULL) {
/*
- * Switch on the transactional cmd, that is the original
- * msg->any.head.cmd that opened the transaction. The actual
- * msg might be different. The original msg cannot have
- * REPLY set by definition (but of course the currenet msg
- * might), so we don't bother with case statements for REPLY
- * for command sequences we expet to be transactional.
+ * If the message state has a function established we just
+ * call the function, otherwise we call the appropriate
+ * link-level protocol related to the original command and
+ * let it sort it out.
*
* Non-transactional one-off messages, on the otherhand,
* might have REPLY set.
*/
- if (msg->state) {
- cmd = msg->state->msg->any.head.cmd;
+ state = msg->state;
+ if (state) {
+ cmd = state->msg->any.head.cmd;
fprintf(stderr,
"MSGRX persist=%08x cmd=%08x error %d\n",
cmd, msg->any.head.cmd, msg->any.head.error);
"MSGRX persist=-------- cmd=%08x error %d\n",
cmd, msg->any.head.error);
}
-
- switch(cmd & HAMMER2_MSGF_CMDSWMASK) {
- case HAMMER2_LNK_ERROR:
- /*
- * A non-transactional error is formulated when
- * the socket or pipe disconnects. Ignore it.
- */
- break;
- case HAMMER2_LNK_SPAN:
- /*
- * Messages related to the LNK_SPAN transaction.
- */
- hammer2_lnk_span(iocom, msg);
- break;
- case HAMMER2_DBG_SHELL:
- case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
- /*
- * Non-transactional DBG messages.
- */
- hammer2_shell_remote(iocom, msg);
- break;
- default:
- hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
- break;
+ if (state && state->func) {
+ state->func(state, msg);
+ } else {
+ switch(cmd & HAMMER2_MSGF_PROTOS) {
+ case HAMMER2_MSG_PROTO_LNK:
+ hammer2_msg_lnk(iocom, msg);
+ break;
+ case HAMMER2_MSG_PROTO_DBG:
+ hammer2_msg_dbg(iocom, msg);
+ break;
+ default:
+ hammer2_msg_reply(iocom, msg,
+ HAMMER2_MSG_ERR_UNKNOWN);
+ break;
+ }
}
hammer2_state_cleanuprx(iocom, msg);
}
{
hammer2_iocom_flush(iocom);
}
-
-/*
- * Receive a message which is part of a LNK_SPAN transaction. Keep in
- * mind that only the original CREATE is utilizing the lnk_span message
- * header.
- *
- * We will get called for CREATE, DELETE, and intermediate states (including
- * errors), and in particular we will get called with an error if the link
- * is lost in the middle of the transaction.
- */
-static
-void
-hammer2_lnk_span(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg)
-{
- char *alloc = NULL;
-
- switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
- case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
- fprintf(stderr,
- "LNK_SPAN: %s/%s\n",
- hammer2_uuid_to_str(&msg->any.lnk_span.pfs_id, &alloc),
- msg->any.lnk_span.label);
- free(alloc);
- break;
- case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE:
- fprintf(stderr, "LNK_SPAN: Terminated with error\n");
- break;
- default:
- fprintf(stderr,
- "LNK_SPAN: Unknown msg %08x\n", msg->any.head.cmd);
- break;
- }
-}
#define HAMMER2_DEFAULT_DIR "/etc/hammer2"
#define HAMMER2_PATH_REMOTE HAMMER2_DEFAULT_DIR "/remote"
+struct hammer2_idmap {
+ struct hammer2_idmap *next;
+ uint32_t ran_beg; /* inclusive */
+ uint32_t ran_end; /* inclusive */
+};
+
+typedef struct hammer2_idmap hammer2_idmap_t;
+
extern int DebugOpt;
extern int VerboseOpt;
extern int QuietOpt;
extern int NormalExit;
+/*
+ * Hammer2 command APIs
+ */
int hammer2_ioctl_handle(const char *sel_path);
void hammer2_demon(void *(*func)(void *), void *arg);
-void hammer2_bswap_head(hammer2_msg_hdr_t *head);
int cmd_remote_connect(const char *sel_path, const char *url);
int cmd_remote_disconnect(const char *sel_path, const char *url);
int cmd_rsaenc(const char **keys, int nkeys);
int cmd_rsadec(const char **keys, int nkeys);
+/*
+ * Msg support functions
+ */
+void hammer2_bswap_head(hammer2_msg_hdr_t *head);
void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd);
hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size,
uint32_t cmd);
void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
- uint16_t error);
+ uint32_t error);
+void hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+ uint32_t error);
+void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
+
void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
void hammer2_iocom_core(hammer2_iocom_t *iocom,
void (*iocom_sendmsg)(hammer2_iocom_t *),
void (*iocom_altmsg)(hammer2_iocom_t *));
hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
-void hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+ void (*func)(hammer2_state_t *, hammer2_msg_t *),
+ void *data);
void hammer2_iocom_drain(hammer2_iocom_t *iocom);
void hammer2_iocom_flush(hammer2_iocom_t *iocom);
void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
void hammer2_state_free(hammer2_state_t *state);
+/*
+ * Msg protocol functions
+ */
+void hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+
+/*
+ * Crypto functions
+ */
void hammer2_crypto_negotiate(hammer2_iocom_t *iocom);
void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
void hammer2_crypto_decrypt_aux(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
void hammer2_crypto_encrypt_wrote(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
int nact);
+/*
+ * Misc functions
+ */
const char *hammer2_time64_to_str(uint64_t htime64, char **strp);
const char *hammer2_uuid_to_str(uuid_t *uuid, char **strp);
const char *hammer2_iptype_to_str(uint8_t type);
const char *hammer2_pfstype_to_str(uint8_t type);
const char *sizetostr(hammer2_off_t size);
-void hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
void iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...);
+void *hammer2_alloc(size_t bytes);
+void hammer2_free(void *ptr);
#include "hammer2.h"
static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
/*
RB_INIT(&iocom->statewr_tree);
TAILQ_INIT(&iocom->freeq);
TAILQ_INIT(&iocom->freeq_aux);
+ TAILQ_INIT(&iocom->addrq);
iocom->sock_fd = sock_fd;
iocom->alt_fd = alt_fd;
iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
if (hbytes)
bzero(&msg->any.head, hbytes);
msg->hdr_size = hbytes;
- msg->any.head.aux_icrc = 0;
msg->any.head.cmd = cmd;
+ msg->any.head.aux_descr = 0;
+ msg->any.head.aux_crc = 0;
return (msg);
}
ssize_t n;
size_t bytes;
size_t nmax;
- uint16_t xcrc16;
uint32_t xcrc32;
int error;
* Calculate the header, decrypt data received so far.
* Data will be decrypted in-place. Partial blocks are
* not immediately decrypted.
+ *
+ * WARNING! The header might be in the wrong endian, we
+ * do not fix it up until we get the entire
+ * extended header.
*/
hammer2_crypto_decrypt(iocom, ioq);
head = (void *)(ioq->buf + ioq->fifo_beg);
break;
}
- xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
- HAMMER2_MSGHDR_CRCBYTES);
- if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
- hammer2_bswap_head(head);
- }
- xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
- if (xcrc16 != head->icrc1) {
- ioq->error = HAMMER2_IOQ_ERROR_HCRC;
- break;
- }
-
/*
* Calculate the full header size and aux data size
*/
- ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
- HAMMER2_MSG_ALIGN;
- ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
+ if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
+ ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
+ HAMMER2_MSG_ALIGN;
+ ioq->abytes = bswap32(head->aux_bytes) *
+ HAMMER2_MSG_ALIGN;
+ } else {
+ ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
+ HAMMER2_MSG_ALIGN;
+ ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
+ }
if (ioq->hbytes < sizeof(msg->any.head) ||
ioq->hbytes > sizeof(msg->any) ||
ioq->abytes > HAMMER2_MSGAUX_MAX) {
/*
* Calculate the extended header, decrypt data received
- * so far.
+ * so far. Handle endian-conversion for the entire extended
+ * header.
*/
hammer2_crypto_decrypt(iocom, ioq);
head = (void *)(ioq->buf + ioq->fifo_beg);
/*
- * Check the crc on the extended header
+ * Check the CRC.
*/
- if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) {
- xcrc32 = hammer2_icrc32(head + 1,
- ioq->hbytes - sizeof(*head));
- xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
- if (head->icrc2 != xcrc16) {
- ioq->error = HAMMER2_IOQ_ERROR_XCRC;
- break;
- }
+ if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
+ xcrc32 = bswap32(head->hdr_crc);
+ else
+ xcrc32 = head->hdr_crc;
+ head->hdr_crc = 0;
+ if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
+ ioq->error = HAMMER2_IOQ_ERROR_XCRC;
+ break;
+ }
+ head->hdr_crc = xcrc32;
+
+ if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
+ hammer2_bswap_head(head);
}
/*
hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
/*
- * Check aux_icrc, then we are done.
+ * Check aux_crc, then we are done.
*/
xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
- if (xcrc32 != msg->any.head.aux_icrc) {
+ if (xcrc32 != msg->any.head.aux_crc) {
ioq->error = HAMMER2_IOQ_ERROR_ACRC;
break;
}
*/
state->txcmd |= HAMMER2_MSGF_DELETE;
msg->state = state;
- msg->any.head.source = state->source;
- msg->any.head.target = state->target;
+ msg->any.head.spanid = state->spanid;
msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
HAMMER2_MSGF_DELETE;
} else {
/*
* Calculate the header and data crc's and write a low-level message to
- * the connection. If aux_icrc is non-zero the aux_data crc is already
+ * the connection. If aux_crc is non-zero the aux_data crc is already
* assumed to have been set.
*
* A non-NULL msg is added to the queue but not necessarily flushed.
* Calling this function with msg == NULL will get a flush going.
*/
-void
+static void
hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
{
hammer2_ioq_t *ioq = &iocom->ioq_tx;
- uint16_t xcrc16;
uint32_t xcrc32;
int hbytes;
- int error;
assert(msg);
/*
- * Process transactional state.
- */
- if (ioq->error == 0) {
- error = hammer2_state_msgtx(iocom, msg);
- if (error) {
- if (error == HAMMER2_IOQ_ERROR_EALREADY) {
- hammer2_msg_free(iocom, msg);
- } else {
- ioq->error = error;
- }
- }
- }
-
- /*
* Process terminal connection errors.
*/
if (ioq->error) {
srandomdev();
/*
- * Calculate aux_icrc if 0, calculate icrc2, and finally
- * calculate icrc1.
+ * Calculate aux_crc if 0, then calculate hdr_crc.
*/
- if (msg->aux_size && msg->any.head.aux_icrc == 0) {
+ if (msg->aux_size && msg->any.head.aux_crc == 0) {
assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
- msg->any.head.aux_icrc = xcrc32;
+ msg->any.head.aux_crc = xcrc32;
}
msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
- if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
- sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
- hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
- HAMMER2_MSG_ALIGN;
- hbytes -= sizeof(msg->any.head);
- xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
- xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
- msg->any.head.icrc2 = xcrc16;
- } else {
- msg->any.head.icrc2 = 0;
- }
- xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
- HAMMER2_MSGHDR_CRCBYTES);
- xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
- msg->any.head.icrc1 = xcrc16;
-
- /*
- * XXX Encrypt the message
- */
+ hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
+ msg->any.head.hdr_crc = 0;
+ msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
/*
- * Enqueue the message.
+ * Enqueue the message (the flush codes handles stream encryption).
*/
TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
++ioq->msgcount;
}
/*
- * This is a shortcut to formulate a reply to msg with a simple error code.
- * It can reply to transaction or one-way messages, or terminate one side
- * of a stream. A HAMMER2_LNK_ERROR command code is utilized to encode
- * the error code (which can be 0).
+ * Write a message to an iocom, with additional state processing.
*
- * Replies to one-way messages are a bit of an oxymoron but the feature
- * is used by the debug (DBG) protocol.
- *
- * The reply contains no data.
+ * The iocom lock must be held by the caller. XXX
*/
void
-hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error)
+hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+ void (*func)(hammer2_state_t *, hammer2_msg_t *),
+ void *data)
{
- hammer2_msg_t *nmsg;
- uint32_t cmd;
+ hammer2_state_t *state;
- cmd = HAMMER2_LNK_ERROR;
- if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+ /*
+ * Handle state processing, create state if necessary.
+ */
+ if ((state = msg->state) != NULL) {
/*
- * Reply to received reply, reply direction uses txcmd.
- * txcmd will be updated by hammer2_ioq_write().
+ * Existing transaction (could be reply). It is also
+ * possible for this to be the first reply (CREATE is set),
+ * in which case we populate state->txcmd.
*/
- if (msg->state) {
- if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
- cmd |= HAMMER2_MSGF_CREATE;
- cmd |= HAMMER2_MSGF_DELETE;
+ msg->any.head.msgid = state->msgid;
+ msg->any.head.spanid = state->spanid;
+ if (func) {
+ state->func = func;
+ state->any.any = data;
}
+ if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
+ state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+ fprintf(stderr, "MSGWRITE IN REPLY msgid %016jx\n",
+ (intmax_t)msg->any.head.msgid);
+ } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+ fprintf(stderr, "MSGWRITE NEW MSG\n");
+ /*
+ * No existing state and CREATE is set, create new
+ * state for outgoing command. This can't happen if
+ * REPLY is set as the state would already exist for
+ * a transaction reply.
+ */
+ assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+
+ state = malloc(sizeof(*state));
+ bzero(state, sizeof(*state));
+ state->iocom = iocom;
+ state->flags = HAMMER2_STATE_DYNAMIC;
+ state->msg = msg;
+ state->msgid = (uint64_t)(uintptr_t)state;
+ state->spanid = msg->any.head.spanid;
+ state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+ state->func = func;
+ state->any.any = data;
+ RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
+ state->flags |= HAMMER2_STATE_INSERTED;
+ msg->state = state;
+ msg->any.head.msgid = state->msgid;
+ /* spanid set by caller */
} else {
+ fprintf(stderr, "MSGWRITE ONE-OFF\n");
+ msg->any.head.msgid = 0;
+ /* spanid set by caller */
+ }
+
+ /*
+ * Queue it for output
+ */
+ hammer2_ioq_write(iocom, msg);
+}
+
+#if 0
+
+ case HAMMER2_MSGF_DELETE:
/*
- * Reply to received command, reply direction uses rxcmd.
- * txcmd will be updated by hammer2_ioq_write().
+ * Sent ABORT+DELETE in case where msgid has already
+ * been fully closed, ignore the message.
*/
- cmd |= HAMMER2_MSGF_REPLY;
- if (msg->state) {
- if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
- cmd |= HAMMER2_MSGF_CREATE;
- cmd |= HAMMER2_MSGF_DELETE;
+ if (state == NULL) {
+ if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+ error = HAMMER2_IOQ_ERROR_EALREADY;
+ } else {
+ iocom_printf(iocom, msg->any.head.cmd,
+ "hammer2_state_msgtx: "
+ "no state match for DELETE\n");
+ error = HAMMER2_IOQ_ERROR_TRANS;
+ }
+ break;
}
+
+ /*
+ * Sent ABORT+DELETE in case where msgid has
+ * already been reused for an unrelated message,
+ * ignore the message.
+ */
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+ if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+ error = HAMMER2_IOQ_ERROR_EALREADY;
+ } else {
+ iocom_printf(iocom, msg->any.head.cmd,
+ "hammer2_state_msgtx: "
+ "state reused for DELETE\n");
+ error = HAMMER2_IOQ_ERROR_TRANS;
+ }
+ break;
+ }
+ error = 0;
+
+
+ case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
+ /*
+ * When transmitting a reply with DELETE set the original
+ * persistent state message should already exist.
+ *
+ * This is very similar to the REPLY|CREATE|* case except
+ * txcmd is already stored, so we just add the DELETE flag.
+ *
+ * Sent REPLY+ABORT+DELETE in case where msgid has
+ * already been fully closed, ignore the message.
+ */
+ if (state == NULL) {
+ if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+ error = HAMMER2_IOQ_ERROR_EALREADY;
+ } else {
+ iocom_printf(iocom, msg->any.head.cmd,
+ "hammer2_state_msgtx: "
+ "no state match for "
+ "REPLY | DELETE\n");
+ error = HAMMER2_IOQ_ERROR_TRANS;
+ }
+ break;
+ }
+
+ /*
+ * Sent REPLY+ABORT+DELETE in case where msgid has already
+ * been reused for an unrelated message, ignore the message.
+ */
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+ if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+ error = HAMMER2_IOQ_ERROR_EALREADY;
+ } else {
+ iocom_printf(iocom, msg->any.head.cmd,
+ "hammer2_state_msgtx: "
+ "state reused for "
+ "REPLY | DELETE\n");
+ error = HAMMER2_IOQ_ERROR_TRANS;
+ }
+ break;
+ }
+ error = 0;
+ break;
+ case HAMMER2_MSGF_REPLY:
+ /*
+ * Check for mid-stream ABORT reply sent.
+ *
+ * One-off REPLY messages are allowed for e.g. status updates.
+ */
+ if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+ if (state == NULL ||
+ (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+ error = HAMMER2_IOQ_ERROR_EALREADY;
+ break;
+ }
+ }
+ error = 0;
+ break;
+ }
+ /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+ return (error);
+#endif
+
+
+/*
+ * This is a shortcut to formulate a reply to msg with a simple error code,
+ * It can reply to and terminate a transaction, or it can reply to a one-way
+ * messages. A HAMMER2_LNK_ERROR command code is utilized to encode
+ * the error code (which can be 0). Not all transactions are terminated
+ * with HAMMER2_LNK_ERROR status (the low level only cares about the
+ * MSGF_DELETE flag), but most are.
+ *
+ * Replies to one-way messages are a bit of an oxymoron but the feature
+ * is used by the debug (DBG) protocol.
+ *
+ * The reply contains no extended data.
+ */
+void
+hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+{
+ hammer2_state_t *state = msg->state;
+ hammer2_msg_t *nmsg;
+ uint32_t cmd;
+
+
+ /*
+ * Reply with a simple error code and terminate the transaction.
+ */
+ cmd = HAMMER2_LNK_ERROR;
+
+ /*
+ * Check if our direction has even been initiated yet, set CREATE.
+ *
+ * Check what direction this is (command or reply direction). Note
+ * that txcmd might not have been initiated yet.
+ *
+ * If our direction has already been closed we just return without
+ * doing anything.
+ */
+ if (state) {
+ if (state->txcmd & HAMMER2_MSGF_DELETE)
+ return;
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ cmd |= HAMMER2_MSGF_CREATE;
+ if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ cmd |= HAMMER2_MSGF_DELETE;
+ } else {
+ if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
}
+
nmsg = hammer2_msg_alloc(iocom, 0, cmd);
nmsg->any.head.error = error;
- hammer2_ioq_write(iocom, nmsg);
+ nmsg->state = msg->state;
+ hammer2_msg_write(iocom, nmsg, NULL, 0);
+}
+
+/*
+ * Similar to hammer2_msg_reply() but leave the transaction open. That is,
+ * we are generating a streaming reply or an intermediate acknowledgement
+ * of some sort as part of the higher level protocol, with more to come
+ * later.
+ */
+void
+hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+{
+ hammer2_state_t *state = msg->state;
+ hammer2_msg_t *nmsg;
+ uint32_t cmd;
+
+
+ /*
+ * Reply with a simple error code and terminate the transaction.
+ */
+ cmd = HAMMER2_LNK_ERROR;
+
+ /*
+ * Check if our direction has even been initiated yet, set CREATE.
+ *
+ * Check what direction this is (command or reply direction). Note
+ * that txcmd might not have been initiated yet.
+ *
+ * If our direction has already been closed we just return without
+ * doing anything.
+ */
+ if (state) {
+ if (state->txcmd & HAMMER2_MSGF_DELETE)
+ return;
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ cmd |= HAMMER2_MSGF_CREATE;
+ if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ /* continuing transaction, do not set MSGF_DELETE */
+ } else {
+ if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ }
+
+ nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+ nmsg->any.head.error = error;
+ nmsg->state = state;
+ hammer2_msg_write(iocom, nmsg, NULL, 0);
+}
+
+/*
+ * Terminate a transaction given a state structure by issuing a DELETE.
+ */
+void
+hammer2_state_reply(hammer2_state_t *state, uint32_t error)
+{
+ hammer2_msg_t *nmsg;
+ uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
+
+ /*
+ * Nothing to do if we already transmitted a delete
+ */
+ if (state->txcmd & HAMMER2_MSGF_DELETE)
+ return;
+
+ /*
+ * We must also set CREATE if this is our first response to a
+ * remote command.
+ */
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ cmd |= HAMMER2_MSGF_CREATE;
+
+ /*
+ * Set REPLY if the other end initiated the command. Otherwise
+ * we are the command direction.
+ */
+ if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+
+ nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
+ nmsg->any.head.error = error;
+ nmsg->state = state;
+ hammer2_msg_write(state->iocom, nmsg, NULL, 0);
}
/************************************************************************
/*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
dummy.msgid = msg->any.head.msgid;
- dummy.source = msg->any.head.source;
- dummy.target = msg->any.head.target;
+ dummy.spanid = msg->any.head.spanid;
+#if 0
iocom_printf(iocom, msg->any.head.cmd,
- "received msg %08x msgid %u source=%u target=%u\n",
- msg->any.head.cmd, msg->any.head.msgid,
- msg->any.head.source, msg->any.head.target);
+ "received msg %08x msgid %jx spanid=%jx\n",
+ msg->any.head.cmd,
+ (intmax_t)msg->any.head.msgid,
+ (intmax_t)msg->any.head.spanid);
+#endif
if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
state = RB_FIND(hammer2_state_tree,
&iocom->statewr_tree, &dummy);
state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
state->flags |= HAMMER2_STATE_INSERTED;
+ state->msgid = msg->any.head.msgid;
+ state->spanid = msg->any.head.spanid;
msg->state = state;
error = 0;
break;
}
}
-/*
- * Process state tracking for a message prior to transmission.
- *
- * Called with msglk held and the msg dequeued.
- *
- * One-off messages are usually with dummy state and msg->state may be NULL
- * in this situation.
- *
- * New transactions (when CREATE is set) will insert the state.
- *
- * May request that caller discard the message by setting *discardp to 1.
- * A NULL state may be returned in this case.
- */
-static int
-hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
-{
- hammer2_state_t *state;
- int error;
-
- /*
- * Lock RB tree. If persistent state is present it will have already
- * been assigned to msg.
- */
- /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
- state = msg->state;
-
- /*
- * Short-cut one-off or mid-stream messages (state may be NULL).
- */
- if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
- HAMMER2_MSGF_ABORT)) == 0) {
- /*lockmgr(&pmp->msglk, LK_RELEASE);*/
- return(0);
- }
-
-
- /*
- * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
- * inside the case statements.
- */
- switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
- HAMMER2_MSGF_REPLY)) {
- case HAMMER2_MSGF_CREATE:
- case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
- /*
- * Insert the new persistent message state and mark
- * half-closed if DELETE is set. Since this is a new
- * message it isn't possible to transition into the fully
- * closed state here.
- *
- * XXX state must be assigned and inserted by
- * hammer2_msg_write(). txcmd is assigned by us
- * on-transmit.
- */
- assert(state != NULL);
-#if 0
- if (state == NULL) {
- state = pmp->freerd_state;
- pmp->freerd_state = NULL;
- msg->state = state;
- state->msg = msg;
- state->msgid = msg->any.head.msgid;
- state->source = msg->any.head.source;
- state->target = msg->any.head.target;
- }
- assert((state->flags & HAMMER2_STATE_INSERTED) == 0);
- if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
- iocom_printf(iocom, msg->any.head.cmd,
- "hammer2_state_msgtx: "
- "duplicate transaction\n");
- error = HAMMER2_IOQ_ERROR_TRANS;
- break;
- }
- state->flags |= HAMMER2_STATE_INSERTED;
-#endif
- state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
- error = 0;
- break;
- case HAMMER2_MSGF_DELETE:
- /*
- * Sent ABORT+DELETE in case where msgid has already
- * been fully closed, ignore the message.
- */
- if (state == NULL) {
- if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
- error = HAMMER2_IOQ_ERROR_EALREADY;
- } else {
- iocom_printf(iocom, msg->any.head.cmd,
- "hammer2_state_msgtx: "
- "no state match for DELETE\n");
- error = HAMMER2_IOQ_ERROR_TRANS;
- }
- break;
- }
-
- /*
- * Sent ABORT+DELETE in case where msgid has
- * already been reused for an unrelated message,
- * ignore the message.
- */
- if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
- if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
- error = HAMMER2_IOQ_ERROR_EALREADY;
- } else {
- iocom_printf(iocom, msg->any.head.cmd,
- "hammer2_state_msgtx: "
- "state reused for DELETE\n");
- error = HAMMER2_IOQ_ERROR_TRANS;
- }
- break;
- }
- error = 0;
- break;
- default:
- /*
- * Check for mid-stream ABORT command sent
- */
- if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
- if (state == NULL ||
- (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
- error = HAMMER2_IOQ_ERROR_EALREADY;
- break;
- }
- }
- error = 0;
- break;
- case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
- case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
- /*
- * When transmitting a reply with CREATE set the original
- * persistent state message should already exist.
- */
- if (state == NULL) {
- iocom_printf(iocom, msg->any.head.cmd,
- "hammer2_state_msgtx: no state match "
- "for REPLY | CREATE\n");
- error = HAMMER2_IOQ_ERROR_TRANS;
- break;
- }
- state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
- error = 0;
- break;
- case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
- /*
- * When transmitting a reply with DELETE set the original
- * persistent state message should already exist.
- *
- * This is very similar to the REPLY|CREATE|* case except
- * txcmd is already stored, so we just add the DELETE flag.
- *
- * Sent REPLY+ABORT+DELETE in case where msgid has
- * already been fully closed, ignore the message.
- */
- if (state == NULL) {
- if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
- error = HAMMER2_IOQ_ERROR_EALREADY;
- } else {
- iocom_printf(iocom, msg->any.head.cmd,
- "hammer2_state_msgtx: "
- "no state match for "
- "REPLY | DELETE\n");
- error = HAMMER2_IOQ_ERROR_TRANS;
- }
- break;
- }
-
- /*
- * Sent REPLY+ABORT+DELETE in case where msgid has already
- * been reused for an unrelated message, ignore the message.
- */
- if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
- if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
- error = HAMMER2_IOQ_ERROR_EALREADY;
- } else {
- iocom_printf(iocom, msg->any.head.cmd,
- "hammer2_state_msgtx: "
- "state reused for "
- "REPLY | DELETE\n");
- error = HAMMER2_IOQ_ERROR_TRANS;
- }
- break;
- }
- error = 0;
- break;
- case HAMMER2_MSGF_REPLY:
- /*
- * Check for mid-stream ABORT reply sent.
- *
- * One-off REPLY messages are allowed for e.g. status updates.
- */
- if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
- if (state == NULL ||
- (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
- error = HAMMER2_IOQ_ERROR_EALREADY;
- break;
- }
- }
- error = 0;
- break;
- }
- /*lockmgr(&pmp->msglk, LK_RELEASE);*/
- return (error);
-}
-
static void
hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
{
int
hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
{
- if (state1->source < state2->source)
- return(-1);
- if (state1->source > state2->source)
- return(1);
- if (state1->target < state2->target)
+ if (state1->spanid < state2->spanid)
return(-1);
- if (state1->target > state2->target)
+ if (state1->spanid > state2->spanid)
return(1);
if (state1->msgid < state2->msgid)
return(-1);
--- /dev/null
+/*
+ * Copyright (c) 2012 The DragonFly Project. All rights reserved.
+ *
+ * This code is derived from software contributed to The DragonFly Project
+ * by Matthew Dillon <dillon@dragonflybsd.org>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ * notice, this list of conditions and the following disclaimer in
+ * the documentation and/or other materials provided with the
+ * distribution.
+ * 3. Neither the name of The DragonFly Project nor the names of its
+ * contributors may be used to endorse or promote products derived
+ * from this software without specific, prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
+ * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+/*
+ * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
+ *
+ * This code supports the LNK_SPAN protocol. Essentially all PFS's
+ * clients and services rendezvous with the userland hammer2 service and
+ * open LNK_SPAN transactions using a message header linkid of 0,
+ * registering any PFS's they have connectivity to with us.
+ *
+ * --
+ *
+ * Each registration maintains its own open LNK_SPAN message transaction.
+ * The SPANs are collected, aggregated, and retransmitted over available
+ * connections through the maintainance of additional LNK_SPAN message
+ * transactions on each link.
+ *
+ * The msgid for each active LNK_SPAN transaction we receive allows us to
+ * send a message to the target PFS (which might be one of many belonging
+ * to the same cluster), by specifying that msgid as the linkid in any
+ * message we send to the target PFS.
+ *
+ * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
+ * (and remember we will maintain multiple open LNK_SPAN transactions on
+ * each connection representing the topology span, so every node sees every
+ * other node as a separate open transaction). So, similarly the msgid for
+ * these active transactions which we initiated can be used by the other
+ * end to route messages through us to another node, ultimately winding up
+ * at the identified hammer2 PFS. We have to adjust the spanid in the message
+ * header at each hop to be representative of the outgoing LNK_SPAN we
+ * are forwarding the message through.
+ *
+ * --
+ *
+ * If we were to retransmit every LNK_SPAN transaction we receive it would
+ * create a huge mess, so we have to aggregate all received LNK_SPAN
+ * transactions, sort them by the fsid (the cluster) and sub-sort them by
+ * the pfs_fsid (individual nodes in the cluster), and only retransmit
+ * (create outgoing transactions) for a subset of the nearest weighted-hops
+ * for each individual node.
+ *
+ * The higher level protocols can then issue transactions to the nodes making
+ * up a cluster to perform all actions required.
+ *
+ * --
+ *
+ * Since this is a large topology and a spanning tree protocol, links can
+ * go up and down all the time. Any time a link goes down its transaction
+ * is closed. The transaction has to be closed on both ends before we can
+ * delete (and potentially reuse) the related spanid. The LNK_SPAN being
+ * closed may have been propagated out to other connections and those related
+ * LNK_SPANs are also closed. Ultimately all routes via the lost LNK_SPAN
+ * go away, ultimately reaching all sources and all targets.
+ *
+ * Any messages in-transit using a route that goes away will be thrown away.
+ * Open transactions are only tracked at the two end-points. When a link
+ * failure propagates to an end-point the related open transactions lose
+ * their spanid and are automatically aborted.
+ *
+ * It is important to note that internal route nodes cannot just associate
+ * a lost LNK_SPAN transaction with another route to the same destination.
+ * Message transactions MUST be serialized and MUST be ordered. All messages
+ * for a transaction must run over the same route. So if the route used by
+ * an active transaction is lost, the related messages will be fully aborted
+ * and the higher protocol levels will retry as appropriate.
+ *
+ * It is also important to note that several paths to the same PFS can be
+ * propagated along the same link, which allows concurrency and even
+ * redundancy over several network interfaces or via different routes through
+ * the topology. Any given transaction will use only a single route but busy
+ * servers will often have hundreds of transactions active simultaniously,
+ * so having multiple active paths through the network topology for A<->B
+ * will improve performance.
+ *
+ * --
+ *
+ * Most protocols consolidate operations rather than simply relaying them.
+ * This is particularly true of LEAF protocols (such as strict HAMMER2
+ * clients), of which there can be millions connecting into the cluster at
+ * various points. The SPAN protocol is not used for these LEAF elements.
+ *
+ * Instead the primary service they connect to implements a proxy for the
+ * client protocols so the core topology only has to propagate a couple of
+ * LNK_SPANs and not millions. LNK_SPANs are meant to be used only for
+ * core master nodes and satellite slaves and cache nodes.
+ */
+
+#include "hammer2.h"
+
+/*
+ * RED-BLACK TREE DEFINITIONS
+ *
+ * We need to track
+ *
+ * (1) shared fsid's (a cluster).
+ * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
+ *
+ * We need to aggegate all active LNK_SPANs, aggregate, and create our own
+ * outgoing LNK_SPAN transactions on each of our connections representing
+ * the aggregated state.
+ *
+ * h2span_connect - list of iocom connections who wish to receive SPAN
+ * propagation from other connections. Might contain
+ * a filter string. Only iocom's with an open
+ * LNK_CONN transactions are applicable for SPAN
+ * propagation.
+ *
+ * h2span_relay - List of links relayed (via SPAN). Essentially
+ * each relay structure represents a LNK_SPAN
+ * transaction that we initiated, verses h2span_link
+ * which is a LNK_SPAN transaction that we received.
+ *
+ * --
+ *
+ * h2span_cluster - Organizes the shared fsid's. One structure for
+ * each cluster.
+ *
+ * h2span_node - Organizes the nodes in a cluster. One structure
+ * for each unique {cluster,node}, aka {fsid, pfs_fsid}.
+ *
+ * h2span_link - Organizes all incoming and outgoing LNK_SPAN message
+ * transactions related to a node.
+ *
+ * One h2span_link structure for each incoming LNK_SPAN
+ * transaction. Links selected for propagation back
+ * out are also where the outgoing LNK_SPAN messages
+ * are indexed into (so we can propagate changes).
+ *
+ * The h2span_link's use a red-black tree to sort the
+ * weighted hop metric for the incoming LNK_SPAN. We
+ * then select the top N for outgoing. When the
+ * topology changes the top N may also change and cause
+ * new outgoing LNK_SPAN transactions to be opened
+ * and less desireable ones to be closed, causing
+ * transactional aborts within the message flow in
+ * the process.
+ *
+ * Also note - All outgoing LNK_SPAN message transactions are also
+ * entered into a red-black tree for use by the routing
+ * function. This is handled by msg.c in the state
+ * code, not here.
+ */
+
+struct h2span_link;
+struct h2span_relay;
+TAILQ_HEAD(h2span_connect_queue, h2span_connect);
+TAILQ_HEAD(h2span_relay_queue, h2span_relay);
+
+RB_HEAD(h2span_cluster_tree, h2span_cluster);
+RB_HEAD(h2span_node_tree, h2span_node);
+RB_HEAD(h2span_link_tree, h2span_link);
+RB_HEAD(h2span_relay_tree, h2span_relay);
+
+/*
+ * Received LNK_CONN transaction enables SPAN protocol over connection.
+ * (may contain filter).
+ */
+struct h2span_connect {
+ TAILQ_ENTRY(h2span_connect) entry;
+ struct h2span_relay_tree tree;
+ hammer2_state_t *state;
+};
+
+/*
+ * All received LNK_SPANs are organized by cluster (pfs_clid),
+ * node (pfs_fsid), and link (received LNK_SPAN transaction).
+ */
+struct h2span_cluster {
+ RB_ENTRY(h2span_cluster) rbnode;
+ struct h2span_node_tree tree;
+ uuid_t pfs_clid; /* shared fsid */
+};
+
+struct h2span_node {
+ RB_ENTRY(h2span_node) rbnode;
+ struct h2span_link_tree tree;
+ struct h2span_cluster *cls;
+ uuid_t pfs_fsid; /* unique fsid */
+};
+
+struct h2span_link {
+ RB_ENTRY(h2span_link) rbnode;
+ hammer2_state_t *state; /* state<->link */
+ struct h2span_node *node; /* related node */
+ int32_t weight;
+ struct h2span_relay_queue relayq; /* relay out */
+};
+
+/*
+ * Any LNK_SPAN transactions we receive which are relayed out other
+ * connections utilize this structure to track the LNK_SPAN transaction
+ * we initiate on the other connections, if selected for relay.
+ *
+ * In many respects this is the core of the protocol... actually figuring
+ * out what LNK_SPANs to relay. The spanid used for relaying is the
+ * address of the 'state' structure, which is why h2span_relay has to
+ * be entered into a RB-TREE based at h2span_connect (so we can look
+ * up the spanid to validate it).
+ */
+struct h2span_relay {
+ RB_ENTRY(h2span_relay) rbnode; /* from h2span_connect */
+ TAILQ_ENTRY(h2span_relay) entry; /* from link */
+ struct h2span_connect *conn;
+ hammer2_state_t *state; /* transmitted LNK_SPAN */
+ struct h2span_link *link; /* received LNK_SPAN */
+};
+
+
+typedef struct h2span_connect h2span_connect_t;
+typedef struct h2span_cluster h2span_cluster_t;
+typedef struct h2span_node h2span_node_t;
+typedef struct h2span_link h2span_link_t;
+typedef struct h2span_relay h2span_relay_t;
+
+static
+int
+h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
+{
+ return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL));
+}
+
+static
+int
+h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
+{
+ return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL));
+}
+
+static
+int
+h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
+{
+ if (link1->weight < link2->weight)
+ return(-1);
+ if (link1->weight > link2->weight)
+ return(1);
+ if ((intptr_t)link1 < (intptr_t)link2)
+ return(-1);
+ if ((intptr_t)link1 > (intptr_t)link2)
+ return(1);
+ return(0);
+}
+
+static
+int
+h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
+{
+ if ((intptr_t)relay1->state < (intptr_t)relay2->state)
+ return(-1);
+ if ((intptr_t)relay1->state > (intptr_t)relay2->state)
+ return(1);
+ return(0);
+}
+
+RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
+ rbnode, h2span_cluster_cmp);
+RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
+ rbnode, h2span_node_cmp);
+RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
+ rbnode, h2span_link_cmp);
+RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
+ rbnode, h2span_relay_cmp);
+
+RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
+ rbnode, h2span_cluster_cmp);
+RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
+ rbnode, h2span_node_cmp);
+RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
+ rbnode, h2span_link_cmp);
+RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
+ rbnode, h2span_relay_cmp);
+
+/*
+ * Global mutex protects cluster_tree lookups.
+ */
+static pthread_mutex_t cluster_mtx;
+static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
+static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
+
+static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_lnk_conn_update(h2span_connect_t *conn);
+
+/*
+ * Receive a HAMMER2_MSG_PROTO_LNK message. This only called for
+ * one-way and opening-transactions since state->func will be assigned
+ * in all other cases.
+ */
+void
+hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+{
+ switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
+ case HAMMER2_LNK_CONN:
+ hammer2_lnk_conn(msg->state, msg);
+ break;
+ case HAMMER2_LNK_SPAN:
+ hammer2_lnk_span(msg->state, msg);
+ break;
+ default:
+ fprintf(stderr,
+ "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
+ hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
+ /* state invalid after reply */
+ break;
+ }
+}
+
+void
+hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+ h2span_connect_t *conn;
+ h2span_relay_t *relay;
+ char *alloc = NULL;
+
+ pthread_mutex_lock(&cluster_mtx);
+
+ /*
+ * On transaction start we allocate a new h2span_connect and
+ * acknowledge the request, leaving the transaction open.
+ */
+ if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+ state->func = hammer2_lnk_conn;
+
+ fprintf(stderr, "LNK_CONN(%016jx): %s/%s\n",
+ (intmax_t)msg->any.head.msgid,
+ hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
+ &alloc),
+ msg->any.lnk_conn.label);
+ free(alloc);
+
+ conn = hammer2_alloc(sizeof(*conn));
+
+ RB_INIT(&conn->tree);
+ conn->state = state;
+ state->any.conn = conn;
+ TAILQ_INSERT_TAIL(&connq, conn, entry);
+
+ hammer2_lnk_conn_update(conn);
+ hammer2_msg_result(state->iocom, msg, 0);
+ }
+
+ /*
+ * On transaction terminate we clean out our h2span_connect
+ * and acknowledge the request, closing the transaction.
+ */
+ if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+ fprintf(stderr, "LNK_CONN: Terminated\n");
+ conn = state->any.conn;
+ assert(conn);
+ while ((relay = RB_ROOT(&conn->tree)) != NULL) {
+ RB_REMOVE(h2span_relay_tree, &conn->tree, relay);
+ TAILQ_REMOVE(&relay->link->relayq, relay, entry);
+
+ if (relay->state) {
+ relay->state->any.relay = NULL;
+ hammer2_state_reply(relay->state, 0);
+ /* state invalid after reply */
+ relay->state = NULL;
+ }
+ relay->conn = NULL;
+ relay->link = NULL;
+ hammer2_free(relay);
+ }
+
+ /*
+ * Clean out conn
+ */
+ conn->state = NULL;
+ msg->state->any.conn = NULL;
+ TAILQ_REMOVE(&connq, conn, entry);
+ hammer2_free(conn);
+
+ hammer2_msg_reply(state->iocom, msg, 0);
+ /* state invalid after reply */
+ }
+ pthread_mutex_unlock(&cluster_mtx);
+}
+
+void
+hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+ h2span_cluster_t dummy_cls;
+ h2span_node_t dummy_node;
+ h2span_cluster_t *cls;
+ h2span_node_t *node;
+ h2span_link_t *slink;
+ h2span_relay_t *relay;
+ char *alloc = NULL;
+
+ pthread_mutex_lock(&cluster_mtx);
+
+ /*
+ * On transaction start we initialize the tracking infrastructure
+ */
+ if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+ state->func = hammer2_lnk_span;
+
+ fprintf(stderr, "LNK_SPAN: %s/%s\n",
+ hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
+ &alloc),
+ msg->any.lnk_span.label);
+ free(alloc);
+
+ /*
+ * Find the cluster
+ */
+ dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
+ cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
+ if (cls == NULL) {
+ cls = hammer2_alloc(sizeof(*cls));
+ cls->pfs_clid = msg->any.lnk_span.pfs_clid;
+ RB_INIT(&cls->tree);
+ RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
+ }
+
+ /*
+ * Find the node
+ */
+ dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
+ node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
+ if (node == NULL) {
+ node = hammer2_alloc(sizeof(*node));
+ node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
+ node->cls = cls;
+ RB_INIT(&node->tree);
+ RB_INSERT(h2span_node_tree, &cls->tree, node);
+ }
+
+ /*
+ * Create the link
+ */
+ assert(state->any.link == NULL);
+ slink = hammer2_alloc(sizeof(*slink));
+ slink->node = node;
+ slink->weight = msg->any.lnk_span.weight;
+ slink->state = state;
+ state->any.link = slink;
+ RB_INSERT(h2span_link_tree, &node->tree, slink);
+
+ /*
+ * Now filter and relay the span to all other iocoms. XXX
+ */
+ }
+
+ /*
+ * On transaction terminate we remove the tracking infrastructure.
+ */
+ if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+ slink = state->any.link;
+ assert(slink != NULL);
+ node = slink->node;
+ cls = node->cls;
+
+ /*
+ * Clean out all relays
+ */
+ while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
+ RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
+ TAILQ_REMOVE(&slink->relayq, relay, entry);
+
+ if (relay->state) {
+ relay->state->any.relay = NULL;
+ hammer2_state_reply(relay->state, 0);
+ /* state invalid after reply */
+ relay->state = NULL;
+ }
+ relay->conn = NULL;
+ relay->link = NULL;
+ hammer2_free(relay);
+ }
+
+ /*
+ * Clean out the topology
+ */
+ RB_REMOVE(h2span_link_tree, &node->tree, slink);
+ if (RB_EMPTY(&node->tree)) {
+ RB_REMOVE(h2span_node_tree, &cls->tree, node);
+ if (RB_EMPTY(&cls->tree)) {
+ RB_REMOVE(h2span_cluster_tree,
+ &cluster_tree, cls);
+ hammer2_free(cls);
+ }
+ node->cls = NULL;
+ hammer2_free(node);
+ }
+ state->any.link = NULL;
+ slink->state = NULL;
+ slink->node = NULL;
+ hammer2_free(slink);
+ }
+
+ pthread_mutex_unlock(&cluster_mtx);
+}
+
+/*
+ * Initiate/Update the relayed spans associated with a connection.
+ */
+static void
+hammer2_lnk_conn_update(h2span_connect_t *conn __unused)
+{
+}
*/
struct hammer2_iocom;
struct hammer2_persist;
-struct hammre2_state;
-struct hammre2_msg;
+struct hammer2_state;
+struct hammer2_address;
+struct hammer2_msg;
+TAILQ_HEAD(hammer2_state_queue, hammer2_state);
TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
+TAILQ_HEAD(hammer2_address_queue, hammer2_address);
RB_HEAD(hammer2_state_tree, hammer2_state);
+struct h2span_link;
+struct h2span_connect;
+
struct hammer2_state {
RB_ENTRY(hammer2_state) rbnode; /* indexed by msgid */
+ TAILQ_ENTRY(hammer2_state) source_entry;/* if routed */
+ TAILQ_ENTRY(hammer2_state) target_entry;/* if routed */
struct hammer2_iocom *iocom;
+ struct hammer2_address_t *source_addr; /* if routed */
+ struct hammer2_address_t *target_addr; /* if routed */
uint32_t txcmd; /* mostly for CMDF flags */
uint32_t rxcmd; /* mostly for CMDF flags */
- uint16_t source; /* command originator */
- uint16_t target; /* reply originator */
- uint32_t msgid; /* {source,target,msgid} uniq */
+ uint64_t spanid; /* routing id */
+ uint64_t msgid; /* {spanid,msgid} uniq */
int flags;
int error;
struct hammer2_msg *msg;
- int (*func)(struct hammer2_iocom *, struct hammer2_msg *);
+ void (*func)(struct hammer2_state *, struct hammer2_msg *);
+ union {
+ void *any;
+ struct h2span_link *link;
+ struct h2span_connect *conn;
+ struct h2span_relay *relay;
+ } any;
+};
+
+struct hammer2_address {
+ TAILQ_ENTRY(hammer2_address) entry; /* on-iocom */
+ struct hammer2_iocom *iocom; /* related iocom */
+ struct hammer2_state_queue sourceq; /* states on source queue */
+ struct hammer2_state_queue targetq; /* states on target queue */
+ uint16_t id;
};
#define HAMMER2_STATE_INSERTED 0x0001
#define HAMMER2_STATE_DYNAMIC 0x0002
+#define HAMMER2_STATE_NODEID 0x0004 /* manages a node id */
struct hammer2_msg {
TAILQ_ENTRY(hammer2_msg) qentry;
};
typedef struct hammer2_state hammer2_state_t;
+typedef struct hammer2_address hammer2_address_t;
typedef struct hammer2_msg hammer2_msg_t;
typedef struct hammer2_msg_queue hammer2_msg_queue_t;
hammer2_ioq_t ioq_tx;
hammer2_msg_queue_t freeq; /* free msgs hdr only */
hammer2_msg_queue_t freeq_aux; /* free msgs w/aux_data */
+ struct hammer2_address_queue addrq; /* source/target addrs */
void (*recvmsg_callback)(struct hammer2_iocom *);
void (*sendmsg_callback)(struct hammer2_iocom *);
void (*altmsg_callback)(struct hammer2_iocom *);
hammer2_bswap_head(hammer2_msg_hdr_t *head)
{
head->magic = bswap16(head->magic);
- head->icrc1 = bswap16(head->icrc1);
+ head->reserved02 = bswap16(head->reserved02);
head->salt = bswap32(head->salt);
- head->source = bswap16(head->source);
- head->target = bswap16(head->target);
- head->msgid = bswap32(head->msgid);
+
+ head->msgid = bswap64(head->msgid);
+ head->spanid = bswap64(head->spanid);
+
head->cmd = bswap32(head->cmd);
- head->error = bswap16(head->error);
- head->resv05 = bswap16(head->resv05);
- head->icrc2 = bswap16(head->icrc2);
- head->aux_bytes = bswap16(head->aux_bytes);
- head->aux_icrc = bswap32(head->aux_icrc);
+ head->aux_crc = bswap32(head->aux_crc);
+ head->aux_bytes = bswap32(head->aux_bytes);
+ head->error = bswap32(head->error);
+ head->aux_descr = bswap64(head->aux_descr);
+ head->reserved30= bswap64(head->reserved30);
+ head->reserved38= bswap32(head->reserved38);
+ head->hdr_crc = bswap32(head->hdr_crc);
}
const char *
}
return(buf);
}
+
+/*
+ * Allocation wrappers give us shims for possible future use
+ */
+void *
+hammer2_alloc(size_t bytes)
+{
+ void *ptr;
+
+ ptr = malloc(bytes);
+ assert(ptr);
+ bzero(ptr, bytes);
+ return (ptr);
+}
+
+void
+hammer2_free(void *ptr)
+{
+ free(ptr);
+}
*/
rawip->comp_algo = HAMMER2_COMP_AUTOZERO;
- rawip->pfs_id = Hammer2_RPFSId;
+ rawip->pfs_clid = Hammer2_RPFSId;
rawip->pfs_type = HAMMER2_PFSTYPE_MASTER;
rawip->op_flags |= HAMMER2_OPFLAG_PFSROOT;
* snapshots and all if desired. PFS ids are used to match up
* mirror sources and targets and cluster copy sources and targets.
*/
- rawip->pfs_id = Hammer2_SPFSId;
+ rawip->pfs_clid = Hammer2_SPFSId;
rawip->pfs_type = HAMMER2_PFSTYPE_MASTER;
rawip->op_flags |= HAMMER2_OPFLAG_PFSROOT;
+
+* Kernel-side needs to clean up transaction queues and make appropriate
+ callbacks.
+
+* Userland side needs to do the same for any initiated transactions.
+
* Nesting problems in the flusher.
* Inefficient vfsync due to thousands of file buffers, one per-vnode.
thread_t msgrd_td; /* cluster thread */
thread_t msgwr_td; /* cluster thread */
int msg_ctl; /* wakeup flags */
+ int msg_seq; /* cluster msg sequence id */
uint32_t msgid_iterator;
struct lock msglk; /* lockmgr lock */
TAILQ_HEAD(, hammer2_msg) msgq; /* transmit queue */
struct hammer2_pfsmount *pmp;
uint32_t txcmd; /* mostly for CMDF flags */
uint32_t rxcmd; /* mostly for CMDF flags */
- uint16_t source; /* command originator */
- uint16_t target; /* reply originator */
- uint32_t msgid; /* {source,target,msgid} uniq */
+ uint64_t spanid; /* spanning tree routing */
+ uint64_t msgid; /* {spanid,msgid} uniq */
int flags;
int error;
struct hammer2_chain *chain; /* msg associated w/chain */
struct hammer2_msg *msg;
- int (*func)(struct hammer2_pfsmount *, struct hammer2_msg *);
+ int (*func)(struct hammer2_state *, struct hammer2_msg *);
+ union {
+ void *any;
+ hammer2_pfsmount_t *pmp;
+ } any;
};
#define HAMMER2_STATE_INSERTED 0x0001
int hammer2_msg_execute(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
void hammer2_state_free(hammer2_state_t *state);
void hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_pfsmount_t *pmp,
- uint16_t source, uint16_t target,
+hammer2_msg_t *hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid,
uint32_t cmd);
-hammer2_state_t *hammer2_msg_write(hammer2_pfsmount_t *pmp,
+void hammer2_msg_write(hammer2_pfsmount_t *pmp,
hammer2_msg_t *msg,
- int (*func)(hammer2_pfsmount_t *,
- hammer2_msg_t *));
+ int (*func)(hammer2_state_t *, hammer2_msg_t *),
+ void *data);
+void hammer2_msg_reply(hammer2_pfsmount_t *pmp,
+ hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_result(hammer2_pfsmount_t *pmp,
+ hammer2_msg_t *msg, uint32_t error);
/*
* hammer2_msgops.c
*/
+int hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
int hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
/*
/*
* These fields are currently only applicable to PFSROOTs.
*
- * NOTE: We can't use {volume_data->fsid, pfs_id} to uniquely
+ * NOTE: We can't use {volume_data->fsid, pfs_clid} to uniquely
* identify an instance of a PFS in the cluster because
* a mount may contain more than one copy of the PFS as
- * a separate node. {pfs_fsid, pfs_id} must be used for
+ * a separate node. {pfs_clid, pfs_fsid} must be used for
* registration in the cluster.
*/
uint8_t target_type; /* 0084 hardlink target type */
uint8_t reserved86; /* 0086 */
uint8_t pfs_type; /* 0087 (if PFSROOT) node type */
uint64_t pfs_inum; /* 0088 (if PFSROOT) inum allocator */
- uuid_t pfs_id; /* 0090 (if PFSROOT) pfs uuid */
- uuid_t pfs_fsid; /* 00A0 (if PFSROOT) unique pfs uuid */
+ uuid_t pfs_clid; /* 0090 (if PFSROOT) cluster uuid */
+ uuid_t pfs_fsid; /* 00A0 (if PFSROOT) unique uuid */
/*
* Quotas and cumulative sub-tree counters.
#define HAMMER2_PFSTYPE_SOFT_SLAVE 6
#define HAMMER2_PFSTYPE_SOFT_MASTER 7
#define HAMMER2_PFSTYPE_MASTER 8
+#define HAMMER2_PFSTYPE_MAX 9 /* 0-8 */
/*
* The allocref structure represents the allocation table. One 64K block
uint8_t priority; /* 07 priority and round-robin flag */
uint8_t remote_pfs_type;/* 08 probed direct remote PFS type */
uint8_t reserved08[23]; /* 09-1F */
- uuid_t pfs_id; /* 20-2F copy target must match this uuid */
+ uuid_t pfs_clid; /* 20-2F copy target must match this uuid */
uint8_t label[16]; /* 30-3F import/export label */
uint8_t path[64]; /* 40-7F target specification string or key */
};
oip->ip_data.iparent = 0; /* XXX */
oip->ip_data.pfs_type = 0;
oip->ip_data.pfs_inum = 0;
- bzero(&oip->ip_data.pfs_id,
- sizeof(oip->ip_data.pfs_id));
+ bzero(&oip->ip_data.pfs_clid,
+ sizeof(oip->ip_data.pfs_clid));
bzero(&oip->ip_data.pfs_fsid,
sizeof(oip->ip_data.pfs_fsid));
oip->ip_data.data_quota = 0;
xip = chain->u.ip;
pfs->name_key = xip->ip_data.name_key;
pfs->pfs_type = xip->ip_data.pfs_type;
- pfs->pfs_id = xip->ip_data.pfs_id;
+ pfs->pfs_clid = xip->ip_data.pfs_clid;
pfs->pfs_fsid = xip->ip_data.pfs_fsid;
KKASSERT(xip->ip_data.name_len < sizeof(pfs->name));
bcopy(xip->ip_data.filename, pfs->name,
if (error == 0) {
hammer2_chain_modify(hmp, &nip->chain, 0);
nip->ip_data.pfs_type = pfs->pfs_type;
- nip->ip_data.pfs_id = pfs->pfs_id;
+ nip->ip_data.pfs_clid = pfs->pfs_clid;
nip->ip_data.pfs_fsid = pfs->pfs_fsid;
hammer2_chain_unlock(hmp, &nip->chain);
}
/*
* Ioctls to manage PFSs
*
- * PFSs can be clustered by matching their pfs_id, and the PFSs making up
+ * PFSs can be clustered by matching their pfs_clid, and the PFSs making up
* a cluster can be uniquely identified by combining the vol_id with
- * the pfs_id.
+ * the pfs_clid.
*/
struct hammer2_ioc_pfs {
hammer2_key_t name_key; /* super-root directory scan */
uint32_t reserved0014;
uint64_t reserved0018;
uuid_t pfs_fsid; /* identifies PFS instance */
- uuid_t pfs_id; /* identifies PFS cluster */
+ uuid_t pfs_clid; /* identifies PFS cluster */
char name[NAME_MAX+1]; /* device@name mtpt */
};
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
state->msgid = msg->any.head.msgid;
- state->source = msg->any.head.source;
- state->target = msg->any.head.target;
- kprintf("received msg %08x msgid %u source=%u target=%u\n",
- msg->any.head.cmd, msg->any.head.msgid, msg->any.head.source,
- msg->any.head.target);
+ state->spanid = msg->any.head.spanid;
+ kprintf("received msg %08x msgid %jx spanid=%jx\n",
+ msg->any.head.cmd,
+ (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
else
msg->state = state;
state->msg = msg;
state->msgid = msg->any.head.msgid;
- state->source = msg->any.head.source;
- state->target = msg->any.head.target;
+ state->spanid = msg->any.head.spanid;
}
KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
}
hammer2_msg_t *
-hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint16_t source, uint16_t target,
- uint32_t cmd)
+hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
{
hammer2_msg_t *msg;
size_t hbytes;
pmp->mmsg, M_WAITOK | M_ZERO);
msg->hdr_size = hbytes;
msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
- msg->any.head.source = source;
- msg->any.head.target = target;
+ msg->any.head.spanid = spanid;
msg->any.head.cmd = cmd;
return (msg);
int
hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
{
- if (state1->source < state2->source)
+ if (state1->spanid < state2->spanid)
return(-1);
- if (state1->source > state2->source)
- return(1);
- if (state1->target < state2->target)
- return(-1);
- if (state1->target > state2->target)
+ if (state1->spanid > state2->spanid)
return(1);
if (state1->msgid < state2->msgid)
return(-1);
}
/*
- * Write a message. {source, target, cmd} have been set. This function
- * merely queues the message to the management thread, it does not write
- * to the message socket/pipe.
+ * Write a message. All requisit command flags have been set.
+ *
+ * If msg->state is non-NULL the message is written to the existing
+ * transaction. Both msgid and spanid will be set accordingly.
+ *
+ * If msg->state is NULL and CREATE is set new state is allocated and
+ * (func, data) is installed.
*
- * If CREATE is set we allocate the state and msgid and do the insertion.
- * If CREATE is not set the state and msgid must already be assigned.
+ * If msg->state is NULL and CREATE is not set the message is assumed
+ * to be a one-way message. The msgid and spanid must be set by the
+ * caller (msgid is typically set to 0 for this case).
+ *
+ * This function merely queues the message to the management thread, it
+ * does not write to the message socket/pipe.
*/
-hammer2_state_t *
+void
hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
- int (*func)(hammer2_pfsmount_t *, hammer2_msg_t *))
+ int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
{
hammer2_state_t *state;
- uint16_t xcrc16;
- uint32_t xcrc32;
- /*
- * Setup transaction (if applicable). One-off messages always
- * use a msgid of 0.
- */
- if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+ if (msg->state) {
+ /*
+ * Continuance or termination of existing transaction.
+ * The transaction could have been initiated by either end.
+ *
+ * (Function callback and aux data for the receive side can
+ * be replaced or left alone).
+ */
+ msg->any.head.msgid = msg->state->msgid;
+ msg->any.head.spanid = msg->state->spanid;
+ lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+ if (func) {
+ msg->state->func = func;
+ msg->state->any.any = data;
+ }
+ } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
/*
* New transaction, requires tracking state and a unique
- * msgid.
+ * msgid to be allocated.
*/
KKASSERT(msg->state == NULL);
state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
state->pmp = pmp;
state->flags = HAMMER2_STATE_DYNAMIC;
state->func = func;
+ state->any.any = data;
state->msg = msg;
- state->source = msg->any.head.source;
- state->target = msg->any.head.target;
+ state->msgid = (uint64_t)(uintptr_t)state;
+ state->spanid = msg->any.head.spanid;
msg->state = state;
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
- if ((state->msgid = pmp->msgid_iterator++) == 0)
- state->msgid = pmp->msgid_iterator++;
- while (RB_INSERT(hammer2_state_tree,
- &pmp->statewr_tree, state)) {
- if ((state->msgid = pmp->msgid_iterator++) == 0)
- state->msgid = pmp->msgid_iterator++;
- }
+ if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
+ panic("duplicate msgid allocated");
msg->any.head.msgid = state->msgid;
- } else if (msg->state) {
- /*
- * Continuance or termination
- */
- lockmgr(&pmp->msglk, LK_EXCLUSIVE);
} else {
/*
- * One-off message (always uses msgid 0)
+ * One-off message (always uses msgid 0 to distinguish
+ * between a possibly lost in-transaction message due to
+ * competing aborts and a real one-off message?)
*/
msg->any.head.msgid = 0;
lockmgr(&pmp->msglk, LK_EXCLUSIVE);
}
/*
- * Set icrc2 and icrc1
+ * Finish up the msg fields
*/
- if (msg->hdr_size > sizeof(msg->any.head)) {
- xcrc32 = hammer2_icrc32(&msg->any.head + 1,
- msg->hdr_size - sizeof(msg->any.head));
- xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
- msg->any.head.icrc2 = xcrc16;
- }
- xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
- HAMMER2_MSGHDR_CRCBYTES);
- xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
- msg->any.head.icrc1 = xcrc16;
+ msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
+ ++pmp->msg_seq;
+
+ msg->any.head.hdr_crc = 0;
+ msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
lockmgr(&pmp->msglk, LK_RELEASE);
+}
+
+/*
+ * Reply to a message and terminate our side of the transaction.
+ *
+ * If msg->state is non-NULL we are replying to a one-way message.
+ */
+void
+hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+{
+ hammer2_state_t *state = msg->state;
+ uint32_t cmd;
+
+ /*
+ * Reply with a simple error code and terminate the transaction.
+ */
+ cmd = HAMMER2_LNK_ERROR;
+
+ /*
+ * Check if our direction has even been initiated yet, set CREATE.
+ *
+ * Check what direction this is (command or reply direction). Note
+ * that txcmd might not have been initiated yet.
+ *
+ * If our direction has already been closed we just return without
+ * doing anything.
+ */
+ if (state) {
+ if (state->txcmd & HAMMER2_MSGF_DELETE)
+ return;
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ cmd |= HAMMER2_MSGF_CREATE;
+ if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ cmd |= HAMMER2_MSGF_DELETE;
+ } else {
+ if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ }
+
+ msg = hammer2_msg_alloc(pmp, 0, cmd);
+ msg->any.head.error = error;
+ hammer2_msg_write(pmp, msg, NULL, NULL);
+}
+
+/*
+ * Reply to a message and continue our side of the transaction.
+ *
+ * If msg->state is non-NULL we are replying to a one-way message and this
+ * function degenerates into the same as hammer2_msg_reply().
+ */
+void
+hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+{
+ hammer2_state_t *state = msg->state;
+ uint32_t cmd;
+
+ /*
+ * Return a simple result code, do NOT terminate the transaction.
+ */
+ cmd = HAMMER2_LNK_ERROR;
+
+ /*
+ * Check if our direction has even been initiated yet, set CREATE.
+ *
+ * Check what direction this is (command or reply direction). Note
+ * that txcmd might not have been initiated yet.
+ *
+ * If our direction has already been closed we just return without
+ * doing anything.
+ */
+ if (state) {
+ if (state->txcmd & HAMMER2_MSGF_DELETE)
+ return;
+ if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+ cmd |= HAMMER2_MSGF_CREATE;
+ if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ /* continuing transaction, do not set MSGF_DELETE */
+ } else {
+ if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+ cmd |= HAMMER2_MSGF_REPLY;
+ }
- return (msg->state);
+ msg = hammer2_msg_alloc(pmp, 0, cmd);
+ msg->any.head.error = error;
+ hammer2_msg_write(pmp, msg, NULL, NULL);
}
kprintf("ADHOC INPUT MSG %08x\n", msg->any.head.cmd);
return(0);
}
+
+int
+hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+ switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
+ case HAMMER2_DBG_SHELL:
+ /*
+ * Execute shell command (not supported atm)
+ */
+ hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_UNKNOWN);
+ break;
+ case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
+ if (msg->aux_data) {
+ msg->aux_data[msg->aux_size - 1] = 0;
+ kprintf("DEBUGMSG: %s\n", msg->aux_data);
+ }
+ break;
+ default:
+ hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_UNKNOWN);
+ break;
+ }
+ return(0);
+}
* Mesh network protocol structures.
*
* The mesh is constructed from point-to-point streaming links with varying
- * levels of interconnectedness, forming a graph. When a link is established
- * link id #0 is reserved for link-level communications. This link is used
- * for authentication, registration, ping, further link id negotiations,
- * spanning tree, and so on.
- *
- * The spanning tree forms a weighted shortest-path-first graph amongst
- * those nodes with sufficient administrative rights to relay between
- * registrations. Each link maintains a full reachability set, aggregates
- * it, and retransmits via the shortest path. However, leaf nodes (even leaf
- * nodes with multiple connections) can opt not to be part of the spanning
- * tree and typically (due to administrative rights) their registrations
- * are not reported to other leafs.
+ * levels of interconnectedness, forming a graph. The spanning tree protocol
+ * running on each node transmits a LNK_SPAN transactional message to the
+ * other end. The protocol collects LNK_SPAN messages from all sources,
+ * aggregates them using a shortest-weighted-path algorithm, and transmits
+ * them over each link as well, creating a multplication within the topology.
+ *
+ * Any node in the graph may transmit a message to any other node by using
+ * the msgid of the LNK_SPAN open transaction as the message's 'linkid'.
+ * This identifies both sides so there is no 'source' and 'target' per-say.
+ *
+ * Open transactions are recorded by the source and the target, but not by
+ * intermediate nodes in the route. Streaming protocols are used. If a
+ * span element is lost its transaction will be aborted automatically (even
+ * if other routes to the same target are available), and any related
+ * messages will be aborted. If the span element was chosen for aggregation
+ * this will propagate through the entire topology and thus ultimately reach
+ * the target which used the aggregated span element, but does not
+ * necessarily effect all paths in the topology.
+ *
+ * When a link failure occurs all SPANs related to that link are
+ * transactionally closed. The SPANs are not deleted until closed in
+ * both directions, thus the spanid serves as a placeholder allowing all
+ * in-transit messages being routed over that spanid to be properly thrown
+ * out. Once completely closed the spanid can be reused.
+ *
+ * NOTE: Multiple spans for the same physical {fsid,pfs_fsid} can be
+ * forwarded, allowing concurrency within the topology.
+ *
+ * NOTE: It is important that messages in a lost route be aborted because
+ * the messaging protocol expects serialization over any given route.
+ * Only propagated spans are forwarded as spans to other nodes, so any
+ * given open span transaction will represent a specific path.
+ *
+ * If a portion of the path in the middle of the topology is lost it
+ * will propagate in both directions all the way to the ends that used
+ * it. Intermediate route nodes DO NOT silently re-route messages to
+ * another span. Messages in-flight will meet the updating SPAN and
+ * simply be discarded by intermediate nodes. Ultimately the updating
+ * SPAN reaches all end-points and auto-aborts the open transaction.
+ *
+ * If another path is available the transaction can be instantly
+ * retried.
+ *
+ * NOTE: It is possible to route messages virtually using the msgid of any
+ * open transaction instead of the msgid of a SPAN transaction, but
+ * not recommended and not currently coded.
+ *
+ * NOTE: Both the msgid and the spanid are 64-bit fields and may be populated
+ * with actual memory pointers (which simplifies the end-points).
+ * However, all such identifiers must be indexed as appropriate by the
+ * nodes and verified as being valid before any memory dereference
+ * occurs, for obvious reasons.
*
* All message responses follow the SAME PATH that the original message
* followed, but in reverse. This is an absolute requirement since messages
- * expecting replies record persistent state at each hop.
+ * expecting replies record persistent state at each hop. Sequencing must
+ * be preserved.
+ *
+ * MESSAGE TRANSACTIONAL STATES
*
* Message state is handled by the CREATE, DELETE, REPLY, and ABORT
* flags. Message state is typically recorded at the end points and
* fully closed state. ABORT|DELETE messages which race the fully closed
* state are expected to be discarded by the other end.
*
- *
- * NEGOTIATION OF {source} AND {target}
- *
- * In this discussion 'originator' describes the original sender of a message
- * and not the relays inbetween, while 'sender' describes the last relay.
- * The two mean the same thing only when the originator IS the last relay.
- *
- * The {source} field is sender-localized. The sender assigns this field
- * based on which connection the message originally came from. The initial
- * message as sent by the originator sets source=0. This also means that a
- * leaf connection will always send messages with source=0.
- *
- * The {source} field must be re-localized at each hop, since messages
- * coming from multiple connections to a node will use conflicting
- * {source} values. This can lead to linkid exhaustion which is discussed
- * a few paragraphs down.
- *
- * The {target} field is sender-allocated. Messages sent to {target} are
- * preceeded by a FORGE message to {target} which associates a registration
- * with {target}, or UNFORGE to delete the associtation.
- *
- * The msgid field is 32 bits (remember some messages have long-lived
- * persistent state so this is important!). One-way messages always use
- * msgid=0.
- *
- * LINKID EXHAUSTION
- *
- * Because {source} must be re-localized at each hop it is possible to run
- * out of link identifiers. At the same time we want to allow millions of
- * client/leaf connections, and 'millions' is a lot bigger than 65535.
- *
- * We also have a problem with the persistent message state... If a single
- * client's vnode cache has a million vnodes that can represent a million
- * persistent cache states. Multiply by a million clients and ... oops!
- *
- * To solve these problems leafs connect into protocol-aggregators rather
- * than directly to the cluster. The linkid and core message protocols only
- * occur within the cluster and not by the leafs. A leaf can still connect
- * to multiple aggregators for redundancy if it desires but may have to
- * pick and choose which inodes go where since acquiring a cache state lock
- * over one connection will cause conflicts to be invalidated on the other.
- * In otherwords, there are limitations to this approach.
- *
- * A protocol aggregator takes any number of connections and aggregates
- * the operations down to a single linkid. For example, this means that
- * the protocol aggregator is responsible for maintaining all the cache
- * state and performing crunches to reduce the overall amount of state
- * down to something the cluster core can handle.
- *
* --
*
- * All message headers are 32-byte aligned and sized (all command and
- * response structures must be 32-byte aligned), and all transports must
- * support message headers up to HAMMER2_MSGHDR_MAX. The msg structure
- * can handle up to 8160 bytes but to keep things fairly clean we limit
- * message headers to 2048 bytes.
+ * All base and extended message headers are 64-byte aligned, and all
+ * transports must support extended message headers up to HAMMER2_MSGHDR_MAX.
+ * Currently we allow extended message headers up to 2048 bytes. Note
+ * that the extended header size is encoded in the 'cmd' field of the header.
*
- * Any in-band data is padded to a 32-byte alignment and placed directly
+ * Any in-band data is padded to a 64-byte alignment and placed directly
* after the extended header (after the higher-level cmd/rep structure).
* The actual unaligned size of the in-band data is encoded in the aux_bytes
* field in this case. Maximum data sizes are negotiated during registration.
*
- * Use of out-of-band data must be negotiated. In this case bit 31 of
- * aux_bytes will be set and the remaining bits will contain information
- * specific to the out-of-band transfer (such as DMA channel, slot, etc).
- *
- * (must be 32 bytes exactly to match the alignment requirement and to
- * support pad records in shared-memory FIFO schemes)
+ * Auxillary data can be in-band or out-of-band. In-band data sets aux_descr
+ * equal to 0. Any out-of-band data must be negotiated by the SPAN protocol.
+ *
+ * Auxillary data, whether in-band or out-of-band, must be at-least 64-byte
+ * aligned. The aux_bytes field contains the actual byte-granular length
+ * and not the aligned length.
+ *
+ * hdr_crc is calculated over the entire, ALIGNED extended header. For
+ * the purposes of calculating the crc, the hdr_crc field is 0. That is,
+ * if calculating the crc in HW a 32-bit '0' must be inserted in place of
+ * the hdr_crc field when reading the entire header and compared at the
+ * end (but the actual hdr_crc must be left intact in memory). A simple
+ * counter to replace the field going into the CRC generator does the job
+ * in HW. The CRC endian is based on the magic number field and may have
+ * to be byte-swapped, too (which is also easy to do in HW).
+ *
+ * aux_crc is calculated over the entire, ALIGNED auxillary data.
+ *
+ * SHARED MEMORY IMPLEMENTATIONS
+ *
+ * Shared-memory implementations typically use a pipe to transmit the extended
+ * message header and shared memory to store any auxilary data. Auxillary
+ * data in one-way (non-transactional) messages is typically required to be
+ * inline. CRCs are still recommended and required at the beginning, but
+ * may be negotiated away later.
+ *
+ * MULTI-PATH MESSAGE DUPLICATION
+ *
+ * Redundancy can be negotiated but is not required in the current spec.
+ * Basically you send the same message, with the same msgid, via several
+ * paths to the target. The msgid is the rendezvous. The first copy that
+ * makes it to the target is used, the second is ignored. Similarly for
+ * replies. This can improve performance during span flapping. Only
+ * transactional messages will be serialized. The target might receive
+ * multiple copies of one-way messages in higher protocol layers (potentially
+ * out of order, too).
*/
struct hammer2_msg_hdr {
- uint16_t magic; /* sanity, synchronization, endian */
- uint16_t icrc1; /* base header crc &salt on */
- uint32_t salt; /* random salt helps crypto/replay */
-
- uint16_t source; /* command originator linkid */
- uint16_t target; /* reply originator linkid */
- uint32_t msgid; /* {source,target,msgid} unique */
-
- uint32_t cmd; /* flags | cmd | hdr_size / 32 */
- uint16_t error; /* error field */
- uint16_t resv05;
-
- uint16_t icrc2; /* extended header crc (after base) */
- uint16_t aux_bytes; /* aux data descriptor or size / 32 */
- uint32_t aux_icrc; /* aux data iscsi crc */
+ uint16_t magic; /* 00 sanity, synchro, endian */
+ uint16_t reserved02; /* 02 size of header in bytes */
+ uint32_t salt; /* 04 random salt helps w/crypto */
+
+ uint64_t msgid; /* 08 message transaction id */
+ uint64_t spanid; /* 10 message routing id or 0 */
+
+ uint32_t cmd; /* 18 flags | cmd | hdr_size / ALIGN */
+ uint32_t aux_crc; /* 1C auxillary data crc */
+ uint32_t aux_bytes; /* 20 auxillary data length (bytes) */
+ uint32_t error; /* 24 error code or 0 */
+ uint64_t aux_descr; /* 28 negotiated OOB data descr */
+ uint64_t reserved30; /* 30 */
+ uint32_t reserved38; /* 38 */
+ uint32_t hdr_crc; /* 3C (aligned) extended header crc */
};
typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
/*
* Administrative protocol limits.
*/
-#define HAMMER2_MSGHDR_MAX 2048 /* msg struct max is 8192-32 */
-#define HAMMER2_MSGAUX_MAX 65536 /* msg struct max is 2MB-32 */
+#define HAMMER2_MSGHDR_MAX 2048 /* <= 65535 */
+#define HAMMER2_MSGAUX_MAX 65536 /* <= 1MB */
#define HAMMER2_MSGBUF_SIZE (HAMMER2_MSGHDR_MAX * 4)
#define HAMMER2_MSGBUF_MASK (HAMMER2_MSGBUF_SIZE - 1)
/*
* Message command constructors, sans flags
*/
-#define HAMMER2_MSG_ALIGN 32
+#define HAMMER2_MSG_ALIGN 64
#define HAMMER2_MSG_ALIGNMASK (HAMMER2_MSG_ALIGN - 1)
#define HAMMER2_MSG_DOALIGN(bytes) (((bytes) + HAMMER2_MSG_ALIGNMASK) & \
~HAMMER2_MSG_ALIGNMASK)
* pad message buffers on shared-memory transports. Not
* typically used with TCP.
*
+ * PING - One-way message on link-0, keep-alive, run by both sides
+ * typically 1/sec on idle link, link is lost after 10 seconds
+ * of inactivity.
+ *
* AUTH - Authenticate the connection, negotiate administrative
* rights & encryption, protocol class, etc. Only PAD and
* AUTH messages (not even PING) are accepted until
* authentication is complete. This message also identifies
* the host.
*
- * PING - One-way message on link-0, keep-alive, run by both sides
- * typically 1/sec on idle link, link is lost after 10 seconds
- * of inactivity.
- *
- * STATUS - One-way message on link-0, host-spanning tree message.
- * Connection and authentication status is propagated using
- * these messages on a per-connection basis. Works like SPAN
- * but is only used for general status. See the hammer2
- * 'rinfo' command.
+ * CONN - Enable the SPAN protocol on link-0, possibly also installing
+ * a PFS filter (by cluster id, unique id, and/or wildcarded
+ * name).
*
- * SPAN - One-way message on link-0, spanning tree message adds,
- * drops, or updates a remote registration. Sent by both
- * sides, delta changes only. Visbility into remote
- * registrations may be limited and received registrations
- * may be filtered depending on administrative controls.
- *
- * A multiply-connected node maintains SPAN information on
- * each link independently and then retransmits an aggregation
- * of the shortest-weighted path for each registration to
- * all links when a received change adjusts the path.
+ * SPAN - A SPAN transaction on link-0 enables messages to be relayed
+ * to/from a particular cluster node. SPANs are received,
+ * sorted, aggregated, and retransmitted back out across all
+ * applicable connections.
*
* The leaf protocol also uses this to make a PFS available
* to the cluster (e.g. on-mount).
#define HAMMER2_LNK_PAD HAMMER2_MSG_LNK(0x000, hammer2_msg_hdr)
#define HAMMER2_LNK_PING HAMMER2_MSG_LNK(0x001, hammer2_msg_hdr)
#define HAMMER2_LNK_AUTH HAMMER2_MSG_LNK(0x010, hammer2_lnk_auth)
-#define HAMMER2_LNK_SPAN HAMMER2_MSG_LNK(0x011, hammer2_lnk_span)
+#define HAMMER2_LNK_CONN HAMMER2_MSG_LNK(0x011, hammer2_lnk_conn)
+#define HAMMER2_LNK_SPAN HAMMER2_MSG_LNK(0x012, hammer2_lnk_span)
#define HAMMER2_LNK_ERROR HAMMER2_MSG_LNK(0xFFF, hammer2_msg_hdr)
/*
- * SPAN - Registration (transaction, left open)
+ * LNK_CONN - Register connection for SPAN (transaction, left open)
+ *
+ * One LNK_CONN transaction may be opened on a stream connection, registering
+ * the connection with the SPAN subsystem and allowing the subsystem to
+ * accept and relay SPANs to this connection.
+ *
+ * The LNK_CONN message may contain a filter, limiting the desireable SPANs.
+ *
+ * This message contains a lot of the same info that a SPAN message contains,
+ * but is not a SPAN. That is, without this message the SPAN subprotocol will
+ * not be executed on the connection, nor is this message a promise that the
+ * sending end is a client or node of a cluster.
+ */
+struct hammer2_lnk_conn {
+ hammer2_msg_hdr_t head;
+ uuid_t pfs_clid; /* rendezvous pfs uuid */
+ uuid_t pfs_fsid; /* unique pfs uuid */
+ uint8_t pfs_type; /* peer type */
+ uint8_t reserved01;
+ uint16_t proto_version; /* high level protocol support */
+ uint32_t status; /* status flags */
+ uint8_t reserved02[8];
+ int32_t weight; /* span weight */
+ uint32_t reserved03[15];
+ char label[256]; /* PFS label (can be wildcard) */
+};
+
+typedef struct hammer2_lnk_conn hammer2_lnk_conn_t;
+
+/*
+ * LNK_SPAN - Relay a SPAN (transaction, left open)
*
* This message registers a PFS/PFS_TYPE with the other end of the connection,
* telling the other end who we are and what we can provide or what we want
*/
struct hammer2_lnk_span {
hammer2_msg_hdr_t head;
- uuid_t pfs_id; /* rendezvous pfs uuid */
+ uuid_t pfs_clid; /* rendezvous pfs uuid */
uuid_t pfs_fsid; /* unique pfs uuid */
uint8_t pfs_type; /* peer type */
uint8_t reserved01;
uint16_t proto_version; /* high level protocol support */
uint32_t status; /* status flags */
uint8_t reserved02[8];
- uint32_t reserved03[16];
+ int32_t weight; /* span weight */
+ uint32_t reserved03[15];
char label[256]; /* PFS label (can be wildcard) */
};
#define HAMMER2_QRM_COMMIT HAMMER2_MSG_QRM(0x001, hammer2_qrm_commit)
/*
+ * NOTE!!!! ALL EXTENDED HEADER STRUCTURES MUST BE 64-BYTE ALIGNED!!!
+ *
* General message errors
*
* 0x00 - 0x1F Local iocomm errors
char buf[HAMMER2_MSGHDR_MAX];
hammer2_msg_hdr_t head;
hammer2_lnk_span_t lnk_span;
+ hammer2_lnk_conn_t lnk_conn;
};
typedef union hammer2_msg_any hammer2_msg_any_t;
static void hammer2_cluster_thread_rd(void *arg);
static void hammer2_cluster_thread_wr(void *arg);
-static int hammer2_msg_span_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
+static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
+static int hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
/*
* HAMMER2 vfs operations.
*/
error = hammer2_state_msgrx(pmp, msg);
if (error) {
+ /*
+ * Raw protocol or connection error
+ */
hammer2_msg_free(pmp, msg);
if (error == EALREADY)
error = 0;
- } else if (msg->state) {
- error = msg->state->func(pmp, msg);
+ } else if (msg->state && msg->state->func) {
+ /*
+ * Message related to state which already has a
+ * handling function installed for it.
+ */
+ error = msg->state->func(msg->state, msg);
+ hammer2_state_cleanuprx(pmp, msg);
+ } else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
+ HAMMER2_MSG_PROTO_LNK) {
+ /*
+ * Message related to the LNK protocol set
+ */
+ error = hammer2_msg_lnk_rcvmsg(pmp, msg);
+ hammer2_state_cleanuprx(pmp, msg);
+ } else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
+ HAMMER2_MSG_PROTO_DBG) {
+ /*
+ * Message related to the DBG protocol set
+ */
+ error = hammer2_msg_dbg_rcvmsg(pmp, msg);
hammer2_state_cleanuprx(pmp, msg);
} else {
+ /*
+ * Other higher-level messages (e.g. vnops)
+ */
error = hammer2_msg_adhoc_input(pmp, msg);
hammer2_state_cleanuprx(pmp, msg);
}
int error = 0;
/*
- * Initiate a SPAN transaction registering our PFS with the other
- * end using {source}=1. The transaction is left open.
+ * Open a LNK_CONN transaction indicating that we want to take part
+ * in the spanning tree algorithm. Filter explicitly on the PFS
+ * info in the iroot.
+ *
+ * We do not transmit our (only) LNK_SPAN until the other end has
+ * acknowledged our link connection request.
*
- * The hammer2_msg_write() function will queue the message, and we
- * pick it off and write it in our transmit loop.
+ * The transaction remains fully open for the duration of the
+ * connection.
*/
- msg = hammer2_msg_alloc(pmp, 1, 0,
- HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE);
- msg->any.lnk_span.pfs_id = pmp->iroot->ip_data.pfs_id;
- msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
- msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
- msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
+ msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE);
+ msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
+ msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
+ msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
+ msg->any.lnk_conn.proto_version = HAMMER2_SPAN_PROTO_1;
name_len = pmp->iroot->ip_data.name_len;
- if (name_len >= sizeof(msg->any.lnk_span.label))
- name_len = sizeof(msg->any.lnk_span.label) - 1;
- bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_span.label, name_len);
- msg->any.lnk_span.label[name_len] = 0;
-
- hammer2_msg_write(pmp, msg, hammer2_msg_span_reply);
+ if (name_len >= sizeof(msg->any.lnk_conn.label))
+ name_len = sizeof(msg->any.lnk_conn.label) - 1;
+ bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
+ msg->any.lnk_conn.label[name_len] = 0;
+ hammer2_msg_write(pmp, msg, hammer2_msg_conn_reply, pmp);
/*
* Transmit loop
}
static int
-hammer2_msg_span_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
{
- kprintf("SPAN REPLY\n");
+ switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
+ case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
+ kprintf("CONN RECEIVE - (just ignore it)\n");
+ hammer2_msg_result(pmp, msg, 0);
+ break;
+ case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
+ kprintf("SPAN RECEIVE - ADDED FROM CLUSTER\n");
+ break;
+ case HAMMER2_LNK_SPAN | HAMMER2_MSGF_DELETE:
+ kprintf("SPAN RECEIVE - DELETED FROM CLUSTER\n");
+ break;
+ default:
+ break;
+ }
+ return(0);
+}
+
+/*
+ * This function is called when the other end replies to our LNK_CONN
+ * request.
+ *
+ * We transmit our (single) SPAN on the initial reply, leaving that
+ * transaction open too.
+ */
+static int
+hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+ hammer2_pfsmount_t *pmp = state->any.pmp;
+ size_t name_len;
+
+ if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+ kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
+ msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_SPAN |
+ HAMMER2_MSGF_CREATE);
+ msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
+ msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
+ msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
+ msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
+ name_len = pmp->iroot->ip_data.name_len;
+ if (name_len >= sizeof(msg->any.lnk_span.label))
+ name_len = sizeof(msg->any.lnk_span.label) - 1;
+ bcopy(pmp->iroot->ip_data.filename,
+ msg->any.lnk_span.label,
+ name_len);
+ msg->any.lnk_span.label[name_len] = 0;
+ hammer2_msg_write(pmp, msg, hammer2_msg_span_reply, pmp);
+ }
+ if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+ kprintf("LNK_CONN transaction terminated by remote\n");
+ hammer2_msg_reply(pmp, msg, 0);
+ }
+ return(0);
+}
+
+static int
+hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+ hammer2_pfsmount_t *pmp = state->any.pmp;
+
+ kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
return(0);
}