hammer2 - Bring in the transaction state code from the hammer2 vfs
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 15 Jun 2012 03:00:58 +0000 (20:00 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 15 Jun 2012 03:00:58 +0000 (20:00 -0700)
* Bring in the transaction state management code from the kernel
  hammer2 module and cleanup the APIs to use similar mechanics.

* Basic replymsg operations now use the HAMMER2_LNK_ERROR directive
  instead of the original command, for now.

sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/network.h

index af170f3..6f93281 100644 (file)
@@ -40,7 +40,7 @@
 static void shell_recv(hammer2_iocom_t *iocom);
 static void shell_send(hammer2_iocom_t *iocom);
 static void shell_tty(hammer2_iocom_t *iocom);
-static void hammer2_shell_parse(hammer2_msg_t *msg, char *cmdbuf);
+static void hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 /************************************************************************
  *                                 SHELL                               *
@@ -97,8 +97,8 @@ cmd_shell(const char *hostname)
        hammer2_iocom_init(&iocom, fd, 0);
        printf("debug: connected\n");
 
-       msg = hammer2_allocmsg(&iocom, HAMMER2_DBG_SHELL, 0);
-       hammer2_ioq_write(msg);
+       msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
+       hammer2_ioq_write(&iocom, msg);
 
        hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty);
        fprintf(stderr, "debug: disconnected\n");
@@ -121,15 +121,20 @@ shell_recv(hammer2_iocom_t *iocom)
 
                switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
                case HAMMER2_LNK_ERROR:
-                       fprintf(stderr, "Link Error: %d\n",
-                               msg->any.head.error);
+               case HAMMER2_LNK_ERROR | HAMMER2_MSGF_REPLY:
+                       if (msg->any.head.error) {
+                               fprintf(stderr,
+                                       "Link Error: %d\n",
+                                       msg->any.head.error);
+                       } else {
+                               write(1, "debug> ", 7);
+                       }
                        break;
                case HAMMER2_DBG_SHELL:
                        /*
                         * We send the commands, not accept them.
                         */
-                       hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN);
-                       hammer2_freemsg(msg);
+                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
                        break;
                case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                        /*
@@ -138,18 +143,16 @@ shell_recv(hammer2_iocom_t *iocom)
                        if (msg->aux_size) {
                                msg->aux_data[msg->aux_size - 1] = 0;
                                write(1, msg->aux_data, strlen(msg->aux_data));
-                       } else {
-                               write(1, "debug> ", 7);
                        }
-                       hammer2_freemsg(msg);
                        break;
                default:
-                       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                        fprintf(stderr, "Unknown message: %08x\n",
                                msg->any.head.cmd);
-                       hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN);
+                       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
                        break;
                }
+               hammer2_state_cleanuprx(iocom, msg);
        }
        if (iocom->ioq_rx.error) {
                fprintf(stderr, "node_master_recv: comm error %d\n",
@@ -181,9 +184,9 @@ shell_tty(hammer2_iocom_t *iocom)
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
-               msg = hammer2_allocmsg(iocom, HAMMER2_DBG_SHELL, len);
+               msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
                bcopy(buf, msg->aux_data, len);
-               hammer2_ioq_write(msg);
+               hammer2_ioq_write(iocom, msg);
        } else {
                /*
                 * Set EOF flag without setting any error code for normal
@@ -199,10 +202,8 @@ shell_tty(hammer2_iocom_t *iocom)
  * then finish up by outputting another prompt.
  */
 void
-hammer2_shell_remote(hammer2_msg_t *msg)
+hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       /* hammer2_iocom_t *iocom = msg->iocom; */
-
        if (msg->aux_data)
                msg->aux_data[msg->aux_size - 1] = 0;
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
@@ -212,54 +213,60 @@ hammer2_shell_remote(hammer2_msg_t *msg)
                 */
                if (msg->aux_data)
                        write(2, msg->aux_data, strlen(msg->aux_data));
-               hammer2_freemsg(msg);
        } else {
                /*
                 * Otherwise this is a command which we must process.
                 * When we are finished we generate a final reply.
                 */
-               hammer2_shell_parse(msg, msg->aux_data);
-               hammer2_replymsg(msg, 0);
+               hammer2_shell_parse(iocom, msg);
+               hammer2_msg_reply(iocom, msg, 0);
        }
 }
 
 static void
-hammer2_shell_parse(hammer2_msg_t *msg, char *cmdbuf)
+hammer2_shell_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       /* hammer2_iocom_t *iocom = msg->iocom; */
+       char *cmdbuf = msg->aux_data;
        char *cmd = strsep(&cmdbuf, " \t");
 
        if (cmd == NULL || *cmd == 0) {
                ;
        } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
-               msg_printf(msg, "help        Command help\n");
+               iocom_printf(iocom, 0, "help        Command help\n");
        } else {
-               msg_printf(msg, "Unrecognized command: %s\n", cmd);
+               iocom_printf(iocom, 0, "Unrecognized command: %s\n", cmd);
        }
 }
 
 /*
  * Returns text debug output to the original defined by (msg).  (msg) is
- * not modified and stays intact.
+ * not modified and stays intact.  We use a one-way message with REPLY set
+ * to distinguish between a debug command and debug terminal output.
+ *
+ * To prevent loops iocom_printf() can filter the message (cmd) related
+ * to the iocom_printf().  We filter out DBG messages.
  */
 void
