hammer2 - Early messaging infrastructure (2)
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 13 Apr 2012 06:44:30 +0000 (23:44 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 13 Apr 2012 06:44:30 +0000 (23:44 -0700)
* Code cleanup and streamlining.

sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_node.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/network.h

index 230d886..240ed29 100644 (file)
@@ -38,8 +38,7 @@
 static void debug_recv(hammer2_iocom_t *iocom);
 static void debug_send(hammer2_iocom_t *iocom);
 static void debug_tty(hammer2_iocom_t *iocom);
-static void hammer2_debug_parse(hammer2_iocom_t *iocom,
-                               hammer2_msg_t *msg, char *cmdbuf);
+static void hammer2_debug_parse(hammer2_msg_t *msg, char *cmdbuf);
 
 int
 cmd_debug(void)
@@ -78,8 +77,8 @@ cmd_debug(void)
        hammer2_iocom_init(&iocom, fd, 0);
        printf("debug: connected\n");
 
-       msg = hammer2_iocom_allocmsg(&iocom, HAMMER2_DBG_SHELL, 0);
-       hammer2_ioq_write(&iocom, msg);
+       msg = hammer2_allocmsg(&iocom, HAMMER2_DBG_SHELL, 0);
+       hammer2_ioq_write(msg);
 
        hammer2_iocom_core(&iocom, debug_recv, debug_send, debug_tty);
        fprintf(stderr, "debug: disconnected\n");
@@ -99,6 +98,7 @@ debug_recv(hammer2_iocom_t *iocom)
 
        while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
               (msg = hammer2_ioq_read(iocom)) != NULL) {
+
                switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
                case HAMMER2_LNK_ERROR:
                        fprintf(stderr, "Link Error: %d\n",
@@ -108,7 +108,8 @@ debug_recv(hammer2_iocom_t *iocom)
                        /*
                         * We send the commands, not accept them.
                         */
-                       hammer2_iocom_freemsg(iocom, msg);
+                       hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN);
+                       hammer2_freemsg(msg);
                        break;
                case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                        /*
@@ -117,15 +118,16 @@ debug_recv(hammer2_iocom_t *iocom)
                        if (msg->aux_size) {
                                msg->aux_data[msg->aux_size - 1] = 0;
                                write(1, msg->aux_data, strlen(msg->aux_data));
+                       } else {
+                               write(1, "debug> ", 7);
                        }
-                       hammer2_iocom_freemsg(iocom, msg);
+                       hammer2_freemsg(msg);
                        break;
                default:
                        assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                        fprintf(stderr, "Unknown message: %08x\n",
                                msg->any.head.cmd);
-                       hammer2_ioq_reply_term(iocom, msg,
-                                              HAMMER2_MSG_ERR_UNKNOWN);
+                       hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN);
                        break;
                }
        }
@@ -143,7 +145,7 @@ static
 void
 debug_send(hammer2_iocom_t *iocom)
 {
-       hammer2_ioq_write(iocom, NULL);
+       hammer2_iocom_flush(iocom);
 }
 
 static
@@ -159,9 +161,9 @@ debug_tty(hammer2_iocom_t *iocom)
                if (len && buf[len - 1] == '\n')
                        buf[--len] = 0;
                ++len;
-               msg = hammer2_iocom_allocmsg(iocom, HAMMER2_DBG_SHELL, len);
+               msg = hammer2_allocmsg(iocom, HAMMER2_DBG_SHELL, len);
                bcopy(buf, msg->aux_data, len);
-               hammer2_ioq_write(iocom, msg);
+               hammer2_ioq_write(msg);
        } else {
                /*
                 * Set EOF flag without setting any error code for normal
@@ -177,8 +179,10 @@ debug_tty(hammer2_iocom_t *iocom)
  * then finish up by outputting another prompt.
  */
 void
-hammer2_debug_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_debug_remote(hammer2_msg_t *msg)
 {
+       /* hammer2_iocom_t *iocom = msg->iocom; */
+
        if (msg->aux_data)
                msg->aux_data[msg->aux_size - 1] = 0;
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
@@ -188,37 +192,40 @@ hammer2_debug_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 */
                if (msg->aux_data)
                        write(2, msg->aux_data, strlen(msg->aux_data));
-               hammer2_iocom_freemsg(iocom, msg);
+               hammer2_freemsg(msg);
        } else {
                /*
                 * Otherwise this is a command which we must process.
                 * When we are finished we generate a final reply.
                 */
-               hammer2_debug_parse(iocom, msg, msg->aux_data);
-               iocom_printf(iocom, msg, "debug> ");
-               hammer2_iocom_freemsg(iocom, msg);
+               hammer2_debug_parse(msg, msg->aux_data);
+               hammer2_replymsg(msg, 0);
        }
 }
 
 static void
