Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository...
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 11 Aug 2012 21:56:01 +0000 (14:56 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sat, 11 Aug 2012 21:56:01 +0000 (14:56 -0700)
1  2 
sbin/dhclient/dhclient.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c

@@@ -170,15 -170,15 +170,26 @@@ routehandler(void
        struct iaddr a;
        ssize_t n;
  
++      /*
++       * Read one message non-blocking, ignore signals.
++       */
        do {
                n = read(routefd, &msg, sizeof(msg));
        } while (n == -1 && errno == EINTR);
  
++      /*
++       * No operation if no messages ready or the message is
++       * not sized properly.
++       */
        rtm = (struct rt_msghdr *)msg;
        if (n < sizeof(rtm->rtm_msglen) || n < rtm->rtm_msglen ||
--          rtm->rtm_version != RTM_VERSION)
++          rtm->rtm_version != RTM_VERSION) {
                return;
++      }
  
++      /*
++       * Process the message.
++       */
        switch (rtm->rtm_type) {
        case RTM_NEWADDR:
                ifam = (struct ifa_msghdr *)rtm;
                if (findproto((char *)(ifam + 1), ifam->ifam_addrs) != AF_INET)
                        break;
                sa = get_ifa((char *)(ifam + 1), ifam->ifam_addrs);
--              if (sa == NULL)
++              if (sa == NULL) {
++                      warning("RTM_NEWADDR: No IFA");
                        goto die;
++              }
  
                if ((a.len = sizeof(struct in_addr)) > sizeof(a.iabuf))
                        error("king bula sez: len mismatch");
                        /* new addr is the one we set */
                        break;
  
++              warning("RTM_NEWADDR: Our IFA not found");
                goto die;
        case RTM_DELADDR:
                ifam = (struct ifa_msghdr *)rtm;
                        break;
                if (scripttime == 0 || t < scripttime + 10)
                        break;
++              warning("RTM_DELADDR: Third-party %d", (int)(t - scripttime));
                goto die;
        case RTM_IFINFO:
                ifm = (struct if_msghdr *)rtm;
                if (ifm->ifm_index != ifi->index)
                        break;
--              if ((rtm->rtm_flags & RTF_UP) == 0)
++              if ((rtm->rtm_flags & RTF_UP) == 0) {
++                      warning("RTM_IFINFO: Interface not up");
                        goto die;
++              }
  
                linkstat =
                    LINK_STATE_IS_UP(ifm->ifm_data.ifi_link_state) ? 1 : 0;
        case RTM_IFANNOUNCE:
                ifan = (struct if_announcemsghdr *)rtm;
                if (ifan->ifan_what == IFAN_DEPARTURE &&
--                  ifan->ifan_index == ifi->index)
++                  ifan->ifan_index == ifi->index) {
++                      warning("RTM_IFANNOUNCE: Interface departure");
                        goto die;
++              }
                break;
        default:
                break;
index 82ca26b,0000000..7564773
mode 100644,000000..100644
--- /dev/null
@@@ -1,280 -1,0 +1,349 @@@
-               pthread_create(&thread, NULL,
-                              master_service, (void *)(intptr_t)fd);
 +/*
 + * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
 + *
 + * This code is derived from software contributed to The DragonFly Project
 + * by Matthew Dillon <dillon@dragonflybsd.org>
 + * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
 + *
 + * Redistribution and use in source and binary forms, with or without
 + * modification, are permitted provided that the following conditions
 + * are met:
 + *
 + * 1. Redistributions of source code must retain the above copyright
 + *    notice, this list of conditions and the following disclaimer.
 + * 2. Redistributions in binary form must reproduce the above copyright
 + *    notice, this list of conditions and the following disclaimer in
 + *    the documentation and/or other materials provided with the
 + *    distribution.
 + * 3. Neither the name of The DragonFly Project nor the names of its
 + *    contributors may be used to endorse or promote products derived
 + *    from this software without specific, prior written permission.
 + *
 + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
 + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
 + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 + * SUCH DAMAGE.
 + */
 +
 +#include "hammer2.h"
 +
 +static void *master_accept(void *data);
 +static void master_auth_signal(hammer2_router_t *router);
 +static void master_auth_rxmsg(hammer2_msg_t *msg);
 +static void master_link_signal(hammer2_router_t *router);
 +static void master_link_rxmsg(hammer2_msg_t *msg);
++static void master_reconnect(const char *mntpt);
 +
 +/*
 + * Start-up the master listener daemon for the machine.
 + *
 + * The master listener serves as a rendezvous point in the cluster, accepting
 + * connections, performing registrations and authentications, maintaining
 + * the spanning tree, and keeping track of message state so disconnects can
 + * be handled properly.
 + *
 + * Once authenticated only low-level messaging protocols (which includes
 + * tracking persistent messages) are handled by this daemon.  This daemon
 + * does not run the higher level quorum or locking protocols.
 + *
 + * This daemon can also be told to maintain connections to other nodes,
 + * forming a messaging backbone, which in turn allows PFS's (if desired) to
 + * simply connect to the master daemon via localhost if desired.
 + * Backbones are specified via /etc/hammer2.conf.
 + */
 +int
 +cmd_service(void)
 +{
 +      struct sockaddr_in lsin;
 +      int on;
 +      int lfd;
 +
 +      /*
 +       * Acquire socket and set options
 +       */
 +      if ((lfd = socket(AF_INET, SOCK_STREAM, 0)) < 0) {
 +              fprintf(stderr, "master_listen: socket(): %s\n",
 +                      strerror(errno));
 +              return 1;
 +      }
 +      on = 1;
 +      setsockopt(lfd, SOL_SOCKET, SO_REUSEADDR, &on, sizeof(on));
 +
 +      /*
 +       * Setup listen port and try to bind.  If the bind fails we assume
 +       * that a master listener process is already running and silently
 +       * fail.
 +       */
 +      bzero(&lsin, sizeof(lsin));
 +      lsin.sin_family = AF_INET;
 +      lsin.sin_addr.s_addr = INADDR_ANY;
 +      lsin.sin_port = htons(HAMMER2_LISTEN_PORT);
 +      if (bind(lfd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
 +              close(lfd);
 +              if (QuietOpt == 0) {
 +                      fprintf(stderr,
 +                              "master listen: daemon already running\n");
 +              }
 +              return 0;
 +      }
 +      if (QuietOpt == 0)
 +              fprintf(stderr, "master listen: startup\n");
 +      listen(lfd, 50);
 +
 +      /*
 +       * Fork and disconnect the controlling terminal and parent process,
 +       * executing the specified function as a pthread.
 +       *
 +       * Returns to the original process which can then continue running.
 +       * In debug mode this call will create the pthread without forking
 +       * and set NormalExit to 0, instead of fork.
 +       */
 +      hammer2_demon(master_accept, (void *)(intptr_t)lfd);
 +      if (NormalExit)
 +              close(lfd);
 +      return 0;
 +}
 +
 +/*
 + * Master listen/accept thread.  Accept connections on the master socket,
 + * starting a pthread for each one.
 + */
 +static
 +void *
 +master_accept(void *data)
 +{
 +      struct sockaddr_in asin;
 +      socklen_t alen;
 +      pthread_t thread;
++      hammer2_master_service_info_t *info;
 +      int lfd = (int)(intptr_t)data;
 +      int fd;
++      int i;
++      int count;
++      struct statfs *mntbuf = NULL;
++      struct statvfs *mntvbuf = NULL;
 +
 +      /*
 +       * Nobody waits for us
 +       */
 +      setproctitle("hammer2 master listen");
 +      pthread_detach(pthread_self());
 +
 +      /*
++       * Scan existing hammer2 mounts and reconnect to them using
++       * HAMMER2IOC_RECLUSTER.
++       */
++      count = getmntvinfo(&mntbuf, &mntvbuf, MNT_NOWAIT);
++      for (i = 0; i < count; ++i) {
++              if (strcmp(mntbuf[i].f_fstypename, "hammer2") == 0)
++                      master_reconnect(mntbuf[i].f_mntonname);
++      }
++
++      /*
 +       * Accept connections and create pthreads to handle them after
 +       * validating the IP.
 +       */
 +      for (;;) {
 +              alen = sizeof(asin);
 +              fd = accept(lfd, (struct sockaddr *)&asin, &alen);
 +              if (fd < 0) {
 +                      if (errno == EINTR)
 +                              continue;
 +                      break;
 +              }
 +              thread = NULL;
 +              fprintf(stderr, "master_accept: accept fd %d\n", fd);
-       int fd;
++              info = malloc(sizeof(*info));
++              bzero(info, sizeof(*info));
++              info->fd = fd;
++              info->detachme = 1;
++              pthread_create(&thread, NULL, master_service, info);
 +      }
 +      return (NULL);
 +}
 +
 +/*
++ * Normally the mount program supplies a cluster communications
++ * descriptor to the hammer2 vfs on mount, but if you kill the service
++ * daemon and restart it that link will be lost.
++ *
++ * This procedure attempts to [re]connect to existing mounts when
++ * the service daemon is started up before going into its accept
++ * loop.
++ */
++static
++void
++master_reconnect(const char *mntpt)
++{
++      struct hammer2_ioc_recluster recls;
++      hammer2_master_service_info_t *info;
++      pthread_t thread;
++      int fd;
++      int pipefds[2];
++
++      fd = open(mntpt, O_RDONLY);
++      if (fd < 0) {
++              fprintf(stderr, "reconnect %s: no access to mount\n", mntpt);
++              return;
++      }
++      if (pipe(pipefds) < 0) {
++              fprintf(stderr, "reconnect %s: pipe() failed\n", mntpt);
++              return;
++      }
++      bzero(&recls, sizeof(recls));
++      recls.fd = pipefds[0];
++      if (ioctl(fd, HAMMER2IOC_RECLUSTER, &recls) < 0) {
++              fprintf(stderr, "reconnect %s: ioctl failed\n", mntpt);
++              close(pipefds[0]);
++              close(pipefds[1]);
++              close(fd);
++              return;
++      }
++      close(pipefds[0]);
++
++      info = malloc(sizeof(*info));
++      bzero(info, sizeof(*info));
++      info->fd = pipefds[1];
++      info->detachme = 1;
++      pthread_create(&thread, NULL, master_service, info);
++}
++
++/*
 + * Service an accepted connection (runs as a pthread)
 + *
 + * (also called from a couple of other places)
 + */
 +void *
 +master_service(void *data)
 +{
++      hammer2_master_service_info_t *info = data;
 +      hammer2_iocom_t iocom;
-       fd = (int)(intptr_t)data;
-       hammer2_iocom_init(&iocom, fd, -1,
 +
-               fd, iocom.ioq_rx.error, iocom.ioq_tx.error);
-       close(fd);
++      if (info->detachme)
++              pthread_detach(pthread_self());
++
++      hammer2_iocom_init(&iocom, info->fd, -1,
 +                         master_auth_signal,
 +                         master_auth_rxmsg,
 +                         NULL);
 +      hammer2_iocom_core(&iocom);
 +
 +      fprintf(stderr,
 +              "iocom on fd %d terminated error rx=%d, tx=%d\n",
++              info->fd, iocom.ioq_rx.error, iocom.ioq_tx.error);
++      close(info->fd);
++      info->fd = -1;  /* safety */
++      free(info);
 +
 +      return (NULL);
 +}
 +
 +/************************************************************************
 + *                        AUTHENTICATION                              *
 + ************************************************************************
 + *
 + * Callback via hammer2_iocom_core().
 + *
 + * Additional messaging-based authentication must occur before normal
 + * message operation.  The connection has already been encrypted at
 + * this point.
 + */
 +static void master_auth_conn_rx(hammer2_msg_t *msg);
 +
 +static
 +void
 +master_auth_signal(hammer2_router_t *router)
 +{
 +      hammer2_msg_t *msg;
 +
 +      /*
 +       * Transmit LNK_CONN, enabling the SPAN protocol if both sides
 +       * agree.
 +       *
 +       * XXX put additional authentication states here
 +       */
 +      msg = hammer2_msg_alloc(router, 0, HAMMER2_LNK_CONN |
 +                                                 HAMMER2_MSGF_CREATE,
 +                              master_auth_conn_rx, NULL);
 +      snprintf(msg->any.lnk_conn.label, sizeof(msg->any.lnk_conn.label), "*");
 +      hammer2_msg_write(msg);
 +
 +      hammer2_router_restate(router,
 +                            master_link_signal,
 +                            master_link_rxmsg,
 +                            NULL);
 +}
 +
 +static
 +void
 +master_auth_conn_rx(hammer2_msg_t *msg)
 +{
 +      if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
 +              hammer2_msg_reply(msg, 0);
 +}
 +
 +static
 +void
 +master_auth_rxmsg(hammer2_msg_t *msg __unused)
 +{
 +}
 +
 +/************************************************************************
 + *                    POST-AUTHENTICATION SERVICE MSGS                *
 + ************************************************************************
 + *
 + * Callback via hammer2_iocom_core().
 + */
 +static
 +void
 +master_link_signal(hammer2_router_t *router)
 +{
 +      hammer2_msg_lnk_signal(router);
 +}
 +
 +static
 +void
 +master_link_rxmsg(hammer2_msg_t *msg)
 +{
 +      hammer2_state_t *state;
 +      uint32_t cmd;
 +
 +      /*
 +       * If the message state has a function established we just
 +       * call the function, otherwise we call the appropriate
 +       * link-level protocol related to the original command and
 +       * let it sort it out.
 +       *
 +       * Non-transactional one-off messages, on the otherhand,
 +       * might have REPLY set.
 +       */
 +      state = msg->state;
 +      cmd = state ? state->msg->any.head.cmd : msg->any.head.cmd;
 +
 +      fprintf(stderr, "service-receive: %s\n", hammer2_msg_str(msg));
 +
 +      if (state && state->func) {
 +              assert(state->func != NULL);
 +              state->func(msg);
 +      } else {
 +              switch(cmd & HAMMER2_MSGF_PROTOS) {
 +              case HAMMER2_MSG_PROTO_LNK:
 +                      hammer2_msg_lnk(msg);
 +                      break;
 +              case HAMMER2_MSG_PROTO_DBG:
 +                      hammer2_msg_dbg(msg);
 +                      break;
 +              default:
 +                      hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
 +                      break;
 +              }
 +      }
 +}
index 5c91865,0000000..ddcbc08
mode 100644,000000..100644
--- /dev/null
@@@ -1,196 -1,0 +1,203 @@@
 +/*
 + * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
 + *
 + * This code is derived from software contributed to The DragonFly Project
 + * by Matthew Dillon <dillon@dragonflybsd.org>
 + * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
 + *
 + * Redistribution and use in source and binary forms, with or without
 + * modification, are permitted provided that the following conditions
 + * are met:
 + *
 + * 1. Redistributions of source code must retain the above copyright
 + *    notice, this list of conditions and the following disclaimer.
 + * 2. Redistributions in binary form must reproduce the above copyright
 + *    notice, this list of conditions and the following disclaimer in
 + *    the documentation and/or other materials provided with the
 + *    distribution.
 + * 3. Neither the name of The DragonFly Project nor the names of its
 + *    contributors may be used to endorse or promote products derived
 + *    from this software without specific, prior written permission.
 + *
 + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
 + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
 + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 + * SUCH DAMAGE.
 + */
 +
 +/*
 + * Rollup headers for hammer2 utility
 + */
 +#include <sys/types.h>
 +#include <sys/uio.h>
 +#include <sys/mount.h>
 +#include <sys/file.h>
 +#include <sys/socket.h>
 +#include <sys/time.h>
 +#include <sys/wait.h>
 +#include <sys/tty.h>
 +#include <sys/endian.h>
 +
 +#include <netinet/in.h>
 +#include <netinet/tcp.h>
 +#include <arpa/inet.h>
 +#include <netdb.h>
 +
 +#include <vfs/hammer2/hammer2_disk.h>
 +#include <vfs/hammer2/hammer2_mount.h>
 +#include <vfs/hammer2/hammer2_ioctl.h>
 +#include <vfs/hammer2/hammer2_network.h>
 +
 +#include <stdio.h>
 +#include <stdlib.h>
 +#include <stdarg.h>
 +#include <stddef.h>
 +
 +#include <errno.h>
 +#include <fcntl.h>
 +#include <signal.h>
 +#include <string.h>
 +#include <unistd.h>
 +#include <ctype.h>
 +#include <uuid.h>
 +#include <assert.h>
 +#include <pthread.h>
 +#include <poll.h>
 +
 +#include <libutil.h>
 +
 +#include "network.h"
 +
 +#define HAMMER2_DEFAULT_DIR   "/etc/hammer2"
 +#define HAMMER2_PATH_REMOTE   HAMMER2_DEFAULT_DIR "/remote"
 +
 +struct hammer2_idmap {
 +      struct hammer2_idmap *next;
 +      uint32_t ran_beg;       /* inclusive */
 +      uint32_t ran_end;       /* inclusive */
 +};
 +
 +typedef struct hammer2_idmap hammer2_idmap_t;
 +
++struct hammer2_master_service_info {
++      int     fd;
++      int     detachme;
++};
++
++typedef struct hammer2_master_service_info hammer2_master_service_info_t;
++
 +extern int DebugOpt;
 +extern int VerboseOpt;
 +extern int QuietOpt;
 +extern int NormalExit;
 +
 +/*
 + * Hammer2 command APIs
 + */
 +int hammer2_ioctl_handle(const char *sel_path);
 +void hammer2_demon(void *(*func)(void *), void *arg);
 +
 +int cmd_remote_connect(const char *sel_path, const char *url);
 +int cmd_remote_disconnect(const char *sel_path, const char *url);
 +int cmd_remote_status(const char *sel_path, int all_opt);
 +
 +int cmd_pfs_getid(const char *sel_path, const char *name, int privateid);
 +int cmd_pfs_list(const char *sel_path);
 +int cmd_pfs_create(const char *sel_path, const char *name,
 +                      uint8_t pfs_type, const char *uuid_str);
 +int cmd_pfs_delete(const char *sel_path, const char *name);
 +
 +int cmd_service(void);
 +int cmd_stat(int ac, const char **av);
 +int cmd_leaf(const char *sel_path);
 +int cmd_shell(const char *hostname);
 +int cmd_debugspan(const char *hostname);
 +int cmd_show(const char *devpath);
 +int cmd_rsainit(const char *dir_path);
 +int cmd_rsaenc(const char **keys, int nkeys);
 +int cmd_rsadec(const char **keys, int nkeys);
 +
 +/*
 + * Msg support functions
 + */
 +void hammer2_bswap_head(hammer2_msg_hdr_t *head);
 +void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 +void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 +void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
 +                      void (*state_func)(hammer2_router_t *),
 +                      void (*rcvmsg_func)(hammer2_msg_t *),
 +                      void (*altmsg_func)(hammer2_iocom_t *));
 +void hammer2_router_restate(hammer2_router_t *router,
 +                      void (*state_func)(hammer2_router_t *),
 +                      void (*rcvmsg_func)(hammer2_msg_t *),
 +                      void (*altmsg_func)(hammer2_iocom_t *));
 +void hammer2_router_signal(hammer2_router_t *router);
 +void hammer2_iocom_done(hammer2_iocom_t *iocom);
 +hammer2_msg_t *hammer2_msg_alloc(hammer2_router_t *router,
 +                      size_t aux_size, uint32_t cmd,
 +                      void (*func)(hammer2_msg_t *), void *data);
 +void hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error);
 +void hammer2_msg_result(hammer2_msg_t *msg, uint32_t error);
 +void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
 +
 +void hammer2_msg_free(hammer2_msg_t *msg);
 +
 +void hammer2_iocom_core(hammer2_iocom_t *iocom);
 +hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
 +void hammer2_msg_write(hammer2_msg_t *msg);
 +
 +void hammer2_iocom_drain(hammer2_iocom_t *iocom);
 +void hammer2_iocom_flush1(hammer2_iocom_t *iocom);
 +void hammer2_iocom_flush2(hammer2_iocom_t *iocom);
 +
 +void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 +void hammer2_state_free(hammer2_state_t *state);
 +
 +hammer2_router_t *hammer2_router_alloc(void);
 +void hammer2_router_connect(hammer2_router_t *router);
 +void hammer2_router_disconnect(hammer2_router_t **routerp);
 +
 +/*
 + * Msg protocol functions
 + */
 +void hammer2_msg_lnk_signal(hammer2_router_t *router);
 +void hammer2_msg_lnk(hammer2_msg_t *msg);
 +void hammer2_msg_dbg(hammer2_msg_t *msg);
 +
 +/*
 + * Crypto functions
 + */
 +void hammer2_crypto_setup(void);
 +void hammer2_crypto_negotiate(hammer2_iocom_t *iocom);
 +void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 +int hammer2_crypto_encrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
 +                      struct iovec *iov, int n, size_t *nactp);
 +
 +/*
 + * Misc functions
 + */
 +const char *hammer2_time64_to_str(uint64_t htime64, char **strp);
 +const char *hammer2_uuid_to_str(uuid_t *uuid, char **strp);
 +const char *hammer2_iptype_to_str(uint8_t type);
 +const char *hammer2_pfstype_to_str(uint8_t type);
 +const char *sizetostr(hammer2_off_t size);
 +const char * hammer2_basecmd_str(uint32_t cmd);
 +const char *hammer2_msg_str(hammer2_msg_t *msg);
 +int hammer2_connect(const char *hostname);
 +
 +void shell_tree(hammer2_router_t *router, char *cmdbuf);
 +
 +void *master_service(void *data);
 +
 +void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 +void router_printf(hammer2_router_t *router, const char *ctl, ...);
 +void *hammer2_alloc(size_t bytes);
 +void hammer2_free(void *ptr);
