From: Matthew Dillon Date: Fri, 15 Jun 2012 03:00:58 +0000 (-0700) Subject: hammer2 - Bring in the transaction state code from the hammer2 vfs X-Git-Tag: v3.4.0rc~1066 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/784762058b814b08bf62787ce0c48c61871c1354 hammer2 - Bring in the transaction state code from the hammer2 vfs * Bring in the transaction state management code from the kernel hammer2 module and cleanup the APIs to use similar mechanics. * Basic replymsg operations now use the HAMMER2_LNK_ERROR directive instead of the original command, for now. --- diff --git a/sbin/hammer2/cmd_debug.c b/sbin/hammer2/cmd_debug.c index af170f3a55..6f93281a5d 100644 --- a/sbin/hammer2/cmd_debug.c +++ b/sbin/hammer2/cmd_debug.c @@ -40,7 +40,7 @@ static void shell_recv(hammer2_iocom_t *iocom); static void shell_send(hammer2_iocom_t *iocom); static void shell_tty(hammer2_iocom_t *iocom); -static void hammer2_shell_parse(hammer2_msg_t *msg, char *cmdbuf); +static void hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg); /************************************************************************ * SHELL * @@ -97,8 +97,8 @@ cmd_shell(const char *hostname) hammer2_iocom_init(&iocom, fd, 0); printf("debug: connected\n"); - msg = hammer2_allocmsg(&iocom, HAMMER2_DBG_SHELL, 0); - hammer2_ioq_write(msg); + msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL); + hammer2_ioq_write(&iocom, msg); hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty); fprintf(stderr, "debug: disconnected\n"); @@ -121,15 +121,20 @@ shell_recv(hammer2_iocom_t *iocom) switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) { case HAMMER2_LNK_ERROR: - fprintf(stderr, "Link Error: %d\n", - msg->any.head.error); + case HAMMER2_LNK_ERROR | HAMMER2_MSGF_REPLY: + if (msg->any.head.error) { + fprintf(stderr, + "Link Error: %d\n", + msg->any.head.error); + } else { + write(1, "debug> ", 7); + } break; case HAMMER2_DBG_SHELL: /* * We send the commands, not accept them. */ - hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN); - hammer2_freemsg(msg); + hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN); break; case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY: /* @@ -138,18 +143,16 @@ shell_recv(hammer2_iocom_t *iocom) if (msg->aux_size) { msg->aux_data[msg->aux_size - 1] = 0; write(1, msg->aux_data, strlen(msg->aux_data)); - } else { - write(1, "debug> ", 7); } - hammer2_freemsg(msg); break; default: - assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); fprintf(stderr, "Unknown message: %08x\n", msg->any.head.cmd); - hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN); + assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); + hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN); break; } + hammer2_state_cleanuprx(iocom, msg); } if (iocom->ioq_rx.error) { fprintf(stderr, "node_master_recv: comm error %d\n", @@ -181,9 +184,9 @@ shell_tty(hammer2_iocom_t *iocom) if (len && buf[len - 1] == '\n') buf[--len] = 0; ++len; - msg = hammer2_allocmsg(iocom, HAMMER2_DBG_SHELL, len); + msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL); bcopy(buf, msg->aux_data, len); - hammer2_ioq_write(msg); + hammer2_ioq_write(iocom, msg); } else { /* * Set EOF flag without setting any error code for normal @@ -199,10 +202,8 @@ shell_tty(hammer2_iocom_t *iocom) * then finish up by outputting another prompt. */ void -hammer2_shell_remote(hammer2_msg_t *msg) +hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg) { - /* hammer2_iocom_t *iocom = msg->iocom; */ - if (msg->aux_data) msg->aux_data[msg->aux_size - 1] = 0; if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { @@ -212,54 +213,60 @@ hammer2_shell_remote(hammer2_msg_t *msg) */ if (msg->aux_data) write(2, msg->aux_data, strlen(msg->aux_data)); - hammer2_freemsg(msg); } else { /* * Otherwise this is a command which we must process. * When we are finished we generate a final reply. */ - hammer2_shell_parse(msg, msg->aux_data); - hammer2_replymsg(msg, 0); + hammer2_shell_parse(iocom, msg); + hammer2_msg_reply(iocom, msg, 0); } } static void -hammer2_shell_parse(hammer2_msg_t *msg, char *cmdbuf) +hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg) { - /* hammer2_iocom_t *iocom = msg->iocom; */ + char *cmdbuf = msg->aux_data; char *cmd = strsep(&cmdbuf, " \t"); if (cmd == NULL || *cmd == 0) { ; } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) { - msg_printf(msg, "help Command help\n"); + iocom_printf(iocom, 0, "help Command help\n"); } else { - msg_printf(msg, "Unrecognized command: %s\n", cmd); + iocom_printf(iocom, 0, "Unrecognized command: %s\n", cmd); } } /* * Returns text debug output to the original defined by (msg). (msg) is - * not modified and stays intact. + * not modified and stays intact. We use a one-way message with REPLY set + * to distinguish between a debug command and debug terminal output. + * + * To prevent loops iocom_printf() can filter the message (cmd) related + * to the iocom_printf(). We filter out DBG messages. */ void -msg_printf(hammer2_msg_t *msg, const char *ctl, ...) +iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...) { - /* hammer2_iocom_t *iocom = msg->iocom; */ hammer2_msg_t *rmsg; va_list va; char buf[1024]; size_t len; + if ((cmd & HAMMER2_MSGF_PROTOS) == HAMMER2_MSG_PROTO_DBG) + return; + va_start(va, ctl); vsnprintf(buf, sizeof(buf), ctl, va); va_end(va); len = strlen(buf) + 1; - rmsg = hammer2_allocreply(msg, HAMMER2_DBG_SHELL, len); + rmsg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL | + HAMMER2_MSGF_REPLY); bcopy(buf, rmsg->aux_data, len); - hammer2_ioq_write(rmsg); + hammer2_ioq_write(iocom, rmsg); } /************************************************************************ diff --git a/sbin/hammer2/cmd_service.c b/sbin/hammer2/cmd_service.c index d6f9e2f78f..97753dbc32 100644 --- a/sbin/hammer2/cmd_service.c +++ b/sbin/hammer2/cmd_service.c @@ -219,12 +219,13 @@ master_link_rx(hammer2_iocom_t *iocom) break; case HAMMER2_DBG_SHELL: case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY: - hammer2_shell_remote(msg); + hammer2_shell_remote(iocom, msg); break; default: - hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN); + hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN); break; } + hammer2_state_cleanuprx(iocom, msg); } if (iocom->ioq_rx.error) { fprintf(stderr, diff --git a/sbin/hammer2/hammer2.h b/sbin/hammer2/hammer2.h index ed71e6c345..dd3e0cc86c 100644 --- a/sbin/hammer2/hammer2.h +++ b/sbin/hammer2/hammer2.h @@ -110,24 +110,25 @@ void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd); void hammer2_iocom_done(hammer2_iocom_t *iocom); -hammer2_msg_t *hammer2_allocmsg(hammer2_iocom_t *iocom, - uint32_t cmd, int aux_size); -hammer2_msg_t *hammer2_allocreply(hammer2_msg_t *msg, - uint32_t cmd, int aux_size); -void hammer2_replymsg(hammer2_msg_t *msg, uint16_t error); -void hammer2_freemsg(hammer2_msg_t *msg); +hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, + uint32_t cmd); +void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, + uint16_t error); +void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg); void hammer2_iocom_core(hammer2_iocom_t *iocom, void (*iocom_recvmsg)(hammer2_iocom_t *), void (*iocom_sendmsg)(hammer2_iocom_t *), void (*iocom_altmsg)(hammer2_iocom_t *)); hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom); -void hammer2_ioq_write(hammer2_msg_t *msg); +void hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg); -void hammer2_ioq_stream(hammer2_msg_t *msg, int reply); void hammer2_iocom_drain(hammer2_iocom_t *iocom); void hammer2_iocom_flush(hammer2_iocom_t *iocom); +void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +void hammer2_state_free(hammer2_state_t *state); + void hammer2_crypto_negotiate(hammer2_iocom_t *iocom); void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq); void hammer2_crypto_decrypt_aux(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq, @@ -143,5 +144,5 @@ const char *hammer2_iptype_to_str(uint8_t type); const char *hammer2_pfstype_to_str(uint8_t type); const char *sizetostr(hammer2_off_t size); -void hammer2_shell_remote(hammer2_msg_t *msg); -void msg_printf(hammer2_msg_t *msg, const char *ctl, ...); +void hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +void iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...); diff --git a/sbin/hammer2/msg.c b/sbin/hammer2/msg.c index f17855a737..aab35f8a62 100644 --- a/sbin/hammer2/msg.c +++ b/sbin/hammer2/msg.c @@ -35,6 +35,10 @@ #include "hammer2.h" +static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); +static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg); + /* * Initialize a low-level ioq */ @@ -52,12 +56,12 @@ hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq) hammer2_msg_t *msg; while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { - TAILQ_REMOVE(&ioq->msgq, msg, entry); - hammer2_freemsg(msg); + TAILQ_REMOVE(&ioq->msgq, msg, qentry); + hammer2_msg_free(iocom, msg); } if ((msg = ioq->msg) != NULL) { ioq->msg = NULL; - hammer2_freemsg(msg); + hammer2_msg_free(iocom, msg); } } @@ -69,6 +73,8 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd) { bzero(iocom, sizeof(*iocom)); + RB_INIT(&iocom->staterd_tree); + RB_INIT(&iocom->statewr_tree); TAILQ_INIT(&iocom->freeq); TAILQ_INIT(&iocom->freeq_aux); iocom->sock_fd = sock_fd; @@ -104,11 +110,11 @@ hammer2_iocom_done(hammer2_iocom_t *iocom) hammer2_ioq_done(iocom, &iocom->ioq_rx); hammer2_ioq_done(iocom, &iocom->ioq_tx); if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { - TAILQ_REMOVE(&iocom->freeq, msg, entry); + TAILQ_REMOVE(&iocom->freeq, msg, qentry); free(msg); } if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { - TAILQ_REMOVE(&iocom->freeq_aux, msg, entry); + TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); free(msg->aux_data); msg->aux_data = NULL; free(msg); @@ -119,7 +125,7 @@ hammer2_iocom_done(hammer2_iocom_t *iocom) * Allocate a new one-way message. */ hammer2_msg_t * -hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size) +hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd) { hammer2_msg_t *msg; int hbytes; @@ -128,14 +134,14 @@ hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size) aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) & ~HAMMER2_MSG_ALIGNMASK; if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) - TAILQ_REMOVE(&iocom->freeq_aux, msg, entry); + TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); } else { if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) - TAILQ_REMOVE(&iocom->freeq, msg, entry); + TAILQ_REMOVE(&iocom->freeq, msg, qentry); } if (msg == NULL) { msg = malloc(sizeof(*msg)); - msg->iocom = iocom; + bzero(msg, sizeof(*msg)); msg->aux_data = NULL; msg->aux_size = 0; } @@ -150,61 +156,28 @@ hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size) msg->aux_size = aux_size; } } - msg->flags = 0; hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN; if (hbytes) bzero(&msg->any.head, hbytes); + msg->hdr_size = hbytes; msg->any.head.aux_icrc = 0; msg->any.head.cmd = cmd; return (msg); } -/* - * Allocate a one-way or streaming reply to a message. The message is - * not modified. This function may be used to allocate multiple replies. - * - * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command. - */ -hammer2_msg_t * -hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size) -{ - hammer2_msg_t *rmsg; - hammer2_persist_t *pers; - - assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); - if (cmd == 0) - cmd = msg->any.head.cmd; - - rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size); - rmsg->any.head = msg->any.head; - rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) & - ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE); - rmsg->any.head.aux_icrc = 0; - - if ((pers = msg->persist) != NULL) { - assert(pers->lrep & HAMMER2_MSGF_DELETE); - rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE; - pers->lrep &= ~HAMMER2_MSGF_CREATE; - /* do not clear DELETE */ - } - return (rmsg); -} - /* * Free a message so it can be reused afresh. * * NOTE: aux_size can be 0 with a non-NULL aux_data. */ void -hammer2_freemsg(hammer2_msg_t *msg) +hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg) { - hammer2_iocom_t *iocom = msg->iocom; - if (msg->aux_data) - TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry); + TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); else - TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry); + TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry); } /* @@ -278,19 +251,20 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) hammer2_msg_t *msg; hammer2_msg_hdr_t *head; ssize_t n; - int bytes; - int flags; - int nmax; + size_t bytes; + size_t nmax; uint16_t xcrc16; uint32_t xcrc32; + int error; +again: /* * If a message is already pending we can just remove and - * return it. + * return it. Message state has already been processed. */ if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { - TAILQ_REMOVE(&ioq->msgq, msg, entry); - return(msg); + TAILQ_REMOVE(&ioq->msgq, msg, qentry); + return (msg); } /* @@ -346,7 +320,6 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) * not immediately decrypted. */ hammer2_crypto_decrypt(iocom, ioq); - flags = 0; head = (void *)(ioq->buf + ioq->fifo_beg); /* @@ -365,7 +338,6 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) HAMMER2_MSGHDR_CRCBYTES); if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) { hammer2_bswap_head(head); - flags |= HAMMER2_MSGX_BSWAPPED; } xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16); if (xcrc16 != head->icrc1) { @@ -379,8 +351,8 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN; ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN; - if (ioq->hbytes < (int)sizeof(msg->any.head) || - ioq->hbytes > (int)sizeof(msg->any) || + if (ioq->hbytes < sizeof(msg->any.head) || + ioq->hbytes > sizeof(msg->any) || ioq->abytes > HAMMER2_MSGAUX_MAX) { ioq->error = HAMMER2_IOQ_ERROR_FIELD; break; @@ -393,9 +365,7 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) * Initialize msg->aux_size to 0 and use it to track * the amount of data copied from the stream. */ - msg = hammer2_allocmsg(iocom, 0, ioq->abytes); - msg->aux_size = 0; - msg->flags = flags; + msg = hammer2_msg_alloc(iocom, ioq->abytes, 0); ioq->msg = msg; /* @@ -469,7 +439,7 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) /* * Check the crc on the extended header */ - if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) { + if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) { xcrc32 = hammer2_icrc32(head + 1, ioq->hbytes - sizeof(*head)); xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16); @@ -507,8 +477,9 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) * Copy the partial or complete payload from remaining * bytes in the FIFO. We have to fall-through either * way so we can check the crc. + * + * Adjust msg->aux_size to the final actual value. */ - assert(msg->aux_size == 0); ioq->already = ioq->fifo_cdx - ioq->fifo_beg; if (ioq->already > ioq->abytes) ioq->already = ioq->abytes; @@ -528,6 +499,8 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) if (ioq->fifo_cdx < ioq->fifo_beg) ioq->fifo_cdx = ioq->fifo_beg; bytes = 0; + } else { + msg->aux_size = 0; } ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2; /* fall through */ @@ -601,6 +574,20 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) } } + /* + * Process transactional state for the message. + */ + if (msg && ioq->error == 0) { + error = hammer2_state_msgrx(iocom, msg); + if (error) { + if (error == HAMMER2_IOQ_ERROR_EALREADY) { + hammer2_msg_free(iocom, msg); + goto again; + } + ioq->error = error; + } + } + /* * Handle error, RREQ, or completion * @@ -620,7 +607,7 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) */ assert(ioq->msg == msg); if (msg == NULL) - msg = hammer2_allocmsg(iocom, 0, 0); + msg = hammer2_msg_alloc(iocom, 0, 0); else ioq->msg = NULL; @@ -681,17 +668,35 @@ hammer2_ioq_read(hammer2_iocom_t *iocom) * Calling this function with msg == NULL will get a flush going. */ void -hammer2_ioq_write(hammer2_msg_t *msg) +hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg) { - hammer2_iocom_t *iocom = msg->iocom; hammer2_ioq_t *ioq = &iocom->ioq_tx; uint16_t xcrc16; uint32_t xcrc32; int hbytes; + int error; assert(msg); + + /* + * Process transactional state. + */ + if (ioq->error == 0) { + error = hammer2_state_msgtx(iocom, msg); + if (error) { + if (error == HAMMER2_IOQ_ERROR_EALREADY) { + hammer2_msg_free(iocom, msg); + } else { + ioq->error = error; + } + } + } + + /* + * Process terminal connection errors. + */ if (ioq->error) { - TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry); + TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); ++ioq->msgcount; hammer2_iocom_drain(iocom); return; @@ -743,7 +748,7 @@ hammer2_ioq_write(hammer2_msg_t *msg) /* * Enqueue the message. */ - TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry); + TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry); ++ioq->msgcount; iocom->flags &= ~HAMMER2_IOCOMF_WIDLE; @@ -767,8 +772,8 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom) ssize_t nmax; ssize_t nact; struct iovec iov[HAMMER2_IOQ_MAXIOVEC]; - int hbytes; - int abytes; + size_t hbytes; + size_t abytes; int hoff; int aoff; int n; @@ -779,7 +784,7 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom) n = 0; nmax = 0; - TAILQ_FOREACH(msg, &ioq->msgq, entry) { + TAILQ_FOREACH(msg, &ioq->msgq, qentry) { hoff = 0; hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN; @@ -845,26 +850,24 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom) HAMMER2_MSG_ALIGN; abytes = msg->aux_size; - if (nact < hbytes - ioq->hbytes) { + if ((size_t)nact < hbytes - ioq->hbytes) { ioq->hbytes += nact; break; } nact -= hbytes - ioq->hbytes; ioq->hbytes = hbytes; - if (nact < abytes - ioq->abytes) { + if ((size_t)nact < abytes - ioq->abytes) { ioq->abytes += nact; break; } nact -= abytes - ioq->abytes; - TAILQ_REMOVE(&ioq->msgq, msg, entry); + TAILQ_REMOVE(&ioq->msgq, msg, qentry); --ioq->msgcount; ioq->hbytes = 0; ioq->abytes = 0; - if (msg->aux_data) - TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry); - else - TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry); + + hammer2_state_cleanuptx(iocom, msg); } if (msg == NULL) { iocom->flags |= HAMMER2_IOCOMF_WIDLE; @@ -890,42 +893,624 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom) hammer2_msg_t *msg; while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) { - TAILQ_REMOVE(&ioq->msgq, msg, entry); + TAILQ_REMOVE(&ioq->msgq, msg, qentry); --ioq->msgcount; - hammer2_freemsg(msg); + hammer2_msg_free(iocom, msg); } iocom->flags |= HAMMER2_IOCOMF_WIDLE; iocom->flags &= ~HAMMER2_IOCOMF_WREQ; } /* - * This is a shortcut to the normal hammer2_allocreply() mechanic which - * uses the received message to formulate a final reply and error code. - * Can be used to issue a final reply for one-way, one-off, or streaming - * commands. + * This is a shortcut to formulate a reply to msg with a simple error code. + * It can reply to transaction or one-way messages, or terminate one side + * of a stream. A HAMMER2_LNK_ERROR command code is utilized to encode + * the error code (which can be 0). * * Replies to one-way messages are a bit of an oxymoron but the feature * is used by the debug (DBG) protocol. * * The reply contains no data. + */ +void +hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error) +{ + hammer2_msg_t *nmsg; + uint32_t cmd; + + cmd = HAMMER2_LNK_ERROR; + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + /* + * Reply to received reply, reply direction uses txcmd. + * txcmd will be updated by hammer2_ioq_write(). + */ + if (msg->state) { + if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0) + cmd |= HAMMER2_MSGF_CREATE; + cmd |= HAMMER2_MSGF_DELETE; + } + } else { + /* + * Reply to received command, reply direction uses rxcmd. + * txcmd will be updated by hammer2_ioq_write(). + */ + cmd |= HAMMER2_MSGF_REPLY; + if (msg->state) { + if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0) + cmd |= HAMMER2_MSGF_CREATE; + cmd |= HAMMER2_MSGF_DELETE; + } + } + nmsg = hammer2_msg_alloc(iocom, 0, cmd); + nmsg->any.head.error = error; + hammer2_ioq_write(iocom, nmsg); +} + +/************************************************************************ + * TRANSACTION STATE HANDLING * + ************************************************************************ + * + */ + +RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp); + +/* + * Process state tracking for a message after reception, prior to + * execution. + * + * Called with msglk held and the msg dequeued. + * + * All messages are called with dummy state and return actual state. + * (One-off messages often just return the same dummy state). + * + * May request that caller discard the message by setting *discardp to 1. + * The returned state is not used in this case and is allowed to be NULL. + * + * -- + * + * These routines handle persistent and command/reply message state via the + * CREATE and DELETE flags. The first message in a command or reply sequence + * sets CREATE, the last message in a command or reply sequence sets DELETE. * - * (msg) is eaten up by this function. + * There can be any number of intermediate messages belonging to the same + * sequence sent inbetween the CREATE message and the DELETE message, + * which set neither flag. This represents a streaming command or reply. + * + * Any command message received with CREATE set expects a reply sequence to + * be returned. Reply sequences work the same as command sequences except the + * REPLY bit is also sent. Both the command side and reply side can + * degenerate into a single message with both CREATE and DELETE set. Note + * that one side can be streaming and the other side not, or neither, or both. + * + * The msgid is unique for the initiator. That is, two sides sending a new + * message can use the same msgid without colliding. + * + * -- + * + * ABORT sequences work by setting the ABORT flag along with normal message + * state. However, ABORTs can also be sent on half-closed messages, that is + * even if the command or reply side has already sent a DELETE, as long as + * the message has not been fully closed it can still send an ABORT+DELETE + * to terminate the half-closed message state. + * + * Since ABORT+DELETEs can race we silently discard ABORT's for message + * state which has already been fully closed. REPLY+ABORT+DELETEs can + * also race, and in this situation the other side might have already + * initiated a new unrelated command with the same message id. Since + * the abort has not set the CREATE flag the situation can be detected + * and the message will also be discarded. + * + * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. + * The ABORT request is essentially integrated into the command instead + * of being sent later on. In this situation the command implementation + * detects that CREATE and ABORT are both set (vs ABORT alone) and can + * special-case non-blocking operation for the command. + * + * NOTE! Messages with ABORT set without CREATE or DELETE are considered + * to be mid-stream aborts for command/reply sequences. ABORTs on + * one-way messages are not supported. + * + * NOTE! If a command sequence does not support aborts the ABORT flag is + * simply ignored. + * + * -- + * + * One-off messages (no reply expected) are sent with neither CREATE or DELETE + * set. One-off messages cannot be aborted and typically aren't processed + * by these routines. The REPLY bit can be used to distinguish whether a + * one-off message is a command or reply. For example, one-off replies + * will typically just contain status updates. */ +static int +hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + hammer2_state_t dummy; + int error; + + /* + * Lock RB tree and locate existing persistent state, if any. + * + * If received msg is a command state is on staterd_tree. + * If received msg is a reply state is on statewr_tree. + */ + /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/ + + dummy.msgid = msg->any.head.msgid; + dummy.source = msg->any.head.source; + dummy.target = msg->any.head.target; + iocom_printf(iocom, msg->any.head.cmd, + "received msg %08x msgid %u source=%u target=%u\n", + msg->any.head.cmd, msg->any.head.msgid, + msg->any.head.source, msg->any.head.target); + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + state = RB_FIND(hammer2_state_tree, + &iocom->statewr_tree, &dummy); + } else { + state = RB_FIND(hammer2_state_tree, + &iocom->staterd_tree, &dummy); + } + msg->state = state; + + /* + * Short-cut one-off or mid-stream messages (state may be NULL). + */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_ABORT)) == 0) { + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + return(0); + } + + /* + * Switch on CREATE, DELETE, REPLY, and also handle ABORT from + * inside the case statements. + */ + switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY)) { + case HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * New persistant command received. + */ + if (state) { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgrx: " + "duplicate transaction\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + break; + } + state = malloc(sizeof(*state)); + bzero(state, sizeof(*state)); + state->iocom = iocom; + state->flags = HAMMER2_STATE_DYNAMIC; + state->msg = msg; + state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state); + state->flags |= HAMMER2_STATE_INSERTED; + msg->state = state; + error = 0; + break; + case HAMMER2_MSGF_DELETE: + /* + * Persistent state is expected but might not exist if an + * ABORT+DELETE races the close. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgrx: " + "no state for DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + + /* + * Handle another ABORT+DELETE case if the msgid has already + * been reused. + */ + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgrx: " + "state reused for DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + error = 0; + break; + default: + /* + * Check for mid-stream ABORT command received, otherwise + * allow. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + error = HAMMER2_IOQ_ERROR_EALREADY; + break; + } + } + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * When receiving a reply with CREATE set the original + * persistent state message should already exist. + */ + if (state == NULL) { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgrx: " + "no state match for REPLY cmd=%08x\n", + msg->any.head.cmd); + error = HAMMER2_IOQ_ERROR_TRANS; + break; + } + state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE: + /* + * Received REPLY+ABORT+DELETE in case where msgid has + * already been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgrx: " + "no state match for " + "REPLY|DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + + /* + * Received REPLY+ABORT+DELETE in case where msgid has + * already been reused for an unrelated message, + * ignore the message. + */ + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgrx: " + "state reused for REPLY|DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + error = 0; + break; + case HAMMER2_MSGF_REPLY: + /* + * Check for mid-stream ABORT reply received to sent command. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + error = HAMMER2_IOQ_ERROR_EALREADY; + break; + } + } + error = 0; + break; + } + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + return (error); +} + void -hammer2_replymsg(hammer2_msg_t *msg, uint16_t error) +hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) { - hammer2_persist_t *pers; - - assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0); - - msg->any.head.error = error; - msg->any.head.cmd |= HAMMER2_MSGF_REPLY; - msg->aux_size = 0; - if ((pers = msg->persist) != NULL) { - assert(pers->lrep & HAMMER2_MSGF_DELETE); - msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE | - HAMMER2_MSGF_DELETE); - pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE); + hammer2_state_t *state; + + if ((state = msg->state) == NULL) { + hammer2_msg_free(iocom, msg); + } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/ + state->rxcmd |= HAMMER2_MSGF_DELETE; + if (state->txcmd & HAMMER2_MSGF_DELETE) { + if (state->msg == msg) + state->msg = NULL; + assert(state->flags & HAMMER2_STATE_INSERTED); + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + RB_REMOVE(hammer2_state_tree, + &iocom->statewr_tree, state); + } else { + RB_REMOVE(hammer2_state_tree, + &iocom->staterd_tree, state); + } + state->flags &= ~HAMMER2_STATE_INSERTED; + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + hammer2_state_free(state); + } else { + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + } + hammer2_msg_free(iocom, msg); + } else if (state->msg != msg) { + hammer2_msg_free(iocom, msg); } - hammer2_ioq_write(msg); +} + +/* + * Process state tracking for a message prior to transmission. + * + * Called with msglk held and the msg dequeued. + * + * One-off messages are usually with dummy state and msg->state may be NULL + * in this situation. + * + * New transactions (when CREATE is set) will insert the state. + * + * May request that caller discard the message by setting *discardp to 1. + * A NULL state may be returned in this case. + */ +static int +hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + int error; + + /* + * Lock RB tree. If persistent state is present it will have already + * been assigned to msg. + */ + /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/ + state = msg->state; + + /* + * Short-cut one-off or mid-stream messages (state may be NULL). + */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_ABORT)) == 0) { + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + return(0); + } + + + /* + * Switch on CREATE, DELETE, REPLY, and also handle ABORT from + * inside the case statements. + */ + switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY)) { + case HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * Insert the new persistent message state and mark + * half-closed if DELETE is set. Since this is a new + * message it isn't possible to transition into the fully + * closed state here. + * + * XXX state must be assigned and inserted by + * hammer2_msg_write(). txcmd is assigned by us + * on-transmit. + */ + assert(state != NULL); +#if 0 + if (state == NULL) { + state = pmp->freerd_state; + pmp->freerd_state = NULL; + msg->state = state; + state->msg = msg; + state->msgid = msg->any.head.msgid; + state->source = msg->any.head.source; + state->target = msg->any.head.target; + } + assert((state->flags & HAMMER2_STATE_INSERTED) == 0); + if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgtx: " + "duplicate transaction\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + break; + } + state->flags |= HAMMER2_STATE_INSERTED; +#endif + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_DELETE: + /* + * Sent ABORT+DELETE in case where msgid has already + * been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgtx: " + "no state match for DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + + /* + * Sent ABORT+DELETE in case where msgid has + * already been reused for an unrelated message, + * ignore the message. + */ + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgtx: " + "state reused for DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + error = 0; + break; + default: + /* + * Check for mid-stream ABORT command sent + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + error = HAMMER2_IOQ_ERROR_EALREADY; + break; + } + } + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * When transmitting a reply with CREATE set the original + * persistent state message should already exist. + */ + if (state == NULL) { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgtx: no state match " + "for REPLY | CREATE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + break; + } + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE: + /* + * When transmitting a reply with DELETE set the original + * persistent state message should already exist. + * + * This is very similar to the REPLY|CREATE|* case except + * txcmd is already stored, so we just add the DELETE flag. + * + * Sent REPLY+ABORT+DELETE in case where msgid has + * already been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgtx: " + "no state match for " + "REPLY | DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + + /* + * Sent REPLY+ABORT+DELETE in case where msgid has already + * been reused for an unrelated message, ignore the message. + */ + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = HAMMER2_IOQ_ERROR_EALREADY; + } else { + iocom_printf(iocom, msg->any.head.cmd, + "hammer2_state_msgtx: " + "state reused for " + "REPLY | DELETE\n"); + error = HAMMER2_IOQ_ERROR_TRANS; + } + break; + } + error = 0; + break; + case HAMMER2_MSGF_REPLY: + /* + * Check for mid-stream ABORT reply sent. + * + * One-off REPLY messages are allowed for e.g. status updates. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + error = HAMMER2_IOQ_ERROR_EALREADY; + break; + } + } + error = 0; + break; + } + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + return (error); +} + +static void +hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + + if ((state = msg->state) == NULL) { + hammer2_msg_free(iocom, msg); + } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/ + state->txcmd |= HAMMER2_MSGF_DELETE; + if (state->rxcmd & HAMMER2_MSGF_DELETE) { + if (state->msg == msg) + state->msg = NULL; + assert(state->flags & HAMMER2_STATE_INSERTED); + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + RB_REMOVE(hammer2_state_tree, + &iocom->staterd_tree, state); + } else { + RB_REMOVE(hammer2_state_tree, + &iocom->statewr_tree, state); + } + state->flags &= ~HAMMER2_STATE_INSERTED; + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + hammer2_state_free(state); + } else { + /*lockmgr(&pmp->msglk, LK_RELEASE);*/ + } + hammer2_msg_free(iocom, msg); + } else if (state->msg != msg) { + hammer2_msg_free(iocom, msg); + } +} + +void +hammer2_state_free(hammer2_state_t *state) +{ + hammer2_iocom_t *iocom = state->iocom; + hammer2_msg_t *msg; + + msg = state->msg; + state->msg = NULL; + free(state); + if (msg) + hammer2_msg_free(iocom, msg); + free(state); +} + +/* + * Indexed messages are stored in a red-black tree indexed by their + * msgid. Only persistent messages are indexed. + */ +int +hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2) +{ + if (state1->source < state2->source) + return(-1); + if (state1->source > state2->source) + return(1); + if (state1->target < state2->target) + return(-1); + if (state1->target > state2->target) + return(1); + if (state1->msgid < state2->msgid) + return(-1); + if (state1->msgid > state2->msgid) + return(1); + return(0); } diff --git a/sbin/hammer2/network.h b/sbin/hammer2/network.h index eeaeb20ebe..607d2c44ac 100644 --- a/sbin/hammer2/network.h +++ b/sbin/hammer2/network.h @@ -108,23 +108,44 @@ typedef struct hammer2_handshake hammer2_handshake_t; */ struct hammer2_iocom; struct hammer2_persist; +struct hammre2_state; +struct hammre2_msg; -struct hammer2_msg { +TAILQ_HEAD(hammer2_msg_queue, hammer2_msg); +RB_HEAD(hammer2_state_tree, hammer2_state); + +struct hammer2_state { + RB_ENTRY(hammer2_state) rbnode; /* indexed by msgid */ struct hammer2_iocom *iocom; - struct hammer2_persist *persist; - TAILQ_ENTRY(hammer2_msg) entry; /* queue */ - char *aux_data; /* aux-data if any */ - int aux_size; + uint32_t txcmd; /* mostly for CMDF flags */ + uint32_t rxcmd; /* mostly for CMDF flags */ + uint16_t source; /* command originator */ + uint16_t target; /* reply originator */ + uint32_t msgid; /* {source,target,msgid} uniq */ int flags; - hammer2_any_t any; /* raw extended msg header */ + int error; + struct hammer2_msg *msg; + int (*func)(struct hammer2_iocom *, struct hammer2_msg *); }; -typedef struct hammer2_msg hammer2_msg_t; +#define HAMMER2_STATE_INSERTED 0x0001 +#define HAMMER2_STATE_DYNAMIC 0x0002 -TAILQ_HEAD(hammer2_msg_queue, hammer2_msg); +struct hammer2_msg { + TAILQ_ENTRY(hammer2_msg) qentry; + struct hammer2_state *state; + size_t hdr_size; + size_t aux_size; + char *aux_data; + hammer2_any_t any; +}; + +typedef struct hammer2_state hammer2_state_t; +typedef struct hammer2_msg hammer2_msg_t; typedef struct hammer2_msg_queue hammer2_msg_queue_t; -#define HAMMER2_MSGX_BSWAPPED 0x0001 +int hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2); +RB_PROTOTYPE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp); /* * hammer2_ioq - An embedded component of hammer2_connect, holds state @@ -139,9 +160,9 @@ struct hammer2_ioq { int fifo_beg; /* buffered data */ int fifo_cdx; /* encrypt/decrypt index */ int fifo_end; - int hbytes; /* header size */ - int abytes; /* aux_data size */ - int already; /* aux_data already decrypted */ + size_t hbytes; /* header size */ + size_t abytes; /* aux_data size */ + size_t already; /* aux_data already decrypted */ int error; int seq; /* salt sequencer */ int msgcount; @@ -169,6 +190,8 @@ typedef struct hammer2_ioq hammer2_ioq_t; #define HAMMER2_IOQ_ERROR_KEYFMT 13 /* key file format problem */ #define HAMMER2_IOQ_ERROR_BADURANDOM 14 /* /dev/urandom is bad */ #define HAMMER2_IOQ_ERROR_MSGSEQ 15 /* message sequence error */ +#define HAMMER2_IOQ_ERROR_EALREADY 16 /* ignore this message */ +#define HAMMER2_IOQ_ERROR_TRANS 17 /* state transaction issue */ #define HAMMER2_IOQ_MAXIOVEC 16 @@ -189,6 +212,8 @@ struct hammer2_iocom { int rxmisc; int txmisc; char sess[HAMMER2_AES_KEY_SIZE]; /* aes_256_cbc key */ + struct hammer2_state_tree staterd_tree; /* active messages */ + struct hammer2_state_tree statewr_tree; /* active messages */ }; typedef struct hammer2_iocom hammer2_iocom_t; @@ -199,118 +224,3 @@ typedef struct hammer2_iocom hammer2_iocom_t; #define HAMMER2_IOCOMF_WIDLE 0x00000008 /* request write-avail event */ #define HAMMER2_IOCOMF_SIGNAL 0x00000010 #define HAMMER2_IOCOMF_CRYPTED 0x00000020 /* encrypt enabled */ - -/*************************************************************************** - * HIGH LEVEL MESSAGING * - *************************************************************************** - * - * Persistent state is stored via the hammer2_persist structure. - */ -struct hammer2_persist { - uint32_t lcmd; /* recent command direction */ - uint32_t lrep; /* recent reply direction */ -}; - -typedef struct hammer2_persist hammer2_persist_t; - -#if 0 - - - -/* - * The global registration structure consolidates information accumulated - * via the spanning tree algorithm and tells us which connection (link) - * is the best path to get to any given registration. - * - * glob_node - Splay entry for this registration in the global index - * of all registrations. - * - * glob_entry - tailq entry when this registration's best_span element - * has changed state. - * - * span_list - Head of a simple list of spanning tree entries which - * we use to determine the best link. - * - * best_span - Which of the span structure on span_list is the best - * one. - * - * source_root - Splay tree root indexing all mesasges sent from this - * registration. The messages are indexed by - * {linkid,msgid} XXX - * - * target_root - Splay tree root indexing all messages being sent to - * this registration. The messages are indexed by - * {linkid,msgid}. XXX - * - * - * Whenever spanning tree data causes a registration's best_link field to - * change that registration is transmitted as spanning tree data to every - * active link. Note that pure clients to the cluster, of which there can - * be millions, typically do not transmit spanning tree data to each other. - * - * Each registration is assigned a unique linkid local to the node (another - * node might assign a different linkid to the same registration). This - * linkid must be persistent as long as messages are active and is used - * to identify the message source and target. - */ -TAILQ_HEAD(hammer2_span_list, hammer2_span); -typedef struct hammer2_span_list hammer2_span_list_t; - -struct hammer2_reg { - SPLAY_ENTRY(hammer2_reg) glob_node; /* index of registrations */ - TAILQ_ENTRY(hammer2_reg) glob_entry; /* when modified */ - hammer2_span_list_t span_list; /* list of hammer2_span's */ - hammer2_span_t *best_span; /* best span entry */ - hammer2_pmsg_splay_head_t source_root; /* msgs sent from reg */ - hammer2_pmsg_splay_head_t target_root; /* msgs sent to reg */ - uuid_t pfs_id; /* key field */ - uuid_t pfs_fsid; /* key field */ - uint32_t linkid; - int flags; - int refs; -}; - -#define HAMMER2_PROTO_REGF_MODIFIED 0x0001 - -/* - * Each link (connection) collects spanning tree data received via the - * link and stores it in these span structures. - */ -struct hammer2_span { - TAILQ_ENTRY(hammer2_span) span_entry; /* from hammer2_reg */ - SPLAY_ENTRY(hammer2_span) span_node; /* from hammer2_link */ - hammer2_reg_t *reg; - hammer2_link_t *link; - int weight; -}; - -/* - * Most hammer2 messages represent transactions and have persistent state - * which must be recorded. Some messages, such as cache states and inode - * representations are very long-lasting transactions. - * - * Each node in the graph must keep track of the message state in order - * to perform the proper action when a connection is lost. To do this - * the message is indexed on the source and target (global) registration, - * and the actual span element the message was received on and transmitted - * to is recorded (allowing us to retrieve the physical links involved). - * - * The {source_reg, target_reg, msgid} uniquely identifies a message. Any - * streaming operations using the same msgid use the same rendezvous. - * - * It is important to note that recorded state must use the same physical - * link (and thus the same chain of links across the graph) as was 'forged' - * by the initial message for that msgid. If the source span a message is - * received on does not match the recorded source, or the recorded target - * is no longer routeable, the message will be returned or generate an ABORT - * with LINKFAIL as appropriate. - */ -struct hammer2_pmsg { - SPLAY_ENTRY(hammer2_pmsg) source_reg; - SPLAY_ENTRY(hammer2_pmsg) target_reg; - hammer2_span_t *source; - hammer2_span_t *target; - uint16_t msgid; -}; - -#endif