-msg_printf(hammer2_msg_t *msg, const char *ctl, ...)
+iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...)
 {
-       /* hammer2_iocom_t *iocom = msg->iocom; */
        hammer2_msg_t *rmsg;
        va_list va;
        char buf[1024];
        size_t len;
 
+       if ((cmd & HAMMER2_MSGF_PROTOS) == HAMMER2_MSG_PROTO_DBG)
+               return;
+
        va_start(va, ctl);
        vsnprintf(buf, sizeof(buf), ctl, va);
        va_end(va);
        len = strlen(buf) + 1;
 
-       rmsg = hammer2_allocreply(msg, HAMMER2_DBG_SHELL, len);
+       rmsg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL |
+                                            HAMMER2_MSGF_REPLY);
        bcopy(buf, rmsg->aux_data, len);
 
-       hammer2_ioq_write(rmsg);
+       hammer2_ioq_write(iocom, rmsg);
 }
 
 /************************************************************************
index d6f9e2f..97753db 100644 (file)
@@ -219,12 +219,13 @@ master_link_rx(hammer2_iocom_t *iocom)
                        break;
                case HAMMER2_DBG_SHELL:
                case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
-                       hammer2_shell_remote(msg);
+                       hammer2_shell_remote(iocom, msg);
                        break;
                default:
-                       hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN);
+                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
                        break;
                }
+               hammer2_state_cleanuprx(iocom, msg);
        }
        if (iocom->ioq_rx.error) {
                fprintf(stderr,
index ed71e6c..dd3e0cc 100644 (file)
@@ -110,24 +110,25 @@ void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd);
 void hammer2_iocom_done(hammer2_iocom_t *iocom);
-hammer2_msg_t *hammer2_allocmsg(hammer2_iocom_t *iocom,
-                       uint32_t cmd, int aux_size);
-hammer2_msg_t *hammer2_allocreply(hammer2_msg_t *msg,
-                       uint32_t cmd, int aux_size);
-void hammer2_replymsg(hammer2_msg_t *msg, uint16_t error);
-void hammer2_freemsg(hammer2_msg_t *msg);
+hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size,
+                       uint32_t cmd);
+void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+                       uint16_t error);
+void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 void hammer2_iocom_core(hammer2_iocom_t *iocom,
                        void (*iocom_recvmsg)(hammer2_iocom_t *),
                        void (*iocom_sendmsg)(hammer2_iocom_t *),
                        void (*iocom_altmsg)(hammer2_iocom_t *));
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
-void hammer2_ioq_write(hammer2_msg_t *msg);
+void hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
-void hammer2_ioq_stream(hammer2_msg_t *msg, int reply);
 void hammer2_iocom_drain(hammer2_iocom_t *iocom);
 void hammer2_iocom_flush(hammer2_iocom_t *iocom);
 
+void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_state_free(hammer2_state_t *state);
+
 void hammer2_crypto_negotiate(hammer2_iocom_t *iocom);
 void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_crypto_decrypt_aux(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
@@ -143,5 +144,5 @@ const char *hammer2_iptype_to_str(uint8_t type);
 const char *hammer2_pfstype_to_str(uint8_t type);
 const char *sizetostr(hammer2_off_t size);
 
-void hammer2_shell_remote(hammer2_msg_t *msg);
-void msg_printf(hammer2_msg_t *msg, const char *ctl, ...);
+void hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...);
index f17855a..aab35f8 100644 (file)
 
 #include "hammer2.h"
 
+static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+
 /*
  * Initialize a low-level ioq
  */
@@ -52,12 +56,12 @@ hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
        hammer2_msg_t *msg;
 
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
-               TAILQ_REMOVE(&ioq->msgq, msg, entry);
-               hammer2_freemsg(msg);
+               TAILQ_REMOVE(&ioq->msgq, msg, qentry);
+               hammer2_msg_free(iocom, msg);
        }
        if ((msg = ioq->msg) != NULL) {
                ioq->msg = NULL;
-               hammer2_freemsg(msg);
+               hammer2_msg_free(iocom, msg);
        }
 }
 
@@ -69,6 +73,8 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
 {
        bzero(iocom, sizeof(*iocom));
 
+       RB_INIT(&iocom->staterd_tree);
+       RB_INIT(&iocom->statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        iocom->sock_fd = sock_fd;
@@ -104,11 +110,11 @@ hammer2_iocom_done(hammer2_iocom_t *iocom)
        hammer2_ioq_done(iocom, &iocom->ioq_rx);
        hammer2_ioq_done(iocom, &iocom->ioq_tx);
        if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
-               TAILQ_REMOVE(&iocom->freeq, msg, entry);
+               TAILQ_REMOVE(&iocom->freeq, msg, qentry);
                free(msg);
        }
        if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
-               TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
+               TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
                free(msg->aux_data);
                msg->aux_data = NULL;
                free(msg);
@@ -119,7 +125,7 @@ hammer2_iocom_done(hammer2_iocom_t *iocom)
  * Allocate a new one-way message.
  */
 hammer2_msg_t *
-hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
+hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
 {
        hammer2_msg_t *msg;
        int hbytes;
@@ -128,14 +134,14 @@ hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
                aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
                           ~HAMMER2_MSG_ALIGNMASK;
                if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
-                       TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
+                       TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
        } else {
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
-                       TAILQ_REMOVE(&iocom->freeq, msg, entry);
+                       TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
        if (msg == NULL) {
                msg = malloc(sizeof(*msg));
-               msg->iocom = iocom;
+               bzero(msg, sizeof(*msg));
                msg->aux_data = NULL;
                msg->aux_size = 0;
        }
@@ -150,10 +156,10 @@ hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
                        msg->aux_size = aux_size;
                }
        }
