hammer2 - spanning tree and messaging work
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 7 Aug 2012 19:40:24 +0000 (12:40 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 7 Aug 2012 19:40:24 +0000 (12:40 -0700)
* Fix numerous bugs and cleanup the messaging infrastructure further.

  Fix issues with state tracking and incorrect message flags, assert that
  flags are correct.

* Fix issues with connection termination.  All active transactions must be
  completely closed from both ends before the iocom can be destroyed.

  Fix bugs in the MSGF_DELETE message simulator when a socket error occurs
  (simulating the other end closing any active transactions going over
  the iocom).

* Implement the spanning tree relay code.  The relay code is even relatively
  optimal though ultimately we need to add additional filters to make
  client<->service rendezvous's less cpu intensive.

sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sbin/hammer2/main.c
sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c
sbin/hammer2/network.h

index e2169e3..6dda52b 100644 (file)
@@ -98,7 +98,7 @@ cmd_shell(const char *hostname)
        printf("debug: connected\n");
 
        msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
        printf("debug: connected\n");
 
        msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
-       hammer2_msg_write(&iocom, msg, NULL, NULL);
+       hammer2_msg_write(&iocom, msg, NULL, NULL, NULL);
 
        hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty);
        fprintf(stderr, "debug: disconnected\n");
 
        hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty);
        fprintf(stderr, "debug: disconnected\n");
@@ -168,7 +168,7 @@ static
 void
 shell_send(hammer2_iocom_t *iocom)
 {
 void
 shell_send(hammer2_iocom_t *iocom)
 {
-       hammer2_iocom_flush(iocom);
+       hammer2_iocom_flush1(iocom);
 }
 
 static
 }
 
 static
@@ -186,7 +186,7 @@ shell_tty(hammer2_iocom_t *iocom)
                ++len;
                msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
                bcopy(buf, msg->aux_data, len);
                ++len;
                msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
                bcopy(buf, msg->aux_data, len);