-hammer2_debug_parse(hammer2_iocom_t *iocom, hammer2_msg_t *msg, char *cmdbuf)
+hammer2_debug_parse(hammer2_msg_t *msg, char *cmdbuf)
 {
+       /* hammer2_iocom_t *iocom = msg->iocom; */
        char *cmd = strsep(&cmdbuf, " \t");
 
        if (cmd == NULL || *cmd == 0) {
                ;
        } else if (strcmp(cmd, "help") == 0 || strcmp(cmd, "?") == 0) {
-               iocom_printf(iocom, msg,
-                            "help        Command help\n"
-               );
+               msg_printf(msg, "help        Command help\n");
        } else {
-               iocom_printf(iocom, msg, "Unrecognized command: %s\n", cmd);
+               msg_printf(msg, "Unrecognized command: %s\n", cmd);
        }
 }
 
+/*
+ * Returns text debug output to the original defined by (msg).  (msg) is
+ * not modified and stays intact.
+ */
 void
-iocom_printf(hammer2_iocom_t *iocom, hammer2_msg_t *msg, const char *ctl, ...)
+msg_printf(hammer2_msg_t *msg, const char *ctl, ...)
 {
+       /* hammer2_iocom_t *iocom = msg->iocom; */
        hammer2_msg_t *rmsg;
        va_list va;
        char buf[1024];
@@ -229,10 +236,8 @@ iocom_printf(hammer2_iocom_t *iocom, hammer2_msg_t *msg, const char *ctl, ...)
        va_end(va);
        len = strlen(buf) + 1;
 
-       rmsg = hammer2_iocom_allocmsg(iocom, HAMMER2_DBG_SHELL, len);
+       rmsg = hammer2_allocreply(msg, HAMMER2_DBG_SHELL, len);
        bcopy(buf, rmsg->aux_data, len);
-       rmsg->any.head = msg->any.head;
-       rmsg->any.head.aux_icrc = 0;
 
-       hammer2_ioq_reply(iocom, rmsg);
+       hammer2_ioq_write(rmsg);
 }
index 4ba95cb..0fba7b3 100644 (file)
@@ -187,11 +187,10 @@ node_master_recv(hammer2_iocom_t *iocom)
                        break;
                case HAMMER2_DBG_SHELL:
                case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
-                       hammer2_debug_remote(iocom, msg);
+                       hammer2_debug_remote(msg);
                        break;
                default:
-                       hammer2_ioq_reply_term(iocom, msg,
-                                              HAMMER2_MSG_ERR_UNKNOWN);
+                       hammer2_replymsg(msg, HAMMER2_MSG_ERR_UNKNOWN);
                        break;
                }
        }
@@ -210,5 +209,5 @@ static
 void
 node_master_send(hammer2_iocom_t *iocom)
 {
-       hammer2_ioq_write(iocom, NULL);
+       hammer2_iocom_flush(iocom);
 }
index bfacbab..d3a21f8 100644 (file)
@@ -98,23 +98,23 @@ void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd);
 void hammer2_iocom_done(hammer2_iocom_t *iocom);