-       msg->flags = 0;
        hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
        if (hbytes)
                bzero(&msg->any.head, hbytes);
+       msg->hdr_size = hbytes;
        msg->any.head.aux_icrc = 0;
        msg->any.head.cmd = cmd;
 
@@ -161,50 +167,17 @@ hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
 }
 
 /*
- * Allocate a one-way or streaming reply to a message.  The message is
- * not modified.  This function may be used to allocate multiple replies.
- *
- * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
- */
-hammer2_msg_t *
-hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
-{
-       hammer2_msg_t *rmsg;
-       hammer2_persist_t *pers;
-
-       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
-       if (cmd == 0)
-               cmd = msg->any.head.cmd;
-
-       rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
-       rmsg->any.head = msg->any.head;
-       rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
-                            ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
-       rmsg->any.head.aux_icrc = 0;
-
-       if ((pers = msg->persist) != NULL) {
-               assert(pers->lrep & HAMMER2_MSGF_DELETE);
-               rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
-               pers->lrep &= ~HAMMER2_MSGF_CREATE;
-               /* do not clear DELETE */
-       }
-       return (rmsg);
-}
-
-/*
  * Free a message so it can be reused afresh.
  *
  * NOTE: aux_size can be 0 with a non-NULL aux_data.
  */
 void
-hammer2_freemsg(hammer2_msg_t *msg)
+hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       hammer2_iocom_t *iocom = msg->iocom;
-
        if (msg->aux_data)
-               TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
+               TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
        else
-               TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
+               TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
 }
 
 /*
@@ -278,19 +251,20 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
        hammer2_msg_t *msg;
        hammer2_msg_hdr_t *head;
        ssize_t n;
-       int bytes;
-       int flags;
-       int nmax;
+       size_t bytes;
+       size_t nmax;
        uint16_t xcrc16;
        uint32_t xcrc32;
+       int error;
 
+again:
        /*
         * If a message is already pending we can just remove and
-        * return it.
+        * return it.  Message state has already been processed.
         */
        if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
-               TAILQ_REMOVE(&ioq->msgq, msg, entry);
-               return(msg);
+               TAILQ_REMOVE(&ioq->msgq, msg, qentry);
+               return (msg);
        }
 
        /*
@@ -346,7 +320,6 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                 * not immediately decrypted.
                 */
                hammer2_crypto_decrypt(iocom, ioq);
-               flags = 0;
                head = (void *)(ioq->buf + ioq->fifo_beg);
 
                /*
@@ -365,7 +338,6 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                                        HAMMER2_MSGHDR_CRCBYTES);
                if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
                        hammer2_bswap_head(head);
-                       flags |= HAMMER2_MSGX_BSWAPPED;
                }
                xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
                if (xcrc16 != head->icrc1) {
@@ -379,8 +351,8 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
                              HAMMER2_MSG_ALIGN;
                ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
-               if (ioq->hbytes < (int)sizeof(msg->any.head) ||
-                   ioq->hbytes > (int)sizeof(msg->any) ||
+               if (ioq->hbytes < sizeof(msg->any.head) ||
+                   ioq->hbytes > sizeof(msg->any) ||
                    ioq->abytes > HAMMER2_MSGAUX_MAX) {
                        ioq->error = HAMMER2_IOQ_ERROR_FIELD;
                        break;
@@ -393,9 +365,7 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                 * Initialize msg->aux_size to 0 and use it to track
                 * the amount of data copied from the stream.
                 */
-               msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
-               msg->aux_size = 0;
-               msg->flags = flags;
+               msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
                ioq->msg = msg;
 
                /*
@@ -469,7 +439,7 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                /*
                 * Check the crc on the extended header
                 */
-               if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
+               if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) {
                        xcrc32 = hammer2_icrc32(head + 1,
                                                ioq->hbytes - sizeof(*head));
                        xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
@@ -507,8 +477,9 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                 * Copy the partial or complete payload from remaining
                 * bytes in the FIFO.  We have to fall-through either
                 * way so we can check the crc.
+                *
+                * Adjust msg->aux_size to the final actual value.
                 */
-               assert(msg->aux_size == 0);
                ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
                if (ioq->already > ioq->abytes)
                        ioq->already = ioq->abytes;
@@ -528,6 +499,8 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                        if (ioq->fifo_cdx < ioq->fifo_beg)
                                ioq->fifo_cdx = ioq->fifo_beg;
                        bytes = 0;
+               } else {
+                       msg->aux_size = 0;
                }
                ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
                /* fall through */
@@ -602,6 +575,20 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
        }
 
        /*
+        * Process transactional state for the message.
+        */
+       if (msg && ioq->error == 0) {
+               error = hammer2_state_msgrx(iocom, msg);
+               if (error) {
+                       if (error == HAMMER2_IOQ_ERROR_EALREADY) {
+                               hammer2_msg_free(iocom, msg);
+                               goto again;
+                       }
+                       ioq->error = error;
+               }
+       }
+
+       /*
         * Handle error, RREQ, or completion
         *
         * NOTE: nmax and bytes are invalid at this point, we don't bother
@@ -620,7 +607,7 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                 */
                assert(ioq->msg == msg);
                if (msg == NULL)