-               hammer2_msg_write(iocom, msg, NULL, NULL);
+               hammer2_msg_write(iocom, msg, NULL, NULL, NULL);
        } else {
                /*
                 * Set EOF flag without setting any error code for normal
        } else {
                /*
                 * Set EOF flag without setting any error code for normal
@@ -274,7 +274,7 @@ iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...)
                                             HAMMER2_MSGF_REPLY);
        bcopy(buf, rmsg->aux_data, len);
 
                                             HAMMER2_MSGF_REPLY);
        bcopy(buf, rmsg->aux_data, len);
 
-       hammer2_msg_write(iocom, rmsg, NULL, NULL);
+       hammer2_msg_write(iocom, rmsg, NULL, NULL, NULL);
 }
 
 /************************************************************************
 }
 
 /************************************************************************
@@ -349,14 +349,12 @@ show_bref(int fd, int tab, int bi, hammer2_blockref_t *bref)
        bytes = (size_t)1 << (bref->data_off & HAMMER2_OFF_MASK_RADIX);
        if (bytes < HAMMER2_MINIOSIZE || bytes > sizeof(media)) {
                printf("(bad block size %zd)\n", bytes);
        bytes = (size_t)1 << (bref->data_off & HAMMER2_OFF_MASK_RADIX);
        if (bytes < HAMMER2_MINIOSIZE || bytes > sizeof(media)) {
                printf("(bad block size %zd)\n", bytes);
-               sleep(1);
                return;
        }
        if (bref->type != HAMMER2_BREF_TYPE_DATA || VerboseOpt >= 1) {
                lseek(fd, bref->data_off & ~HAMMER2_OFF_MASK_RADIX, 0);
                if (read(fd, &media, bytes) != (ssize_t)bytes) {
                        printf("(media read failed)\n");
                return;
        }
        if (bref->type != HAMMER2_BREF_TYPE_DATA || VerboseOpt >= 1) {
                lseek(fd, bref->data_off & ~HAMMER2_OFF_MASK_RADIX, 0);
                if (read(fd, &media, bytes) != (ssize_t)bytes) {
                        printf("(media read failed)\n");
-                       sleep(1);
                        return;
                }
        }
                        return;
                }
        }
index 9e708a7..848a1e6 100644 (file)
@@ -272,5 +272,5 @@ static
 void
 master_link_tx(hammer2_iocom_t *iocom)
 {
 void
 master_link_tx(hammer2_iocom_t *iocom)
 {
-       hammer2_iocom_flush(iocom);
+       hammer2_iocom_flush1(iocom);
 }
 }
index 3e21588..62a8833 100644 (file)
@@ -102,6 +102,7 @@ int cmd_remote_connect(const char *sel_path, const char *url);
 int cmd_remote_disconnect(const char *sel_path, const char *url);
 int cmd_remote_status(const char *sel_path, int all_opt);
 
 int cmd_remote_disconnect(const char *sel_path, const char *url);
 int cmd_remote_status(const char *sel_path, int all_opt);
 
+int cmd_pfs_getid(const char *sel_path, const char *name, int privateid);
 int cmd_pfs_list(const char *sel_path);
 int cmd_pfs_create(const char *sel_path, const char *name,
                        uint8_t pfs_type, const char *uuid_str);
 int cmd_pfs_list(const char *sel_path);
 int cmd_pfs_create(const char *sel_path, const char *name,
                        uint8_t pfs_type, const char *uuid_str);
@@ -141,10 +142,11 @@ void hammer2_iocom_core(hammer2_iocom_t *iocom,
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
 void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                        void (*func)(hammer2_state_t *, hammer2_msg_t *),
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
 void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                        void (*func)(hammer2_state_t *, hammer2_msg_t *),
-                       void *data);
+                       void *data, hammer2_state_t **statep);
 
 void hammer2_iocom_drain(hammer2_iocom_t *iocom);
 
 void hammer2_iocom_drain(hammer2_iocom_t *iocom);
-void hammer2_iocom_flush(hammer2_iocom_t *iocom);
+void hammer2_iocom_flush1(hammer2_iocom_t *iocom);
+void hammer2_iocom_flush2(hammer2_iocom_t *iocom);
 
 void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 
 void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
index 75caad5..b0dd2fb 100644 (file)
@@ -54,6 +54,7 @@ main(int ac, char **av)
        int ch;
 
        srandomdev();
        int ch;
 
        srandomdev();
+       signal(SIGPIPE, SIG_IGN);
 
        /*
         * Core options
 
        /*
         * Core options
@@ -153,6 +154,24 @@ main(int ac, char **av)
                 * Get status of PFS and its connections (-a for all PFSs)
                 */
                ecode = cmd_remote_status(sel_path, all_opt);
                 * Get status of PFS and its connections (-a for all PFSs)
                 */
                ecode = cmd_remote_status(sel_path, all_opt);
+       } else if (strcmp(av[0], "pfs-clid") == 0) {
+               /*
+                * Print cluster id (uuid) for specific PFS
+                */
+               if (ac < 2) {
+                       fprintf(stderr, "pfs-clid: requires name\n");
+                       usage(1);
+               }
+               ecode = cmd_pfs_getid(sel_path, av[1], 0);
+       } else if (strcmp(av[0], "pfs-fsid") == 0) {
+               /*
+                * Print private id (uuid) for specific PFS
+                */
+               if (ac < 2) {
+                       fprintf(stderr, "pfs-fsid: requires name\n");
+                       usage(1);
+               }
+               ecode = cmd_pfs_getid(sel_path, av[1], 1);
        } else if (strcmp(av[0], "pfs-list") == 0) {
                /*
                 * List all PFSs
        } else if (strcmp(av[0], "pfs-list") == 0) {
                /*
                 * List all PFSs
@@ -309,6 +328,8 @@ usage(int code)
                "    disconnect <target> Del cluster link\n"
                "    status             Report cluster status\n"
                "    pfs-list           List PFSs\n"
                "    disconnect <target> Del cluster link\n"
                "    status             Report cluster status\n"
                "    pfs-list           List PFSs\n"
+               "    pfs-clid <label>   Print cluster id for specific PFS\n"
+               "    pfs-fsid <label>   Print private id for specific PFS\n"
                "    pfs-create <label> Create a PFS\n"
                "    pfs-delete <label> Destroy a PFS\n"
                "    snapshot           Snapshot a PFS\n"
                "    pfs-create <label> Create a PFS\n"
                "    pfs-delete <label> Destroy a PFS\n"
                "    snapshot           Snapshot a PFS\n"
index 53cec34..e406821 100644 (file)
@@ -49,12 +49,18 @@ hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
        TAILQ_INIT(&ioq->msgq);
 }
 
        TAILQ_INIT(&ioq->msgq);
 }
 
+/*
+ * Cleanup queue.
+ *
+ * caller holds iocom->mtx.
+ */
 void
 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 {
        hammer2_msg_t *msg;
 
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
 void
 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 {
        hammer2_msg_t *msg;
 
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
+               assert(0);      /* shouldn't happen */
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                hammer2_msg_free(iocom, msg);
        }
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                hammer2_msg_free(iocom, msg);
        }
@@ -72,16 +78,22 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
 {
        bzero(iocom, sizeof(*iocom));
 
 {
        bzero(iocom, sizeof(*iocom));
 
+       pthread_mutex_init(&iocom->mtx, NULL);
        RB_INIT(&iocom->staterd_tree);
        RB_INIT(&iocom->statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->addrq);
        RB_INIT(&iocom->staterd_tree);
        RB_INIT(&iocom->statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->addrq);
+       TAILQ_INIT(&iocom->txmsgq);
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
-       iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
+       iocom->flags = HAMMER2_IOCOMF_RREQ;
        hammer2_ioq_init(iocom, &iocom->ioq_rx);
        hammer2_ioq_init(iocom, &iocom->ioq_tx);
        hammer2_ioq_init(iocom, &iocom->ioq_rx);
        hammer2_ioq_init(iocom, &iocom->ioq_tx);
+       if (pipe(iocom->wakeupfds) < 0)
+               assert(0);
+       fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
+       fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
 
        /*
         * Negotiate session crypto synchronously.  This will mark the
 
        /*
         * Negotiate session crypto synchronously.  This will mark the
@@ -101,12 +113,25 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
 #endif
 }
 
 #endif
 }
 
+/*
+ * Cleanup a terminating iocom.
+ *
+ * Caller should not hold iocom->mtx.  The iocom has already been disconnected
+ * from all possible references to it.
+ */
 void
 hammer2_iocom_done(hammer2_iocom_t *iocom)
 {
        hammer2_msg_t *msg;
 
 void
 hammer2_iocom_done(hammer2_iocom_t *iocom)
 {
        hammer2_msg_t *msg;
 
-       iocom->sock_fd = -1;
+       if (iocom->sock_fd >= 0) {
+               close(iocom->sock_fd);
+               iocom->sock_fd = -1;
+       }
+       if (iocom->alt_fd >= 0) {
+               close(iocom->alt_fd);
+               iocom->alt_fd = -1;
+       }
        hammer2_ioq_done(iocom, &iocom->ioq_rx);
        hammer2_ioq_done(iocom, &iocom->ioq_tx);
        if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
        hammer2_ioq_done(iocom, &iocom->ioq_rx);
        hammer2_ioq_done(iocom, &iocom->ioq_tx);
        if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
@@ -119,6 +144,15 @@ hammer2_iocom_done(hammer2_iocom_t *iocom)
                msg->aux_data = NULL;
                free(msg);
        }
                msg->aux_data = NULL;
                free(msg);
        }
+       if (iocom->wakeupfds[0] >= 0) {
+               close(iocom->wakeupfds[0]);
+               iocom->wakeupfds[0] = -1;
+       }
+       if (iocom->wakeupfds[1] >= 0) {
+               close(iocom->wakeupfds[1]);
+               iocom->wakeupfds[1] = -1;
+       }
+       pthread_mutex_destroy(&iocom->mtx);
 }
 
 /*
 }
 
 /*
@@ -130,6 +164,7 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
        hammer2_msg_t *msg;
        int hbytes;
 
        hammer2_msg_t *msg;
        int hbytes;
 
+       pthread_mutex_lock(&iocom->mtx);
        if (aux_size) {
                aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
                           ~HAMMER2_MSG_ALIGNMASK;
        if (aux_size) {
                aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
                           ~HAMMER2_MSG_ALIGNMASK;
@@ -139,6 +174,7 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
+       pthread_mutex_unlock(&iocom->mtx);
        if (msg == NULL) {
                msg = malloc(sizeof(*msg));
                bzero(msg, sizeof(*msg));
        if (msg == NULL) {
                msg = malloc(sizeof(*msg));
                bzero(msg, sizeof(*msg));
@@ -172,17 +208,29 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
  *
  * NOTE: aux_size can be 0 with a non-NULL aux_data.
  */
  *
  * NOTE: aux_size can be 0 with a non-NULL aux_data.
  */
+static
 void
 void
-hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
 {
+       msg->state = NULL;
        if (msg->aux_data)
                TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
        else
                TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
 }
 
        if (msg->aux_data)
                TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
        else
                TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
 }
 
+void
+hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+{
+       pthread_mutex_lock(&iocom->mtx);
+       hammer2_msg_free_locked(iocom, msg);
+       pthread_mutex_unlock(&iocom->mtx);
+}
+
 /*
  * I/O core loop for an iocom.
 /*
  * I/O core loop for an iocom.
+ *
+ * Thread localized, iocom->mtx not held.
  */
 void
 hammer2_iocom_core(hammer2_iocom_t *iocom,
  */
 void
 hammer2_iocom_core(hammer2_iocom_t *iocom,
@@ -190,50 +238,119 @@ hammer2_iocom_core(hammer2_iocom_t *iocom,
                   void (*sendmsg_func)(hammer2_iocom_t *),
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
                   void (*sendmsg_func)(hammer2_iocom_t *),
                   void (*altmsg_func)(hammer2_iocom_t *))
 {
-       struct pollfd fds[2];
+       struct pollfd fds[3];
+       char dummybuf[256];
        int timeout;
        int timeout;
+       int count;
+       int wi; /* wakeup pipe */
+       int si; /* socket */
+       int ai; /* alt bulk path socket */
 
        iocom->recvmsg_callback = recvmsg_func;
        iocom->sendmsg_callback = sendmsg_func;
        iocom->altmsg_callback = altmsg_func;
 
        while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
 
        iocom->recvmsg_callback = recvmsg_func;
        iocom->sendmsg_callback = sendmsg_func;
        iocom->altmsg_callback = altmsg_func;
 
        while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
-               timeout = 5000;
+               if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
+                                    HAMMER2_IOCOMF_WWORK |
+                                    HAMMER2_IOCOMF_PWORK |
+                                    HAMMER2_IOCOMF_ARWORK |
+                                    HAMMER2_IOCOMF_AWWORK)) == 0) {
+                       /*
+                        * Only poll if no immediate work is pending.
+                        * Otherwise we are just wasting our time calling
+                        * poll.
+                        */
+                       timeout = 5000;
 
 
-               fds[0].fd = iocom->sock_fd;
-               fds[0].events = 0;
-               fds[0].revents = 0;
+                       count = 0;
+                       wi = -1;
+                       si = -1;
+                       ai = -1;
 
 
-               if (iocom->flags & HAMMER2_IOCOMF_RREQ)
-                       fds[0].events |= POLLIN;
-               else
-                       timeout = 0;
-               if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
-                       if (iocom->flags & HAMMER2_IOCOMF_WREQ)
-                               fds[0].events |= POLLOUT;
-                       else
-                               timeout = 0;
-               }
+                       /*
+                        * Always check the inter-thread pipe, e.g.
+                        * for iocom->txmsgq work.
+                        */
+                       wi = count++;
+                       fds[wi].fd = iocom->wakeupfds[0];
+                       fds[wi].events = POLLIN;
+                       fds[wi].revents = 0;
+
+                       /*
+                        * Check the socket input/output direction as
+                        * requested
+                        */
+                       if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
+                                           HAMMER2_IOCOMF_WREQ)) {
+                               si = count++;
+                               fds[si].fd = iocom->sock_fd;
+                               fds[si].events = 0;
+                               fds[si].revents = 0;
+
+                               if (iocom->flags & HAMMER2_IOCOMF_RREQ)
+                                       fds[si].events |= POLLIN;
+                               if (iocom->flags & HAMMER2_IOCOMF_WREQ)
+                                       fds[si].events |= POLLOUT;
+                       }
 
 
-               if (iocom->alt_fd >= 0) {
-                       fds[1].fd = iocom->alt_fd;
-                       fds[1].events |= POLLIN;
-                       fds[1].revents = 0;
-                       poll(fds, 2, timeout);
+                       /*
+                        * Check the alternative fd for work.
+                        */
+                       if (iocom->alt_fd >= 0) {
+                               ai = count++;
+                               fds[ai].fd = iocom->alt_fd;
+                               fds[ai].events = POLLIN;
+                               fds[ai].revents = 0;
+                       }
+                       poll(fds, count, timeout);
+
+                       if (wi >= 0 && (fds[wi].revents & POLLIN))
+                               iocom->flags |= HAMMER2_IOCOMF_PWORK;
+                       if (si >= 0 && (fds[si].revents & POLLIN))
+                               iocom->flags |= HAMMER2_IOCOMF_RWORK;
+                       if (si >= 0 && (fds[si].revents & POLLOUT))
+                               iocom->flags |= HAMMER2_IOCOMF_WWORK;
+                       if (wi >= 0 && (fds[wi].revents & POLLOUT))
+                               iocom->flags |= HAMMER2_IOCOMF_WWORK;
+                       if (ai >= 0 && (fds[ai].revents & POLLIN))
+                               iocom->flags |= HAMMER2_IOCOMF_ARWORK;
                } else {
                } else {
-                       poll(fds, 1, timeout);
-               }
-               if ((fds[0].revents & POLLIN) ||
-                   (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
-                       iocom->recvmsg_callback(iocom);
+                       /*
+                        * Always check the pipe
+                        */
+                       iocom->flags |= HAMMER2_IOCOMF_PWORK;
                }
                }