-hammer2_msg_t *hammer2_iocom_allocmsg(hammer2_iocom_t *iocom,
+hammer2_msg_t *hammer2_allocmsg(hammer2_iocom_t *iocom,
                        uint32_t cmd, int aux_size);
-void hammer2_iocom_reallocmsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       int aux_size);
-void hammer2_iocom_freemsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+hammer2_msg_t *hammer2_allocreply(hammer2_msg_t *msg,
+                       uint32_t cmd, int aux_size);
+void hammer2_replymsg(hammer2_msg_t *msg, uint16_t error);
+void hammer2_freemsg(hammer2_msg_t *msg);
 
 void hammer2_iocom_core(hammer2_iocom_t *iocom,
                        void (*iocom_recvmsg)(hammer2_iocom_t *),
                        void (*iocom_sendmsg)(hammer2_iocom_t *),
                        void (*iocom_altmsg)(hammer2_iocom_t *));
-hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocon);
-void hammer2_ioq_write(hammer2_iocom_t *iocon, hammer2_msg_t *msg);
-void hammer2_ioq_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-void hammer2_ioq_reply_term(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       uint16_t error);
-void hammer2_ioq_write_drain(hammer2_iocom_t *iocon);
+hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
+void hammer2_ioq_write(hammer2_msg_t *msg);
+
+void hammer2_ioq_stream(hammer2_msg_t *msg, int reply);
+void hammer2_iocom_drain(hammer2_iocom_t *iocom);
+void hammer2_iocom_flush(hammer2_iocom_t *iocom);
 
-void hammer2_debug_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-void iocom_printf(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       const char *ctl, ...);
+void hammer2_debug_remote(hammer2_msg_t *msg);
+void msg_printf(hammer2_msg_t *msg, const char *ctl, ...);
index fb01809..e0ac85c 100644 (file)
@@ -47,17 +47,17 @@ hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 }
 
 void
-hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq)
+hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 {
        hammer2_msg_t *msg;
 
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, entry);
-               hammer2_iocom_freemsg(iocom, msg);
+               hammer2_freemsg(msg);
        }
        if ((msg = ioq->msg) != NULL) {
                ioq->msg = NULL;
-               hammer2_iocom_freemsg(iocom, msg);
+               hammer2_freemsg(msg);
        }
 }
 
@@ -106,8 +106,11 @@ hammer2_iocom_done(hammer2_iocom_t *iocom)
        }
 }
 
+/*
+ * Allocate a new one-way message.
+ */
 hammer2_msg_t *
-hammer2_iocom_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
+hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
 {
        hammer2_msg_t *msg;
        int hbytes;
@@ -123,6 +126,7 @@ hammer2_iocom_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
        }
        if (msg == NULL) {
                msg = malloc(sizeof(*msg));
+               msg->iocom = iocom;
                msg->aux_data = NULL;
                msg->aux_size = 0;
        }
@@ -139,31 +143,57 @@ hammer2_iocom_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
        }
        msg->flags = 0;
        hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
-       bzero(&msg->any.head, hbytes);
+       if (hbytes)
+               bzero(&msg->any.head, hbytes);
+       msg->any.head.aux_icrc = 0;
        msg->any.head.cmd = cmd;
 
        return (msg);
 }
 
-void
-hammer2_iocom_reallocmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg,
-                        int aux_size)
+/*
+ * Allocate a one-way or streaming reply to a message.  The message is
+ * not modified.  This function may be used to allocate multiple replies.
+ *
+ * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
+ */
+hammer2_msg_t *
+hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
 {
-       aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) & ~HAMMER2_MSG_ALIGNMASK;
-       if (aux_size && msg->aux_size != aux_size) {
-               if (msg->aux_data) {
-                       free(msg->aux_data);
-                       msg->aux_data = NULL;
-               }
-               msg->aux_data = malloc(aux_size);
-               msg->aux_size = aux_size;
+       hammer2_msg_t *rmsg;
+       hammer2_persist_t *pers;
+
+       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+       if (cmd == 0)
+               cmd = msg->any.head.cmd;
+
+       rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
+       rmsg->any.head = msg->any.head;
+       rmsg->any.head.source = msg->any.head.target;
+       rmsg->any.head.target = msg->any.head.source;
+       rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
+                            ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
+       rmsg->any.head.aux_icrc = 0;
+
+       if ((pers = msg->persist) != NULL) {
+               assert(pers->lrep & HAMMER2_MSGF_DELETE);
+               rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
+               pers->lrep &= ~HAMMER2_MSGF_CREATE;
+               /* do not clear DELETE */
        }
-       msg->flags = 0;
+       return (rmsg);
 }
 