index ff38357,0000000..f4cb65c
mode 100644,000000..100644
--- /dev/null
@@@ -1,2232 -1,0 +1,2239 @@@
-        * connection as error'd if it fails.
 +/*
 + * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
 + *
 + * This code is derived from software contributed to The DragonFly Project
 + * by Matthew Dillon <dillon@dragonflybsd.org>
 + * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
 + *
 + * Redistribution and use in source and binary forms, with or without
 + * modification, are permitted provided that the following conditions
 + * are met:
 + *
 + * 1. Redistributions of source code must retain the above copyright
 + *    notice, this list of conditions and the following disclaimer.
 + * 2. Redistributions in binary form must reproduce the above copyright
 + *    notice, this list of conditions and the following disclaimer in
 + *    the documentation and/or other materials provided with the
 + *    distribution.
 + * 3. Neither the name of The DragonFly Project nor the names of its
 + *    contributors may be used to endorse or promote products derived
 + *    from this software without specific, prior written permission.
 + *
 + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
 + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
 + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 + * SUCH DAMAGE.
 + */
 +
 +#include "hammer2.h"
 +
 +static int hammer2_state_msgrx(hammer2_msg_t *msg);
 +static void hammer2_state_cleanuptx(hammer2_msg_t *msg);
 +
 +/*
 + * ROUTER TREE - Represents available routes for message routing, indexed
 + *             by their spanid.  The router structure is embedded in
 + *             either an iocom, h2span_link, or h2span_relay (see msg_lnk.c).
 + */
 +int
 +hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2)
 +{
 +      if (router1->target < router2->target)
 +              return(-1);
 +      if (router1->target > router2->target)
 +              return(1);
 +      return(0);
 +}
 +
 +RB_GENERATE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp);
 +
 +static pthread_mutex_t router_mtx;
 +static struct hammer2_router_tree router_ltree = RB_INITIALIZER(router_ltree);
 +static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree);
 +
 +/*
 + * STATE TREE - Represents open transactions which are indexed by their
 + *            {router,msgid} relative to the governing iocom.
 + *
 + *            router is usually iocom->router since state isn't stored
 + *            for relayed messages.
 + */
 +int
 +hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
 +{
 +#if 0
 +      if (state1->router < state2->router)
 +              return(-1);
 +      if (state1->router > state2->router)
 +              return(1);
 +#endif
 +      if (state1->msgid < state2->msgid)
 +              return(-1);
 +      if (state1->msgid > state2->msgid)
 +              return(1);
 +      return(0);
 +}
 +
 +RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
 +
 +/*
 + * Initialize a low-level ioq
 + */
 +void
 +hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 +{
 +      bzero(ioq, sizeof(*ioq));
 +      ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
 +      TAILQ_INIT(&ioq->msgq);
 +}
 +
 +/*
 + * Cleanup queue.
 + *
 + * caller holds iocom->mtx.
 + */
 +void
 +hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
 +{
 +      hammer2_msg_t *msg;
 +
 +      while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
 +              assert(0);      /* shouldn't happen */
 +              TAILQ_REMOVE(&ioq->msgq, msg, qentry);
 +              hammer2_msg_free(msg);
 +      }
 +      if ((msg = ioq->msg) != NULL) {
 +              ioq->msg = NULL;
 +              hammer2_msg_free(msg);
 +      }
 +}
 +
 +/*
 + * Initialize a low-level communications channel.
 + *
 + * NOTE: The signal_func() is called at least once from the loop and can be
 + *     re-armed via hammer2_iocom_restate().
 + */
 +void
 +hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
 +                 void (*signal_func)(hammer2_router_t *),
 +                 void (*rcvmsg_func)(hammer2_msg_t *),
 +                 void (*altmsg_func)(hammer2_iocom_t *))
 +{
++      struct stat st;
++
 +      bzero(iocom, sizeof(*iocom));
 +
 +      iocom->router = hammer2_router_alloc();
 +      iocom->router->signal_callback = signal_func;
 +      iocom->router->rcvmsg_callback = rcvmsg_func;
 +      iocom->router->altmsg_callback = altmsg_func;
 +      /* we do not call hammer2_router_connect() for iocom routers */
 +
 +      pthread_mutex_init(&iocom->mtx, NULL);
 +      RB_INIT(&iocom->router->staterd_tree);
 +      RB_INIT(&iocom->router->statewr_tree);
 +      TAILQ_INIT(&iocom->freeq);
 +      TAILQ_INIT(&iocom->freeq_aux);
 +      TAILQ_INIT(&iocom->router->txmsgq);
 +      iocom->router->iocom = iocom;
 +      iocom->sock_fd = sock_fd;
 +      iocom->alt_fd = alt_fd;
 +      iocom->flags = HAMMER2_IOCOMF_RREQ;
 +      if (signal_func)
 +              iocom->flags |= HAMMER2_IOCOMF_SWORK;
 +      hammer2_ioq_init(iocom, &iocom->ioq_rx);
 +      hammer2_ioq_init(iocom, &iocom->ioq_tx);
 +      if (pipe(iocom->wakeupfds) < 0)
 +              assert(0);
 +      fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
 +      fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
 +
 +      /*
 +       * Negotiate session crypto synchronously.  This will mark the
-       hammer2_crypto_negotiate(iocom);
++       * connection as error'd if it fails.  If this is a pipe it's
++       * a linkage that we set up ourselves to the filesystem and there
++       * is no crypto.
 +       */
++      if (fstat(sock_fd, &st) < 0)
++              assert(0);
++      if (S_ISSOCK(st.st_mode))
++              hammer2_crypto_negotiate(iocom);
 +
 +      /*
 +       * Make sure our fds are set to non-blocking for the iocom core.
 +       */
 +      if (sock_fd >= 0)
 +              fcntl(sock_fd, F_SETFL, O_NONBLOCK);
 +#if 0
 +      /* if line buffered our single fgets() should be fine */
 +      if (alt_fd >= 0)
 +              fcntl(alt_fd, F_SETFL, O_NONBLOCK);
 +#endif
 +}
 +
 +/*
 + * May only be called from a callback from iocom_core.
 + *
 + * Adjust state machine functions, set flags to guarantee that both
 + * the recevmsg_func and the sendmsg_func is called at least once.
 + */
 +void
 +hammer2_router_restate(hammer2_router_t *router,
 +                 void (*signal_func)(hammer2_router_t *),
 +                 void (*rcvmsg_func)(hammer2_msg_t *msg),
 +                 void (*altmsg_func)(hammer2_iocom_t *))
 +{
 +      router->signal_callback = signal_func;
 +      router->rcvmsg_callback = rcvmsg_func;
 +      router->altmsg_callback = altmsg_func;
 +      if (signal_func)
 +              router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
 +      else
 +              router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
 +}
 +
 +void
 +hammer2_router_signal(hammer2_router_t *router)
 +{
 +      if (router->signal_callback)
 +              router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
 +}
 +
 +/*
 + * Cleanup a terminating iocom.
 + *
 + * Caller should not hold iocom->mtx.  The iocom has already been disconnected
 + * from all possible references to it.
 + */
 +void
 +hammer2_iocom_done(hammer2_iocom_t *iocom)
 +{
 +      hammer2_msg_t *msg;
 +
 +      if (iocom->sock_fd >= 0) {
 +              close(iocom->sock_fd);
 +              iocom->sock_fd = -1;
 +      }
 +      if (iocom->alt_fd >= 0) {
 +              close(iocom->alt_fd);
 +              iocom->alt_fd = -1;
 +      }
 +      hammer2_ioq_done(iocom, &iocom->ioq_rx);
 +      hammer2_ioq_done(iocom, &iocom->ioq_tx);
 +      if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
 +              TAILQ_REMOVE(&iocom->freeq, msg, qentry);
 +              free(msg);
 +      }
 +      if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
 +              TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
 +              free(msg->aux_data);
 +              msg->aux_data = NULL;
 +              free(msg);
 +      }
 +      if (iocom->wakeupfds[0] >= 0) {
 +              close(iocom->wakeupfds[0]);
 +              iocom->wakeupfds[0] = -1;
 +      }
 +      if (iocom->wakeupfds[1] >= 0) {
 +              close(iocom->wakeupfds[1]);
 +              iocom->wakeupfds[1] = -1;
 +      }
 +      pthread_mutex_destroy(&iocom->mtx);
 +}
 +
 +/*
 + * Allocate a new one-way message.
 + */
 +hammer2_msg_t *
 +hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
 +                void (*func)(hammer2_msg_t *), void *data)
 +{
 +      hammer2_state_t *state = NULL;
 +      hammer2_iocom_t *iocom = router->iocom;
 +      hammer2_msg_t *msg;
 +      int hbytes;
 +
 +      pthread_mutex_lock(&iocom->mtx);
 +      if (aux_size) {
 +              aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
 +                         ~HAMMER2_MSG_ALIGNMASK;
 +              if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
 +                      TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
 +      } else {
 +              if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
 +                      TAILQ_REMOVE(&iocom->freeq, msg, qentry);
 +      }
 +      if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) ==
 +          HAMMER2_MSGF_CREATE) {
 +              /*
 +               * Create state when CREATE is set without REPLY.
 +               *
 +               * NOTE: CREATE in txcmd handled by hammer2_msg_write()
 +               * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx()
 +               */
 +              state = malloc(sizeof(*state));
 +              bzero(state, sizeof(*state));
 +              state->iocom = iocom;
 +              state->flags = HAMMER2_STATE_DYNAMIC;
 +              state->msgid = (uint64_t)(uintptr_t)state;
 +              state->router = router;
 +              state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
 +                                     HAMMER2_MSGF_DELETE);
 +              state->rxcmd = HAMMER2_MSGF_REPLY;
 +              state->func = func;
 +              state->any.any = data;
 +              pthread_mutex_lock(&iocom->mtx);
 +              RB_INSERT(hammer2_state_tree,
 +                        &iocom->router->statewr_tree,
 +                        state);
 +              pthread_mutex_unlock(&iocom->mtx);
 +              state->flags |= HAMMER2_STATE_INSERTED;
 +      }
 +      pthread_mutex_unlock(&iocom->mtx);
 +      if (msg == NULL) {
 +              msg = malloc(sizeof(*msg));
 +              bzero(msg, sizeof(*msg));
 +              msg->aux_data = NULL;
 +              msg->aux_size = 0;
 +      }
 +      if (msg->aux_size != aux_size) {
 +              if (msg->aux_data) {
 +                      free(msg->aux_data);
 +                      msg->aux_data = NULL;
 +                      msg->aux_size = 0;
 +              }
 +              if (aux_size) {
 +                      msg->aux_data = malloc(aux_size);
 +                      msg->aux_size = aux_size;
 +              }
 +      }
 +      hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
 +      if (hbytes)
 +              bzero(&msg->any.head, hbytes);
 +      msg->hdr_size = hbytes;
 +      msg->any.head.cmd = cmd;
 +      msg->any.head.aux_descr = 0;
 +      msg->any.head.aux_crc = 0;
 +      msg->router = router;
 +      if (state) {
 +              msg->state = state;
 +              state->msg = msg;
 +              msg->any.head.msgid = state->msgid;
 +      }
 +      return (msg);
 +}
 +
 +/*
 + * Free a message so it can be reused afresh.
 + *
 + * NOTE: aux_size can be 0 with a non-NULL aux_data.
 + */
 +static
 +void
 +hammer2_msg_free_locked(hammer2_msg_t *msg)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +
 +      msg->state = NULL;
 +      if (msg->aux_data)
 +              TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
 +      else
 +              TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
 +}
 +
 +void
 +hammer2_msg_free(hammer2_msg_t *msg)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +
 +      pthread_mutex_lock(&iocom->mtx);
 +      hammer2_msg_free_locked(msg);
 +      pthread_mutex_unlock(&iocom->mtx);
 +}
 +
 +/*
 + * I/O core loop for an iocom.
 + *
 + * Thread localized, iocom->mtx not held.
 + */
 +void
 +hammer2_iocom_core(hammer2_iocom_t *iocom)
 +{
 +      struct pollfd fds[3];
 +      char dummybuf[256];
 +      hammer2_msg_t *msg;
 +      int timeout;
 +      int count;
 +      int wi; /* wakeup pipe */
 +      int si; /* socket */
 +      int ai; /* alt bulk path socket */
 +
 +      while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
 +              if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
 +                                   HAMMER2_IOCOMF_WWORK |
 +                                   HAMMER2_IOCOMF_PWORK |
 +                                   HAMMER2_IOCOMF_SWORK |
 +                                   HAMMER2_IOCOMF_ARWORK |
 +                                   HAMMER2_IOCOMF_AWWORK)) == 0) {
 +                      /*
 +                       * Only poll if no immediate work is pending.
 +                       * Otherwise we are just wasting our time calling
 +                       * poll.
 +                       */
 +                      timeout = 5000;
 +
 +                      count = 0;
 +                      wi = -1;
 +                      si = -1;
 +                      ai = -1;
 +
 +                      /*
 +                       * Always check the inter-thread pipe, e.g.
 +                       * for iocom->txmsgq work.
 +                       */
 +                      wi = count++;
 +                      fds[wi].fd = iocom->wakeupfds[0];
 +                      fds[wi].events = POLLIN;
 +                      fds[wi].revents = 0;
 +
 +                      /*
 +                       * Check the socket input/output direction as
 +                       * requested
 +                       */
 +                      if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
 +                                          HAMMER2_IOCOMF_WREQ)) {
 +                              si = count++;
 +                              fds[si].fd = iocom->sock_fd;
 +                              fds[si].events = 0;
 +                              fds[si].revents = 0;
 +
 +                              if (iocom->flags & HAMMER2_IOCOMF_RREQ)
 +                                      fds[si].events |= POLLIN;
 +                              if (iocom->flags & HAMMER2_IOCOMF_WREQ)
 +                                      fds[si].events |= POLLOUT;
 +                      }
 +
 +                      /*
 +                       * Check the alternative fd for work.
 +                       */
 +                      if (iocom->alt_fd >= 0) {
 +                              ai = count++;
 +                              fds[ai].fd = iocom->alt_fd;
 +                              fds[ai].events = POLLIN;
 +                              fds[ai].revents = 0;
 +                      }
 +                      poll(fds, count, timeout);
 +
 +                      if (wi >= 0 && (fds[wi].revents & POLLIN))
 +                              iocom->flags |= HAMMER2_IOCOMF_PWORK;
 +                      if (si >= 0 && (fds[si].revents & POLLIN))
 +                              iocom->flags |= HAMMER2_IOCOMF_RWORK;
 +                      if (si >= 0 && (fds[si].revents & POLLOUT))
 +                              iocom->flags |= HAMMER2_IOCOMF_WWORK;
 +                      if (wi >= 0 && (fds[wi].revents & POLLOUT))
 +                              iocom->flags |= HAMMER2_IOCOMF_WWORK;
 +                      if (ai >= 0 && (fds[ai].revents & POLLIN))
 +                              iocom->flags |= HAMMER2_IOCOMF_ARWORK;
 +              } else {
 +                      /*
 +                       * Always check the pipe
 +                       */
 +                      iocom->flags |= HAMMER2_IOCOMF_PWORK;
 +              }
 +
 +              if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
 +                      iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
 +                      iocom->router->signal_callback(iocom->router);
 +              }
 +
 +              /*
 +               * Pending message queues from other threads wake us up
 +               * with a write to the wakeupfds[] pipe.  We have to clear
 +               * the pipe with a dummy read.
 +               */
 +              if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
 +                      iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
 +                      read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
 +                      iocom->flags |= HAMMER2_IOCOMF_RWORK;
 +                      iocom->flags |= HAMMER2_IOCOMF_WWORK;
 +                      if (TAILQ_FIRST(&iocom->router->txmsgq))
 +                              hammer2_iocom_flush1(iocom);
 +              }
 +
 +              /*
 +               * Message write sequencing
 +               */
 +              if (iocom->flags & HAMMER2_IOCOMF_WWORK)
 +                      hammer2_iocom_flush1(iocom);
 +
 +              /*
 +               * Message read sequencing.  Run this after the write
 +               * sequencing in case the write sequencing allowed another
 +               * auto-DELETE to occur on the read side.
 +               */
 +              if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
 +                      while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
 +                             (msg = hammer2_ioq_read(iocom)) != NULL) {
 +                              if (DebugOpt) {
 +                                      fprintf(stderr, "receive %s\n",
 +                                              hammer2_msg_str(msg));
 +                              }
 +                              iocom->router->rcvmsg_callback(msg);
 +                              hammer2_state_cleanuprx(iocom, msg);
 +                      }
 +              }
 +
 +              if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
 +                      iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
 +                      iocom->router->altmsg_callback(iocom);
 +              }
 +      }
 +}
 +
 +/*
 + * Make sure there's enough room in the FIFO to hold the
 + * needed data.
 + *
 + * Assume worst case encrypted form is 2x the size of the
 + * plaintext equivalent.
 + */
 +static
 +size_t
 +hammer2_ioq_makeroom(hammer2_ioq_t *ioq, size_t needed)
 +{
 +      size_t bytes;
 +      size_t nmax;
 +
 +      bytes = ioq->fifo_cdx - ioq->fifo_beg;
 +      nmax = sizeof(ioq->buf) - ioq->fifo_end;
 +      if (bytes + nmax / 2 < needed) {
 +              if (bytes) {
 +                      bcopy(ioq->buf + ioq->fifo_beg,
 +                            ioq->buf,
 +                            bytes);
 +              }
 +              ioq->fifo_cdx -= ioq->fifo_beg;
 +              ioq->fifo_beg = 0;
 +              if (ioq->fifo_cdn < ioq->fifo_end) {
 +                      bcopy(ioq->buf + ioq->fifo_cdn,
 +                            ioq->buf + ioq->fifo_cdx,
 +                            ioq->fifo_end - ioq->fifo_cdn);
 +              }
 +              ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
 +              ioq->fifo_cdn = ioq->fifo_cdx;
 +              nmax = sizeof(ioq->buf) - ioq->fifo_end;
 +      }
 +      return(nmax);
 +}
 +
 +/*
 + * Read the next ready message from the ioq, issuing I/O if needed.
 + * Caller should retry on a read-event when NULL is returned.
 + *
 + * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
 + * be returned for each open transaction, then the ioq and iocom
 + * will be errored out and a non-transactional HAMMER2_LNK_ERROR
 + * msg will be returned as the final message.  The caller should not call
 + * us again after the final message is returned.
 + *
 + * Thread localized, iocom->mtx not held.
 + */
 +hammer2_msg_t *
 +hammer2_ioq_read(hammer2_iocom_t *iocom)
 +{
 +      hammer2_ioq_t *ioq = &iocom->ioq_rx;
 +      hammer2_msg_t *msg;
 +      hammer2_msg_hdr_t *head;
 +      hammer2_state_t *state;
 +      ssize_t n;
 +      size_t bytes;
 +      size_t nmax;
 +      uint32_t xcrc32;
 +      int error;
 +
 +again:
 +      iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
 +
 +      /*
 +       * If a message is already pending we can just remove and
 +       * return it.  Message state has already been processed.
 +       * (currently not implemented)
 +       */
 +      if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
 +              TAILQ_REMOVE(&ioq->msgq, msg, qentry);
 +              return (msg);
 +      }
 +
 +      /*
 +       * If the stream is errored out we stop processing it.
 +       */
 +      if (ioq->error)
 +              goto skip;
 +
 +      /*
 +       * Message read in-progress (msg is NULL at the moment).  We don't
 +       * allocate a msg until we have its core header.
 +       */
 +      nmax = sizeof(ioq->buf) - ioq->fifo_end;
 +      bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
 +      msg = ioq->msg;
 +
 +      switch(ioq->state) {
 +      case HAMMER2_MSGQ_STATE_HEADER1:
 +              /*
 +               * Load the primary header, fail on any non-trivial read
 +               * error or on EOF.  Since the primary header is the same
 +               * size is the message alignment it will never straddle
 +               * the end of the buffer.
 +               */
 +              nmax = hammer2_ioq_makeroom(ioq, sizeof(msg->any.head));
 +              if (bytes < sizeof(msg->any.head)) {
 +                      n = read(iocom->sock_fd,
 +                               ioq->buf + ioq->fifo_end,
 +                               nmax);
 +                      if (n <= 0) {
 +                              if (n == 0) {
 +                                      ioq->error = HAMMER2_IOQ_ERROR_EOF;
 +                                      break;
 +                              }
 +                              if (errno != EINTR &&
 +                                  errno != EINPROGRESS &&
 +                                  errno != EAGAIN) {
 +                                      ioq->error = HAMMER2_IOQ_ERROR_SOCK;
 +                                      break;
 +                              }
 +                              n = 0;
 +                              /* fall through */
 +                      }
 +                      ioq->fifo_end += (size_t)n;
 +                      nmax -= (size_t)n;
 +              }
 +
 +              /*
 +               * Decrypt data received so far.  Data will be decrypted
 +               * in-place but might create gaps in the FIFO.  Partial
 +               * blocks are not immediately decrypted.
 +               *
 +               * WARNING!  The header might be in the wrong endian, we
 +               *           do not fix it up until we get the entire
 +               *           extended header.
 +               */
 +              if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
 +                      hammer2_crypto_decrypt(iocom, ioq);
 +              } else {
 +                      ioq->fifo_cdx = ioq->fifo_end;
 +                      ioq->fifo_cdn = ioq->fifo_end;
 +              }
 +              bytes = ioq->fifo_cdx - ioq->fifo_beg;
 +
 +              /*
 +               * Insufficient data accumulated (msg is NULL, caller will
 +               * retry on event).
 +               */
 +              assert(msg == NULL);
 +              if (bytes < sizeof(msg->any.head))
 +                      break;
 +
 +              /*
 +               * Check and fixup the core header.  Note that the icrc
 +               * has to be calculated before any fixups, but the crc
 +               * fields in the msg may have to be swapped like everything
 +               * else.
 +               */
 +              head = (void *)(ioq->buf + ioq->fifo_beg);
 +              if (head->magic != HAMMER2_MSGHDR_MAGIC &&
 +                  head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
 +                      ioq->error = HAMMER2_IOQ_ERROR_SYNC;
 +                      break;
 +              }
 +
 +              /*
 +               * Calculate the full header size and aux data size
 +               */
 +              if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
 +                      ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
 +                                    HAMMER2_MSG_ALIGN;
 +                      ioq->abytes = bswap32(head->aux_bytes) *
 +                                    HAMMER2_MSG_ALIGN;
 +              } else {
 +                      ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
 +                                    HAMMER2_MSG_ALIGN;
 +                      ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
 +              }
 +              if (ioq->hbytes < sizeof(msg->any.head) ||
 +                  ioq->hbytes > sizeof(msg->any) ||
 +                  ioq->abytes > HAMMER2_MSGAUX_MAX) {
 +                      ioq->error = HAMMER2_IOQ_ERROR_FIELD;
 +                      break;
 +              }
 +
 +              /*
 +               * Allocate the message, the next state will fill it in.
 +               */
 +              msg = hammer2_msg_alloc(iocom->router, ioq->abytes, 0,
 +                                      NULL, NULL);
 +              ioq->msg = msg;
 +
 +              /*
 +               * Fall through to the next state.  Make sure that the
 +               * extended header does not straddle the end of the buffer.
 +               * We still want to issue larger reads into our buffer,
 +               * book-keeping is easier if we don't bcopy() yet.
 +               *
 +               * Make sure there is enough room for bloated encrypt data.
 +               */
 +              nmax = hammer2_ioq_makeroom(ioq, ioq->hbytes);
 +              ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
 +              /* fall through */
 +      case HAMMER2_MSGQ_STATE_HEADER2:
 +              /*
 +               * Fill out the extended header.
 +               */
 +              assert(msg != NULL);
 +              if (bytes < ioq->hbytes) {
 +                      n = read(iocom->sock_fd,
 +                               ioq->buf + ioq->fifo_end,
 +                               nmax);
 +                      if (n <= 0) {
 +                              if (n == 0) {
 +                                      ioq->error = HAMMER2_IOQ_ERROR_EOF;
 +                                      break;
 +                              }
 +                              if (errno != EINTR &&
 +                                  errno != EINPROGRESS &&
 +                                  errno != EAGAIN) {
 +                                      ioq->error = HAMMER2_IOQ_ERROR_SOCK;
 +                                      break;
 +                              }
 +                              n = 0;
 +                              /* fall through */
 +                      }
 +                      ioq->fifo_end += (size_t)n;
 +                      nmax -= (size_t)n;
 +              }
 +
 +              if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
 +                      hammer2_crypto_decrypt(iocom, ioq);
 +              } else {
 +                      ioq->fifo_cdx = ioq->fifo_end;
 +                      ioq->fifo_cdn = ioq->fifo_end;
 +              }
 +              bytes = ioq->fifo_cdx - ioq->fifo_beg;
 +
 +              /*
 +               * Insufficient data accumulated (set msg NULL so caller will
 +               * retry on event).
 +               */
 +              if (bytes < ioq->hbytes) {
 +                      msg = NULL;
 +                      break;
 +              }
 +
 +              /*
 +               * Calculate the extended header, decrypt data received
 +               * so far.  Handle endian-conversion for the entire extended
 +               * header.
 +               */
 +              head = (void *)(ioq->buf + ioq->fifo_beg);
 +
 +              /*
 +               * Check the CRC.
 +               */
 +              if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
 +                      xcrc32 = bswap32(head->hdr_crc);
 +              else
 +                      xcrc32 = head->hdr_crc;
 +              head->hdr_crc = 0;
 +              if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
 +                      ioq->error = HAMMER2_IOQ_ERROR_XCRC;
 +                      fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
 +                              xcrc32, hammer2_icrc32(head, ioq->hbytes),
 +                              hammer2_msg_str(msg));
 +                      assert(0);
 +                      break;
 +              }
 +              head->hdr_crc = xcrc32;
 +
 +              if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
 +                      hammer2_bswap_head(head);
 +              }
 +
 +              /*
 +               * Copy the extended header into the msg and adjust the
 +               * FIFO.
 +               */
 +              bcopy(head, &msg->any, ioq->hbytes);
 +
 +              /*
 +               * We are either done or we fall-through.
 +               */
 +              if (ioq->abytes == 0) {
 +                      ioq->fifo_beg += ioq->hbytes;
 +                      break;
 +              }
 +
 +              /*
 +               * Must adjust bytes (and the state) when falling through.
 +               * nmax doesn't change.
 +               */
 +              ioq->fifo_beg += ioq->hbytes;
 +              bytes -= ioq->hbytes;
 +              ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
 +              /* fall through */
 +      case HAMMER2_MSGQ_STATE_AUXDATA1:
 +              /*
 +               * Copy the partial or complete payload from remaining
 +               * bytes in the FIFO in order to optimize the makeroom call
 +               * in the AUXDATA2 state.  We have to fall-through either
 +               * way so we can check the crc.
 +               *
 +               * msg->aux_size tracks our aux data.
 +               */
 +              if (bytes >= ioq->abytes) {
 +                      bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
 +                            ioq->abytes);
 +                      msg->aux_size = ioq->abytes;
 +                      ioq->fifo_beg += ioq->abytes;
 +                      assert(ioq->fifo_beg <= ioq->fifo_cdx);
 +                      assert(ioq->fifo_cdx <= ioq->fifo_cdn);
 +                      bytes -= ioq->abytes;
 +              } else if (bytes) {
 +                      bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
 +                            bytes);
 +                      msg->aux_size = bytes;
 +                      ioq->fifo_beg += bytes;
 +                      if (ioq->fifo_cdx < ioq->fifo_beg)
 +                              ioq->fifo_cdx = ioq->fifo_beg;
 +                      assert(ioq->fifo_beg <= ioq->fifo_cdx);
 +                      assert(ioq->fifo_cdx <= ioq->fifo_cdn);
 +                      bytes = 0;
 +              } else {
 +                      msg->aux_size = 0;
 +              }
 +              ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
 +              /* fall through */
 +      case HAMMER2_MSGQ_STATE_AUXDATA2:
 +              /*
 +               * Make sure there is enough room for more data.
 +               */
 +              assert(msg);
 +              nmax = hammer2_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
 +
 +              /*
 +               * Read and decrypt more of the payload.
 +               */
 +              if (msg->aux_size < ioq->abytes) {
 +                      assert(bytes == 0);
 +                      n = read(iocom->sock_fd,
 +                               ioq->buf + ioq->fifo_end,
 +                               nmax);
 +                      if (n <= 0) {
 +                              if (n == 0) {
 +                                      ioq->error = HAMMER2_IOQ_ERROR_EOF;
 +                                      break;
 +                              }
 +                              if (errno != EINTR &&
 +                                  errno != EINPROGRESS &&
 +                                  errno != EAGAIN) {
 +                                      ioq->error = HAMMER2_IOQ_ERROR_SOCK;
 +                                      break;
 +                              }
 +                              n = 0;
 +                              /* fall through */
 +                      }
 +                      ioq->fifo_end += (size_t)n;
 +                      nmax -= (size_t)n;
 +              }
 +
 +              if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
 +                      hammer2_crypto_decrypt(iocom, ioq);
 +              } else {
 +                      ioq->fifo_cdx = ioq->fifo_end;
 +                      ioq->fifo_cdn = ioq->fifo_end;
 +              }
 +              bytes = ioq->fifo_cdx - ioq->fifo_beg;
 +
 +              if (bytes > ioq->abytes - msg->aux_size)
 +                      bytes = ioq->abytes - msg->aux_size;
 +
 +              if (bytes) {
 +                      bcopy(ioq->buf + ioq->fifo_beg,
 +                            msg->aux_data + msg->aux_size,
 +                            bytes);
 +                      msg->aux_size += bytes;
 +                      ioq->fifo_beg += bytes;
 +              }
 +
 +              /*
 +               * Insufficient data accumulated (set msg NULL so caller will
 +               * retry on event).
 +               */
 +              if (msg->aux_size < ioq->abytes) {
 +                      msg = NULL;
 +                      break;
 +              }
 +              assert(msg->aux_size == ioq->abytes);
 +
 +              /*
 +               * Check aux_crc, then we are done.
 +               */
 +              xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
 +              if (xcrc32 != msg->any.head.aux_crc) {
 +                      ioq->error = HAMMER2_IOQ_ERROR_ACRC;
 +                      break;
 +              }
 +              break;
 +      case HAMMER2_MSGQ_STATE_ERROR:
 +              /*
 +               * Continued calls to drain recorded transactions (returning
 +               * a LNK_ERROR for each one), before we return the final
 +               * LNK_ERROR.
 +               */
 +              assert(msg == NULL);
 +              break;
 +      default:
 +              /*
 +               * We don't double-return errors, the caller should not
 +               * have called us again after getting an error msg.
 +               */
 +              assert(0);
 +              break;
 +      }
 +
 +      /*
 +       * Check the message sequence.  The iv[] should prevent any
 +       * possibility of a replay but we add this check anyway.
 +       */
 +      if (msg && ioq->error == 0) {
 +              if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
 +                      ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
 +              } else {
 +                      ++ioq->seq;
 +              }
 +      }
 +
 +      /*
 +       * Process transactional state for the message.
 +       */
 +      if (msg && ioq->error == 0) {
 +              error = hammer2_state_msgrx(msg);
 +              if (error) {
 +                      if (error == HAMMER2_IOQ_ERROR_EALREADY) {
 +                              hammer2_msg_free(msg);
 +                              goto again;
 +                      }
 +                      ioq->error = error;
 +              }
 +      }
 +
 +      /*
 +       * Handle error, RREQ, or completion
 +       *
 +       * NOTE: nmax and bytes are invalid at this point, we don't bother
 +       *       to update them when breaking out.
 +       */
 +      if (ioq->error) {
 +skip:
 +              /*
 +               * An unrecoverable error causes all active receive
 +               * transactions to be terminated with a LNK_ERROR message.
 +               *
 +               * Once all active transactions are exhausted we set the
 +               * iocom ERROR flag and return a non-transactional LNK_ERROR
 +               * message, which should cause master processing loops to
 +               * terminate.
 +               */
 +              assert(ioq->msg == msg);
 +              if (msg) {
 +                      hammer2_msg_free(msg);
 +                      ioq->msg = NULL;
 +              }
 +
 +              /*
 +               * No more I/O read processing
 +               */
 +              ioq->state = HAMMER2_MSGQ_STATE_ERROR;
 +
 +              /*
 +               * Simulate a remote LNK_ERROR DELETE msg for any open
 +               * transactions, ending with a final non-transactional
 +               * LNK_ERROR (that the session can detect) when no
 +               * transactions remain.
 +               */
 +              msg = hammer2_msg_alloc(iocom->router, 0, 0, NULL, NULL);
 +              bzero(&msg->any.head, sizeof(msg->any.head));
 +              msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
 +              msg->any.head.cmd = HAMMER2_LNK_ERROR;
 +              msg->any.head.error = ioq->error;
 +
 +              pthread_mutex_lock(&iocom->mtx);
 +              hammer2_iocom_drain(iocom);
 +              if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) {
 +                      /*
 +                       * Active remote transactions are still present.
 +                       * Simulate the other end sending us a DELETE.
 +                       */
 +                      if (state->rxcmd & HAMMER2_MSGF_DELETE) {
 +                              hammer2_msg_free(msg);
 +                              msg = NULL;
 +                      } else {
 +                              /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
 +                              msg->state = state;
 +                              msg->router = state->router;
 +                              msg->any.head.msgid = state->msgid;
 +                              msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
 +                                                   HAMMER2_MSGF_DELETE;
 +                      }
 +              } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) !=
 +                         NULL) {
 +                      /*
 +                       * Active local transactions are still present.
 +                       * Simulate the other end sending us a DELETE.
 +                       */
 +                      if (state->rxcmd & HAMMER2_MSGF_DELETE) {
 +                              hammer2_msg_free(msg);
 +                              msg = NULL;
 +                      } else {
 +                              msg->state = state;
 +                              msg->router = state->router;
 +                              msg->any.head.msgid = state->msgid;
 +                              msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
 +                                                   HAMMER2_MSGF_DELETE |
 +                                                   HAMMER2_MSGF_REPLY;
 +                              if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
 +                                      msg->any.head.cmd |=
 +                                                   HAMMER2_MSGF_CREATE;
 +                              }
 +                      }
 +              } else {
 +                      /*
 +                       * No active local or remote transactions remain.
 +                       * Generate a final LNK_ERROR and flag EOF.
 +                       */
 +                      msg->state = NULL;
 +                      iocom->flags |= HAMMER2_IOCOMF_EOF;
 +                      fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
 +              }
 +              pthread_mutex_unlock(&iocom->mtx);
 +
 +              /*
 +               * For the iocom error case we want to set RWORK to indicate
 +               * that more messages might be pending.
 +               *
 +               * It is possible to return NULL when there is more work to
 +               * do because each message has to be DELETEd in both
 +               * directions before we continue on with the next (though
 +               * this could be optimized).  The transmit direction will
 +               * re-set RWORK.
 +               */
 +              if (msg)
 +                      iocom->flags |= HAMMER2_IOCOMF_RWORK;
 +      } else if (msg == NULL) {
 +              /*
 +               * Insufficient data received to finish building the message,
 +               * set RREQ and return NULL.
 +               *
 +               * Leave ioq->msg intact.
 +               * Leave the FIFO intact.
 +               */
 +              iocom->flags |= HAMMER2_IOCOMF_RREQ;
 +      } else {
 +              /*
 +               * Return msg.
 +               *
 +               * The fifo has already been advanced past the message.
 +               * Trivially reset the FIFO indices if possible.
 +               *
 +               * clear the FIFO if it is now empty and set RREQ to wait
 +               * for more from the socket.  If the FIFO is not empty set
 +               * TWORK to bypass the poll so we loop immediately.
 +               */
 +              if (ioq->fifo_beg == ioq->fifo_cdx &&
 +                  ioq->fifo_cdn == ioq->fifo_end) {
 +                      iocom->flags |= HAMMER2_IOCOMF_RREQ;
 +                      ioq->fifo_cdx = 0;
 +                      ioq->fifo_cdn = 0;
 +                      ioq->fifo_beg = 0;
 +                      ioq->fifo_end = 0;
 +              } else {
 +                      iocom->flags |= HAMMER2_IOCOMF_RWORK;
 +              }
 +              ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
 +              ioq->msg = NULL;
 +      }
 +      return (msg);
 +}
 +
 +/*
 + * Calculate the header and data crc's and write a low-level message to
 + * the connection.  If aux_crc is non-zero the aux_data crc is already
 + * assumed to have been set.
 + *
 + * A non-NULL msg is added to the queue but not necessarily flushed.
 + * Calling this function with msg == NULL will get a flush going.
 + *
 + * Caller must hold iocom->mtx.
 + */
 +void
 +hammer2_iocom_flush1(hammer2_iocom_t *iocom)
 +{
 +      hammer2_ioq_t *ioq = &iocom->ioq_tx;
 +      hammer2_msg_t *msg;
 +      uint32_t xcrc32;
 +      int hbytes;
 +      hammer2_msg_queue_t tmpq;
 +
 +      iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
 +      TAILQ_INIT(&tmpq);
 +      pthread_mutex_lock(&iocom->mtx);
 +      while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) {
 +              TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry);
 +              TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
 +      }
 +      pthread_mutex_unlock(&iocom->mtx);
 +
 +      while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
 +              /*
 +               * Process terminal connection errors.
 +               */
 +              TAILQ_REMOVE(&tmpq, msg, qentry);
 +              if (ioq->error) {
 +                      TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
 +                      ++ioq->msgcount;
 +                      continue;
 +              }
 +
 +              /*
 +               * Finish populating the msg fields.  The salt ensures that
 +               * the iv[] array is ridiculously randomized and we also
 +               * re-seed our PRNG every 32768 messages just to be sure.
 +               */
 +              msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
 +              msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
 +              ++ioq->seq;
 +              if ((ioq->seq & 32767) == 0)
 +                      srandomdev();
 +
 +              /*
 +               * Calculate aux_crc if 0, then calculate hdr_crc.
 +               */
 +              if (msg->aux_size && msg->any.head.aux_crc == 0) {
 +                      assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
 +                      xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
 +                      msg->any.head.aux_crc = xcrc32;
 +              }
 +              msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
 +              assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
 +
 +              hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
 +                       HAMMER2_MSG_ALIGN;
 +              msg->any.head.hdr_crc = 0;
 +              msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
 +
 +              /*
 +               * Enqueue the message (the flush codes handles stream
 +               * encryption).
 +               */
 +              TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
 +              ++ioq->msgcount;
 +      }
 +      hammer2_iocom_flush2(iocom);
 +}
 +
 +/*
 + * Thread localized, iocom->mtx not held by caller.
 + */
 +void
 +hammer2_iocom_flush2(hammer2_iocom_t *iocom)
 +{
 +      hammer2_ioq_t *ioq = &iocom->ioq_tx;
 +      hammer2_msg_t *msg;
 +      ssize_t n;
 +      struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
 +      size_t nact;
 +      size_t hbytes;
 +      size_t abytes;
 +      size_t hoff;
 +      size_t aoff;
 +      int iovcnt;
 +
 +      if (ioq->error) {
 +              hammer2_iocom_drain(iocom);
 +              return;
 +      }
 +
 +      /*
 +       * Pump messages out the connection by building an iovec.
 +       *
 +       * ioq->hbytes/ioq->abytes tracks how much of the first message
 +       * in the queue has been successfully written out, so we can
 +       * resume writing.
 +       */
 +      iovcnt = 0;
 +      nact = 0;
 +      hoff = ioq->hbytes;
 +      aoff = ioq->abytes;
 +
 +      TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
 +              hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
 +                       HAMMER2_MSG_ALIGN;
 +              abytes = msg->aux_size;
 +              assert(hoff <= hbytes && aoff <= abytes);
 +
 +              if (hoff < hbytes) {
 +                      iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
 +                      iov[iovcnt].iov_len = hbytes - hoff;
 +                      nact += hbytes - hoff;
 +                      ++iovcnt;
 +                      if (iovcnt == HAMMER2_IOQ_MAXIOVEC)
 +                              break;
 +              }
 +              if (aoff < abytes) {
 +                      assert(msg->aux_data != NULL);
 +                      iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
 +                      iov[iovcnt].iov_len = abytes - aoff;
 +                      nact += abytes - aoff;
 +                      ++iovcnt;
 +                      if (iovcnt == HAMMER2_IOQ_MAXIOVEC)
 +                              break;
 +              }
 +              hoff = 0;
 +              aoff = 0;
 +      }
 +      if (iovcnt == 0)
 +              return;
 +
 +      /*
 +       * Encrypt and write the data.  The crypto code will move the
 +       * data into the fifo and adjust the iov as necessary.  If
 +       * encryption is disabled the iov is left alone.
 +       *
 +       * May return a smaller iov (thus a smaller n), with aggregated
 +       * chunks.  May reduce nmax to what fits in the FIFO.
 +       *
 +       * This function sets nact to the number of original bytes now
 +       * encrypted, adding to the FIFO some number of bytes that might
 +       * be greater depending on the crypto mechanic.  iov[] is adjusted
 +       * to point at the FIFO if necessary.
 +       *
 +       * NOTE: The return value from the writev() is the post-encrypted
 +       *       byte count, not the plaintext count.
 +       */
 +      if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
 +              /*
 +               * Make sure the FIFO has a reasonable amount of space
 +               * left (if not completely full).
 +               */
 +              if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
 +                  sizeof(ioq->buf) - ioq->fifo_end >= HAMMER2_MSG_ALIGN * 2) {
 +                      bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
 +                            ioq->fifo_end - ioq->fifo_beg);
 +                      ioq->fifo_cdx -= ioq->fifo_beg;
 +                      ioq->fifo_cdn -= ioq->fifo_beg;
 +                      ioq->fifo_end -= ioq->fifo_beg;
 +                      ioq->fifo_beg = 0;
 +              }
 +
 +              iovcnt = hammer2_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
 +              n = writev(iocom->sock_fd, iov, iovcnt);
 +              if (n > 0) {
 +                      ioq->fifo_beg += n;
 +                      ioq->fifo_cdn += n;
 +                      ioq->fifo_cdx += n;
 +                      if (ioq->fifo_beg == ioq->fifo_end) {
 +                              ioq->fifo_beg = 0;
 +                              ioq->fifo_cdn = 0;
 +                              ioq->fifo_cdx = 0;
 +                              ioq->fifo_end = 0;
 +                      }
 +              }
 +      } else {
 +              n = writev(iocom->sock_fd, iov, iovcnt);
 +              if (n > 0)
 +                      nact = n;
 +              else
 +                      nact = 0;
 +      }
 +
 +      /*
 +       * Clean out the transmit queue based on what we successfully
 +       * sent (nact is the plaintext count).  ioq->hbytes/abytes
 +       * represents the portion of the first message previously sent.
 +       */
 +      while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
 +              hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
 +                       HAMMER2_MSG_ALIGN;
 +              abytes = msg->aux_size;
 +
 +              if ((size_t)nact < hbytes - ioq->hbytes) {
 +                      ioq->hbytes += nact;
 +                      nact = 0;
 +                      break;
 +              }
 +              nact -= hbytes - ioq->hbytes;
 +              ioq->hbytes = hbytes;
 +              if ((size_t)nact < abytes - ioq->abytes) {
 +                      ioq->abytes += nact;
 +                      nact = 0;
 +                      break;
 +              }
 +              nact -= abytes - ioq->abytes;
 +
 +              TAILQ_REMOVE(&ioq->msgq, msg, qentry);
 +              --ioq->msgcount;
 +              ioq->hbytes = 0;
 +              ioq->abytes = 0;
 +
 +              hammer2_state_cleanuptx(msg);
 +      }
 +      assert(nact == 0);
 +
 +      /*
 +       * Process the return value from the write w/regards to blocking.
 +       */
 +      if (n < 0) {
 +              if (errno != EINTR &&
 +                  errno != EINPROGRESS &&
 +                  errno != EAGAIN) {
 +                      /*
 +                       * Fatal write error
 +                       */
 +                      ioq->error = HAMMER2_IOQ_ERROR_SOCK;
 +                      hammer2_iocom_drain(iocom);
 +              } else {
 +                      /*
 +                       * Wait for socket buffer space
 +                       */
 +                      iocom->flags |= HAMMER2_IOCOMF_WREQ;
 +              }
 +      } else {
 +              iocom->flags |= HAMMER2_IOCOMF_WREQ;
 +      }
 +      if (ioq->error) {
 +              hammer2_iocom_drain(iocom);
 +      }
 +}
 +
 +/*
 + * Kill pending msgs on ioq_tx and adjust the flags such that no more
 + * write events will occur.  We don't kill read msgs because we want
 + * the caller to pull off our contrived terminal error msg to detect
 + * the connection failure.
 + *
 + * Thread localized, iocom->mtx not held by caller.
 + */
 +void
 +hammer2_iocom_drain(hammer2_iocom_t *iocom)
 +{
 +      hammer2_ioq_t *ioq = &iocom->ioq_tx;
 +      hammer2_msg_t *msg;
 +
 +      iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
 +      ioq->hbytes = 0;
 +      ioq->abytes = 0;
 +
 +      while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
 +              TAILQ_REMOVE(&ioq->msgq, msg, qentry);
 +              --ioq->msgcount;
 +              hammer2_state_cleanuptx(msg);
 +      }
 +}
 +
 +/*
 + * Write a message to an iocom, with additional state processing.
 + */
 +void
 +hammer2_msg_write(hammer2_msg_t *msg)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +      hammer2_state_t *state;
 +      char dummy;
 +
 +      /*
 +       * Handle state processing, create state if necessary.
 +       */
 +      pthread_mutex_lock(&iocom->mtx);
 +      if ((state = msg->state) != NULL) {
 +              /*
 +               * Existing transaction (could be reply).  It is also
 +               * possible for this to be the first reply (CREATE is set),
 +               * in which case we populate state->txcmd.
 +               *
 +               * state->txcmd is adjusted to hold the final message cmd,
 +               * and we also be sure to set the CREATE bit here.  We did
 +               * not set it in hammer2_msg_alloc() because that would have
 +               * not been serialized (state could have gotten ripped out
 +               * from under the message prior to it being transmitted).
 +               */
 +              if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE |
 +                                        HAMMER2_MSGF_REPLY)) ==
 +                  HAMMER2_MSGF_CREATE) {
 +                      state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
 +              }
 +              msg->any.head.msgid = state->msgid;
 +              assert(((state->txcmd ^ msg->any.head.cmd) &
 +                      HAMMER2_MSGF_REPLY) == 0);
 +              if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
 +                      state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
 +      } else {
 +              msg->any.head.msgid = 0;
 +              /* XXX set spanid by router */
 +      }
 +      msg->any.head.source = 0;
 +      msg->any.head.target = msg->router->target;
 +
 +      /*
 +       * Queue it for output, wake up the I/O pthread.  Note that the
 +       * I/O thread is responsible for generating the CRCs and encryption.
 +       */
 +      TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry);
 +      dummy = 0;
 +      write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
 +      pthread_mutex_unlock(&iocom->mtx);
 +}
 +
 +/*
 + * This is a shortcut to formulate a reply to msg with a simple error code,
 + * It can reply to and terminate a transaction, or it can reply to a one-way
 + * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
 + * the error code (which can be 0).  Not all transactions are terminated
 + * with HAMMER2_LNK_ERROR status (the low level only cares about the
 + * MSGF_DELETE flag), but most are.
 + *
 + * Replies to one-way messages are a bit of an oxymoron but the feature
 + * is used by the debug (DBG) protocol.
 + *
 + * The reply contains no extended data.
 + */
 +void
 +hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +      hammer2_state_t *state = msg->state;
 +      hammer2_msg_t *nmsg;
 +      uint32_t cmd;
 +
 +
 +      /*
 +       * Reply with a simple error code and terminate the transaction.
 +       */
 +      cmd = HAMMER2_LNK_ERROR;
 +
 +      /*
 +       * Check if our direction has even been initiated yet, set CREATE.
 +       *
 +       * Check what direction this is (command or reply direction).  Note
 +       * that txcmd might not have been initiated yet.
 +       *
 +       * If our direction has already been closed we just return without
 +       * doing anything.
 +       */
 +      if (state) {
 +              if (state->txcmd & HAMMER2_MSGF_DELETE)
 +                      return;
 +              if (state->txcmd & HAMMER2_MSGF_REPLY)
 +                      cmd |= HAMMER2_MSGF_REPLY;
 +              cmd |= HAMMER2_MSGF_DELETE;
 +      } else {
 +              if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
 +                      cmd |= HAMMER2_MSGF_REPLY;
 +      }
 +
 +      /*
 +       * Allocate the message and associate it with the existing state.
 +       * We cannot pass MSGF_CREATE to msg_alloc() because that may
 +       * allocate new state.  We have our state already.
 +       */
 +      nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
 +      if (state) {
 +              if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
 +                      nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
 +      }
 +      nmsg->any.head.error = error;
 +      nmsg->state = state;
 +      hammer2_msg_write(nmsg);
 +}
 +
 +/*
 + * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
 + * we are generating a streaming reply or an intermediate acknowledgement
 + * of some sort as part of the higher level protocol, with more to come
 + * later.
 + */
 +void
 +hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +      hammer2_state_t *state = msg->state;
 +      hammer2_msg_t *nmsg;
 +      uint32_t cmd;
 +
 +
 +      /*
 +       * Reply with a simple error code and terminate the transaction.
 +       */
 +      cmd = HAMMER2_LNK_ERROR;
 +
 +      /*
 +       * Check if our direction has even been initiated yet, set CREATE.
 +       *
 +       * Check what direction this is (command or reply direction).  Note
 +       * that txcmd might not have been initiated yet.
 +       *
 +       * If our direction has already been closed we just return without
 +       * doing anything.
 +       */
 +      if (state) {
 +              if (state->txcmd & HAMMER2_MSGF_DELETE)
 +                      return;
 +              if (state->txcmd & HAMMER2_MSGF_REPLY)
 +                      cmd |= HAMMER2_MSGF_REPLY;
 +              /* continuing transaction, do not set MSGF_DELETE */
 +      } else {
 +              if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
 +                      cmd |= HAMMER2_MSGF_REPLY;
 +      }
 +
 +      nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
 +      if (state) {
 +              if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
 +                      nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
 +      }
 +      nmsg->any.head.error = error;
 +      nmsg->state = state;
 +      hammer2_msg_write(nmsg);
 +}
 +
 +/*
 + * Terminate a transaction given a state structure by issuing a DELETE.
 + */
 +void
 +hammer2_state_reply(hammer2_state_t *state, uint32_t error)
 +{
 +      hammer2_msg_t *nmsg;
 +      uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
 +
 +      /*
 +       * Nothing to do if we already transmitted a delete
 +       */
 +      if (state->txcmd & HAMMER2_MSGF_DELETE)
 +              return;
 +
 +      /*
 +       * Set REPLY if the other end initiated the command.  Otherwise
 +       * we are the command direction.
 +       */
 +      if (state->txcmd & HAMMER2_MSGF_REPLY)
 +              cmd |= HAMMER2_MSGF_REPLY;
 +
 +      nmsg = hammer2_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL);
 +      if (state) {
 +              if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
 +                      nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
 +      }
 +      nmsg->any.head.error = error;
 +      nmsg->state = state;
 +      hammer2_msg_write(nmsg);
 +}
 +
 +/************************************************************************
 + *                    TRANSACTION STATE HANDLING                      *
 + ************************************************************************
 + *
 + */
 +
 +/*
 + * Process state tracking for a message after reception, prior to
 + * execution.
 + *
 + * Called with msglk held and the msg dequeued.
 + *
 + * All messages are called with dummy state and return actual state.
 + * (One-off messages often just return the same dummy state).
 + *
 + * May request that caller discard the message by setting *discardp to 1.
 + * The returned state is not used in this case and is allowed to be NULL.
 + *
 + * --
 + *
 + * These routines handle persistent and command/reply message state via the
 + * CREATE and DELETE flags.  The first message in a command or reply sequence
 + * sets CREATE, the last message in a command or reply sequence sets DELETE.
 + *
 + * There can be any number of intermediate messages belonging to the same
 + * sequence sent inbetween the CREATE message and the DELETE message,
 + * which set neither flag.  This represents a streaming command or reply.
 + *
 + * Any command message received with CREATE set expects a reply sequence to
 + * be returned.  Reply sequences work the same as command sequences except the
 + * REPLY bit is also sent.  Both the command side and reply side can
 + * degenerate into a single message with both CREATE and DELETE set.  Note
 + * that one side can be streaming and the other side not, or neither, or both.
 + *
 + * The msgid is unique for the initiator.  That is, two sides sending a new
 + * message can use the same msgid without colliding.
 + *
 + * --
 + *
 + * ABORT sequences work by setting the ABORT flag along with normal message
 + * state.  However, ABORTs can also be sent on half-closed messages, that is
 + * even if the command or reply side has already sent a DELETE, as long as
 + * the message has not been fully closed it can still send an ABORT+DELETE
 + * to terminate the half-closed message state.
 + *
 + * Since ABORT+DELETEs can race we silently discard ABORT's for message
 + * state which has already been fully closed.  REPLY+ABORT+DELETEs can
 + * also race, and in this situation the other side might have already
 + * initiated a new unrelated command with the same message id.  Since
 + * the abort has not set the CREATE flag the situation can be detected
 + * and the message will also be discarded.
 + *
 + * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
 + * The ABORT request is essentially integrated into the command instead
 + * of being sent later on.  In this situation the command implementation
 + * detects that CREATE and ABORT are both set (vs ABORT alone) and can
 + * special-case non-blocking operation for the command.
 + *
 + * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
 + *      to be mid-stream aborts for command/reply sequences.  ABORTs on
 + *      one-way messages are not supported.
 + *
 + * NOTE!  If a command sequence does not support aborts the ABORT flag is
 + *      simply ignored.
 + *
 + * --
 + *
 + * One-off messages (no reply expected) are sent with neither CREATE or DELETE
 + * set.  One-off messages cannot be aborted and typically aren't processed
 + * by these routines.  The REPLY bit can be used to distinguish whether a
 + * one-off message is a command or reply.  For example, one-off replies
 + * will typically just contain status updates.
 + */
 +static int
 +hammer2_state_msgrx(hammer2_msg_t *msg)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +      hammer2_state_t *state;
 +      hammer2_state_t dummy;
 +      int error;
 +
 +      /*
 +       * Lock RB tree and locate existing persistent state, if any.
 +       *
 +       * If received msg is a command state is on staterd_tree.
 +       * If received msg is a reply state is on statewr_tree.
 +       */
 +      dummy.msgid = msg->any.head.msgid;
 +      pthread_mutex_lock(&iocom->mtx);
 +      if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
 +              state = RB_FIND(hammer2_state_tree,
 +                              &iocom->router->statewr_tree, &dummy);
 +      } else {
 +              state = RB_FIND(hammer2_state_tree,
 +                              &iocom->router->staterd_tree, &dummy);
 +      }
 +      msg->state = state;
 +      pthread_mutex_unlock(&iocom->mtx);
 +
 +      /*
 +       * Short-cut one-off or mid-stream messages (state may be NULL).
 +       */
 +      if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
 +                                HAMMER2_MSGF_ABORT)) == 0) {
 +              return(0);
 +      }
 +
 +      /*
 +       * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
 +       * inside the case statements.
 +       */
 +      switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
 +                                  HAMMER2_MSGF_REPLY)) {
 +      case HAMMER2_MSGF_CREATE:
 +      case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
 +              /*
 +               * New persistant command received.
 +               */
 +              if (state) {
 +                      fprintf(stderr, "duplicate-trans %s\n",
 +                              hammer2_msg_str(msg));
 +                      error = HAMMER2_IOQ_ERROR_TRANS;
 +                      assert(0);
 +                      break;
 +              }
 +              state = malloc(sizeof(*state));
 +              bzero(state, sizeof(*state));
 +              state->iocom = iocom;
 +              state->flags = HAMMER2_STATE_DYNAMIC;
 +              state->msg = msg;
 +              state->txcmd = HAMMER2_MSGF_REPLY;
 +              state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
 +              state->flags |= HAMMER2_STATE_INSERTED;
 +              state->msgid = msg->any.head.msgid;
 +              state->router = msg->router;
 +              msg->state = state;
 +              pthread_mutex_lock(&iocom->mtx);
 +              RB_INSERT(hammer2_state_tree,
 +                        &iocom->router->staterd_tree, state);
 +              pthread_mutex_unlock(&iocom->mtx);
 +              error = 0;
 +              if (DebugOpt) {
 +                      fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
 +                              state, (uint32_t)state->msgid, iocom);
 +              }
 +              break;
 +      case HAMMER2_MSGF_DELETE:
 +              /*
 +               * Persistent state is expected but might not exist if an
 +               * ABORT+DELETE races the close.
 +               */
 +              if (state == NULL) {
 +                      if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
 +                              error = HAMMER2_IOQ_ERROR_EALREADY;
 +                      } else {
 +                              fprintf(stderr, "missing-state %s\n",
 +                                      hammer2_msg_str(msg));
 +                              error = HAMMER2_IOQ_ERROR_TRANS;
 +                      assert(0);
 +                      }
 +                      break;
 +              }
 +
 +              /*
 +               * Handle another ABORT+DELETE case if the msgid has already
 +               * been reused.
 +               */
 +              if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
 +                      if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
 +                              error = HAMMER2_IOQ_ERROR_EALREADY;
 +                      } else {
 +                              fprintf(stderr, "reused-state %s\n",
 +                                      hammer2_msg_str(msg));
 +                              error = HAMMER2_IOQ_ERROR_TRANS;
 +                      assert(0);
 +                      }
 +                      break;
 +              }
 +              error = 0;
 +              break;
 +      default:
 +              /*
 +               * Check for mid-stream ABORT command received, otherwise
 +               * allow.
 +               */
 +              if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
 +                      if (state == NULL ||
 +                          (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
 +                              error = HAMMER2_IOQ_ERROR_EALREADY;
 +                              break;
 +                      }
 +              }
 +              error = 0;
 +              break;
 +      case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
 +      case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
 +              /*
 +               * When receiving a reply with CREATE set the original
 +               * persistent state message should already exist.
 +               */
 +              if (state == NULL) {
 +                      fprintf(stderr, "no-state(r) %s\n",
 +                              hammer2_msg_str(msg));
 +                      error = HAMMER2_IOQ_ERROR_TRANS;
 +                      assert(0);
 +                      break;
 +              }
 +              assert(((state->rxcmd ^ msg->any.head.cmd) &
 +                      HAMMER2_MSGF_REPLY) == 0);
 +              state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
 +              error = 0;
 +              break;
 +      case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
 +              /*
 +               * Received REPLY+ABORT+DELETE in case where msgid has
 +               * already been fully closed, ignore the message.
 +               */
 +              if (state == NULL) {
 +                      if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
 +                              error = HAMMER2_IOQ_ERROR_EALREADY;
 +                      } else {
 +                              fprintf(stderr, "no-state(r,d) %s\n",
 +                                      hammer2_msg_str(msg));
 +                              error = HAMMER2_IOQ_ERROR_TRANS;
 +                      assert(0);
 +                      }
 +                      break;
 +              }
 +
 +              /*
 +               * Received REPLY+ABORT+DELETE in case where msgid has
 +               * already been reused for an unrelated message,
 +               * ignore the message.
 +               */
 +              if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
 +                      if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
 +                              error = HAMMER2_IOQ_ERROR_EALREADY;
 +                      } else {
 +                              fprintf(stderr, "reused-state(r,d) %s\n",
 +                                      hammer2_msg_str(msg));
 +                              error = HAMMER2_IOQ_ERROR_TRANS;
 +                      assert(0);
 +                      }
 +                      break;
 +              }
 +              error = 0;
 +              break;
 +      case HAMMER2_MSGF_REPLY:
 +              /*
 +               * Check for mid-stream ABORT reply received to sent command.
 +               */
 +              if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
 +                      if (state == NULL ||
 +                          (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
 +                              error = HAMMER2_IOQ_ERROR_EALREADY;
 +                              break;
 +                      }
 +              }
 +              error = 0;
 +              break;
 +      }
 +      return (error);
 +}
 +
 +void
 +hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 +{
 +      hammer2_state_t *state;
 +
 +      if ((state = msg->state) == NULL) {
 +              /*
 +               * Free a non-transactional message, there is no state
 +               * to worry about.
 +               */
 +              hammer2_msg_free(msg);
 +      } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
 +              /*
 +               * Message terminating transaction, destroy the related
 +               * state, the original message, and this message (if it
 +               * isn't the original message due to a CREATE|DELETE).
 +               */
 +              pthread_mutex_lock(&iocom->mtx);
 +              state->rxcmd |= HAMMER2_MSGF_DELETE;
 +              if (state->txcmd & HAMMER2_MSGF_DELETE) {
 +                      if (state->msg == msg)
 +                              state->msg = NULL;
 +                      assert(state->flags & HAMMER2_STATE_INSERTED);
 +                      if (state->rxcmd & HAMMER2_MSGF_REPLY) {
 +                              assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
 +                              RB_REMOVE(hammer2_state_tree,
 +                                        &iocom->router->statewr_tree, state);
 +                      } else {
 +                              assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
 +                              RB_REMOVE(hammer2_state_tree,
 +                                        &iocom->router->staterd_tree, state);
 +                      }
 +                      state->flags &= ~HAMMER2_STATE_INSERTED;
 +                      hammer2_state_free(state);
 +              } else {
 +                      ;
 +              }
 +              pthread_mutex_unlock(&iocom->mtx);
 +              hammer2_msg_free(msg);
 +      } else if (state->msg != msg) {
 +              /*
 +               * Message not terminating transaction, leave state intact
 +               * and free message if it isn't the CREATE message.
 +               */
 +              hammer2_msg_free(msg);
 +      }
 +}
 +
 +static void
 +hammer2_state_cleanuptx(hammer2_msg_t *msg)
 +{
 +      hammer2_iocom_t *iocom = msg->router->iocom;
 +      hammer2_state_t *state;
 +
 +      if ((state = msg->state) == NULL) {
 +              hammer2_msg_free(msg);
 +      } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
 +              pthread_mutex_lock(&iocom->mtx);
 +              state->txcmd |= HAMMER2_MSGF_DELETE;
 +              if (state->rxcmd & HAMMER2_MSGF_DELETE) {
 +                      if (state->msg == msg)
 +                              state->msg = NULL;
 +                      assert(state->flags & HAMMER2_STATE_INSERTED);
 +                      if (state->txcmd & HAMMER2_MSGF_REPLY) {
 +                              assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
 +                              RB_REMOVE(hammer2_state_tree,
 +                                        &iocom->router->staterd_tree, state);
 +                      } else {
 +                              assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
 +                              RB_REMOVE(hammer2_state_tree,
 +                                        &iocom->router->statewr_tree, state);
 +                      }
 +                      state->flags &= ~HAMMER2_STATE_INSERTED;
 +                      hammer2_state_free(state);
 +              } else {
 +                      ;
 +              }
 +              pthread_mutex_unlock(&iocom->mtx);
 +              hammer2_msg_free(msg);
 +      } else if (state->msg != msg) {
 +              hammer2_msg_free(msg);
 +      }
 +}
 +
 +/*
 + * Called with iocom locked
 + */
 +void
 +hammer2_state_free(hammer2_state_t *state)
 +{
 +      hammer2_iocom_t *iocom = state->iocom;
 +      hammer2_msg_t *msg;
 +      char dummy;
 +
 +      if (DebugOpt) {
 +              fprintf(stderr, "terminate state %p id=%08x\n",
 +                      state, (uint32_t)state->msgid);
 +      }
 +      assert(state->any.any == NULL);
 +      msg = state->msg;
 +      state->msg = NULL;
 +      if (msg)
 +              hammer2_msg_free_locked(msg);
 +      free(state);
 +
 +      /*
 +       * When an iocom error is present we are trying to close down the
 +       * iocom, but we have to wait for all states to terminate before
 +       * we can do so.  The iocom rx code will terminate the receive side
 +       * for all transactions by simulating incoming DELETE messages,
 +       * but the state doesn't go away until both sides are terminated.
 +       *
 +       * We may have to wake up the rx code.
 +       */
 +      if (iocom->ioq_rx.error &&
 +          RB_EMPTY(&iocom->router->staterd_tree) &&
 +          RB_EMPTY(&iocom->router->statewr_tree)) {
 +              dummy = 0;
 +              write(iocom->wakeupfds[1], &dummy, 1);
 +      }
 +}
 +
 +/************************************************************************
 + *                            ROUTING                                 *
 + ************************************************************************
 + *
 + * Incoming messages are routed by their spanid, matched up against
 + * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c).
 + * Any replies run through the same router.
 + *
 + * Originated messages are routed by their spanid, matched up against
 + * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c).
 + * Replies come back through the same route.
 + *
 + * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular
 + * transaction runs through the same route.  Commands and replies both.
 + *
 + * An originated message will use a different routing spanid to
 + * reach a target node than a message which originates from that node.
 + * They might use the same physical pipes (each pipe can have multiple
 + * SPANs and RELAYs), but the routes are distinct from the perspective
 + * of the router.
 + */
 +hammer2_router_t *
 +hammer2_router_alloc(void)
 +{
 +      hammer2_router_t *router;
 +
 +      router = hammer2_alloc(sizeof(*router));
 +      TAILQ_INIT(&router->txmsgq);
 +      return (router);
 +}
 +
 +void
 +hammer2_router_connect(hammer2_router_t *router)
 +{
 +      hammer2_router_t *tmp;
 +
 +      assert(router->link || router->relay);
 +      assert((router->flags & HAMMER2_ROUTER_CONNECTED) == 0);
 +
 +      pthread_mutex_lock(&router_mtx);
 +      if (router->link)
 +              tmp = RB_INSERT(hammer2_router_tree, &router_ltree, router);
 +      else
 +              tmp = RB_INSERT(hammer2_router_tree, &router_rtree, router);
 +      assert(tmp == NULL);
 +      router->flags |= HAMMER2_ROUTER_CONNECTED;
 +      pthread_mutex_unlock(&router_mtx);
 +}
 +
 +void
 +hammer2_router_disconnect(hammer2_router_t **routerp)
 +{
 +      hammer2_router_t *router;
 +
 +      router = *routerp;
 +      assert(router->link || router->relay);
 +      assert(router->flags & HAMMER2_ROUTER_CONNECTED);
 +
 +      pthread_mutex_lock(&router_mtx);
 +      if (router->link)
 +              RB_REMOVE(hammer2_router_tree, &router_ltree, router);
 +      else
 +              RB_REMOVE(hammer2_router_tree, &router_rtree, router);
 +      router->flags &= ~HAMMER2_ROUTER_CONNECTED;
 +      *routerp = NULL;
 +      pthread_mutex_unlock(&router_mtx);
 +}
 +
 +#if 0
 +/*
 + * XXX
 + */
 +hammer2_router_t *
 +hammer2_route_msg(hammer2_msg_t *msg)
 +{
 +}
 +#endif
 +
 +/************************************************************************
 + *                            DEBUGGING                               *
 + ************************************************************************/
 +
 +const char *
 +hammer2_basecmd_str(uint32_t cmd)
 +{
 +      static char buf[64];
 +      char protobuf[32];
 +      char cmdbuf[32];
 +      const char *protostr;
 +      const char *cmdstr;
 +
 +      switch(cmd & HAMMER2_MSGF_PROTOS) {
 +      case HAMMER2_MSG_PROTO_LNK:
 +              protostr = "LNK_";
 +              break;
 +      case HAMMER2_MSG_PROTO_DBG:
 +              protostr = "DBG_";
 +              break;
 +      case HAMMER2_MSG_PROTO_DOM:
 +              protostr = "DOM_";
 +              break;
 +      case HAMMER2_MSG_PROTO_CAC:
 +              protostr = "CAC_";
 +              break;
 +      case HAMMER2_MSG_PROTO_QRM:
 +              protostr = "QRM_";
 +              break;
 +      case HAMMER2_MSG_PROTO_BLK:
 +              protostr = "BLK_";
 +              break;
 +      case HAMMER2_MSG_PROTO_VOP:
 +              protostr = "VOP_";
 +              break;
 +      default:
 +              snprintf(protobuf, sizeof(protobuf), "%x_",
 +                      (cmd & HAMMER2_MSGF_PROTOS) >> 20);
 +              protostr = protobuf;
 +              break;
 +      }
 +
 +      switch(cmd & (HAMMER2_MSGF_PROTOS |
 +                    HAMMER2_MSGF_CMDS |
 +                    HAMMER2_MSGF_SIZE)) {
 +      case HAMMER2_LNK_PAD:
 +              cmdstr = "PAD";
 +              break;
 +      case HAMMER2_LNK_PING:
 +              cmdstr = "PING";
 +              break;
 +      case HAMMER2_LNK_AUTH:
 +              cmdstr = "AUTH";
 +              break;
 +      case HAMMER2_LNK_CONN:
 +              cmdstr = "CONN";
 +              break;
 +      case HAMMER2_LNK_SPAN:
 +              cmdstr = "SPAN";
 +              break;
 +      case HAMMER2_LNK_VOLCONF:
 +              cmdstr = "VOLCONF";
 +              break;
 +      case HAMMER2_LNK_ERROR:
 +              if (cmd & HAMMER2_MSGF_DELETE)
 +                      cmdstr = "RETURN";
 +              else
 +                      cmdstr = "RESULT";
 +              break;
 +      case HAMMER2_DBG_SHELL:
 +              cmdstr = "SHELL";
 +              break;
 +      default:
 +              snprintf(cmdbuf, sizeof(cmdbuf),
 +                       "%06x", (cmd & (HAMMER2_MSGF_PROTOS |
 +                                       HAMMER2_MSGF_CMDS |
 +                                       HAMMER2_MSGF_SIZE)));
 +              cmdstr = cmdbuf;
 +              break;
 +      }
 +      snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr);
 +      return (buf);
 +}
 +
 +const char *
 +hammer2_msg_str(hammer2_msg_t *msg)
 +{
 +      hammer2_state_t *state;
 +      static char buf[256];
 +      char errbuf[16];
 +      char statebuf[64];
 +      char flagbuf[64];
 +      const char *statestr;
 +      const char *errstr;
 +      uint32_t basecmd;
 +      int i;
 +
 +      /*
 +       * Parse the state
 +       */
 +      if ((state = msg->state) != NULL) {
 +              basecmd = (state->rxcmd & HAMMER2_MSGF_REPLY) ?
 +                        state->txcmd : state->rxcmd;
 +              snprintf(statebuf, sizeof(statebuf),
 +                       " %s=%s,L=%s%s,R=%s%s",
 +                       ((state->txcmd & HAMMER2_MSGF_REPLY) ?
 +                              "rcvcmd" : "sndcmd"),
 +                       hammer2_basecmd_str(basecmd),
 +                       ((state->txcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
 +                       ((state->txcmd & HAMMER2_MSGF_DELETE) ? "D" : ""),
 +                       ((state->rxcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
 +                       ((state->rxcmd & HAMMER2_MSGF_DELETE) ? "D" : "")
 +              );
 +              statestr = statebuf;
 +      } else {
 +              statestr = "";
 +      }
 +
 +      /*
 +       * Parse the error
 +       */
 +      switch(msg->any.head.error) {
 +      case 0:
 +              errstr = "";
 +              break;
 +      case HAMMER2_IOQ_ERROR_SYNC:
 +              errstr = "err=IOQ:NOSYNC";
 +              break;
 +      case HAMMER2_IOQ_ERROR_EOF:
 +              errstr = "err=IOQ:STREAMEOF";
 +              break;
 +      case HAMMER2_IOQ_ERROR_SOCK:
 +              errstr = "err=IOQ:SOCKERR";
 +              break;
 +      case HAMMER2_IOQ_ERROR_FIELD:
 +              errstr = "err=IOQ:BADFIELD";
 +              break;
 +      case HAMMER2_IOQ_ERROR_HCRC:
 +              errstr = "err=IOQ:BADHCRC";
 +              break;
 +      case HAMMER2_IOQ_ERROR_XCRC:
 +              errstr = "err=IOQ:BADXCRC";
 +              break;
 +      case HAMMER2_IOQ_ERROR_ACRC:
 +              errstr = "err=IOQ:BADACRC";
 +              break;
 +      case HAMMER2_IOQ_ERROR_STATE:
 +              errstr = "err=IOQ:BADSTATE";
 +              break;
 +      case HAMMER2_IOQ_ERROR_NOPEER:
 +              errstr = "err=IOQ:PEERCONFIG";
 +              break;
 +      case HAMMER2_IOQ_ERROR_NORKEY:
 +              errstr = "err=IOQ:BADRKEY";
 +              break;
 +      case HAMMER2_IOQ_ERROR_NOLKEY:
 +              errstr = "err=IOQ:BADLKEY";
 +              break;
 +      case HAMMER2_IOQ_ERROR_KEYXCHGFAIL:
 +              errstr = "err=IOQ:BADKEYXCHG";
 +              break;
 +      case HAMMER2_IOQ_ERROR_KEYFMT:
 +              errstr = "err=IOQ:BADFMT";
 +              break;
 +      case HAMMER2_IOQ_ERROR_BADURANDOM:
 +              errstr = "err=IOQ:BADRANDOM";
 +              break;
 +      case HAMMER2_IOQ_ERROR_MSGSEQ:
 +              errstr = "err=IOQ:BADSEQ";
 +              break;
 +      case HAMMER2_IOQ_ERROR_EALREADY:
 +              errstr = "err=IOQ:DUPMSG";
 +              break;
 +      case HAMMER2_IOQ_ERROR_TRANS:
 +              errstr = "err=IOQ:BADTRANS";
 +              break;
 +      case HAMMER2_IOQ_ERROR_IVWRAP:
 +              errstr = "err=IOQ:IVWRAP";
 +              break;
 +      case HAMMER2_IOQ_ERROR_MACFAIL:
 +              errstr = "err=IOQ:MACFAIL";
 +              break;
 +      case HAMMER2_IOQ_ERROR_ALGO:
 +              errstr = "err=IOQ:ALGOFAIL";
 +              break;
 +      case HAMMER2_MSG_ERR_NOSUPP:
 +              errstr = "err=NOSUPPORT";
 +              break;
 +      default:
 +              snprintf(errbuf, sizeof(errbuf),
 +                       " err=%d", msg->any.head.error);
 +              errstr = errbuf;
 +              break;
 +      }
 +
 +      /*
 +       * Message flags
 +       */
 +      i = 0;
 +      if (msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
 +                               HAMMER2_MSGF_ABORT | HAMMER2_MSGF_REPLY)) {
 +              flagbuf[i++] = '|';
 +              if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
 +                      flagbuf[i++] = 'C';
 +              if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
 +                      flagbuf[i++] = 'D';
 +              if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
 +                      flagbuf[i++] = 'R';
 +              if (msg->any.head.cmd & HAMMER2_MSGF_ABORT)
 +                      flagbuf[i++] = 'A';
 +      }
 +      flagbuf[i] = 0;
 +
 +      /*
 +       * Generate the buf
 +       */
 +      snprintf(buf, sizeof(buf),
 +              "msg=%s%s %s id=%08x src=%08x tgt=%08x %s",
 +               hammer2_basecmd_str(msg->any.head.cmd),
 +               flagbuf,
 +               errstr,
 +               (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
 +               (uint32_t)(intmax_t)msg->any.head.source,  /* for brevity */
 +               (uint32_t)(intmax_t)msg->any.head.target,  /* for brevity */
 +               statestr);
 +
 +      return(buf);
 +}
index 91b7dfe,0000000..cb5afb7
mode 100644,000000..100644
--- /dev/null
@@@ -1,1252 -1,0 +1,1257 @@@
-                       pthread_create(&conf->iocom_thread, NULL,
-                                      master_service,
-                                      (void *)(intptr_t)conf->fd);
 +/*
 + * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
 + *
 + * This code is derived from software contributed to The DragonFly Project
 + * by Matthew Dillon <dillon@dragonflybsd.org>
 + *
 + * Redistribution and use in source and binary forms, with or without
 + * modification, are permitted provided that the following conditions
 + * are met:
 + *
 + * 1. Redistributions of source code must retain the above copyright
 + *    notice, this list of conditions and the following disclaimer.
 + * 2. Redistributions in binary form must reproduce the above copyright
 + *    notice, this list of conditions and the following disclaimer in
 + *    the documentation and/or other materials provided with the
 + *    distribution.
 + * 3. Neither the name of The DragonFly Project nor the names of its
 + *    contributors may be used to endorse or promote products derived
 + *    from this software without specific, prior written permission.
 + *
 + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
 + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
 + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
 + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
 + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
 + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
 + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
 + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
 + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
 + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
 + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 + * SUCH DAMAGE.
 + */
 +/*
 + * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
 + *
 + * This code supports the LNK_SPAN protocol.  Essentially all PFS's
 + * clients and services rendezvous with the userland hammer2 service and
 + * open LNK_SPAN transactions using a message header linkid of 0,
 + * registering any PFS's they have connectivity to with us.
 + *
 + * --
 + *
 + * Each registration maintains its own open LNK_SPAN message transaction.
 + * The SPANs are collected, aggregated, and retransmitted over available
 + * connections through the maintainance of additional LNK_SPAN message
 + * transactions on each link.
 + *
 + * The msgid for each active LNK_SPAN transaction we receive allows us to
 + * send a message to the target PFS (which might be one of many belonging
 + * to the same cluster), by specifying that msgid as the linkid in any
 + * message we send to the target PFS.
 + *
 + * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
 + * (and remember we will maintain multiple open LNK_SPAN transactions on
 + * each connection representing the topology span, so every node sees every
 + * other node as a separate open transaction).  So, similarly the msgid for
 + * these active transactions which we initiated can be used by the other
 + * end to route messages through us to another node, ultimately winding up
 + * at the identified hammer2 PFS.  We have to adjust the spanid in the message
 + * header at each hop to be representative of the outgoing LNK_SPAN we
 + * are forwarding the message through.
 + *
 + * --
 + *
 + * If we were to retransmit every LNK_SPAN transaction we receive it would
 + * create a huge mess, so we have to aggregate all received LNK_SPAN
 + * transactions, sort them by the fsid (the cluster) and sub-sort them by
 + * the pfs_fsid (individual nodes in the cluster), and only retransmit
 + * (create outgoing transactions) for a subset of the nearest distance-hops
 + * for each individual node.
 + *
 + * The higher level protocols can then issue transactions to the nodes making
 + * up a cluster to perform all actions required.
 + *
 + * --
 + *
 + * Since this is a large topology and a spanning tree protocol, links can
 + * go up and down all the time.  Any time a link goes down its transaction
 + * is closed.  The transaction has to be closed on both ends before we can
 + * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
 + * closed may have been propagated out to other connections and those related
 + * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
 + * go away, ultimately reaching all sources and all targets.
 + *
 + * Any messages in-transit using a route that goes away will be thrown away.
 + * Open transactions are only tracked at the two end-points.  When a link
 + * failure propagates to an end-point the related open transactions lose
 + * their spanid and are automatically aborted.
 + *
 + * It is important to note that internal route nodes cannot just associate
 + * a lost LNK_SPAN transaction with another route to the same destination.
 + * Message transactions MUST be serialized and MUST be ordered.  All messages
 + * for a transaction must run over the same route.  So if the route used by
 + * an active transaction is lost, the related messages will be fully aborted
 + * and the higher protocol levels will retry as appropriate.
 + *
 + * FULLY ABORTING A ROUTED MESSAGE is handled via link-failure propagation
 + * back to the originator.  Only the originator keeps tracks of a message.
 + * Routers just pass it through.  If a route is lost during transit the
 + * message is simply thrown away.
 + *
 + * It is also important to note that several paths to the same PFS can be
 + * propagated along the same link, which allows concurrency and even
 + * redundancy over several network interfaces or via different routes through
 + * the topology.  Any given transaction will use only a single route but busy
 + * servers will often have hundreds of transactions active simultaniously,
 + * so having multiple active paths through the network topology for A<->B
 + * will improve performance.
 + *
 + * --
 + *
 + * Most protocols consolidate operations rather than simply relaying them.
 + * This is particularly true of LEAF protocols (such as strict HAMMER2
 + * clients), of which there can be millions connecting into the cluster at
 + * various points.  The SPAN protocol is not used for these LEAF elements.
 + *
 + * Instead the primary service they connect to implements a proxy for the
 + * client protocols so the core topology only has to propagate a couple of
 + * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
 + * core master nodes and satellite slaves and cache nodes.
 + */
 +
 +#include "hammer2.h"
 +
 +/*
 + * Maximum spanning tree distance.  This has the practical effect of
 + * stopping tail-chasing closed loops when a feeder span is lost.
 + */
 +#define HAMMER2_SPAN_MAXDIST  16
 +
 +/*
 + * RED-BLACK TREE DEFINITIONS
 + *
 + * We need to track:
 + *
 + * (1) shared fsid's (a cluster).
 + * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
 + *
 + * We need to aggegate all active LNK_SPANs, aggregate, and create our own
 + * outgoing LNK_SPAN transactions on each of our connections representing
 + * the aggregated state.
 + *
 + * h2span_connect     - list of iocom connections who wish to receive SPAN
 + *                      propagation from other connections.  Might contain
 + *                      a filter string.  Only iocom's with an open
 + *                      LNK_CONN transactions are applicable for SPAN
 + *                      propagation.
 + *
 + * h2span_relay               - List of links relayed (via SPAN).  Essentially
 + *                      each relay structure represents a LNK_SPAN
 + *                      transaction that we initiated, verses h2span_link
 + *                      which is a LNK_SPAN transaction that we received.
 + *
 + * --
 + *
 + * h2span_cluster     - Organizes the shared fsid's.  One structure for
 + *                      each cluster.
 + *
 + * h2span_node                - Organizes the nodes in a cluster.  One structure
 + *                      for each unique {cluster,node}, aka {fsid, pfs_fsid}.
 + *
 + * h2span_link                - Organizes all incoming and outgoing LNK_SPAN message
 + *                      transactions related to a node.
 + *
 + *                      One h2span_link structure for each incoming LNK_SPAN
 + *                      transaction.  Links selected for propagation back
 + *                      out are also where the outgoing LNK_SPAN messages
 + *                      are indexed into (so we can propagate changes).
 + *
 + *                      The h2span_link's use a red-black tree to sort the
 + *                      distance hop metric for the incoming LNK_SPAN.  We
 + *                      then select the top N for outgoing.  When the
 + *                      topology changes the top N may also change and cause
 + *                      new outgoing LNK_SPAN transactions to be opened
 + *                      and less desireable ones to be closed, causing
 + *                      transactional aborts within the message flow in
 + *                      the process.
 + *
 + * Also note          - All outgoing LNK_SPAN message transactions are also
 + *                      entered into a red-black tree for use by the routing
 + *                      function.  This is handled by msg.c in the state
 + *                      code, not here.
 + */
 +
 +struct h2span_link;
 +struct h2span_relay;
 +TAILQ_HEAD(h2span_media_queue, h2span_media);
 +TAILQ_HEAD(h2span_connect_queue, h2span_connect);
 +TAILQ_HEAD(h2span_relay_queue, h2span_relay);
 +
 +RB_HEAD(h2span_cluster_tree, h2span_cluster);
 +RB_HEAD(h2span_node_tree, h2span_node);
 +RB_HEAD(h2span_link_tree, h2span_link);
 +RB_HEAD(h2span_relay_tree, h2span_relay);
 +
 +/*
 + * This represents a media
 + */
 +struct h2span_media {
 +      TAILQ_ENTRY(h2span_media) entry;
 +      uuid_t  mediaid;
 +      int     refs;
 +      struct h2span_media_config {
 +              hammer2_copy_data_t     copy_run;
 +              hammer2_copy_data_t     copy_pend;
 +              pthread_t               thread;
 +              pthread_cond_t          cond;
 +              int                     ctl;
 +              int                     fd;
 +              hammer2_iocom_t         iocom;
 +              pthread_t               iocom_thread;
 +              enum { H2MC_STOPPED, H2MC_CONNECT, H2MC_RUNNING } state;
 +      } config[HAMMER2_COPYID_COUNT];
 +};
 +
 +typedef struct h2span_media_config h2span_media_config_t;
 +
 +#define H2CONFCTL_STOP                0x00000001
 +#define H2CONFCTL_UPDATE      0x00000002
 +
 +/*
 + * Received LNK_CONN transaction enables SPAN protocol over connection.
 + * (may contain filter).  Typically one for each mount and several may
 + * share the same media.
 + */
 +struct h2span_connect {
 +      TAILQ_ENTRY(h2span_connect) entry;
 +      struct h2span_relay_tree tree;
 +      struct h2span_media *media;
 +      hammer2_state_t *state;
 +};
 +
 +/*
 + * All received LNK_SPANs are organized by cluster (pfs_clid),
 + * node (pfs_fsid), and link (received LNK_SPAN transaction).
 + */
 +struct h2span_cluster {
 +      RB_ENTRY(h2span_cluster) rbnode;
 +      struct h2span_node_tree tree;
 +      uuid_t  pfs_clid;               /* shared fsid */
 +      int     refs;                   /* prevents destruction */
 +};
 +
 +struct h2span_node {
 +      RB_ENTRY(h2span_node) rbnode;
 +      struct h2span_link_tree tree;
 +      struct h2span_cluster *cls;
 +      uuid_t  pfs_fsid;               /* unique fsid */
 +      char label[64];
 +};
 +
 +struct h2span_link {
 +      RB_ENTRY(h2span_link) rbnode;
 +      hammer2_state_t *state;         /* state<->link */
 +      struct h2span_node *node;       /* related node */
 +      int32_t dist;
 +      struct h2span_relay_queue relayq; /* relay out */
 +      struct hammer2_router *router;  /* route out this link */
 +};
 +
 +/*
 + * Any LNK_SPAN transactions we receive which are relayed out other
 + * connections utilize this structure to track the LNK_SPAN transaction
 + * we initiate on the other connections, if selected for relay.
 + *
 + * In many respects this is the core of the protocol... actually figuring
 + * out what LNK_SPANs to relay.  The spanid used for relaying is the
 + * address of the 'state' structure, which is why h2span_relay has to
 + * be entered into a RB-TREE based at h2span_connect (so we can look
 + * up the spanid to validate it).
 + *
 + * NOTE: Messages can be received via the LNK_SPAN transaction the
 + *     relay maintains, and can be replied via relay->router, but
 + *     messages are NOT initiated via a relay.  Messages are initiated
 + *     via incoming links (h2span_link's).
 + *
 + *     relay->link represents the link being relayed, NOT the LNK_SPAN
 + *     transaction the relay is holding open.
 + */
 +struct h2span_relay {
 +      RB_ENTRY(h2span_relay) rbnode;  /* from h2span_connect */
 +      TAILQ_ENTRY(h2span_relay) entry; /* from link */
 +      struct h2span_connect *conn;
 +      hammer2_state_t *state;         /* transmitted LNK_SPAN */
 +      struct h2span_link *link;       /* LNK_SPAN being relayed */
 +      struct hammer2_router   *router;/* route out this relay */
 +};
 +
 +
 +typedef struct h2span_media h2span_media_t;
 +typedef struct h2span_connect h2span_connect_t;
 +typedef struct h2span_cluster h2span_cluster_t;
 +typedef struct h2span_node h2span_node_t;
 +typedef struct h2span_link h2span_link_t;
 +typedef struct h2span_relay h2span_relay_t;
 +
 +static
 +int
 +h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
 +{
 +      return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL));
 +}
 +
 +static
 +int
 +h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
 +{
 +      return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL));
 +}
 +
 +/*
 + * Sort/subsort must match h2span_relay_cmp() under any given node
 + * to make the aggregation algorithm easier, so the best links are
 + * in the same sorted order as the best relays.
 + *
 + * NOTE: We cannot use link*->state->msgid because this msgid is created
 + *     by each remote host and thus might wind up being the same.
 + */
 +static
 +int
 +h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
 +{
 +      if (link1->dist < link2->dist)
 +              return(-1);
 +      if (link1->dist > link2->dist)
 +              return(1);
 +#if 1
 +      if ((uintptr_t)link1->state < (uintptr_t)link2->state)
 +              return(-1);
 +      if ((uintptr_t)link1->state > (uintptr_t)link2->state)
 +              return(1);
 +#else
 +      if (link1->state->msgid < link2->state->msgid)
 +              return(-1);
 +      if (link1->state->msgid > link2->state->msgid)
 +              return(1);
 +#endif
 +      return(0);
 +}
 +
 +/*
 + * Relay entries are sorted by node, subsorted by distance and link
 + * address (so we can match up the conn->tree relay topology with
 + * a node's link topology).
 + */
 +static
 +int
 +h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
 +{
 +      h2span_link_t *link1 = relay1->link;
 +      h2span_link_t *link2 = relay2->link;
 +
 +      if ((intptr_t)link1->node < (intptr_t)link2->node)
 +              return(-1);
 +      if ((intptr_t)link1->node > (intptr_t)link2->node)
 +              return(1);
 +      if (link1->dist < link2->dist)
 +              return(-1);
 +      if (link1->dist > link2->dist)
 +              return(1);
 +#if 1
 +      if ((uintptr_t)link1->state < (uintptr_t)link2->state)
 +              return(-1);
 +      if ((uintptr_t)link1->state > (uintptr_t)link2->state)
 +              return(1);
 +#else
 +      if (link1->state->msgid < link2->state->msgid)
 +              return(-1);
 +      if (link1->state->msgid > link2->state->msgid)
 +              return(1);
 +#endif
 +      return(0);
 +}
 +
 +RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
 +           rbnode, h2span_cluster_cmp);
 +RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
 +           rbnode, h2span_node_cmp);
 +RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
 +           rbnode, h2span_link_cmp);
 +RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
 +           rbnode, h2span_relay_cmp);
 +
 +RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
 +           rbnode, h2span_cluster_cmp);
 +RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
 +           rbnode, h2span_node_cmp);
 +RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
 +           rbnode, h2span_link_cmp);
 +RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
 +           rbnode, h2span_relay_cmp);
 +
 +/*
 + * Global mutex protects cluster_tree lookups, connq, mediaq.
 + */
 +static pthread_mutex_t cluster_mtx;
 +static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
 +static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
 +static struct h2span_media_queue mediaq = TAILQ_HEAD_INITIALIZER(mediaq);
 +
 +static void hammer2_lnk_span(hammer2_msg_t *msg);
 +static void hammer2_lnk_conn(hammer2_msg_t *msg);
 +static void hammer2_lnk_relay(hammer2_msg_t *msg);
 +static void hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node);
 +static void hammer2_relay_delete(h2span_relay_t *relay);
 +
 +static void *hammer2_volconf_thread(void *info);
 +static void hammer2_volconf_stop(h2span_media_config_t *conf);
 +static void hammer2_volconf_start(h2span_media_config_t *conf,
 +                              const char *hostname);
 +
 +void
 +hammer2_msg_lnk_signal(hammer2_router_t *router __unused)
 +{
 +      pthread_mutex_lock(&cluster_mtx);
 +      hammer2_relay_scan(NULL, NULL);
 +      pthread_mutex_unlock(&cluster_mtx);
 +}
 +
 +/*
 + * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
 + * one-way and opening-transactions since state->func will be assigned
 + * in all other cases.
 + */
 +void
 +hammer2_msg_lnk(hammer2_msg_t *msg)
 +{
 +      switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
 +      case HAMMER2_LNK_CONN:
 +              hammer2_lnk_conn(msg);
 +              break;
 +      case HAMMER2_LNK_SPAN:
 +              hammer2_lnk_span(msg);
 +              break;
 +      default:
 +              fprintf(stderr,
 +                      "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
 +              hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
 +              /* state invalid after reply */
 +              break;
 +      }
 +}
 +
 +void
 +hammer2_lnk_conn(hammer2_msg_t *msg)
 +{
 +      hammer2_state_t *state = msg->state;
 +      h2span_media_t *media;
 +      h2span_media_config_t *conf;
 +      h2span_connect_t *conn;
 +      h2span_relay_t *relay;
 +      char *alloc = NULL;
 +      int i;
 +
 +      pthread_mutex_lock(&cluster_mtx);
 +
 +      switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
 +      case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
 +      case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
 +              /*
 +               * On transaction start we allocate a new h2span_connect and
 +               * acknowledge the request, leaving the transaction open.
 +               * We then relay priority-selected SPANs.
 +               */
 +              fprintf(stderr, "LNK_CONN(%08x): %s/%s\n",
 +                      (uint32_t)msg->any.head.msgid,
 +                      hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
 +                                          &alloc),
 +                      msg->any.lnk_conn.label);
 +              free(alloc);
 +
 +              conn = hammer2_alloc(sizeof(*conn));
 +
 +              RB_INIT(&conn->tree);
 +              conn->state = state;
 +              state->func = hammer2_lnk_conn;
 +              state->any.conn = conn;
 +              TAILQ_INSERT_TAIL(&connq, conn, entry);
 +
 +              /*
 +               * Set up media
 +               */
 +              TAILQ_FOREACH(media, &mediaq, entry) {
 +                      if (uuid_compare(&msg->any.lnk_conn.mediaid,
 +                                       &media->mediaid, NULL) == 0) {
 +                              break;
 +                      }
 +              }
 +              if (media == NULL) {
 +                      media = hammer2_alloc(sizeof(*media));
 +                      media->mediaid = msg->any.lnk_conn.mediaid;
 +                      TAILQ_INSERT_TAIL(&mediaq, media, entry);
 +              }
 +              conn->media = media;
 +              ++media->refs;
 +
 +              if ((msg->any.head.cmd & HAMMER2_MSGF_DELETE) == 0) {
 +                      hammer2_msg_result(msg, 0);
 +                      hammer2_router_signal(msg->router);
 +                      break;
 +              }
 +              /* FALL THROUGH */
 +      case HAMMER2_LNK_CONN | HAMMER2_MSGF_DELETE:
 +      case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE:
 +deleteconn:
 +              /*
 +               * On transaction terminate we clean out our h2span_connect
 +               * and acknowledge the request, closing the transaction.
 +               */
 +              fprintf(stderr, "LNK_CONN: Terminated\n");
 +              conn = state->any.conn;
 +              assert(conn);
 +
 +              /*
 +               * Clean out the media structure. If refs drops to zero we
 +               * also clean out the media config threads.  These threads
 +               * maintain span connections to other hammer2 service daemons.
 +               */
 +              media = conn->media;
 +              if (--media->refs == 0) {
 +                      fprintf(stderr, "Shutting down media spans\n");
 +                      for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
 +                              conf = &media->config[i];
 +
 +                              if (conf->thread == NULL)
 +                                      continue;
 +                              conf->ctl = H2CONFCTL_STOP;
 +                              pthread_cond_signal(&conf->cond);
 +                      }
 +                      for (i = 0; i < HAMMER2_COPYID_COUNT; ++i) {
 +                              conf = &media->config[i];
 +
 +                              if (conf->thread == NULL)
 +                                      continue;
 +                              pthread_mutex_unlock(&cluster_mtx);
 +                              pthread_join(conf->thread, NULL);
 +                              pthread_mutex_lock(&cluster_mtx);
 +                              conf->thread = NULL;
 +                              pthread_cond_destroy(&conf->cond);
 +                      }
 +                      fprintf(stderr, "Media shutdown complete\n");
 +                      TAILQ_REMOVE(&mediaq, media, entry);
 +                      hammer2_free(media);
 +              }
 +
 +              /*
 +               * Clean out all relays.  This requires terminating each
 +               * relay transaction.
 +               */
 +              while ((relay = RB_ROOT(&conn->tree)) != NULL) {
 +                      hammer2_relay_delete(relay);
 +              }
 +
 +              /*
 +               * Clean out conn
 +               */
 +              conn->media = NULL;
 +              conn->state = NULL;
 +              msg->state->any.conn = NULL;
 +              TAILQ_REMOVE(&connq, conn, entry);
 +              hammer2_free(conn);
 +
 +              hammer2_msg_reply(msg, 0);
 +              /* state invalid after reply */
 +              break;
 +      case HAMMER2_LNK_VOLCONF:
 +              /*
 +               * One-way volume-configuration message is transmitted
 +               * over the open LNK_CONN transaction.
 +               */
 +              fprintf(stderr, "RECEIVED VOLCONF\n");
 +              if (msg->any.lnk_volconf.index < 0 ||
 +                  msg->any.lnk_volconf.index >= HAMMER2_COPYID_COUNT) {
 +                      fprintf(stderr, "VOLCONF: ILLEGAL INDEX %d\n",
 +                              msg->any.lnk_volconf.index);
 +                      break;
 +              }
 +              if (msg->any.lnk_volconf.copy.path[sizeof(msg->any.lnk_volconf.copy.path) - 1] != 0 ||
 +                  msg->any.lnk_volconf.copy.path[0] == 0) {
 +                      fprintf(stderr, "VOLCONF: ILLEGAL PATH %d\n",
 +                              msg->any.lnk_volconf.index);
 +                      break;
 +              }
 +              conn = msg->state->any.conn;
 +              if (conn == NULL) {
 +                      fprintf(stderr, "VOLCONF: LNK_CONN is missing\n");
 +                      break;
 +              }
 +              conf = &conn->media->config[msg->any.lnk_volconf.index];
 +              conf->copy_pend = msg->any.lnk_volconf.copy;
 +              conf->ctl |= H2CONFCTL_UPDATE;
 +              if (conf->thread == NULL) {
 +                      fprintf(stderr, "VOLCONF THREAD STARTED\n");
 +                      pthread_cond_init(&conf->cond, NULL);
 +                      pthread_create(&conf->thread, NULL,
 +                                     hammer2_volconf_thread, (void *)conf);
 +              }
 +              pthread_cond_signal(&conf->cond);
 +              break;
 +      default:
 +              /*
 +               * Failsafe
 +               */
 +              if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
 +                      goto deleteconn;
 +              hammer2_msg_reply(msg, HAMMER2_MSG_ERR_NOSUPP);
 +              break;
 +      }
 +      pthread_mutex_unlock(&cluster_mtx);
 +}
 +
 +void
 +hammer2_lnk_span(hammer2_msg_t *msg)
 +{
 +      hammer2_state_t *state = msg->state;
 +      h2span_cluster_t dummy_cls;
 +      h2span_node_t dummy_node;
 +      h2span_cluster_t *cls;
 +      h2span_node_t *node;
 +      h2span_link_t *slink;
 +      h2span_relay_t *relay;
 +      char *alloc = NULL;
 +
 +      assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
 +
 +      pthread_mutex_lock(&cluster_mtx);
 +
 +      /*
 +       * On transaction start we initialize the tracking infrastructure
 +       */
 +      if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
 +              assert(state->func == NULL);
 +              state->func = hammer2_lnk_span;
 +
 +              msg->any.lnk_span.label[sizeof(msg->any.lnk_span.label)-1] = 0;
 +
 +              /*
 +               * Find the cluster
 +               */
 +              dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
 +              cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
 +              if (cls == NULL) {
 +                      cls = hammer2_alloc(sizeof(*cls));
 +                      cls->pfs_clid = msg->any.lnk_span.pfs_clid;
 +                      RB_INIT(&cls->tree);
 +                      RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
 +              }
 +
 +              /*
 +               * Find the node
 +               */
 +              dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
 +              node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
 +              if (node == NULL) {
 +                      node = hammer2_alloc(sizeof(*node));
 +                      node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
 +                      node->cls = cls;
 +                      RB_INIT(&node->tree);
 +                      RB_INSERT(h2span_node_tree, &cls->tree, node);
 +                      snprintf(node->label, sizeof(node->label),
 +                               "%s", msg->any.lnk_span.label);
 +              }
 +
 +              /*
 +               * Create the link
 +               */
 +              assert(state->any.link == NULL);
 +              slink = hammer2_alloc(sizeof(*slink));
 +              TAILQ_INIT(&slink->relayq);
 +              slink->node = node;
 +              slink->dist = msg->any.lnk_span.dist;
 +              slink->state = state;
 +              state->any.link = slink;
 +
 +              /*
 +               * Embedded router structure in link for message forwarding.
 +               *
 +               * The spanning id for the router is the message id of
 +               * the SPAN link it is embedded in, allowing messages to
 +               * be routed via &slink->router.
 +               */
 +              slink->router = hammer2_router_alloc();
 +              slink->router->iocom = state->iocom;
 +              slink->router->link = slink;
 +              slink->router->target = state->msgid;
 +              hammer2_router_connect(slink->router);
 +
 +              RB_INSERT(h2span_link_tree, &node->tree, slink);
 +
 +              fprintf(stderr, "LNK_SPAN(thr %p): %p %s/%s dist=%d\n",
 +                      msg->router->iocom,
 +                      slink,
 +                      hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
 +                                          &alloc),
 +                      msg->any.lnk_span.label,
 +                      msg->any.lnk_span.dist);
 +              free(alloc);
 +#if 0
 +              hammer2_relay_scan(NULL, node);
 +#endif
 +              hammer2_router_signal(msg->router);
 +      }
 +
 +      /*
 +       * On transaction terminate we remove the tracking infrastructure.
 +       */
 +      if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
 +              slink = state->any.link;
 +              assert(slink != NULL);
 +              node = slink->node;
 +              cls = node->cls;
 +
 +              fprintf(stderr, "LNK_DELE(thr %p): %p %s/%s dist=%d\n",
 +                      msg->router->iocom,
 +                      slink,
 +                      hammer2_uuid_to_str(&cls->pfs_clid, &alloc),
 +                      state->msg->any.lnk_span.label,
 +                      state->msg->any.lnk_span.dist);
 +              free(alloc);
 +
 +              /*
 +               * Remove the router from consideration
 +               */
 +              hammer2_router_disconnect(&slink->router);
 +
 +              /*
 +               * Clean out all relays.  This requires terminating each
 +               * relay transaction.
 +               */
 +              while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
 +                      hammer2_relay_delete(relay);
 +              }
 +
 +              /*
 +               * Clean out the topology
 +               */
 +              RB_REMOVE(h2span_link_tree, &node->tree, slink);
 +              if (RB_EMPTY(&node->tree)) {
 +                      RB_REMOVE(h2span_node_tree, &cls->tree, node);
 +                      if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
 +                              RB_REMOVE(h2span_cluster_tree,
 +                                        &cluster_tree, cls);
 +                              hammer2_free(cls);
 +                      }
 +                      node->cls = NULL;
 +                      hammer2_free(node);
 +                      node = NULL;
 +              }
 +              state->any.link = NULL;
 +              slink->state = NULL;
 +              slink->node = NULL;
 +              hammer2_free(slink);
 +
 +              /*
 +               * We have to terminate the transaction
 +               */
 +              hammer2_state_reply(state, 0);
 +              /* state invalid after reply */
 +
 +              /*
 +               * If the node still exists issue any required updates.  If
 +               * it doesn't then all related relays have already been
 +               * removed and there's nothing left to do.
 +               */
 +#if 0
 +              if (node)
 +                      hammer2_relay_scan(NULL, node);
 +#endif
 +              if (node)
 +                      hammer2_router_signal(msg->router);
 +      }
 +
 +      pthread_mutex_unlock(&cluster_mtx);
 +}
 +
 +/*
 + * Messages received on relay SPANs.  These are open transactions so it is
 + * in fact possible for the other end to close the transaction.
 + *
 + * XXX MPRACE on state structure
 + */
 +static void
 +hammer2_lnk_relay(hammer2_msg_t *msg)
 +{
 +      hammer2_state_t *state = msg->state;
 +      h2span_relay_t *relay;
 +
 +      assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
 +
 +      if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
 +              pthread_mutex_lock(&cluster_mtx);
 +              if ((relay = state->any.relay) != NULL) {
 +                      hammer2_relay_delete(relay);
 +              } else {
 +                      hammer2_state_reply(state, 0);
 +              }
 +              pthread_mutex_unlock(&cluster_mtx);
 +      }
 +}
 +
 +/*
 + * Update relay transactions for SPANs.
 + *
 + * Called with cluster_mtx held.
 + */
 +static void hammer2_relay_scan_specific(h2span_node_t *node,
 +                                      h2span_connect_t *conn);
 +
 +static void
 +hammer2_relay_scan(h2span_connect_t *conn, h2span_node_t *node)
 +{
 +      h2span_cluster_t *cls;
 +
 +      if (node) {
 +              /*
 +               * Iterate specific node
 +               */
 +              TAILQ_FOREACH(conn, &connq, entry)
 +                      hammer2_relay_scan_specific(node, conn);
 +      } else {
 +              /*
 +               * Full iteration.
 +               *
 +               * Iterate cluster ids, nodes, and either a specific connection
 +               * or all connections.
 +               */
 +              RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
 +                      /*
 +                       * Iterate node ids
 +                       */
 +                      RB_FOREACH(node, h2span_node_tree, &cls->tree) {
 +                              /*
 +                               * Synchronize the node's link (received SPANs)
 +                               * with each connection's relays.
 +                               */
 +                              if (conn) {
 +                                      hammer2_relay_scan_specific(node, conn);
 +                              } else {
 +                                      TAILQ_FOREACH(conn, &connq, entry) {
 +                                          hammer2_relay_scan_specific(node,
 +                                                                      conn);
 +                                      }
 +                                      assert(conn == NULL);
 +                              }
 +                      }
 +              }
 +      }
 +}
 +
 +/*
 + * Update the relay'd SPANs for this (node, conn).
 + *
 + * Iterate links and adjust relays to match.  We only propagate the top link
 + * for now (XXX we want to propagate the top two).
 + *
 + * The hammer2_relay_scan_cmp() function locates the first relay element
 + * for any given node.  The relay elements will be sub-sorted by dist.
 + */
 +struct relay_scan_info {
 +      h2span_node_t *node;
 +      h2span_relay_t *relay;
 +};
 +
 +static int
 +hammer2_relay_scan_cmp(h2span_relay_t *relay, void *arg)
 +{
 +      struct relay_scan_info *info = arg;
 +
 +      if ((intptr_t)relay->link->node < (intptr_t)info->node)
 +              return(-1);
 +      if ((intptr_t)relay->link->node > (intptr_t)info->node)
 +              return(1);
 +      return(0);
 +}
 +
 +static int
 +hammer2_relay_scan_callback(h2span_relay_t *relay, void *arg)
 +{
 +      struct relay_scan_info *info = arg;
 +
 +      info->relay = relay;
 +      return(-1);
 +}
 +
 +static void
 +hammer2_relay_scan_specific(h2span_node_t *node, h2span_connect_t *conn)
 +{
 +      struct relay_scan_info info;
 +      h2span_relay_t *relay;
 +      h2span_relay_t *next_relay;
 +      h2span_link_t *slink;
 +      int count = 2;
 +
 +      info.node = node;
 +      info.relay = NULL;
 +
 +      /*
 +       * Locate the first related relay for the node on this connection.
 +       * relay will be NULL if there were none.
 +       */
 +      RB_SCAN(h2span_relay_tree, &conn->tree,
 +              hammer2_relay_scan_cmp, hammer2_relay_scan_callback, &info);
 +      relay = info.relay;
 +      info.relay = NULL;
 +      if (relay)
 +              assert(relay->link->node == node);
 +
 +      if (DebugOpt > 8)
 +              fprintf(stderr, "relay scan for connection %p\n", conn);
 +
 +      /*
 +       * Iterate the node's links (received SPANs) in distance order,
 +       * lowest (best) dist first.
 +       */
 +      /* fprintf(stderr, "LOOP\n"); */
 +      RB_FOREACH(slink, h2span_link_tree, &node->tree) {
 +              /*
 +              fprintf(stderr, "SLINK %p RELAY %p(%p)\n",
 +                      slink, relay, relay ? relay->link : NULL);
 +              */
 +              /*
 +               * PROPAGATE THE BEST LINKS OVER THE SPECIFIED CONNECTION.
 +               *
 +               * Track relays while iterating the best links and construct
 +               * missing relays when necessary.
 +               *
 +               * (If some prior better link was removed it would have also
 +               *  removed the relay, so the relay can only match exactly or
 +               *  be worse).
 +               */
 +              if (relay && relay->link == slink) {
 +                      /*
 +                       * Match, relay already in-place, get the next
 +                       * relay to match against the next slink.
 +                       */
 +                      relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
 +                      if (--count == 0)
 +                              break;
 +              } else if (slink->dist > HAMMER2_SPAN_MAXDIST) {
 +                      /*
 +                       * No match but span distance is too great,
 +                       * do not relay.  This prevents endless closed
 +                       * loops with ever-incrementing distances when
 +                       * the seed span is lost in the graph.
 +                       *
 +                       * All later spans will also be too far away so
 +                       * we can break out of the loop.
 +                       */
 +                      break;
 +              } else if (slink->state->iocom == conn->state->iocom) {
 +                      /*
 +                       * No match but we would transmit a LNK_SPAN
 +                       * out the same connection it came in on, which
 +                       * can be trivially optimized out.
 +                       */
 +                      break;
 +              } else {
 +                      /*
 +                       * No match, distance is ok, construct a new relay.
 +                       * (slink is better than relay).
 +                       */
 +                      hammer2_msg_t *msg;
 +
 +                      assert(relay == NULL ||
 +                             relay->link->node != slink->node ||
 +                             relay->link->dist >= slink->dist);
 +                      relay = hammer2_alloc(sizeof(*relay));
 +                      relay->conn = conn;
 +                      relay->link = slink;
 +
 +                      msg = hammer2_msg_alloc(conn->state->iocom->router, 0,
 +                                              HAMMER2_LNK_SPAN |
 +                                              HAMMER2_MSGF_CREATE,
 +                                              hammer2_lnk_relay, relay);
 +                      relay->state = msg->state;
 +                      relay->router = hammer2_router_alloc();
 +                      relay->router->iocom = relay->state->iocom;
 +                      relay->router->relay = relay;
 +                      relay->router->target = relay->state->msgid;
 +
 +                      msg->any.lnk_span = slink->state->msg->any.lnk_span;
 +                      msg->any.lnk_span.dist = slink->dist + 1;
 +
 +                      hammer2_router_connect(relay->router);
 +
 +                      RB_INSERT(h2span_relay_tree, &conn->tree, relay);
 +                      TAILQ_INSERT_TAIL(&slink->relayq, relay, entry);
 +
 +                      hammer2_msg_write(msg);
 +
 +                      fprintf(stderr,
 +                              "RELAY SPAN %p RELAY %p ON CLS=%p NODE=%p DIST=%d "
 +                              "FD %d state %p\n",
 +                              slink,
 +                              relay,
 +                              node->cls, node, slink->dist,
 +                              conn->state->iocom->sock_fd, relay->state);
 +
 +                      /*
 +                       * Match (created new relay), get the next relay to
 +                       * match against the next slink.
 +                       */
 +                      relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
 +                      if (--count == 0)
 +                              break;
 +              }
 +      }
 +
 +      /*
 +       * Any remaining relay's belonging to this connection which match
 +       * the node are in excess of the current aggregate spanning state
 +       * and should be removed.
 +       */
 +      while (relay && relay->link->node == node) {
 +              next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
 +              hammer2_relay_delete(relay);
 +              relay = next_relay;
 +      }
 +}
 +
 +static
 +void
 +hammer2_relay_delete(h2span_relay_t *relay)
 +{
 +      fprintf(stderr,
 +              "RELAY DELETE %p RELAY %p ON CLS=%p NODE=%p DIST=%d FD %d STATE %p\n",
 +              relay->link,
 +              relay,
 +              relay->link->node->cls, relay->link->node,
 +              relay->link->dist,
 +              relay->conn->state->iocom->sock_fd, relay->state);
 +
 +      hammer2_router_disconnect(&relay->router);
 +
 +      RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
 +      TAILQ_REMOVE(&relay->link->relayq, relay, entry);
 +
 +      if (relay->state) {
 +              relay->state->any.relay = NULL;
 +              hammer2_state_reply(relay->state, 0);
 +              /* state invalid after reply */
 +              relay->state = NULL;
 +      }
 +      relay->conn = NULL;
 +      relay->link = NULL;
 +      hammer2_free(relay);
 +}
 +
 +static void *
 +hammer2_volconf_thread(void *info)
 +{
 +      h2span_media_config_t *conf = info;
 +
 +      pthread_mutex_lock(&cluster_mtx);
 +      while ((conf->ctl & H2CONFCTL_STOP) == 0) {
 +              if (conf->ctl & H2CONFCTL_UPDATE) {
 +                      fprintf(stderr, "VOLCONF UPDATE\n");
 +                      conf->ctl &= ~H2CONFCTL_UPDATE;
 +                      if (bcmp(&conf->copy_run, &conf->copy_pend,
 +                               sizeof(conf->copy_run)) == 0) {
 +                              fprintf(stderr, "VOLCONF: no changes\n");
 +                              continue;
 +                      }
 +                      /*
 +                       * XXX TODO - auto reconnect on lookup failure or
 +                       *              connect failure or stream failure.
 +                       */
 +
 +                      pthread_mutex_unlock(&cluster_mtx);
 +                      hammer2_volconf_stop(conf);
 +                      conf->copy_run = conf->copy_pend;
 +                      if (conf->copy_run.copyid != 0 &&
 +                          strncmp(conf->copy_run.path, "span:", 5) == 0) {
 +                              hammer2_volconf_start(conf,
 +                                                    conf->copy_run.path + 5);
 +                      }
 +                      pthread_mutex_lock(&cluster_mtx);
 +                      fprintf(stderr, "VOLCONF UPDATE DONE state %d\n", conf->state);
 +              }
 +              if (conf->state == H2MC_CONNECT) {
 +                      hammer2_volconf_start(conf, conf->copy_run.path + 5);
 +                      pthread_mutex_unlock(&cluster_mtx);
 +                      sleep(5);
 +                      pthread_mutex_lock(&cluster_mtx);
 +              } else {
 +                      pthread_cond_wait(&conf->cond, &cluster_mtx);
 +              }
 +      }
 +      pthread_mutex_unlock(&cluster_mtx);
 +      hammer2_volconf_stop(conf);
 +      return(NULL);
 +}
 +
 +static
 +void
 +hammer2_volconf_stop(h2span_media_config_t *conf)
 +{
 +      switch(conf->state) {
 +      case H2MC_STOPPED:
 +              break;
 +      case H2MC_CONNECT:
 +              conf->state = H2MC_STOPPED;
 +              break;
 +      case H2MC_RUNNING:
 +              shutdown(conf->fd, SHUT_WR);
 +              pthread_join(conf->iocom_thread, NULL);
 +              conf->iocom_thread = NULL;
 +              break;
 +      }
 +}
 +
 +static
 +void
 +hammer2_volconf_start(h2span_media_config_t *conf, const char *hostname)
 +{
++      hammer2_master_service_info_t *info;
++
 +      switch(conf->state) {
 +      case H2MC_STOPPED:
 +      case H2MC_CONNECT:
 +              conf->fd = hammer2_connect(hostname);
 +              if (conf->fd < 0) {
 +                      fprintf(stderr, "Unable to connect to %s\n", hostname);
 +                      conf->state = H2MC_CONNECT;
 +              } else {
++                      info = malloc(sizeof(*info));
++                      bzero(info, sizeof(*info));
++                      info->fd = conf->fd;
++                      info->detachme = 0;
 +                      conf->state = H2MC_RUNNING;
++                      pthread_create(&conf->iocom_thread, NULL,
++                                     master_service, info);
 +              }
 +              break;
 +      case H2MC_RUNNING:
 +              break;
 +      }
 +}
 +
 +/************************************************************************
 + *                    ROUTER AND MESSAGING HANDLES                    *
 + ************************************************************************
 + *
 + * Basically the idea here is to provide a stable data structure which
 + * can be localized to the caller for higher level protocols to work with.
 + * Depends on the context, these hammer2_handle's can be pooled by use-case
 + * and remain persistent through a client (or mount point's) life.
 + */
 +
 +#if 0
 +/*
 + * Obtain a stable handle on a cluster given its uuid.  This ties directly
 + * into the global cluster topology, creating the structure if necessary
 + * (even if the uuid does not exist or does not exist yet), and preventing
 + * the structure from getting ripped out from under us while we hold a
 + * pointer to it.
 + */
 +h2span_cluster_t *
 +hammer2_cluster_get(uuid_t *pfs_clid)
 +{
 +      h2span_cluster_t dummy_cls;
 +      h2span_cluster_t *cls;
 +
 +      dummy_cls.pfs_clid = *pfs_clid;
 +      pthread_mutex_lock(&cluster_mtx);
 +      cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
 +      if (cls)
 +              ++cls->refs;
 +      pthread_mutex_unlock(&cluster_mtx);
 +      return (cls);
 +}
 +
 +void
 +hammer2_cluster_put(h2span_cluster_t *cls)
 +{
 +      pthread_mutex_lock(&cluster_mtx);
 +      assert(cls->refs > 0);
 +      --cls->refs;
 +      if (RB_EMPTY(&cls->tree) && cls->refs == 0) {
 +              RB_REMOVE(h2span_cluster_tree,
 +                        &cluster_tree, cls);
 +              hammer2_free(cls);
 +      }
 +      pthread_mutex_unlock(&cluster_mtx);
 +}
 +
 +/*
 + * Obtain a stable handle to a specific cluster node given its uuid.
 + * This handle does NOT lock in the route to the node and is typically
 + * used as part of the hammer2_handle_*() API to obtain a set of
 + * stable nodes.
 + */
 +h2span_node_t *
 +hammer2_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 +{
 +}
 +
 +#endif
 +
 +#if 0
 +/*
 + * Acquire a persistent router structure given the cluster and node ids.
 + * Messages can be transacted via this structure while held.  If the route
 + * is lost messages will return failure.
 + */
 +hammer2_router_t *
 +hammer2_router_get(uuid_t *pfs_clid, uuid_t *pfs_fsid)
 +{
 +}
 +
 +/*
 + * Release previously acquired router.
 + */
 +void
 +hammer2_router_put(hammer2_router_t *router)
 +{
 +}
 +#endif
 +
 +/************************************************************************
 + *                            DEBUGGER                                *
 + ************************************************************************/
 +/*
 + * Dumps the spanning tree
 + */
 +void
 +shell_tree(hammer2_router_t *router, char *cmdbuf __unused)
 +{
 +      h2span_cluster_t *cls;
 +      h2span_node_t *node;
 +      h2span_link_t *slink;
 +      char *uustr = NULL;
 +
 +      pthread_mutex_lock(&cluster_mtx);
 +      RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
 +              router_printf(router, "Cluster %s\n",
 +                           hammer2_uuid_to_str(&cls->pfs_clid, &uustr));
 +              RB_FOREACH(node, h2span_node_tree, &cls->tree) {
 +                      router_printf(router, "    Node %s (%s)\n",
 +                               hammer2_uuid_to_str(&node->pfs_fsid, &uustr),
 +                               node->label);
 +                      RB_FOREACH(slink, h2span_link_tree, &node->tree) {
 +                              router_printf(router, "\tLink dist=%d via %d\n",
 +                                           slink->dist,
 +                                           slink->state->iocom->sock_fd);
 +                      }
 +              }
 +      }
 +      pthread_mutex_unlock(&cluster_mtx);
 +      if (uustr)
 +              free(uustr);
 +#if 0
 +      TAILQ_FOREACH(conn, &connq, entry) {
 +      }
 +#endif
 +}