-               if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
-                       if ((fds[0].revents & POLLOUT) ||
-                           (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
+
+               /*
+                * Pending message queues from other threads wake us up
+                * with a write to the wakeupfds[] pipe.  We have to clear
+                * the pipe with a dummy read.
+                */
+               if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
+                       iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
+                       read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
+                       iocom->flags |= HAMMER2_IOCOMF_RWORK;
+                       iocom->flags |= HAMMER2_IOCOMF_WWORK;
+                       if (TAILQ_FIRST(&iocom->txmsgq))
                                iocom->sendmsg_callback(iocom);
                                iocom->sendmsg_callback(iocom);
-                       }
                }
                }
-               if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
+
+               /*
+                * Message write sequencing
+                */
+               if (iocom->flags & HAMMER2_IOCOMF_WWORK)
+                       iocom->sendmsg_callback(iocom);
+
+               /*
+                * Message read sequencing.  Run this after the write
+                * sequencing in case the write sequencing allowed another
+                * auto-DELETE to occur on the read side.
+                */
+               if (iocom->flags & HAMMER2_IOCOMF_RWORK)
+                       iocom->recvmsg_callback(iocom);
+
+               if (iocom->flags & HAMMER2_IOCOMF_ARWORK)
                        iocom->altmsg_callback(iocom);
        }
 }
                        iocom->altmsg_callback(iocom);
        }
 }
@@ -247,6 +364,8 @@ hammer2_iocom_core(hammer2_iocom_t *iocom,
  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
  * msg will be returned as the final message.  The caller should not call
  * us again after the final message is returned.
  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
  * msg will be returned as the final message.  The caller should not call
  * us again after the final message is returned.
+ *
+ * Thread localized, iocom->mtx not held.
  */
 hammer2_msg_t *
 hammer2_ioq_read(hammer2_iocom_t *iocom)
  */
 hammer2_msg_t *
 hammer2_ioq_read(hammer2_iocom_t *iocom)
@@ -262,6 +381,8 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
        int error;
 
 again:
        int error;
 
 again:
+       iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
+
        /*
         * If a message is already pending we can just remove and
         * return it.  Message state has already been processed.
        /*
         * If a message is already pending we can just remove and
         * return it.  Message state has already been processed.
@@ -633,9 +754,10 @@ again:
                ioq->state = HAMMER2_MSGQ_STATE_ERROR;
 
                /*
                ioq->state = HAMMER2_MSGQ_STATE_ERROR;
 
                /*
-                * Return LNK_ERROR for any open transaction, and finally
-                * as a non-transactional message when no transactions are
-                * left.
+                * Simulate a remote LNK_ERROR DELETE msg for any open
+                * transactions, ending with a final non-transactional
+                * LNK_ERROR (that the session can detect) when no
+                * transactions remain.
                 */
                msg = hammer2_msg_alloc(iocom, 0, 0);
                bzero(&msg->any.head, sizeof(msg->any.head));
                 */
                msg = hammer2_msg_alloc(iocom, 0, 0);
                bzero(&msg->any.head, sizeof(msg->any.head));
@@ -643,23 +765,71 @@ again:
                msg->any.head.cmd = HAMMER2_LNK_ERROR;
                msg->any.head.error = ioq->error;
 
                msg->any.head.cmd = HAMMER2_LNK_ERROR;
                msg->any.head.error = ioq->error;
 
+               pthread_mutex_lock(&iocom->mtx);
                if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
                        /*
                if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
                        /*
-                        * Active transactions are still present.  Simulate
-                        * the other end sending us a DELETE.
+                        * Active remote transactions are still present.
+                        * Simulate the other end sending us a DELETE.
                         */
                         */
-                       state->txcmd |= HAMMER2_MSGF_DELETE;
-                       msg->state = state;
-                       msg->any.head.spanid = state->spanid;
-                       msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
-                                            HAMMER2_MSGF_DELETE;
+                       if (state->rxcmd & HAMMER2_MSGF_DELETE) {
+                               fprintf(stderr, "SIMULATE DELETION RCONT %p\n", state);
+                               hammer2_msg_free(iocom, msg);
+                               msg = NULL;
+                       } else {
+                               fprintf(stderr, "SIMULATE DELETION %p RD RXCMD %08x\n", state, state->rxcmd);
+                               /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
+                               msg->state = state;
+                               msg->any.head.spanid = state->spanid;
+                               msg->any.head.msgid = state->msgid;
+                               msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
+                                                    HAMMER2_MSGF_DELETE;
+                       }
+               } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
+                       /*
+                        * Active local transactions are still present.
+                        * Simulate the other end sending us a DELETE.
+                        */
+                       if (state->rxcmd & HAMMER2_MSGF_DELETE) {
+                               fprintf(stderr, "SIMULATE DELETION WCONT\n");
+                               hammer2_msg_free(iocom, msg);
+                               msg = NULL;
+                       } else {
+                               fprintf(stderr, "SIMULATE DELETION WD RXCMD %08x\n", state->txcmd);
+                               /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
+                               msg->state = state;
+                               msg->any.head.spanid = state->spanid;
+                               msg->any.head.msgid = state->msgid;
+                               msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
+                                                    HAMMER2_MSGF_DELETE |
+                                                    HAMMER2_MSGF_REPLY;
+                               if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                                       msg->any.head.cmd |=
+                                                    HAMMER2_MSGF_CREATE;
+                               }
+                       }
                } else {
                        /*
                } else {
                        /*
-                        * No active transactions remain
+                        * No active local or remote transactions remain.
+                        * Generate a final LNK_ERROR and flag EOF.
                         */
                        msg->state = NULL;
                        iocom->flags |= HAMMER2_IOCOMF_EOF;
                         */
                        msg->state = NULL;
                        iocom->flags |= HAMMER2_IOCOMF_EOF;
+                       fprintf(stderr, "EOF ON SOCKET\n");
                }
                }