+/*
+ * Free a message so it can be reused afresh.
+ *
+ * NOTE: aux_size can be 0 with a non-NULL aux_data.
+ */
 void
-hammer2_iocom_freemsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_freemsg(hammer2_msg_t *msg)
 {
+       hammer2_iocom_t *iocom = msg->iocom;
+
        if (msg->aux_data)
                TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
        else
@@ -180,13 +210,15 @@ hammer2_iocom_core(hammer2_iocom_t *iocom,
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
        struct pollfd fds[2];
-       int timeout = 5000;
+       int timeout;
 
        iocom->recvmsg_callback = recvmsg_func;
        iocom->sendmsg_callback = sendmsg_func;
        iocom->altmsg_callback = altmsg_func;
 
        while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
+               timeout = 5000;
+
                fds[0].fd = iocom->sock_fd;
                fds[0].events = 0;
                fds[0].revents = 0;
@@ -348,33 +380,12 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                /*
                 * Finally allocate the message and copy the core header
                 * to the embedded extended header.
+                *
+                * Initialize msg->aux_size to 0 and use it to track
+                * the amount of data copied from the stream.
                 */
-               if (ioq->abytes) {
-                       if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
-                               TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
-                       } else {
-                               msg = malloc(sizeof(*msg));
-                               msg->aux_data = NULL;
-                               msg->aux_size = 0;
-                       }
-                       if (msg->aux_size != ioq->abytes) {
-                               if (msg->aux_data) {
-                                       free(msg->aux_data);
-                                       msg->aux_data = NULL;
-                               }
-                               msg->aux_data = malloc(ioq->abytes);
-                               /* msg->aux_size = ioq->abytes; */
-                       }
-               } else {
-                       if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
-                               TAILQ_REMOVE(&iocom->freeq, msg, entry);
-                       } else {
-                               msg = malloc(sizeof(*msg));
-                               msg->aux_data = NULL;
-                               /* msg->aux_size = 0; */
-                       }
-               }
-               msg->aux_size = 0;      /* data copied so far */
+               msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
+               msg->aux_size = 0;
                msg->flags = flags;
                ioq->msg = msg;
 
@@ -580,19 +591,12 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
                 * msgid is 0 (link-level).  All we really need to do is
                 * set up magic, cmd, and error.
                 */