-                       msg = hammer2_allocmsg(iocom, 0, 0);
+                       msg = hammer2_msg_alloc(iocom, 0, 0);
                else
                        ioq->msg = NULL;
 
@@ -681,17 +668,35 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
  * Calling this function with msg == NULL will get a flush going.
  */
 void
-hammer2_ioq_write(hammer2_msg_t *msg)
+hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       hammer2_iocom_t *iocom = msg->iocom;
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
        uint16_t xcrc16;
        uint32_t xcrc32;
        int hbytes;
+       int error;
 
        assert(msg);
+
+       /*
+        * Process transactional state.
+        */
+       if (ioq->error == 0) {
+               error = hammer2_state_msgtx(iocom, msg);
+               if (error) {
+                       if (error == HAMMER2_IOQ_ERROR_EALREADY) {
+                               hammer2_msg_free(iocom, msg);
+                       } else {
+                               ioq->error = error;
+                       }
+               }
+       }
+
+       /*
+        * Process terminal connection errors.
+        */
        if (ioq->error) {
-               TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
+               TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
                ++ioq->msgcount;
                hammer2_iocom_drain(iocom);
                return;
@@ -743,7 +748,7 @@ hammer2_ioq_write(hammer2_msg_t *msg)
        /*
         * Enqueue the message.
         */
-       TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
+       TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
        ++ioq->msgcount;
        iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
 
@@ -767,8 +772,8 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
        ssize_t nmax;
        ssize_t nact;
        struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
-       int hbytes;
-       int abytes;
+       size_t hbytes;
+       size_t abytes;
        int hoff;
        int aoff;
        int n;
@@ -779,7 +784,7 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
        n = 0;
        nmax = 0;
 
-       TAILQ_FOREACH(msg, &ioq->msgq, entry) {
+       TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
                hoff = 0;
                hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
                         HAMMER2_MSG_ALIGN;
@@ -845,26 +850,24 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
                         HAMMER2_MSG_ALIGN;
                abytes = msg->aux_size;
 
-               if (nact < hbytes - ioq->hbytes) {
+               if ((size_t)nact < hbytes - ioq->hbytes) {
                        ioq->hbytes += nact;
                        break;
                }
                nact -= hbytes - ioq->hbytes;
                ioq->hbytes = hbytes;
-               if (nact < abytes - ioq->abytes) {
+               if ((size_t)nact < abytes - ioq->abytes) {
                        ioq->abytes += nact;
                        break;
                }
                nact -= abytes - ioq->abytes;
 
-               TAILQ_REMOVE(&ioq->msgq, msg, entry);
+               TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                --ioq->msgcount;
                ioq->hbytes = 0;
                ioq->abytes = 0;
-               if (msg->aux_data)
-                       TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
-               else
-                       TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
+
+               hammer2_state_cleanuptx(iocom, msg);
        }
        if (msg == NULL) {
                iocom->flags |= HAMMER2_IOCOMF_WIDLE;
@@ -890,42 +893,624 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom)
        hammer2_msg_t *msg;
 
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
-               TAILQ_REMOVE(&ioq->msgq, msg, entry);
+               TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                --ioq->msgcount;
-               hammer2_freemsg(msg);
+               hammer2_msg_free(iocom, msg);
        }
        iocom->flags |= HAMMER2_IOCOMF_WIDLE;
        iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
 }
 
 /*
- * This is a shortcut to the normal hammer2_allocreply() mechanic which
- * uses the received message to formulate a final reply and error code.
- * Can be used to issue a final reply for one-way, one-off, or streaming
- * commands.
+ * This is a shortcut to formulate a reply to msg with a simple error code.
+ * It can reply to transaction or one-way messages, or terminate one side
+ * of a stream.  A HAMMER2_LNK_ERROR command code is utilized to encode
+ * the error code (which can be 0).
  *
  * Replies to one-way messages are a bit of an oxymoron but the feature
  * is used by the debug (DBG) protocol.
  *
  * The reply contains no data.
+ */
+void
+hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error)
+{
+       hammer2_msg_t *nmsg;
+       uint32_t cmd;
+
+       cmd = HAMMER2_LNK_ERROR;
+       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+               /*
+                * Reply to received reply, reply direction uses txcmd.
+                * txcmd will be updated by hammer2_ioq_write().
+                */
+               if (msg->state) {
+                       if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+                               cmd |= HAMMER2_MSGF_CREATE;
+                       cmd |= HAMMER2_MSGF_DELETE;
+               }
+       } else {
+               /*
+                * Reply to received command, reply direction uses rxcmd.
+                * txcmd will be updated by hammer2_ioq_write().
+                */
+               cmd |= HAMMER2_MSGF_REPLY;
+               if (msg->state) {
+                       if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
+                               cmd |= HAMMER2_MSGF_CREATE;
+                       cmd |= HAMMER2_MSGF_DELETE;
+               }
+       }
+       nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+       nmsg->any.head.error = error;
+       hammer2_ioq_write(iocom, nmsg);
+}
+
+/************************************************************************
+ *                     TRANSACTION STATE HANDLING                      *
+ ************************************************************************
+ *
+ */
+
+RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
+
+/*
+ * Process state tracking for a message after reception, prior to
+ * execution.
+ *
+ * Called with msglk held and the msg dequeued.
+ *
+ * All messages are called with dummy state and return actual state.
+ * (One-off messages often just return the same dummy state).
+ *
+ * May request that caller discard the message by setting *discardp to 1.
+ * The returned state is not used in this case and is allowed to be NULL.
+ *
+ * --
+ *
+ * These routines handle persistent and command/reply message state via the
+ * CREATE and DELETE flags.  The first message in a command or reply sequence
+ * sets CREATE, the last message in a command or reply sequence sets DELETE.
  *
- * (msg) is eaten up by this function.
+ * There can be any number of intermediate messages belonging to the same
+ * sequence sent inbetween the CREATE message and the DELETE message,
+ * which set neither flag.  This represents a streaming command or reply.
+ *
+ * Any command message received with CREATE set expects a reply sequence to
+ * be returned.  Reply sequences work the same as command sequences except the
+ * REPLY bit is also sent.  Both the command side and reply side can
+ * degenerate into a single message with both CREATE and DELETE set.  Note
+ * that one side can be streaming and the other side not, or neither, or both.
+ *
+ * The msgid is unique for the initiator.  That is, two sides sending a new
+ * message can use the same msgid without colliding.
+ *
+ * --
+ *
+ * ABORT sequences work by setting the ABORT flag along with normal message
+ * state.  However, ABORTs can also be sent on half-closed messages, that is
+ * even if the command or reply side has already sent a DELETE, as long as
+ * the message has not been fully closed it can still send an ABORT+DELETE
+ * to terminate the half-closed message state.
+ *
+ * Since ABORT+DELETEs can race we silently discard ABORT's for message
+ * state which has already been fully closed.  REPLY+ABORT+DELETEs can
+ * also race, and in this situation the other side might have already
+ * initiated a new unrelated command with the same message id.  Since
+ * the abort has not set the CREATE flag the situation can be detected
+ * and the message will also be discarded.
+ *
+ * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
+ * The ABORT request is essentially integrated into the command instead
+ * of being sent later on.  In this situation the command implementation
+ * detects that CREATE and ABORT are both set (vs ABORT alone) and can
+ * special-case non-blocking operation for the command.
+ *
+ * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
+ *       to be mid-stream aborts for command/reply sequences.  ABORTs on
+ *       one-way messages are not supported.
+ *
+ * NOTE!  If a command sequence does not support aborts the ABORT flag is
+ *       simply ignored.
+ *
+ * --
+ *
+ * One-off messages (no reply expected) are sent with neither CREATE or DELETE
+ * set.  One-off messages cannot be aborted and typically aren't processed
+ * by these routines.  The REPLY bit can be used to distinguish whether a
+ * one-off message is a command or reply.  For example, one-off replies
+ * will typically just contain status updates.
  */