+               pthread_mutex_unlock(&iocom->mtx);
+
+               /*
+                * For the iocom error case we want to set RWORK to indicate
+                * that more messages might be pending.
+                *
+                * It is possible to return NULL when there is more work to
+                * do because each message has to be DELETEd in both
+                * directions before we continue on with the next (though
+                * this could be optimized).  The transmit direction will
+                * re-set RWORK.
+                */
+               if (msg)
+                       iocom->flags |= HAMMER2_IOCOMF_RWORK;
        } else if (msg == NULL) {
                /*
                 * Insufficient data received to finish building the message,
        } else if (msg == NULL) {
                /*
                 * Insufficient data received to finish building the message,
@@ -669,19 +839,16 @@ again:
                 * Leave the FIFO intact.
                 */
                iocom->flags |= HAMMER2_IOCOMF_RREQ;
                 * Leave the FIFO intact.
                 */
                iocom->flags |= HAMMER2_IOCOMF_RREQ;
-#if 0
-               ioq->fifo_cdx = 0;
-               ioq->fifo_beg = 0;
-               ioq->fifo_end = 0;
-#endif
        } else {
                /*
        } else {
                /*
-                * Return msg, clear the FIFO if it is now empty.
-                * Flag RREQ if the caller needs to wait for a read-event
-                * or not.
+                * Return msg.
                 *
                 * The fifo has already been advanced past the message.
                 * Trivially reset the FIFO indices if possible.
                 *
                 * The fifo has already been advanced past the message.
                 * Trivially reset the FIFO indices if possible.
+                *
+                * clear the FIFO if it is now empty and set RREQ to wait
+                * for more from the socket.  If the FIFO is not empty set
+                * TWORK to bypass the poll so we loop immediately.
                 */
                if (ioq->fifo_beg == ioq->fifo_end) {
                        iocom->flags |= HAMMER2_IOCOMF_RREQ;
                 */
                if (ioq->fifo_beg == ioq->fifo_end) {
                        iocom->flags |= HAMMER2_IOCOMF_RREQ;
@@ -689,7 +856,7 @@ again:
                        ioq->fifo_beg = 0;
                        ioq->fifo_end = 0;
                } else {
                        ioq->fifo_beg = 0;
                        ioq->fifo_end = 0;
                } else {
-                       iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
+                       iocom->flags |= HAMMER2_IOCOMF_RWORK;
                }
                ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
                ioq->msg = NULL;
                }
                ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
                ioq->msg = NULL;
@@ -704,73 +871,80 @@ again:
  *
  * A non-NULL msg is added to the queue but not necessarily flushed.
  * Calling this function with msg == NULL will get a flush going.
  *
  * A non-NULL msg is added to the queue but not necessarily flushed.
  * Calling this function with msg == NULL will get a flush going.
+ *
+ * Caller must hold iocom->mtx.
  */
  */
-static void
-hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+void
+hammer2_iocom_flush1(hammer2_iocom_t *iocom)
 {
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
 {
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
+       hammer2_msg_t *msg;
        uint32_t xcrc32;
        int hbytes;
        uint32_t xcrc32;
        int hbytes;
-
-       assert(msg);
-
-       /*
-        * Process terminal connection errors.
-        */
-       if (ioq->error) {
-               TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
-               ++ioq->msgcount;
-               hammer2_iocom_drain(iocom);
-               return;
+       hammer2_msg_queue_t tmpq;
+
+       iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
+       TAILQ_INIT(&tmpq);
+       pthread_mutex_lock(&iocom->mtx);
+       while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
+               TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
+               TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
        }
        }
+       pthread_mutex_unlock(&iocom->mtx);
 
 
-       /*
-        * Finish populating the msg fields.  The salt ensures that the iv[]
-        * array is ridiculously randomized and we also re-seed our PRNG
-        * every 32768 messages just to be sure.
-        */
-       msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
-       msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
-       ++ioq->seq;
-       if ((ioq->seq & 32767) == 0)
-               srandomdev();
+       while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
+               /*
+                * Process terminal connection errors.
+                */
+               TAILQ_REMOVE(&tmpq, msg, qentry);
+               if (ioq->error) {
+                       TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
+                       ++ioq->msgcount;
+                       continue;
+               }
 
 
-       /*
-        * Calculate aux_crc if 0, then calculate hdr_crc.
-        */
-       if (msg->aux_size && msg->any.head.aux_crc == 0) {
-               assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
-               xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
-               msg->any.head.aux_crc = xcrc32;
-       }
-       msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
-       assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
+               /*
+                * Finish populating the msg fields.  The salt ensures that
+                * the iv[] array is ridiculously randomized and we also
+                * re-seed our PRNG every 32768 messages just to be sure.
+                */
+               msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
+               msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
+               ++ioq->seq;
+               if ((ioq->seq & 32767) == 0)
+                       srandomdev();
 
 
-       hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
-       msg->any.head.hdr_crc = 0;
-       msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
+               /*
+                * Calculate aux_crc if 0, then calculate hdr_crc.
+                */
+               if (msg->aux_size && msg->any.head.aux_crc == 0) {
+                       assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
+                       xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
+                       msg->any.head.aux_crc = xcrc32;
+               }
+               msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
+               assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
 
 
-       /*
-        * Enqueue the message (the flush codes handles stream encryption).
-        */
-       TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
-       ++ioq->msgcount;
-       iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
+               hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
+                        HAMMER2_MSG_ALIGN;
+               msg->any.head.hdr_crc = 0;
+               msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
 
 
-       /*
-        * Flush if we know we can write (WREQ not set) and if
-        * sufficient messages have accumulated.  Otherwise hold
-        * off to avoid piecemeal system calls.
-        */
-       if (iocom->flags & HAMMER2_IOCOMF_WREQ)
-               return;
-       if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
-               return;
-       hammer2_iocom_flush(iocom);
+               /*
+                * Enqueue the message (the flush codes handles stream
+                * encryption).
+                */
+               TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
+               ++ioq->msgcount;
+       }
+       hammer2_iocom_flush2(iocom);
 }
 
 }
 
+/*
+ * Thread localized, iocom->mtx not held by caller.
+ */
 void
 void
-hammer2_iocom_flush(hammer2_iocom_t *iocom)
+hammer2_iocom_flush2(hammer2_iocom_t *iocom)
 {
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
        hammer2_msg_t *msg;
 {
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
        hammer2_msg_t *msg;
@@ -783,6 +957,11 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
        int aoff;
        int n;
 
        int aoff;
        int n;
 
+       if (ioq->error) {
+               hammer2_iocom_drain(iocom);
+               return;
+       }
+
        /*
         * Pump messages out the connection by building an iovec.
         */
        /*
         * Pump messages out the connection by building an iovec.
         */
@@ -837,19 +1016,33 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
                if (errno != EINTR &&
                    errno != EINPROGRESS &&
                    errno != EAGAIN) {
                if (errno != EINTR &&
                    errno != EINPROGRESS &&
                    errno != EAGAIN) {
+                       /*
+                        * Fatal write error
+                        */
                        ioq->error = HAMMER2_IOQ_ERROR_SOCK;
                        hammer2_iocom_drain(iocom);
                } else {
                        ioq->error = HAMMER2_IOQ_ERROR_SOCK;
                        hammer2_iocom_drain(iocom);
                } else {
+                       /*
+                        * Wait for socket buffer space
+                        */
                        iocom->flags |= HAMMER2_IOCOMF_WREQ;
                }
                return;
        }
                        iocom->flags |= HAMMER2_IOCOMF_WREQ;
                }
                return;
        }
+
+       /*
+        * Indicate bytes written successfully.  If we were unable to
+        * write the entire iov array then set WREQ to wait for more
+        * socket buffer space.
+        */
        hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
        hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
-       if (nact == nmax)
-               iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
-       else
+       if (nact != nmax)
                iocom->flags |= HAMMER2_IOCOMF_WREQ;
 
                iocom->flags |= HAMMER2_IOCOMF_WREQ;
 
+       /*
+        * Clean out the transmit queue based on what we successfully
+        * sent.
+        */
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
                         HAMMER2_MSG_ALIGN;
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
                         HAMMER2_MSG_ALIGN;
@@ -874,14 +1067,8 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
 
                hammer2_state_cleanuptx(iocom, msg);
        }
 
                hammer2_state_cleanuptx(iocom, msg);
        }
-       if (msg == NULL) {
-               iocom->flags |= HAMMER2_IOCOMF_WIDLE;
-               iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
-       }
        if (ioq->error) {
        if (ioq->error) {
-               iocom->flags |= HAMMER2_IOCOMF_EOF |
-                               HAMMER2_IOCOMF_WIDLE;
-               iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
+               hammer2_iocom_drain(iocom);
        }
 }
 
        }
 }
 
@@ -890,6 +1077,8 @@ hammer2_iocom_flush(hammer2_iocom_t *iocom)
  * write events will occur.  We don't kill read msgs because we want
  * the caller to pull off our contrived terminal error msg to detect
  * the connection failure.
  * write events will occur.  We don't kill read msgs because we want
  * the caller to pull off our contrived terminal error msg to detect
  * the connection failure.
+ *
+ * Thread localized, iocom->mtx not held by caller.
  */
 void
 hammer2_iocom_drain(hammer2_iocom_t *iocom)
  */
 void
 hammer2_iocom_drain(hammer2_iocom_t *iocom)
@@ -897,30 +1086,31 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom)
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
        hammer2_msg_t *msg;
 
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
        hammer2_msg_t *msg;
 