-               if (msg == NULL) {
-                       if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
-                               TAILQ_REMOVE(&iocom->freeq, msg, entry);
-                       } else {
-                               msg = malloc(sizeof(*msg));
-                               msg->aux_data = NULL;
-                               msg->aux_size = 0;
-                       }
-                       assert(ioq->msg == NULL);
-               } else {
-                       assert(ioq->msg == msg);
+               assert(ioq->msg == msg);
+               if (msg == NULL)
+                       msg = hammer2_allocmsg(iocom, 0, 0);
+               else
                        ioq->msg = NULL;
-               }
+
                if (msg->aux_data) {
                        free(msg->aux_data);
                        msg->aux_data = NULL;
@@ -646,92 +650,93 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
  * Calling this function with msg == NULL will get a flush going.
  */
 void
-hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_ioq_write(hammer2_msg_t *msg)
 {
+       hammer2_iocom_t *iocom = msg->iocom;
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
-       ssize_t nmax;
-       ssize_t nact;
-       int hbytes;
-       int abytes;
-       int hoff;
-       int aoff;
        uint16_t xcrc16;
        uint32_t xcrc32;
-       struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
-       int n;
+       int hbytes;
 
+       assert(msg);
        if (ioq->error) {
-               if (msg) {
-                       TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
-                       ++ioq->msgcount;
-               }
-               hammer2_ioq_write_drain(iocom);
+               TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
+               ++ioq->msgcount;
+               hammer2_iocom_drain(iocom);
                return;
        }
 
-       if (msg) {
-               /*
-                * Finish populating the msg fields
-                */
-               msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
-               msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
-               ++ioq->seq;
+       /*
+        * Finish populating the msg fields
+        */
+       msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
+       msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
+       ++ioq->seq;
 
-               /*
-                * Calculate aux_icrc if 0, calculate icrc2, and finally
-                * calculate icrc1.
-                */
-               if (msg->aux_size && msg->any.head.aux_icrc == 0) {
-                       assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
-                       xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
-                       msg->any.head.aux_icrc = xcrc32;
-               }
-               msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
+       /*
+        * Calculate aux_icrc if 0, calculate icrc2, and finally
+        * calculate icrc1.
+        */
+       if (msg->aux_size && msg->any.head.aux_icrc == 0) {
                assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
+               xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
+               msg->any.head.aux_icrc = xcrc32;
+       }
+       msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
+       assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
 
-               if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
-                   sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
-                       hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
-                               HAMMER2_MSG_ALIGN;
-                       hbytes -= sizeof(msg->any.head);
-                       xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
-                       xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-                       msg->any.head.icrc2 = xcrc16;
-               } else {
-                       msg->any.head.icrc2 = 0;
-               }
-               xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
-                                       HAMMER2_MSGHDR_CRCBYTES);
+       if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
+           sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
+               hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
+                       HAMMER2_MSG_ALIGN;
+               hbytes -= sizeof(msg->any.head);
+               xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
                xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-               msg->any.head.icrc1 = xcrc16;
+               msg->any.head.icrc2 = xcrc16;
+       } else {
+               msg->any.head.icrc2 = 0;
+       }
+       xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
+                               HAMMER2_MSGHDR_CRCBYTES);
+       xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
+       msg->any.head.icrc1 = xcrc16;
 
-               /*
-                * XXX Encrypt the message
-                */
+       /*
+        * XXX Encrypt the message
+        */
 
-               /*
-                * Enqueue the message, stop now if we already know that
-                * we can't write.
-                */
-               TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
-               ++ioq->msgcount;
-               iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
-               if (iocom->flags & HAMMER2_IOCOMF_WREQ)
-                       return;
+       /*
+        * Enqueue the message.
+        */
+       TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
+       ++ioq->msgcount;
+       iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
 