+static int
+hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+       hammer2_state_t dummy;
+       int error;
+
+       /*
+        * Lock RB tree and locate existing persistent state, if any.
+        *
+        * If received msg is a command state is on staterd_tree.
+        * If received msg is a reply state is on statewr_tree.
+        */
+       /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
+
+       dummy.msgid = msg->any.head.msgid;
+       dummy.source = msg->any.head.source;
+       dummy.target = msg->any.head.target;
+       iocom_printf(iocom, msg->any.head.cmd,
+                    "received msg %08x msgid %u source=%u target=%u\n",
+                     msg->any.head.cmd, msg->any.head.msgid,
+                     msg->any.head.source, msg->any.head.target);
+       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+               state = RB_FIND(hammer2_state_tree,
+                               &iocom->statewr_tree, &dummy);
+       } else {
+               state = RB_FIND(hammer2_state_tree,
+                               &iocom->staterd_tree, &dummy);
+       }
+       msg->state = state;
+
+       /*
+        * Short-cut one-off or mid-stream messages (state may be NULL).
+        */
+       if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                 HAMMER2_MSGF_ABORT)) == 0) {
+               /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+               return(0);
+       }
+
+       /*
+        * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
+        * inside the case statements.
+        */
+       switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                   HAMMER2_MSGF_REPLY)) {
+       case HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * New persistant command received.
+                */
+               if (state) {
+                       iocom_printf(iocom, msg->any.head.cmd,
+                                    "hammer2_state_msgrx: "
+                                    "duplicate transaction\n");
+                       error = HAMMER2_IOQ_ERROR_TRANS;
+                       break;
+               }
+               state = malloc(sizeof(*state));
+               bzero(state, sizeof(*state));
+               state->iocom = iocom;
+               state->flags = HAMMER2_STATE_DYNAMIC;
+               state->msg = msg;
+               state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
+               state->flags |= HAMMER2_STATE_INSERTED;
+               msg->state = state;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_DELETE:
+               /*
+                * Persistent state is expected but might not exist if an
+                * ABORT+DELETE races the close.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgrx: "
+                                            "no state for DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+
+               /*
+                * Handle another ABORT+DELETE case if the msgid has already
+                * been reused.
+                */
+               if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgrx: "
+                                            "state reused for DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       default:
+               /*
+                * Check for mid-stream ABORT command received, otherwise
+                * allow.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * When receiving a reply with CREATE set the original
+                * persistent state message should already exist.
+                */
+               if (state == NULL) {
+                       iocom_printf(iocom, msg->any.head.cmd,
+                                    "hammer2_state_msgrx: "
+                                    "no state match for REPLY cmd=%08x\n",
+                                    msg->any.head.cmd);
+                       error = HAMMER2_IOQ_ERROR_TRANS;
+                       break;
+               }
+               state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
+               /*
+                * Received REPLY+ABORT+DELETE in case where msgid has
+                * already been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgrx: "
+                                            "no state match for "
+                                            "REPLY|DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+
+               /*
+                * Received REPLY+ABORT+DELETE in case where msgid has
+                * already been reused for an unrelated message,
+                * ignore the message.
+                */
+               if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgrx: "
+                                            "state reused for REPLY|DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY:
+               /*
+                * Check for mid-stream ABORT reply received to sent command.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       }
+       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+       return (error);
+}
+
 void
-hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
+hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       hammer2_persist_t *pers;
-
-       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
-
-       msg->any.head.error = error;
-       msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
-       msg->aux_size = 0;
-       if ((pers = msg->persist) != NULL) {
-               assert(pers->lrep & HAMMER2_MSGF_DELETE);
-               msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
-                                                  HAMMER2_MSGF_DELETE);
-               pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
+       hammer2_state_t *state;
+
+       if ((state = msg->state) == NULL) {
+               hammer2_msg_free(iocom, msg);
+       } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
+               state->rxcmd |= HAMMER2_MSGF_DELETE;
+               if (state->txcmd & HAMMER2_MSGF_DELETE) {
+                       if (state->msg == msg)
+                               state->msg = NULL;
+                       assert(state->flags & HAMMER2_STATE_INSERTED);
+                       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &iocom->statewr_tree, state);
+                       } else {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &iocom->staterd_tree, state);
+                       }
+                       state->flags &= ~HAMMER2_STATE_INSERTED;
+                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+                       hammer2_state_free(state);
+               } else {
+                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+               }
+               hammer2_msg_free(iocom, msg);
+       } else if (state->msg != msg) {
+               hammer2_msg_free(iocom, msg);
        }
-       hammer2_ioq_write(msg);
+}
+
+/*
+ * Process state tracking for a message prior to transmission.
+ *
+ * Called with msglk held and the msg dequeued.
+ *
+ * One-off messages are usually with dummy state and msg->state may be NULL
+ * in this situation.
+ *
+ * New transactions (when CREATE is set) will insert the state.
+ *
+ * May request that caller discard the message by setting *discardp to 1.
+ * A NULL state may be returned in this case.
+ */
+static int
+hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+       int error;
+
+       /*
+        * Lock RB tree.  If persistent state is present it will have already
+        * been assigned to msg.
+        */
+       /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
+       state = msg->state;
+
+       /*
+        * Short-cut one-off or mid-stream messages (state may be NULL).
+        */
+       if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                 HAMMER2_MSGF_ABORT)) == 0) {
+               /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+               return(0);
+       }
+
+
+       /*
+        * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
+        * inside the case statements.
+        */
+       switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                   HAMMER2_MSGF_REPLY)) {
+       case HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * Insert the new persistent message state and mark
+                * half-closed if DELETE is set.  Since this is a new
+                * message it isn't possible to transition into the fully
+                * closed state here.
+                *
+                * XXX state must be assigned and inserted by
+                *     hammer2_msg_write().  txcmd is assigned by us
+                *     on-transmit.
+                */
+               assert(state != NULL);
+#if 0
+               if (state == NULL) {
+                       state = pmp->freerd_state;
+                       pmp->freerd_state = NULL;
+                       msg->state = state;
+                       state->msg = msg;
+                       state->msgid = msg->any.head.msgid;
+                       state->source = msg->any.head.source;
+                       state->target = msg->any.head.target;
+               }
+               assert((state->flags & HAMMER2_STATE_INSERTED) == 0);
+               if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
+                       iocom_printf(iocom, msg->any.head.cmd,
+                                   "hammer2_state_msgtx: "
+                                   "duplicate transaction\n");
+                       error = HAMMER2_IOQ_ERROR_TRANS;
+                       break;
+               }
+               state->flags |= HAMMER2_STATE_INSERTED;
+#endif
+               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_DELETE:
+               /*
+                * Sent ABORT+DELETE in case where msgid has already
+                * been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "no state match for DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+
+               /*
+                * Sent ABORT+DELETE in case where msgid has
+                * already been reused for an unrelated message,
+                * ignore the message.
+                */
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "state reused for DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       default:
+               /*
+                * Check for mid-stream ABORT command sent
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * When transmitting a reply with CREATE set the original
+                * persistent state message should already exist.
+                */
+               if (state == NULL) {
+                       iocom_printf(iocom, msg->any.head.cmd,
+                                    "hammer2_state_msgtx: no state match "
+                                    "for REPLY | CREATE\n");
+                       error = HAMMER2_IOQ_ERROR_TRANS;
+                       break;
+               }
+               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
+               /*
+                * When transmitting a reply with DELETE set the original
+                * persistent state message should already exist.
+                *
+                * This is very similar to the REPLY|CREATE|* case except
+                * txcmd is already stored, so we just add the DELETE flag.
+                *
+                * Sent REPLY+ABORT+DELETE in case where msgid has
+                * already been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "no state match for "
+                                            "REPLY | DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+
+               /*
+                * Sent REPLY+ABORT+DELETE in case where msgid has already
+                * been reused for an unrelated message, ignore the message.
+                */
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "state reused for "
+                                            "REPLY | DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY:
+               /*
+                * Check for mid-stream ABORT reply sent.
+                *
+                * One-off REPLY messages are allowed for e.g. status updates.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       }
+       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+       return (error);
+}
+
+static void
+hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+
+       if ((state = msg->state) == NULL) {
+               hammer2_msg_free(iocom, msg);
+       } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
+               state->txcmd |= HAMMER2_MSGF_DELETE;
+               if (state->rxcmd & HAMMER2_MSGF_DELETE) {
+                       if (state->msg == msg)
+                               state->msg = NULL;
+                       assert(state->flags & HAMMER2_STATE_INSERTED);
+                       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &iocom->staterd_tree, state);
+                       } else {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &iocom->statewr_tree, state);
+                       }
+                       state->flags &= ~HAMMER2_STATE_INSERTED;
+                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+                       hammer2_state_free(state);
+               } else {
+                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+               }
+               hammer2_msg_free(iocom, msg);
+       } else if (state->msg != msg) {
+               hammer2_msg_free(iocom, msg);
+       }
+}
+
+void
+hammer2_state_free(hammer2_state_t *state)
+{
+       hammer2_iocom_t *iocom = state->iocom;
+       hammer2_msg_t *msg;
+
+       msg = state->msg;
+       state->msg = NULL;
+       free(state);
+       if (msg)
+               hammer2_msg_free(iocom, msg);
+       free(state);
+}
+
+/*
+ * Indexed messages are stored in a red-black tree indexed by their
+ * msgid.  Only persistent messages are indexed.
+ */
+int
+hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
+{
+       if (state1->source < state2->source)
+               return(-1);
+       if (state1->source > state2->source)
+               return(1);
+       if (state1->target < state2->target)
+               return(-1);
+       if (state1->target > state2->target)
+               return(1);
+       if (state1->msgid < state2->msgid)
+               return(-1);
+       if (state1->msgid > state2->msgid)
+               return(1);
+       return(0);
 }