+       iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
+
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                --ioq->msgcount;
        while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                --ioq->msgcount;
-               hammer2_msg_free(iocom, msg);
+               hammer2_state_cleanuptx(iocom, msg);
        }
        }
-       iocom->flags |= HAMMER2_IOCOMF_WIDLE;
-       iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
 }
 
 /*
  * Write a message to an iocom, with additional state processing.
 }
 
 /*
  * Write a message to an iocom, with additional state processing.
- *
- * The iocom lock must be held by the caller. XXX
  */
 void
 hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                  void (*func)(hammer2_state_t *, hammer2_msg_t *),
  */
 void
 hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                  void (*func)(hammer2_state_t *, hammer2_msg_t *),
-                 void *data)
+                 void *data,
+                 hammer2_state_t **statep)
 {
        hammer2_state_t *state;
 {
        hammer2_state_t *state;
+       char dummy;
 
        /*
         * Handle state processing, create state if necessary.
         */
 
        /*
         * Handle state processing, create state if necessary.
         */
+       pthread_mutex_lock(&iocom->mtx);
        if ((state = msg->state) != NULL) {
                /*
                 * Existing transaction (could be reply).  It is also
        if ((state = msg->state) != NULL) {
                /*
                 * Existing transaction (could be reply).  It is also
@@ -933,12 +1123,11 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                        state->func = func;
                        state->any.any = data;
                }
                        state->func = func;
                        state->any.any = data;
                }
+               assert(((state->txcmd ^ msg->any.head.cmd) &
+                       HAMMER2_MSGF_REPLY) == 0);
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
                        state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
                        state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
-               fprintf(stderr, "MSGWRITE IN REPLY msgid %016jx\n",
-                       (intmax_t)msg->any.head.msgid);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
        } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
-               fprintf(stderr, "MSGWRITE NEW MSG\n");
                /*
                 * No existing state and CREATE is set, create new
                 * state for outgoing command.  This can't happen if
                /*
                 * No existing state and CREATE is set, create new
                 * state for outgoing command.  This can't happen if
@@ -955,6 +1144,7 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                state->msgid = (uint64_t)(uintptr_t)state;
                state->spanid = msg->any.head.spanid;
                state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                state->msgid = (uint64_t)(uintptr_t)state;
                state->spanid = msg->any.head.spanid;
                state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               state->rxcmd = HAMMER2_MSGF_REPLY;
                state->func = func;
                state->any.any = data;
                RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
                state->func = func;
                state->any.any = data;
                RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
@@ -963,118 +1153,23 @@ hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
                msg->any.head.msgid = state->msgid;
                /* spanid set by caller */
        } else {
                msg->any.head.msgid = state->msgid;
                /* spanid set by caller */
        } else {
-               fprintf(stderr, "MSGWRITE ONE-OFF\n");
                msg->any.head.msgid = 0;
                /* spanid set by caller */
        }
 
                msg->any.head.msgid = 0;
                /* spanid set by caller */
        }
 
+       if (statep)
+               *statep = state;
+
        /*
        /*
-        * Queue it for output
+        * Queue it for output, wake up the I/O pthread.  Note that the
+        * I/O thread is responsible for generating the CRCs and encryption.
         */
         */
-       hammer2_ioq_write(iocom, msg);
+       TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
+       dummy = 0;
+       write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
+       pthread_mutex_unlock(&iocom->mtx);
 }
 
 }
 
-#if 0
-
-       case HAMMER2_MSGF_DELETE:
-               /*
-                * Sent ABORT+DELETE in case where msgid has already
-                * been fully closed, ignore the message.
-                */
-               if (state == NULL) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "no state match for DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-
-               /*
-                * Sent ABORT+DELETE in case where msgid has
-                * already been reused for an unrelated message,
-                * ignore the message.
-                */
-               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "state reused for DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-               error = 0;
-
-
-       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
-               /*
-                * When transmitting a reply with DELETE set the original
-                * persistent state message should already exist.
-                *
-                * This is very similar to the REPLY|CREATE|* case except
-                * txcmd is already stored, so we just add the DELETE flag.
-                *
-                * Sent REPLY+ABORT+DELETE in case where msgid has
-                * already been fully closed, ignore the message.
-                */
-               if (state == NULL) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "no state match for "
-                                            "REPLY | DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-
-               /*
-                * Sent REPLY+ABORT+DELETE in case where msgid has already
-                * been reused for an unrelated message, ignore the message.
-                */
-               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "state reused for "
-                                            "REPLY | DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-               error = 0;
-               break;
-       case HAMMER2_MSGF_REPLY:
-               /*
-                * Check for mid-stream ABORT reply sent.
-                *
-                * One-off REPLY messages are allowed for e.g. status updates.
-                */
-               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                       if (state == NULL ||
-                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                               break;
-                       }
-               }
-               error = 0;
-               break;
-       }
-       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
-       return (error);
-#endif
-
-
 /*
  * This is a shortcut to formulate a reply to msg with a simple error code,
  * It can reply to and terminate a transaction, or it can reply to a one-way
 /*
  * This is a shortcut to formulate a reply to msg with a simple error code,
  * It can reply to and terminate a transaction, or it can reply to a one-way
@@ -1115,7 +1210,7 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
                        return;
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        cmd |= HAMMER2_MSGF_CREATE;
                        return;
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        cmd |= HAMMER2_MSGF_CREATE;
-               if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+               if (state->txcmd & HAMMER2_MSGF_REPLY)
                        cmd |= HAMMER2_MSGF_REPLY;
                cmd |= HAMMER2_MSGF_DELETE;
        } else {
                        cmd |= HAMMER2_MSGF_REPLY;
                cmd |= HAMMER2_MSGF_DELETE;
        } else {
@@ -1126,7 +1221,7 @@ hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
        nmsg = hammer2_msg_alloc(iocom, 0, cmd);
        nmsg->any.head.error = error;
        nmsg->state = msg->state;
        nmsg = hammer2_msg_alloc(iocom, 0, cmd);
        nmsg->any.head.error = error;
        nmsg->state = msg->state;
-       hammer2_msg_write(iocom, nmsg, NULL, 0);
+       hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
 }
 
 /*
 }
 
 /*
@@ -1162,7 +1257,7 @@ hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
                        return;
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        cmd |= HAMMER2_MSGF_CREATE;
                        return;
                if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
                        cmd |= HAMMER2_MSGF_CREATE;
-               if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+               if (state->txcmd & HAMMER2_MSGF_REPLY)
                        cmd |= HAMMER2_MSGF_REPLY;
                /* continuing transaction, do not set MSGF_DELETE */
        } else {
                        cmd |= HAMMER2_MSGF_REPLY;
                /* continuing transaction, do not set MSGF_DELETE */
        } else {
@@ -1173,7 +1268,7 @@ hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
        nmsg = hammer2_msg_alloc(iocom, 0, cmd);
        nmsg->any.head.error = error;
        nmsg->state = state;
        nmsg = hammer2_msg_alloc(iocom, 0, cmd);
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(iocom, nmsg, NULL, 0);
+       hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
 }
 
 /*
 }
 
 /*
@@ -1202,13 +1297,13 @@ hammer2_state_reply(hammer2_state_t *state, uint32_t error)
         * Set REPLY if the other end initiated the command.  Otherwise
         * we are the command direction.
         */
         * Set REPLY if the other end initiated the command.  Otherwise
         * we are the command direction.
         */
-       if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+       if (state->txcmd & HAMMER2_MSGF_REPLY)
                cmd |= HAMMER2_MSGF_REPLY;
 
        nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
        nmsg->any.head.error = error;
        nmsg->state = state;
                cmd |= HAMMER2_MSGF_REPLY;
 
        nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
        nmsg->any.head.error = error;
        nmsg->state = state;
-       hammer2_msg_write(state->iocom, nmsg, NULL, 0);
+       hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
 }
 
 /************************************************************************
 }
 
 /************************************************************************
@@ -1299,7 +1394,6 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         */
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         */
-       /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
 
        dummy.msgid = msg->any.head.msgid;
        dummy.spanid = msg->any.head.spanid;
 
        dummy.msgid = msg->any.head.msgid;
        dummy.spanid = msg->any.head.spanid;
@@ -1310,6 +1404,7 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                      (intmax_t)msg->any.head.msgid,
                      (intmax_t)msg->any.head.spanid);
 #endif
                      (intmax_t)msg->any.head.msgid,
                      (intmax_t)msg->any.head.spanid);
 #endif