-               /*
-                * Flush if we can aggregate several msgs, otherwise
-                * we will wait for the global flush (msg == NULL).
-                */
-               if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
-                       return;
-       } else if (iocom->flags &= HAMMER2_IOCOMF_WIDLE) {
-               /*
-                * Nothing to do if WIDLE is set.
-                */
-               assert(TAILQ_FIRST(&ioq->msgq) == NULL);
+       /*
+        * Flush if we know we can write (WREQ not set) and if
+        * sufficient messages have accumulated.  Otherwise hold
+        * off to avoid piecemeal system calls.
+        */
+       if (iocom->flags & HAMMER2_IOCOMF_WREQ)
                return;
-       }
+       if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
+               return;
+       hammer2_iocom_flush(iocom);
+}
+
+void
+hammer2_iocom_flush(hammer2_iocom_t *iocom)
+{
+       hammer2_ioq_t *ioq = &iocom->ioq_tx;
+       hammer2_msg_t *msg;
+       ssize_t nmax;
+       ssize_t nact;
+       struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
+       int hbytes;
+       int abytes;
+       int hoff;
+       int aoff;
+       int n;
 
        /*
         * Pump messages out the connection by building an iovec.
@@ -779,7 +784,7 @@ hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                    errno != EINPROGRESS &&
                    errno != EAGAIN) {
                        ioq->error = HAMMER2_IOQ_ERROR_SOCK;
-                       hammer2_ioq_write_drain(iocom);
+                       hammer2_iocom_drain(iocom);
                } else {
                        iocom->flags |= HAMMER2_IOCOMF_WREQ;
                }
@@ -834,7 +839,7 @@ hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
  * the connection failure.
  */
 void
-hammer2_ioq_write_drain(hammer2_iocom_t *iocom)
+hammer2_iocom_drain(hammer2_iocom_t *iocom)
 {
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
        hammer2_msg_t *msg;
@@ -842,36 +847,44 @@ hammer2_ioq_write_drain(hammer2_iocom_t *iocom)
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, entry);
                --ioq->msgcount;
-               hammer2_iocom_freemsg(iocom, msg);
+               hammer2_freemsg(msg);
        }
        iocom->flags |= HAMMER2_IOCOMF_WIDLE;
        iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
 }
 
 /*
- * Reply to a message after setting various fields appropriately.
- * This function will swap (source) and (target) and enqueue the
- * message for transmission.
+ * This is a shortcut to the normal hammer2_allocreply() mechanic which
+ * uses the received message to formulate a final reply and error code.
+ * Can be used to issue a final reply for one-way, one-off, or streaming
+ * commands.
+ *
+ * Replies to one-way messages are a bit of an oxymoron but the feature
+ * is used by the debug (DBG) protocol.
+ *
+ * The reply contains no data.
+ *
+ * (msg) is eaten up by this function.
  */
 void
-hammer2_ioq_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
 {
+       hammer2_persist_t *pers;
        uint16_t t16;
 
+       assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+
        t16 = msg->any.head.source;
        msg->any.head.source = msg->any.head.target;
        msg->any.head.target = t16;
-       msg->any.head.cmd ^= HAMMER2_MSGF_REPLY;
-       hammer2_ioq_write(iocom, msg);
-}
-
-void
-hammer2_ioq_reply_term(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                      uint16_t error)
-{
-       if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
-               msg->any.head.cmd |= HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE;
-               msg->any.head.error = error;
-               hammer2_ioq_reply(iocom, msg);
+       msg->any.head.error = error;
+       msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
+       msg->aux_size = 0;
+       if ((pers = msg->persist) != NULL) {
+               assert(pers->lrep & HAMMER2_MSGF_DELETE);
+               msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
+                                                  HAMMER2_MSGF_DELETE);
+               pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
        }
+       hammer2_ioq_write(msg);
 }
index d599cd3..a970783 100644 (file)
  * by a connection.
  *
  * This structure is typically not used for storing persistent message
- * state (see hammer2_pmsg for that).
+ * state (see hammer2_persist for that).
  */
+struct hammer2_iocom;
+struct hammer2_persist;
+
 struct hammer2_msg {
+       struct hammer2_iocom *iocom;
+       struct hammer2_persist  *persist;
        TAILQ_ENTRY(hammer2_msg) entry; /* queue */
        char            *aux_data;      /* aux-data if any */
        int             aux_size;
@@ -127,7 +132,14 @@ typedef struct hammer2_iocom hammer2_iocom_t;
  *                             HIGH LEVEL MESSAGING                       *
  ***************************************************************************
  *
+ * Persistent state is stored via the hammer2_persist structure.
  */
+struct hammer2_persist {
+       uint32_t        lcmd;           /* recent command direction */
+       uint32_t        lrep;           /* recent reply direction */
+};
+
+typedef struct hammer2_persist hammer2_persist_t;
 
 #if 0
 
@@ -227,8 +239,6 @@ struct hammer2_pmsg {
        hammer2_span_t  *source;
        hammer2_span_t  *target;
        uint16_t        msgid;
-       void            *aux_data;              /* allocated aux data */
-       hammer2_msg_any_t any;                  /* dynamically allocated */
 };
 
 #endif