index eeaeb20..607d2c4 100644 (file)
@@ -108,23 +108,44 @@ typedef struct hammer2_handshake hammer2_handshake_t;
  */
 struct hammer2_iocom;
 struct hammer2_persist;
+struct hammre2_state;
+struct hammre2_msg;
 
-struct hammer2_msg {
+TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
+RB_HEAD(hammer2_state_tree, hammer2_state);
+
+struct hammer2_state {
+       RB_ENTRY(hammer2_state) rbnode;         /* indexed by msgid */
        struct hammer2_iocom *iocom;
-       struct hammer2_persist  *persist;
-       TAILQ_ENTRY(hammer2_msg) entry; /* queue */
-       char            *aux_data;      /* aux-data if any */
-       int             aux_size;
+       uint32_t        txcmd;                  /* mostly for CMDF flags */
+       uint32_t        rxcmd;                  /* mostly for CMDF flags */
+       uint16_t        source;                 /* command originator */
+       uint16_t        target;                 /* reply originator */
+       uint32_t        msgid;                  /* {source,target,msgid} uniq */
        int             flags;
-       hammer2_any_t   any;            /* raw extended msg header */
+       int             error;
+       struct hammer2_msg *msg;
+       int (*func)(struct hammer2_iocom *, struct hammer2_msg *);
 };
 
-typedef struct hammer2_msg hammer2_msg_t;
+#define HAMMER2_STATE_INSERTED 0x0001
+#define HAMMER2_STATE_DYNAMIC  0x0002
 
-TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
+struct hammer2_msg {
+       TAILQ_ENTRY(hammer2_msg) qentry;
+       struct hammer2_state *state;
+       size_t          hdr_size;
+       size_t          aux_size;
+       char            *aux_data;
+       hammer2_any_t   any;
+};
+
+typedef struct hammer2_state hammer2_state_t;
+typedef struct hammer2_msg hammer2_msg_t;
 typedef struct hammer2_msg_queue hammer2_msg_queue_t;
 
-#define HAMMER2_MSGX_BSWAPPED  0x0001
+int hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2);
+RB_PROTOTYPE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
 
 /*
  * hammer2_ioq - An embedded component of hammer2_connect, holds state
@@ -139,9 +160,9 @@ struct hammer2_ioq {
        int             fifo_beg;               /* buffered data */
        int             fifo_cdx;               /* encrypt/decrypt index */
        int             fifo_end;