+       pthread_mutex_lock(&iocom->mtx);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
                                &iocom->statewr_tree, &dummy);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
                                &iocom->statewr_tree, &dummy);
@@ -1318,13 +1413,13 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                                &iocom->staterd_tree, &dummy);
        }
        msg->state = state;
                                &iocom->staterd_tree, &dummy);
        }
        msg->state = state;
+       pthread_mutex_unlock(&iocom->mtx);
 
        /*
         * Short-cut one-off or mid-stream messages (state may be NULL).
         */
        if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
                                  HAMMER2_MSGF_ABORT)) == 0) {
 
        /*
         * Short-cut one-off or mid-stream messages (state may be NULL).
         */
        if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
                                  HAMMER2_MSGF_ABORT)) == 0) {
-               /*lockmgr(&pmp->msglk, LK_RELEASE);*/
                return(0);
        }
 
                return(0);
        }
 
@@ -1351,8 +1446,11 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                state->iocom = iocom;
                state->flags = HAMMER2_STATE_DYNAMIC;
                state->msg = msg;
                state->iocom = iocom;
                state->flags = HAMMER2_STATE_DYNAMIC;
                state->msg = msg;
+               state->txcmd = HAMMER2_MSGF_REPLY;
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               pthread_mutex_lock(&iocom->mtx);
                RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
                RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
+               pthread_mutex_unlock(&iocom->mtx);
                state->flags |= HAMMER2_STATE_INSERTED;
                state->msgid = msg->any.head.msgid;
                state->spanid = msg->any.head.spanid;
                state->flags |= HAMMER2_STATE_INSERTED;
                state->msgid = msg->any.head.msgid;
                state->spanid = msg->any.head.spanid;
@@ -1421,6 +1519,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                        error = HAMMER2_IOQ_ERROR_TRANS;
                        break;
                }
                        error = HAMMER2_IOQ_ERROR_TRANS;
                        break;
                }
+               assert(((state->rxcmd ^ msg->any.head.cmd) &
+                       HAMMER2_MSGF_REPLY) == 0);
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                error = 0;
                break;
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                error = 0;
                break;
@@ -1474,7 +1574,6 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                error = 0;
                break;
        }
                error = 0;
                break;
        }
-       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
        return (error);
 }
 
        return (error);
 }
 
@@ -1495,25 +1594,27 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                 * state, the original message, and this message (if it
                 * isn't the original message due to a CREATE|DELETE).
                 */
                 * state, the original message, and this message (if it
                 * isn't the original message due to a CREATE|DELETE).
                 */
-               /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
+               pthread_mutex_lock(&iocom->mtx);
                state->rxcmd |= HAMMER2_MSGF_DELETE;
                if (state->txcmd & HAMMER2_MSGF_DELETE) {
                        if (state->msg == msg)
                                state->msg = NULL;
                        assert(state->flags & HAMMER2_STATE_INSERTED);
                state->rxcmd |= HAMMER2_MSGF_DELETE;
                if (state->txcmd & HAMMER2_MSGF_DELETE) {
                        if (state->msg == msg)
                                state->msg = NULL;
                        assert(state->flags & HAMMER2_STATE_INSERTED);
-                       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+                       if (state->rxcmd & HAMMER2_MSGF_REPLY) {
+                               assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->statewr_tree, state);
                        } else {
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->statewr_tree, state);
                        } else {
+                               assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->staterd_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->staterd_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
-                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
                        hammer2_state_free(state);
                } else {
                        hammer2_state_free(state);
                } else {
-                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+                       ;
                }
                }
+               pthread_mutex_unlock(&iocom->mtx);
                hammer2_msg_free(iocom, msg);
        } else if (state->msg != msg) {
                /*
                hammer2_msg_free(iocom, msg);
        } else if (state->msg != msg) {
                /*
@@ -1532,42 +1633,67 @@ hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
        if ((state = msg->state) == NULL) {
                hammer2_msg_free(iocom, msg);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
        if ((state = msg->state) == NULL) {
                hammer2_msg_free(iocom, msg);
        } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
-               /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
+               pthread_mutex_lock(&iocom->mtx);
                state->txcmd |= HAMMER2_MSGF_DELETE;
                if (state->rxcmd & HAMMER2_MSGF_DELETE) {
                        if (state->msg == msg)
                                state->msg = NULL;
                        assert(state->flags & HAMMER2_STATE_INSERTED);
                state->txcmd |= HAMMER2_MSGF_DELETE;
                if (state->rxcmd & HAMMER2_MSGF_DELETE) {
                        if (state->msg == msg)
                                state->msg = NULL;
                        assert(state->flags & HAMMER2_STATE_INSERTED);
-                       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+                       if (state->txcmd & HAMMER2_MSGF_REPLY) {
+                               assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->staterd_tree, state);
                        } else {
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->staterd_tree, state);
                        } else {
+                               assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->statewr_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
                                RB_REMOVE(hammer2_state_tree,
                                          &iocom->statewr_tree, state);
                        }
                        state->flags &= ~HAMMER2_STATE_INSERTED;
-                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
                        hammer2_state_free(state);
                } else {
                        hammer2_state_free(state);
                } else {
-                       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+                       ;
                }
                }
+               pthread_mutex_unlock(&iocom->mtx);
                hammer2_msg_free(iocom, msg);
        } else if (state->msg != msg) {
                hammer2_msg_free(iocom, msg);
        }
 }
 
                hammer2_msg_free(iocom, msg);
        } else if (state->msg != msg) {
                hammer2_msg_free(iocom, msg);
        }
 }
 
+/*
+ * Called with iocom locked
+ */
 void
 hammer2_state_free(hammer2_state_t *state)
 {
        hammer2_iocom_t *iocom = state->iocom;
        hammer2_msg_t *msg;
 void
 hammer2_state_free(hammer2_state_t *state)
 {
        hammer2_iocom_t *iocom = state->iocom;
        hammer2_msg_t *msg;
+       char dummy;
+
+       fprintf(stderr, "STATE FREE %p\n", state);
 
 
+       assert(state->any.any == NULL);
        msg = state->msg;
        state->msg = NULL;
        if (msg)
        msg = state->msg;
        state->msg = NULL;
        if (msg)
-               hammer2_msg_free(iocom, msg);
+               hammer2_msg_free_locked(iocom, msg);
        free(state);
        free(state);
+
+       /*
+        * When an iocom error is present we are trying to close down the
+        * iocom, but we have to wait for all states to terminate before
+        * we can do so.  The iocom rx code will terminate the receive side
+        * for all transactions by simulating incoming DELETE messages,
+        * but the state doesn't go away until both sides are terminated.
+        *
+        * We may have to wake up the rx code.
+        */
+       if (iocom->ioq_rx.error &&
+           RB_EMPTY(&iocom->staterd_tree) &&
+           RB_EMPTY(&iocom->statewr_tree)) {
+               dummy = 0;
+               write(iocom->wakeupfds[1], &dummy, 1);
+       }
 }
 
 /*
 }
 
 /*
index b689880..5b06ce6 100644 (file)
@@ -67,7 +67,7 @@
  * create a huge mess, so we have to aggregate all received LNK_SPAN
  * transactions, sort them by the fsid (the cluster) and sub-sort them by
  * the pfs_fsid (individual nodes in the cluster), and only retransmit
  * create a huge mess, so we have to aggregate all received LNK_SPAN
  * transactions, sort them by the fsid (the cluster) and sub-sort them by
  * the pfs_fsid (individual nodes in the cluster), and only retransmit
- * (create outgoing transactions) for a subset of the nearest weighted-hops
+ * (create outgoing transactions) for a subset of the nearest distance-hops
  * for each individual node.
  *
  * The higher level protocols can then issue transactions to the nodes making
  * for each individual node.
  *
  * The higher level protocols can then issue transactions to the nodes making
 /*
  * RED-BLACK TREE DEFINITIONS
  *
 /*
  * RED-BLACK TREE DEFINITIONS
  *
- * We need to track
+ * We need to track:
  *
  * (1) shared fsid's (a cluster).
  * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
  *
  * (1) shared fsid's (a cluster).
  * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
  *                       are indexed into (so we can propagate changes).
  *
  *                       The h2span_link's use a red-black tree to sort the
  *                       are indexed into (so we can propagate changes).
  *
  *                       The h2span_link's use a red-black tree to sort the
- *                       weighted hop metric for the incoming LNK_SPAN.  We
+ *                       distance hop metric for the incoming LNK_SPAN.  We
  *                       then select the top N for outgoing.  When the
  *                       topology changes the top N may also change and cause
  *                       new outgoing LNK_SPAN transactions to be opened
  *                       then select the top N for outgoing.  When the
  *                       topology changes the top N may also change and cause
  *                       new outgoing LNK_SPAN transactions to be opened
@@ -202,7 +202,7 @@ struct h2span_cluster {
        uuid_t  pfs_clid;               /* shared fsid */
 };
 
        uuid_t  pfs_clid;               /* shared fsid */
 };
 
-struct h2span_node  {
+struct h2span_node {
        RB_ENTRY(h2span_node) rbnode;
        struct h2span_link_tree tree;
        struct h2span_cluster *cls;
        RB_ENTRY(h2span_node) rbnode;
        struct h2span_link_tree tree;
        struct h2span_cluster *cls;
@@ -213,7 +213,7 @@ struct h2span_link {
        RB_ENTRY(h2span_link) rbnode;
        hammer2_state_t *state;         /* state<->link */
        struct h2span_node *node;       /* related node */
        RB_ENTRY(h2span_link) rbnode;
        hammer2_state_t *state;         /* state<->link */
        struct h2span_node *node;       /* related node */
-       int32_t weight;
+       int32_t dist;
        struct h2span_relay_queue relayq; /* relay out */
 };
 
        struct h2span_relay_queue relayq; /* relay out */
 };
 
@@ -261,9 +261,9 @@ static
 int
 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
 {
 int
 h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
 {
-       if (link1->weight < link2->weight)
+       if (link1->dist < link2->dist)
                return(-1);
                return(-1);
-       if (link1->weight > link2->weight)
+       if (link1->dist > link2->dist)
                return(1);
        if ((intptr_t)link1 < (intptr_t)link2)
                return(-1);
                return(1);
        if ((intptr_t)link1 < (intptr_t)link2)
                return(-1);
@@ -272,13 +272,26 @@ h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
        return(0);
 }
 
        return(0);
 }
 
+/*
+ * Relay entries are sorted by node, subsorted by distance and link
+ * address (so we can match up the conn->tree relay topology with
+ * a node's link topology).
+ */
 static
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
 static
 int
 h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 {
-       if ((intptr_t)relay1->state < (intptr_t)relay2->state)
+       if ((intptr_t)relay1->link->node < (intptr_t)relay2->link->node)
+               return(-1);
+       if ((intptr_t)relay1->link->node > (intptr_t)relay2->link->node)
+               return(1);
+       if ((intptr_t)relay1->link->dist < (intptr_t)relay2->link->dist)
                return(-1);
                return(-1);
-       if ((intptr_t)relay1->state > (intptr_t)relay2->state)
+       if ((intptr_t)relay1->link->dist > (intptr_t)relay2->link->dist)
+               return(1);
+       if ((intptr_t)relay1->link < (intptr_t)relay2->link)
+               return(-1);
+       if ((intptr_t)relay1->link > (intptr_t)relay2->link)
                return(1);
        return(0);
 }
                return(1);
        return(0);
 }
@@ -310,7 +323,9 @@ static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
 
 static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
 static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
 
 static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
 static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
-static void hammer2_lnk_conn_update(h2span_connect_t *conn);
+static void hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_relay_scan(h2span_node_t *node);
+static void hammer2_relay_delete(h2span_relay_t *relay);
 
 /*
  * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
 
 /*
  * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
@@ -348,6 +363,7 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
        /*
         * On transaction start we allocate a new h2span_connect and
         * acknowledge the request, leaving the transaction open.
        /*
         * On transaction start we allocate a new h2span_connect and
         * acknowledge the request, leaving the transaction open.
+        * We then relay priority-selected SPANs.
         */
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                state->func = hammer2_lnk_conn;
         */
        if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                state->func = hammer2_lnk_conn;
@@ -366,7 +382,6 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
                state->any.conn = conn;
                TAILQ_INSERT_TAIL(&connq, conn, entry);
 
                state->any.conn = conn;
                TAILQ_INSERT_TAIL(&connq, conn, entry);
 
-               hammer2_lnk_conn_update(conn);
                hammer2_msg_result(state->iocom, msg, 0);
        }
 
                hammer2_msg_result(state->iocom, msg, 0);
        }
 
@@ -378,19 +393,13 @@ hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
                fprintf(stderr, "LNK_CONN: Terminated\n");
                conn = state->any.conn;
                assert(conn);
                fprintf(stderr, "LNK_CONN: Terminated\n");
                conn = state->any.conn;
                assert(conn);
+
+               /*
+                * Clean out all relays.  This requires terminating each
+                * relay transaction.
+                */
                while ((relay = RB_ROOT(&conn->tree)) != NULL) {
                while ((relay = RB_ROOT(&conn->tree)) != NULL) {
-                       RB_REMOVE(h2span_relay_tree, &conn->tree, relay);
-                       TAILQ_REMOVE(&relay->link->relayq, relay, entry);
-
-                       if (relay->state) {
-                               relay->state->any.relay = NULL;
-                               hammer2_state_reply(relay->state, 0);
-                               /* state invalid after reply */
-                               relay->state = NULL;
-                       }
-                       relay->conn = NULL;
-                       relay->link = NULL;
-                       hammer2_free(relay);
+                       hammer2_relay_delete(relay);
                }
 
                /*
                }
 
                /*
@@ -462,15 +471,14 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
                 */
                assert(state->any.link == NULL);
                slink = hammer2_alloc(sizeof(*slink));
                 */
                assert(state->any.link == NULL);
                slink = hammer2_alloc(sizeof(*slink));
+               TAILQ_INIT(&slink->relayq);
                slink->node = node;
                slink->node = node;
-               slink->weight = msg->any.lnk_span.weight;
+               slink->dist = msg->any.lnk_span.dist;
                slink->state = state;
                state->any.link = slink;
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
                slink->state = state;
                state->any.link = slink;
                RB_INSERT(h2span_link_tree, &node->tree, slink);
 
-               /*
-                * Now filter and relay the span to all other iocoms. XXX
-                */
+               hammer2_relay_scan(node);
        }
 
        /*
        }
 
        /*
@@ -483,21 +491,11 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
                cls = node->cls;
 
                /*
                cls = node->cls;
 
                /*
-                * Clean out all relays
+                * Clean out all relays.  This requires terminating each
+                * relay transaction.
                 */
                while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
                 */
                while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
-                       RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
-                       TAILQ_REMOVE(&slink->relayq, relay, entry);
-
-                       if (relay->state) {
-                               relay->state->any.relay = NULL;
-                               hammer2_state_reply(relay->state, 0);
-                               /* state invalid after reply */
-                               relay->state = NULL;
-                       }
-                       relay->conn = NULL;
-                       relay->link = NULL;
-                       hammer2_free(relay);
+                       hammer2_relay_delete(relay);
                }
 
                /*
                }
 
                /*
@@ -513,20 +511,238 @@ hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
                        }
                        node->cls = NULL;
                        hammer2_free(node);
                        }
                        node->cls = NULL;
                        hammer2_free(node);
+                       node = NULL;
                }
                state->any.link = NULL;
                slink->state = NULL;
                slink->node = NULL;
                hammer2_free(slink);
                }
                state->any.link = NULL;
                slink->state = NULL;
                slink->node = NULL;
                hammer2_free(slink);
+
+               /*
+                * We have to terminate the transaction
+                */
+               hammer2_state_reply(state, 0);
+               /* state invalid after reply */
+
+               /*
+                * If the node still exists issue any required updates.  If
+                * it doesn't then all related relays have already been
+                * removed and there's nothing left to do.
+                */
+               if (node)
+                       hammer2_relay_scan(node);
        }
 
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 /*
        }
 
        pthread_mutex_unlock(&cluster_mtx);
 }
 
 /*
- * Initiate/Update the relayed spans associated with a connection.
+ * Messages received on relay SPANs.  These are open transactions so it is
+ * in fact possible for the other end to close the transaction.
+ *
+ * XXX MPRACE on state structure
+ */
+static void
+hammer2_lnk_relay(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+       h2span_relay_t *relay;
+
+       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               pthread_mutex_lock(&cluster_mtx);
+               if ((relay = state->any.relay) != NULL) {
+                       hammer2_relay_delete(relay);
+               } else {
+                       hammer2_state_reply(state, 0);
+               }
+               pthread_mutex_unlock(&cluster_mtx);
+       }
+}
+
+/*
+ * Update relay transactions for SPANs.
+ *
+ * Called with cluster_mtx held.
+ */
+static void hammer2_relay_scan_conn(h2span_node_t *node,
+                               h2span_connect_t *conn);
+
+static void
+hammer2_relay_scan(h2span_node_t *node)
+{
+       h2span_cluster_t *cls;
+       h2span_connect_t *conn;
+
+       if (node) {
+               /*
+                * Iterate specific node
+                */
+               TAILQ_FOREACH(conn, &connq, entry)
+                       hammer2_relay_scan_conn(node, conn);
+       } else {
+               /*
+                * Full iteration (not currently implemented)
+                *
+                * Iterate cluster ids
+                */
+               assert(0);
+               RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+                       /*
+                        * Iterate node ids
+                        */
+                       RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+                               /*
+                                * Synchronize the node's link (received SPANs)
+                                * with each connection's relays.
+                                */
+                               TAILQ_FOREACH(conn, &connq, entry)
+                                       hammer2_relay_scan_conn(node, conn);
+                       }
+               }
+       }
+}
+
+/*
+ * Update the relay'd SPANs for this (node, conn).
+ *
+ * Iterate links and adjust relays to match.  We only propagate the top link
+ * for now (XXX we want to propagate the top two).
+ *
+ * The hammer2_relay_scan_cmp() function locates the first relay element
+ * for any given node.  The relay elements will be sub-sorted by dist.
  */
  */