-       int             hbytes;                 /* header size */
-       int             abytes;                 /* aux_data size */
-       int             already;                /* aux_data already decrypted */
+       size_t          hbytes;                 /* header size */
+       size_t          abytes;                 /* aux_data size */
+       size_t          already;                /* aux_data already decrypted */
        int             error;
        int             seq;                    /* salt sequencer */
        int             msgcount;
@@ -169,6 +190,8 @@ typedef struct hammer2_ioq hammer2_ioq_t;
 #define HAMMER2_IOQ_ERROR_KEYFMT       13      /* key file format problem */
 #define HAMMER2_IOQ_ERROR_BADURANDOM   14      /* /dev/urandom is bad */
 #define HAMMER2_IOQ_ERROR_MSGSEQ       15      /* message sequence error */
+#define HAMMER2_IOQ_ERROR_EALREADY     16      /* ignore this message */
+#define HAMMER2_IOQ_ERROR_TRANS                17      /* state transaction issue */
 
 #define HAMMER2_IOQ_MAXIOVEC    16
 
@@ -189,6 +212,8 @@ struct hammer2_iocom {
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
+       struct hammer2_state_tree staterd_tree; /* active messages */
+       struct hammer2_state_tree statewr_tree; /* active messages */
 };
 
 typedef struct hammer2_iocom hammer2_iocom_t;