+struct relay_scan_info {
+       h2span_node_t *node;
+       h2span_relay_t *relay;
+};
+
+static int
+hammer2_relay_scan_cmp(h2span_relay_t *relay, void *arg)
+{
+       struct relay_scan_info *info = arg;
+
+       if ((intptr_t)relay->link->node < (intptr_t)info->node)
+               return(-1);
+       if ((intptr_t)relay->link->node > (intptr_t)info->node)
+               return(1);
+       return(0);
+}
+
+static int
+hammer2_relay_scan_callback(h2span_relay_t *relay, void *arg)
+{
+       struct relay_scan_info *info = arg;
+
+       info->relay = relay;
+       return(-1);
+}
+
 static void
 static void
-hammer2_lnk_conn_update(h2span_connect_t *conn __unused)
+hammer2_relay_scan_conn(h2span_node_t *node, h2span_connect_t *conn)
 {
 {
+       struct relay_scan_info info;
+       h2span_relay_t *relay;
+       h2span_relay_t *next_relay;
+       h2span_link_t *slink;
+       int count = 2;
+
+       info.node = node;
+       info.relay = NULL;
+
+       /*
+        * Locate the first related relay for the connection.  relay will
+        * be NULL if there were none.
+        */
+       RB_SCAN(h2span_relay_tree, &conn->tree,
+               hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
+       relay = info.relay;
+
+       fprintf(stderr, "relay scan for connection %p\n", conn);
+
+       /*
+        * Iterate the node's links (received SPANs) in distance order,
+        * lowest (best) dist first.
+        */
+       RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+               /*
+                * PROPAGATE THE BEST RELAYS BY TRANSMITTING SPANs.
+                *
+                * Check for match against current best relay.
+                *
+                * A match failure means that the current best relay is not
+                * as good as the link, create a new relay for the link.
+                *
+                * (If some prior better link was removed it would have also
+                *  removed the relay, so the relay can only match exactly or
+                *  be worst).
+                */
+               info.relay = relay;
+               if (relay == NULL || relay->link != slink) {
+                       hammer2_msg_t *msg;
+
+                       assert(relay == NULL ||
+                              slink->dist <= relay->link->dist);
+                       relay = hammer2_alloc(sizeof(*relay));
+                       relay->conn = conn;
+                       relay->link = slink;
+
+                       msg = hammer2_msg_alloc(conn->state->iocom, 0,
+                                               HAMMER2_LNK_SPAN |
+                                               HAMMER2_MSGF_CREATE);
+                       msg->any.lnk_span = slink->state->msg->any.lnk_span;
+                       ++msg->any.lnk_span.dist; /* XXX add weighting */
+
+                       hammer2_msg_write(conn->state->iocom, msg,
+                                         hammer2_lnk_relay, relay,
+                                         &relay->state);
+                       fprintf(stderr, "RELAY SPAN ON CLS=%p NODE=%p FD %d state %p\n",
+                               node->cls, node,
+                               conn->state->iocom->sock_fd, relay->state);
+
+                       RB_INSERT(h2span_relay_tree, &conn->tree, relay);
+                       TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
+               }
+
+               /*
+                * Iterate, figure out the next relay.
+                */
+               relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               if (--count == 0) {
+                       break;
+                       continue;
+               }
+       }
+
+       /*
+        * Any remaining relay's belonging to this connection which match
+        * the node are in excess of the current aggregate spanning state
+        * and should be removed.
+        */
+       while (relay && relay->link->node == node) {
+               next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               hammer2_relay_delete(relay);
+               relay = next_relay;
+       }
+}
+
+static
+void
+hammer2_relay_delete(h2span_relay_t *relay)
+{
+       fprintf(stderr, "RELAY DELETE ON CLS=%p NODE=%p FD %d STATE %p\n",
+               relay->link->node->cls, relay->link->node,
+               relay->conn->state->iocom->sock_fd, relay->state);
+       fprintf(stderr, "RELAY TX %08x RX %08x\n", relay->state->txcmd, relay->state->rxcmd);
+
+       RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
+       TAILQ_REMOVE(&relay->link->relayq, relay, entry);
+
+       if (relay->state) {
+               relay->state->any.relay = NULL;
+               hammer2_state_reply(relay->state, 0);
+               /* state invalid after reply */
+               relay->state = NULL;
+       }
+       relay->conn = NULL;
+       relay->link = NULL;
+       hammer2_free(relay);
 }
 }
index a51ff86..39c7689 100644 (file)
@@ -234,12 +234,15 @@ struct hammer2_iocom {
        void    (*altmsg_callback)(struct hammer2_iocom *);
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
        void    (*altmsg_callback)(struct hammer2_iocom *);
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
+       int     wakeupfds[2];                   /* pipe wakes up iocom thread */
        int     flags;
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state_tree statewr_tree; /* active messages */
        int     flags;
        int     rxmisc;
        int     txmisc;
        char    sess[HAMMER2_AES_KEY_SIZE];     /* aes_256_cbc key */
        struct hammer2_state_tree staterd_tree; /* active messages */
        struct hammer2_state_tree statewr_tree; /* active messages */
+       hammer2_msg_queue_t txmsgq;             /* tx msgq from remote */
+       pthread_mutex_t mtx;                    /* mutex for state*tree/rmsgq */
 };
 
 typedef struct hammer2_iocom hammer2_iocom_t;
 };
 
 typedef struct hammer2_iocom hammer2_iocom_t;
@@ -247,6 +250,9 @@ typedef struct hammer2_iocom hammer2_iocom_t;
 #define HAMMER2_IOCOMF_EOF     0x00000001      /* EOF or ERROR on desc */
 #define HAMMER2_IOCOMF_RREQ    0x00000002      /* request read-data event */
 #define HAMMER2_IOCOMF_WREQ    0x00000004      /* request write-avail event */
 #define HAMMER2_IOCOMF_EOF     0x00000001      /* EOF or ERROR on desc */
 #define HAMMER2_IOCOMF_RREQ    0x00000002      /* request read-data event */
 #define HAMMER2_IOCOMF_WREQ    0x00000004      /* request write-avail event */
-#define HAMMER2_IOCOMF_WIDLE   0x00000008      /* request write-avail event */
-#define HAMMER2_IOCOMF_SIGNAL  0x00000010
-#define HAMMER2_IOCOMF_CRYPTED 0x00000020      /* encrypt enabled */
+#define HAMMER2_IOCOMF_RWORK   0x00000008      /* immediate work pending */
+#define HAMMER2_IOCOMF_WWORK   0x00000010      /* immediate work pending */
+#define HAMMER2_IOCOMF_PWORK   0x00000020      /* immediate work pending */
+#define HAMMER2_IOCOMF_ARWORK  0x00000040      /* immediate work pending */
+#define HAMMER2_IOCOMF_AWWORK  0x00000080      /* immediate work pending */
+#define HAMMER2_IOCOMF_CRYPTED 0x00000100      /* encrypt enabled */