@@ -199,118 +224,3 @@ typedef struct hammer2_iocom hammer2_iocom_t;
 #define HAMMER2_IOCOMF_WIDLE   0x00000008      /* request write-avail event */
 #define HAMMER2_IOCOMF_SIGNAL  0x00000010
 #define HAMMER2_IOCOMF_CRYPTED 0x00000020      /* encrypt enabled */
-
-/***************************************************************************
- *                             HIGH LEVEL MESSAGING                       *
- ***************************************************************************
- *
- * Persistent state is stored via the hammer2_persist structure.
- */
-struct hammer2_persist {
-       uint32_t        lcmd;           /* recent command direction */
-       uint32_t        lrep;           /* recent reply direction */
-};
-
-typedef struct hammer2_persist hammer2_persist_t;
-
-#if 0
-
-
-
-/*
- * The global registration structure consolidates information accumulated
- * via the spanning tree algorithm and tells us which connection (link)
- * is the best path to get to any given registration.
- *
- * glob_node   - Splay entry for this registration in the global index
- *               of all registrations.
- *
- * glob_entry  - tailq entry when this registration's best_span element
- *               has changed state.
- *
- * span_list   - Head of a simple list of spanning tree entries which
- *               we use to determine the best link.
- *
- * best_span   - Which of the span structure on span_list is the best
- *               one.
- *
- * source_root - Splay tree root indexing all mesasges sent from this
- *               registration.  The messages are indexed by
- *               {linkid,msgid} XXX
- *
- * target_root - Splay tree root indexing all messages being sent to
- *               this registration.  The messages are indexed by
- *               {linkid,msgid}. XXX
- *
- *
- * Whenever spanning tree data causes a registration's best_link field to
- * change that registration is transmitted as spanning tree data to every
- * active link.  Note that pure clients to the cluster, of which there can
- * be millions, typically do not transmit spanning tree data to each other.
- *
- * Each registration is assigned a unique linkid local to the node (another
- * node might assign a different linkid to the same registration).  This
- * linkid must be persistent as long as messages are active and is used
- * to identify the message source and target.
- */
-TAILQ_HEAD(hammer2_span_list, hammer2_span);
-typedef struct hammer2_span_list hammer2_span_list_t;
-
-struct hammer2_reg {
-       SPLAY_ENTRY(hammer2_reg) glob_node;     /* index of registrations */
-       TAILQ_ENTRY(hammer2_reg) glob_entry;    /* when modified */
-       hammer2_span_list_t     span_list;      /* list of hammer2_span's */
-       hammer2_span_t          *best_span;     /* best span entry */
-       hammer2_pmsg_splay_head_t source_root;  /* msgs sent from reg */
-       hammer2_pmsg_splay_head_t target_root;  /* msgs sent to reg */
-       uuid_t  pfs_id;                         /* key field */
-       uuid_t  pfs_fsid;                       /* key field */
-       uint32_t linkid;
-       int     flags;
-       int     refs;
-};
-
-#define HAMMER2_PROTO_REGF_MODIFIED    0x0001
-
-/*
- * Each link (connection) collects spanning tree data received via the
- * link and stores it in these span structures.
- */
-struct hammer2_span {
-       TAILQ_ENTRY(hammer2_span)       span_entry;     /* from hammer2_reg */
-       SPLAY_ENTRY(hammer2_span)       span_node;      /* from hammer2_link */
-       hammer2_reg_t                   *reg;
-       hammer2_link_t                  *link;
-       int                             weight;
-};
-
-/*
- * Most hammer2 messages represent transactions and have persistent state
- * which must be recorded.  Some messages, such as cache states and inode
- * representations are very long-lasting transactions.
- *
- * Each node in the graph must keep track of the message state in order
- * to perform the proper action when a connection is lost.  To do this
- * the message is indexed on the source and target (global) registration,
- * and the actual span element the message was received on and transmitted
- * to is recorded (allowing us to retrieve the physical links involved).
- *
- * The {source_reg, target_reg, msgid} uniquely identifies a message.  Any
- * streaming operations using the same msgid use the same rendezvous.
- *
- * It is important to note that recorded state must use the same physical
- * link (and thus the same chain of links across the graph) as was 'forged'
- * by the initial message for that msgid.  If the source span a message is
- * received on does not match the recorded source, or the recorded target
- * is no longer routeable, the message will be returned or generate an ABORT
- * with LINKFAIL as appropriate.
- */
-struct hammer2_pmsg {
-       SPLAY_ENTRY(hammer2_pmsg) source_reg;
-       SPLAY_ENTRY(hammer2_pmsg) target_reg;
-       hammer2_span_t  *source;
-       hammer2_span_t  *target;
-       uint16_t        msgid;
-};
-
-#endif