hammer2 - Flesh out span code, API cleanups
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 4 Aug 2012 22:11:40 +0000 (15:11 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sat, 4 Aug 2012 22:11:40 +0000 (15:11 -0700)
* Cleanup the transactional APIs and add a few functions to help with
  simple (error code only) message replies.

* Better message protocol layering for both the kernel and userland code.

* Kernel now opens a LNK_CONN transaction which will enable the SPAN
  protocol on the link and also serve to install a PFS filter (which is
  not yet implemented).

  Upon success the kernel then initiates the SPAN.

  Basically for the kernel:

send LNK_CONN
wait for streaming reply (transaction remains open on both sides)
send LNK_SPAN

TODO: Receive/track LNK_SPANs, each representing a virtual circuit.

TODO: Track LNK_SPANs that match our PFS.

TODO: Issue higher level protocol transaction messages over these
      circuits based on VNOPS, caching, mirroring, etc.
      (transactional failures can occur when the LNK_SPAN state
      changes, forcing a retry, etc).

* Userland now accepts the LNK_CONN and uses the open transaction to
  install tracking structures for those connections participating in
  the SPAN protocol.

* Userland now installs tracking structures for received SPAN messages.

* Start fleshing out the userland side of the SPAN relay/transmit code.
  This will involve yet more structures to track which SPANs are being
  relayed over each connection, so changes can be propagated (not yet
  implemented).

  For userland the TODO is very large so no point iterating it here.

* Kernel now accepts DBG_SHELL replies (basically debug output messages)
  and will kprintf() them.  DBG_SHELL commands not yet accepted by the
  kernel.

20 files changed:
sbin/hammer2/Makefile
sbin/hammer2/cmd_debug.c
sbin/hammer2/cmd_pfs.c
sbin/hammer2/cmd_service.c
sbin/hammer2/hammer2.h
sbin/hammer2/msg.c
sbin/hammer2/msg_lnk.c [new file with mode: 0644]
sbin/hammer2/network.h
sbin/hammer2/subs.c
sbin/newfs_hammer2/newfs_hammer2.c
sys/vfs/hammer2/TODO
sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_disk.h
sys/vfs/hammer2/hammer2_inode.c
sys/vfs/hammer2/hammer2_ioctl.c
sys/vfs/hammer2/hammer2_ioctl.h
sys/vfs/hammer2/hammer2_msg.c
sys/vfs/hammer2/hammer2_msgops.c
sys/vfs/hammer2/hammer2_network.h
sys/vfs/hammer2/hammer2_vfsops.c

index f6bd7bf..3de282d 100644 (file)
@@ -1,8 +1,9 @@
 PROG=  hammer2
-SRCS=  main.c subs.c icrc.c msg.c crypto.c
+SRCS=  main.c subs.c icrc.c crypto.c
 SRCS+= cmd_remote.c cmd_snapshot.c cmd_pfs.c
 SRCS+= cmd_service.c cmd_leaf.c cmd_debug.c
 SRCS+= cmd_rsa.c cmd_stat.c
+SRCS+= msg.c msg_lnk.c
 #MAN=  hammer2.8
 NOMAN= TRUE
 DEBUG_FLAGS=-g
index 6f93281..e2169e3 100644 (file)
@@ -98,7 +98,7 @@ cmd_shell(const char *hostname)
        printf("debug: connected\n");
 
        msg = hammer2_msg_alloc(&iocom, 0, HAMMER2_DBG_SHELL);
-       hammer2_ioq_write(&iocom, msg);
+       hammer2_msg_write(&iocom, msg, NULL, NULL);
 
        hammer2_iocom_core(&iocom, shell_recv, shell_send, shell_tty);
        fprintf(stderr, "debug: disconnected\n");
@@ -186,7 +186,7 @@ shell_tty(hammer2_iocom_t *iocom)
                ++len;
                msg = hammer2_msg_alloc(iocom, len, HAMMER2_DBG_SHELL);
                bcopy(buf, msg->aux_data, len);
-               hammer2_ioq_write(iocom, msg);
+               hammer2_msg_write(iocom, msg, NULL, NULL);
        } else {
                /*
                 * Set EOF flag without setting any error code for normal
@@ -202,24 +202,32 @@ shell_tty(hammer2_iocom_t *iocom)
  * then finish up by outputting another prompt.
  */
 void
-hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
-       if (msg->aux_data)
-               msg->aux_data[msg->aux_size - 1] = 0;
-       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+       switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
+       case HAMMER2_DBG_SHELL:
+               /*
+                * This is a command which we must process.
+                * When we are finished we generate a final reply.
+                */
+               if (msg->aux_data)
+                       msg->aux_data[msg->aux_size - 1] = 0;
+               hammer2_shell_parse(iocom, msg);
+               hammer2_msg_reply(iocom, msg, 0);
+               break;
+       case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
                /*
                 * A reply just prints out the string.  No newline is added
                 * (it is expected to be embedded if desired).
                 */
                if (msg->aux_data)
+                       msg->aux_data[msg->aux_size - 1] = 0;
+               if (msg->aux_data)
                        write(2, msg->aux_data, strlen(msg->aux_data));
-       } else {
-               /*
-                * Otherwise this is a command which we must process.
-                * When we are finished we generate a final reply.
-                */
-               hammer2_shell_parse(iocom, msg);
-               hammer2_msg_reply(iocom, msg, 0);
+               break;
+       default:
+               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
+               break;
        }
 }
 
@@ -266,7 +274,7 @@ iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...)
                                             HAMMER2_MSGF_REPLY);
        bcopy(buf, rmsg->aux_data, len);
 
-       hammer2_ioq_write(iocom, rmsg);
+       hammer2_msg_write(iocom, rmsg, NULL, NULL);
 }
 
 /************************************************************************
@@ -425,8 +433,8 @@ show_bref(int fd, int tab, int bi, hammer2_blockref_t *bref)
                                  hammer2_pfstype_to_str(media.ipdata.pfs_type));
                        tabprintf(tab, "pfs_inum 0x%016jx\n",
                                  (uintmax_t)media.ipdata.pfs_inum);
-                       tabprintf(tab, "pfs_id   %s\n",
-                                 hammer2_uuid_to_str(&media.ipdata.pfs_id,
+                       tabprintf(tab, "pfs_clid %s\n",
+                                 hammer2_uuid_to_str(&media.ipdata.pfs_clid,
                                                      &str));
                        tabprintf(tab, "pfs_fsid %s\n",
                                  hammer2_uuid_to_str(&media.ipdata.pfs_fsid,
index 46df7cf..60a59c2 100644 (file)
@@ -89,7 +89,7 @@ cmd_pfs_list(const char *sel_path)
                        printf("%02x          ", pfs.pfs_type);
                        break;
                }
-               uuid_to_string(&pfs.pfs_id, &pfs_id_str, &status);
+               uuid_to_string(&pfs.pfs_clid, &pfs_id_str, &status);
                printf("%s ", pfs_id_str);
                free(pfs_id_str);
                pfs_id_str = NULL;
@@ -123,9 +123,9 @@ cmd_pfs_create(const char *sel_path, const char *name,
        snprintf(pfs.name, sizeof(pfs.name), "%s", name);
        pfs.pfs_type = pfs_type;
        if (uuid_str) {
-               uuid_from_string(uuid_str, &pfs.pfs_id, &status);
+               uuid_from_string(uuid_str, &pfs.pfs_clid, &status);
        } else {
-               uuid_create(&pfs.pfs_id, &status);
+               uuid_create(&pfs.pfs_clid, &status);
        }
        if (status == uuid_s_ok)
                uuid_create(&pfs.pfs_fsid, &status);
index ee17c67..9e708a7 100644 (file)
@@ -42,8 +42,6 @@ static void master_auth_tx(hammer2_iocom_t *iocom);
 static void master_link_rx(hammer2_iocom_t *iocom);
 static void master_link_tx(hammer2_iocom_t *iocom);
 
-static void hammer2_lnk_span(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-
 /*
  * Start-up the master listener daemon for the machine.
  *
@@ -215,23 +213,23 @@ void
 master_link_rx(hammer2_iocom_t *iocom)
 {
        hammer2_msg_t *msg;
+       hammer2_state_t *state;
        uint32_t cmd;
 
        while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
               (msg = hammer2_ioq_read(iocom)) != NULL) {
                /*
-                * Switch on the transactional cmd, that is the original
-                * msg->any.head.cmd that opened the transaction.  The actual
-                * msg might be different.  The original msg cannot have
-                * REPLY set by definition (but of course the currenet msg
-                * might), so we don't bother with case statements for REPLY
-                * for command sequences we expet to be transactional.
+                * If the message state has a function established we just
+                * call the function, otherwise we call the appropriate
+                * link-level protocol related to the original command and
+                * let it sort it out.
                 *
                 * Non-transactional one-off messages, on the otherhand,
                 * might have REPLY set.
                 */
-               if (msg->state) {
-                       cmd = msg->state->msg->any.head.cmd;
+               state = msg->state;
+               if (state) {
+                       cmd = state->msg->any.head.cmd;
                        fprintf(stderr,
                                "MSGRX persist=%08x cmd=%08x error %d\n",
                                cmd, msg->any.head.cmd, msg->any.head.error);
@@ -241,30 +239,21 @@ master_link_rx(hammer2_iocom_t *iocom)
                                "MSGRX persist=-------- cmd=%08x error %d\n",
                                cmd, msg->any.head.error);
                }
-
-               switch(cmd & HAMMER2_MSGF_CMDSWMASK) {
-               case HAMMER2_LNK_ERROR:
-                       /*
-                        * A non-transactional error is formulated when
-                        * the socket or pipe disconnects.  Ignore it.
-                        */
-                       break;
-               case HAMMER2_LNK_SPAN:
-                       /*
-                        * Messages related to the LNK_SPAN transaction.
-                        */
-                       hammer2_lnk_span(iocom, msg);
-                       break;
-               case HAMMER2_DBG_SHELL:
-               case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
-                       /*
-                        * Non-transactional DBG messages.
-                        */
-                       hammer2_shell_remote(iocom, msg);
-                       break;
-               default:
-                       hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
-                       break;
+               if (state && state->func) {
+                       state->func(state, msg);
+               } else {
+                       switch(cmd & HAMMER2_MSGF_PROTOS) {
+                       case HAMMER2_MSG_PROTO_LNK:
+                               hammer2_msg_lnk(iocom, msg);
+                               break;
+                       case HAMMER2_MSG_PROTO_DBG:
+                               hammer2_msg_dbg(iocom, msg);
+                               break;
+                       default:
+                               hammer2_msg_reply(iocom, msg,
+                                                 HAMMER2_MSG_ERR_UNKNOWN);
+                               break;
+                       }
                }
                hammer2_state_cleanuprx(iocom, msg);
        }
@@ -285,36 +274,3 @@ master_link_tx(hammer2_iocom_t *iocom)
 {
        hammer2_iocom_flush(iocom);
 }
-
-/*
- * Receive a message which is part of a LNK_SPAN transaction.  Keep in
- * mind that only the original CREATE is utilizing the lnk_span message
- * header.
- *
- * We will get called for CREATE, DELETE, and intermediate states (including
- * errors), and in particular we will get called with an error if the link
- * is lost in the middle of the transaction.
- */
-static
-void
-hammer2_lnk_span(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg)
-{
-       char *alloc = NULL;
-
-       switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
-       case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
-               fprintf(stderr,
-                       "LNK_SPAN: %s/%s\n",
-                       hammer2_uuid_to_str(&msg->any.lnk_span.pfs_id, &alloc),
-                       msg->any.lnk_span.label);
-               free(alloc);
-               break;
-       case HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE:
-               fprintf(stderr, "LNK_SPAN: Terminated with error\n");
-               break;
-       default:
-               fprintf(stderr,
-                       "LNK_SPAN: Unknown msg %08x\n", msg->any.head.cmd);
-               break;
-       }
-}
index dd3e0cc..3e21588 100644 (file)
 #define HAMMER2_DEFAULT_DIR    "/etc/hammer2"
 #define HAMMER2_PATH_REMOTE    HAMMER2_DEFAULT_DIR "/remote"
 
+struct hammer2_idmap {
+       struct hammer2_idmap *next;
+       uint32_t ran_beg;       /* inclusive */
+       uint32_t ran_end;       /* inclusive */
+};
+
+typedef struct hammer2_idmap hammer2_idmap_t;
+
 extern int DebugOpt;
 extern int VerboseOpt;
 extern int QuietOpt;
 extern int NormalExit;
 
+/*
+ * Hammer2 command APIs
+ */
 int hammer2_ioctl_handle(const char *sel_path);
 void hammer2_demon(void *(*func)(void *), void *arg);
-void hammer2_bswap_head(hammer2_msg_hdr_t *head);
 
 int cmd_remote_connect(const char *sel_path, const char *url);
 int cmd_remote_disconnect(const char *sel_path, const char *url);
@@ -106,6 +116,10 @@ int cmd_rsainit(const char *dir_path);
 int cmd_rsaenc(const char **keys, int nkeys);
 int cmd_rsadec(const char **keys, int nkeys);
 
+/*
+ * Msg support functions
+ */
+void hammer2_bswap_head(hammer2_msg_hdr_t *head);
 void hammer2_ioq_init(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd);
@@ -113,7 +127,11 @@ void hammer2_iocom_done(hammer2_iocom_t *iocom);
 hammer2_msg_t *hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size,
                        uint32_t cmd);
 void hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
-                       uint16_t error);
+                       uint32_t error);
+void hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+                       uint32_t error);
+void hammer2_state_reply(hammer2_state_t *state, uint32_t error);
+
 void hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 void hammer2_iocom_core(hammer2_iocom_t *iocom,
@@ -121,7 +139,9 @@ void hammer2_iocom_core(hammer2_iocom_t *iocom,
                        void (*iocom_sendmsg)(hammer2_iocom_t *),
                        void (*iocom_altmsg)(hammer2_iocom_t *));
 hammer2_msg_t *hammer2_ioq_read(hammer2_iocom_t *iocom);
-void hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+                       void (*func)(hammer2_state_t *, hammer2_msg_t *),
+                       void *data);
 
 void hammer2_iocom_drain(hammer2_iocom_t *iocom);
 void hammer2_iocom_flush(hammer2_iocom_t *iocom);
@@ -129,6 +149,15 @@ void hammer2_iocom_flush(hammer2_iocom_t *iocom);
 void hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 
+/*
+ * Msg protocol functions
+ */
+void hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_dbg(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+
+/*
+ * Crypto functions
+ */
 void hammer2_crypto_negotiate(hammer2_iocom_t *iocom);
 void hammer2_crypto_decrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq);
 void hammer2_crypto_decrypt_aux(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
@@ -138,11 +167,16 @@ int hammer2_crypto_encrypt(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
 void hammer2_crypto_encrypt_wrote(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq,
                        int nact);
 
+/*
+ * Misc functions
+ */
 const char *hammer2_time64_to_str(uint64_t htime64, char **strp);
 const char *hammer2_uuid_to_str(uuid_t *uuid, char **strp);
 const char *hammer2_iptype_to_str(uint8_t type);
 const char *hammer2_pfstype_to_str(uint8_t type);
 const char *sizetostr(hammer2_off_t size);
 
-void hammer2_shell_remote(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
+void hammer2_msg_debug(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 void iocom_printf(hammer2_iocom_t *iocom, uint32_t cmd, const char *ctl, ...);
+void *hammer2_alloc(size_t bytes);
+void hammer2_free(void *ptr);
index 91ef036..53cec34 100644 (file)
@@ -36,7 +36,6 @@
 #include "hammer2.h"
 
 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
-static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
 
 /*
@@ -77,6 +76,7 @@ hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
        RB_INIT(&iocom->statewr_tree);
        TAILQ_INIT(&iocom->freeq);
        TAILQ_INIT(&iocom->freeq_aux);
+       TAILQ_INIT(&iocom->addrq);
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
        iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
@@ -160,8 +160,9 @@ hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
        if (hbytes)
                bzero(&msg->any.head, hbytes);
        msg->hdr_size = hbytes;
-       msg->any.head.aux_icrc = 0;
        msg->any.head.cmd = cmd;
+       msg->any.head.aux_descr = 0;
+       msg->any.head.aux_crc = 0;
 
        return (msg);
 }
@@ -257,7 +258,6 @@ hammer2_ioq_read(hammer2_iocom_t *iocom)
        ssize_t n;
        size_t bytes;
        size_t nmax;
-       uint16_t xcrc16;
        uint32_t xcrc32;
        int error;
 
@@ -322,6 +322,10 @@ again:
                 * Calculate the header, decrypt data received so far.
                 * Data will be decrypted in-place.  Partial blocks are
                 * not immediately decrypted.
+                *
+                * WARNING!  The header might be in the wrong endian, we
+                *           do not fix it up until we get the entire
+                *           extended header.
                 */
                hammer2_crypto_decrypt(iocom, ioq);
                head = (void *)(ioq->buf + ioq->fifo_beg);
@@ -338,23 +342,19 @@ again:
                        break;
                }
 
-               xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
-                                       HAMMER2_MSGHDR_CRCBYTES);
-               if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
-                       hammer2_bswap_head(head);
-               }
-               xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-               if (xcrc16 != head->icrc1) {
-                       ioq->error = HAMMER2_IOQ_ERROR_HCRC;
-                       break;
-               }
-
                /*
                 * Calculate the full header size and aux data size
                 */
-               ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
-                             HAMMER2_MSG_ALIGN;
-               ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
+               if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
+                       ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
+                                     HAMMER2_MSG_ALIGN;
+                       ioq->abytes = bswap32(head->aux_bytes) *
+                                     HAMMER2_MSG_ALIGN;
+               } else {
+                       ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
+                                     HAMMER2_MSG_ALIGN;
+                       ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
+               }
                if (ioq->hbytes < sizeof(msg->any.head) ||
                    ioq->hbytes > sizeof(msg->any) ||
                    ioq->abytes > HAMMER2_MSGAUX_MAX) {
@@ -435,22 +435,28 @@ again:
 
                /*
                 * Calculate the extended header, decrypt data received
-                * so far.
+                * so far.  Handle endian-conversion for the entire extended
+                * header.
                 */
                hammer2_crypto_decrypt(iocom, ioq);
                head = (void *)(ioq->buf + ioq->fifo_beg);
 
                /*
-                * Check the crc on the extended header
+                * Check the CRC.
                 */
-               if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) {
-                       xcrc32 = hammer2_icrc32(head + 1,
-                                               ioq->hbytes - sizeof(*head));
-                       xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-                       if (head->icrc2 != xcrc16) {
-                               ioq->error = HAMMER2_IOQ_ERROR_XCRC;
-                               break;
-                       }
+               if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
+                       xcrc32 = bswap32(head->hdr_crc);
+               else
+                       xcrc32 = head->hdr_crc;
+               head->hdr_crc = 0;
+               if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
+                       ioq->error = HAMMER2_IOQ_ERROR_XCRC;
+                       break;
+               }
+               head->hdr_crc = xcrc32;
+
+               if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
+                       hammer2_bswap_head(head);
                }
 
                /*
@@ -548,10 +554,10 @@ again:
                hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
 
                /*
-                * Check aux_icrc, then we are done.
+                * Check aux_crc, then we are done.
                 */
                xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
-               if (xcrc32 != msg->any.head.aux_icrc) {
+               if (xcrc32 != msg->any.head.aux_crc) {
                        ioq->error = HAMMER2_IOQ_ERROR_ACRC;
                        break;
                }
@@ -644,8 +650,7 @@ again:
                         */
                        state->txcmd |= HAMMER2_MSGF_DELETE;
                        msg->state = state;
-                       msg->any.head.source = state->source;
-                       msg->any.head.target = state->target;
+                       msg->any.head.spanid = state->spanid;
                        msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
                                             HAMMER2_MSGF_DELETE;
                } else {
@@ -694,38 +699,22 @@ again:
 
 /*
  * Calculate the header and data crc's and write a low-level message to
- * the connection.  If aux_icrc is non-zero the aux_data crc is already
+ * the connection.  If aux_crc is non-zero the aux_data crc is already
  * assumed to have been set.
  *
  * A non-NULL msg is added to the queue but not necessarily flushed.
  * Calling this function with msg == NULL will get a flush going.
  */
-void
+static void
 hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
        hammer2_ioq_t *ioq = &iocom->ioq_tx;
-       uint16_t xcrc16;
        uint32_t xcrc32;
        int hbytes;
-       int error;
 
        assert(msg);
 
        /*
-        * Process transactional state.
-        */
-       if (ioq->error == 0) {
-               error = hammer2_state_msgtx(iocom, msg);
-               if (error) {
-                       if (error == HAMMER2_IOQ_ERROR_EALREADY) {
-                               hammer2_msg_free(iocom, msg);
-                       } else {
-                               ioq->error = error;
-                       }
-               }
-       }
-
-       /*
         * Process terminal connection errors.
         */
        if (ioq->error) {
@@ -747,39 +736,22 @@ hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                srandomdev();
 
        /*
-        * Calculate aux_icrc if 0, calculate icrc2, and finally
-        * calculate icrc1.
+        * Calculate aux_crc if 0, then calculate hdr_crc.
         */
-       if (msg->aux_size && msg->any.head.aux_icrc == 0) {
+       if (msg->aux_size && msg->any.head.aux_crc == 0) {
                assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
                xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
-               msg->any.head.aux_icrc = xcrc32;
+               msg->any.head.aux_crc = xcrc32;
        }
        msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
        assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
 
-       if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
-           sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
-               hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
-                       HAMMER2_MSG_ALIGN;
-               hbytes -= sizeof(msg->any.head);
-               xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
-               xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-               msg->any.head.icrc2 = xcrc16;
-       } else {
-               msg->any.head.icrc2 = 0;
-       }
-       xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
-                               HAMMER2_MSGHDR_CRCBYTES);
-       xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-       msg->any.head.icrc1 = xcrc16;
-
-       /*
-        * XXX Encrypt the message
-        */
+       hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
+       msg->any.head.hdr_crc = 0;
+       msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
 
        /*
-        * Enqueue the message.
+        * Enqueue the message (the flush codes handles stream encryption).
         */
        TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
        ++ioq->msgcount;
@@ -935,48 +907,308 @@ hammer2_iocom_drain(hammer2_iocom_t *iocom)
 }
 
 /*
- * This is a shortcut to formulate a reply to msg with a simple error code.
- * It can reply to transaction or one-way messages, or terminate one side
- * of a stream.  A HAMMER2_LNK_ERROR command code is utilized to encode
- * the error code (which can be 0).
+ * Write a message to an iocom, with additional state processing.
  *
- * Replies to one-way messages are a bit of an oxymoron but the feature
- * is used by the debug (DBG) protocol.
- *
- * The reply contains no data.
+ * The iocom lock must be held by the caller. XXX
  */
 void
-hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error)
+hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
+                 void (*func)(hammer2_state_t *, hammer2_msg_t *),
+                 void *data)
 {
-       hammer2_msg_t *nmsg;
-       uint32_t cmd;
+       hammer2_state_t *state;
 
-       cmd = HAMMER2_LNK_ERROR;
-       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+       /*
+        * Handle state processing, create state if necessary.
+        */
+       if ((state = msg->state) != NULL) {
                /*
-                * Reply to received reply, reply direction uses txcmd.
-                * txcmd will be updated by hammer2_ioq_write().
+                * Existing transaction (could be reply).  It is also
+                * possible for this to be the first reply (CREATE is set),
+                * in which case we populate state->txcmd.
                 */
-               if (msg->state) {
-                       if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
-                               cmd |= HAMMER2_MSGF_CREATE;
-                       cmd |= HAMMER2_MSGF_DELETE;
+               msg->any.head.msgid = state->msgid;
+               msg->any.head.spanid = state->spanid;
+               if (func) {
+                       state->func = func;
+                       state->any.any = data;
                }
+               if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
+                       state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               fprintf(stderr, "MSGWRITE IN REPLY msgid %016jx\n",
+                       (intmax_t)msg->any.head.msgid);
+       } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+               fprintf(stderr, "MSGWRITE NEW MSG\n");
+               /*
+                * No existing state and CREATE is set, create new
+                * state for outgoing command.  This can't happen if
+                * REPLY is set as the state would already exist for
+                * a transaction reply.
+                */
+               assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
+
+               state = malloc(sizeof(*state));
+               bzero(state, sizeof(*state));
+               state->iocom = iocom;
+               state->flags = HAMMER2_STATE_DYNAMIC;
+               state->msg = msg;
+               state->msgid = (uint64_t)(uintptr_t)state;
+               state->spanid = msg->any.head.spanid;
+               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               state->func = func;
+               state->any.any = data;
+               RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
+               state->flags |= HAMMER2_STATE_INSERTED;
+               msg->state = state;
+               msg->any.head.msgid = state->msgid;
+               /* spanid set by caller */
        } else {
+               fprintf(stderr, "MSGWRITE ONE-OFF\n");
+               msg->any.head.msgid = 0;
+               /* spanid set by caller */
+       }
+
+       /*
+        * Queue it for output
+        */
+       hammer2_ioq_write(iocom, msg);
+}
+
+#if 0
+
+       case HAMMER2_MSGF_DELETE:
                /*
-                * Reply to received command, reply direction uses rxcmd.
-                * txcmd will be updated by hammer2_ioq_write().
+                * Sent ABORT+DELETE in case where msgid has already
+                * been fully closed, ignore the message.
                 */
-               cmd |= HAMMER2_MSGF_REPLY;
-               if (msg->state) {
-                       if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
-                               cmd |= HAMMER2_MSGF_CREATE;
-                       cmd |= HAMMER2_MSGF_DELETE;
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "no state match for DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
                }
+
+               /*
+                * Sent ABORT+DELETE in case where msgid has
+                * already been reused for an unrelated message,
+                * ignore the message.
+                */
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "state reused for DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+               error = 0;
+
+
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
+               /*
+                * When transmitting a reply with DELETE set the original
+                * persistent state message should already exist.
+                *
+                * This is very similar to the REPLY|CREATE|* case except
+                * txcmd is already stored, so we just add the DELETE flag.
+                *
+                * Sent REPLY+ABORT+DELETE in case where msgid has
+                * already been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "no state match for "
+                                            "REPLY | DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+
+               /*
+                * Sent REPLY+ABORT+DELETE in case where msgid has already
+                * been reused for an unrelated message, ignore the message.
+                */
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                       } else {
+                               iocom_printf(iocom, msg->any.head.cmd,
+                                            "hammer2_state_msgtx: "
+                                            "state reused for "
+                                            "REPLY | DELETE\n");
+                               error = HAMMER2_IOQ_ERROR_TRANS;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY:
+               /*
+                * Check for mid-stream ABORT reply sent.
+                *
+                * One-off REPLY messages are allowed for e.g. status updates.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = HAMMER2_IOQ_ERROR_EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       }
+       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
+       return (error);
+#endif
+
+
+/*
+ * This is a shortcut to formulate a reply to msg with a simple error code,
+ * It can reply to and terminate a transaction, or it can reply to a one-way
+ * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
+ * the error code (which can be 0).  Not all transactions are terminated
+ * with HAMMER2_LNK_ERROR status (the low level only cares about the
+ * MSGF_DELETE flag), but most are.
+ *
+ * Replies to one-way messages are a bit of an oxymoron but the feature
+ * is used by the debug (DBG) protocol.
+ *
+ * The reply contains no extended data.
+ */
+void
+hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+{
+       hammer2_state_t *state = msg->state;
+       hammer2_msg_t *nmsg;
+       uint32_t cmd;
+
+
+       /*
+        * Reply with a simple error code and terminate the transaction.
+        */
+       cmd = HAMMER2_LNK_ERROR;
+
+       /*
+        * Check if our direction has even been initiated yet, set CREATE.
+        *
+        * Check what direction this is (command or reply direction).  Note
+        * that txcmd might not have been initiated yet.
+        *
+        * If our direction has already been closed we just return without
+        * doing anything.
+        */
+       if (state) {
+               if (state->txcmd & HAMMER2_MSGF_DELETE)
+                       return;
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       cmd |= HAMMER2_MSGF_CREATE;
+               if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+               cmd |= HAMMER2_MSGF_DELETE;
+       } else {
+               if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
        }
+
        nmsg = hammer2_msg_alloc(iocom, 0, cmd);
        nmsg->any.head.error = error;
-       hammer2_ioq_write(iocom, nmsg);
+       nmsg->state = msg->state;
+       hammer2_msg_write(iocom, nmsg, NULL, 0);
+}
+
+/*
+ * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
+ * we are generating a streaming reply or an intermediate acknowledgement
+ * of some sort as part of the higher level protocol, with more to come
+ * later.
+ */
+void
+hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
+{
+       hammer2_state_t *state = msg->state;
+       hammer2_msg_t *nmsg;
+       uint32_t cmd;
+
+
+       /*
+        * Reply with a simple error code and terminate the transaction.
+        */
+       cmd = HAMMER2_LNK_ERROR;
+
+       /*
+        * Check if our direction has even been initiated yet, set CREATE.
+        *
+        * Check what direction this is (command or reply direction).  Note
+        * that txcmd might not have been initiated yet.
+        *
+        * If our direction has already been closed we just return without
+        * doing anything.
+        */
+       if (state) {
+               if (state->txcmd & HAMMER2_MSGF_DELETE)
+                       return;
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       cmd |= HAMMER2_MSGF_CREATE;
+               if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+               /* continuing transaction, do not set MSGF_DELETE */
+       } else {
+               if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+       }
+
+       nmsg = hammer2_msg_alloc(iocom, 0, cmd);
+       nmsg->any.head.error = error;
+       nmsg->state = state;
+       hammer2_msg_write(iocom, nmsg, NULL, 0);
+}
+
+/*
+ * Terminate a transaction given a state structure by issuing a DELETE.
+ */
+void
+hammer2_state_reply(hammer2_state_t *state, uint32_t error)
+{
+       hammer2_msg_t *nmsg;
+       uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
+
+       /*
+        * Nothing to do if we already transmitted a delete
+        */
+       if (state->txcmd & HAMMER2_MSGF_DELETE)
+               return;
+
+       /*
+        * We must also set CREATE if this is our first response to a
+        * remote command.
+        */
+       if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+               cmd |= HAMMER2_MSGF_CREATE;
+
+       /*
+        * Set REPLY if the other end initiated the command.  Otherwise
+        * we are the command direction.
+        */
+       if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+               cmd |= HAMMER2_MSGF_REPLY;
+
+       nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
+       nmsg->any.head.error = error;
+       nmsg->state = state;
+       hammer2_msg_write(state->iocom, nmsg, NULL, 0);
 }
 
 /************************************************************************
@@ -1070,12 +1302,14 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
        /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
 
        dummy.msgid = msg->any.head.msgid;
-       dummy.source = msg->any.head.source;
-       dummy.target = msg->any.head.target;
+       dummy.spanid = msg->any.head.spanid;
+#if 0
        iocom_printf(iocom, msg->any.head.cmd,
-                    "received msg %08x msgid %u source=%u target=%u\n",
-                     msg->any.head.cmd, msg->any.head.msgid,
-                     msg->any.head.source, msg->any.head.target);
+                    "received msg %08x msgid %jx spanid=%jx\n",
+                     msg->any.head.cmd,
+                     (intmax_t)msg->any.head.msgid,
+                     (intmax_t)msg->any.head.spanid);
+#endif
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
                state = RB_FIND(hammer2_state_tree,
                                &iocom->statewr_tree, &dummy);
@@ -1120,6 +1354,8 @@ hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
                state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
                RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
                state->flags |= HAMMER2_STATE_INSERTED;
+               state->msgid = msg->any.head.msgid;
+               state->spanid = msg->any.head.spanid;
                msg->state = state;
                error = 0;
                break;
@@ -1288,210 +1524,6 @@ hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
        }
 }
 
-/*
- * Process state tracking for a message prior to transmission.
- *
- * Called with msglk held and the msg dequeued.
- *
- * One-off messages are usually with dummy state and msg->state may be NULL
- * in this situation.
- *
- * New transactions (when CREATE is set) will insert the state.
- *
- * May request that caller discard the message by setting *discardp to 1.
- * A NULL state may be returned in this case.
- */
-static int
-hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
-{
-       hammer2_state_t *state;
-       int error;
-
-       /*
-        * Lock RB tree.  If persistent state is present it will have already
-        * been assigned to msg.
-        */
-       /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
-       state = msg->state;
-
-       /*
-        * Short-cut one-off or mid-stream messages (state may be NULL).
-        */
-       if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
-                                 HAMMER2_MSGF_ABORT)) == 0) {
-               /*lockmgr(&pmp->msglk, LK_RELEASE);*/
-               return(0);
-       }
-
-
-       /*
-        * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
-        * inside the case statements.
-        */
-       switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
-                                   HAMMER2_MSGF_REPLY)) {
-       case HAMMER2_MSGF_CREATE:
-       case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
-               /*
-                * Insert the new persistent message state and mark
-                * half-closed if DELETE is set.  Since this is a new
-                * message it isn't possible to transition into the fully
-                * closed state here.
-                *
-                * XXX state must be assigned and inserted by
-                *     hammer2_msg_write().  txcmd is assigned by us
-                *     on-transmit.
-                */
-               assert(state != NULL);
-#if 0
-               if (state == NULL) {
-                       state = pmp->freerd_state;
-                       pmp->freerd_state = NULL;
-                       msg->state = state;
-                       state->msg = msg;
-                       state->msgid = msg->any.head.msgid;
-                       state->source = msg->any.head.source;
-                       state->target = msg->any.head.target;
-               }
-               assert((state->flags & HAMMER2_STATE_INSERTED) == 0);
-               if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
-                       iocom_printf(iocom, msg->any.head.cmd,
-                                   "hammer2_state_msgtx: "
-                                   "duplicate transaction\n");
-                       error = HAMMER2_IOQ_ERROR_TRANS;
-                       break;
-               }
-               state->flags |= HAMMER2_STATE_INSERTED;
-#endif
-               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
-               error = 0;
-               break;
-       case HAMMER2_MSGF_DELETE:
-               /*
-                * Sent ABORT+DELETE in case where msgid has already
-                * been fully closed, ignore the message.
-                */
-               if (state == NULL) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "no state match for DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-
-               /*
-                * Sent ABORT+DELETE in case where msgid has
-                * already been reused for an unrelated message,
-                * ignore the message.
-                */
-               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "state reused for DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-               error = 0;
-               break;
-       default:
-               /*
-                * Check for mid-stream ABORT command sent
-                */
-               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                       if (state == NULL ||
-                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                               break;
-                       }
-               }
-               error = 0;
-               break;
-       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
-       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
-               /*
-                * When transmitting a reply with CREATE set the original
-                * persistent state message should already exist.
-                */
-               if (state == NULL) {
-                       iocom_printf(iocom, msg->any.head.cmd,
-                                    "hammer2_state_msgtx: no state match "
-                                    "for REPLY | CREATE\n");
-                       error = HAMMER2_IOQ_ERROR_TRANS;
-                       break;
-               }
-               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
-               error = 0;
-               break;
-       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
-               /*
-                * When transmitting a reply with DELETE set the original
-                * persistent state message should already exist.
-                *
-                * This is very similar to the REPLY|CREATE|* case except
-                * txcmd is already stored, so we just add the DELETE flag.
-                *
-                * Sent REPLY+ABORT+DELETE in case where msgid has
-                * already been fully closed, ignore the message.
-                */
-               if (state == NULL) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "no state match for "
-                                            "REPLY | DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-
-               /*
-                * Sent REPLY+ABORT+DELETE in case where msgid has already
-                * been reused for an unrelated message, ignore the message.
-                */
-               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                       } else {
-                               iocom_printf(iocom, msg->any.head.cmd,
-                                            "hammer2_state_msgtx: "
-                                            "state reused for "
-                                            "REPLY | DELETE\n");
-                               error = HAMMER2_IOQ_ERROR_TRANS;
-                       }
-                       break;
-               }
-               error = 0;
-               break;
-       case HAMMER2_MSGF_REPLY:
-               /*
-                * Check for mid-stream ABORT reply sent.
-                *
-                * One-off REPLY messages are allowed for e.g. status updates.
-                */
-               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
-                       if (state == NULL ||
-                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
-                               error = HAMMER2_IOQ_ERROR_EALREADY;
-                               break;
-                       }
-               }
-               error = 0;
-               break;
-       }
-       /*lockmgr(&pmp->msglk, LK_RELEASE);*/
-       return (error);
-}
-
 static void
 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
 {
@@ -1545,13 +1577,9 @@ hammer2_state_free(hammer2_state_t *state)
 int
 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
 {
-       if (state1->source < state2->source)
-               return(-1);
-       if (state1->source > state2->source)
-               return(1);
-       if (state1->target < state2->target)
+       if (state1->spanid < state2->spanid)
                return(-1);
-       if (state1->target > state2->target)
+       if (state1->spanid > state2->spanid)
                return(1);
        if (state1->msgid < state2->msgid)
                return(-1);
diff --git a/sbin/hammer2/msg_lnk.c b/sbin/hammer2/msg_lnk.c
new file mode 100644 (file)
index 0000000..b689880
--- /dev/null
@@ -0,0 +1,532 @@
+/*
+ * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
+ *
+ * This code is derived from software contributed to The DragonFly Project
+ * by Matthew Dillon <dillon@dragonflybsd.org>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in
+ *    the documentation and/or other materials provided with the
+ *    distribution.
+ * 3. Neither the name of The DragonFly Project nor the names of its
+ *    contributors may be used to endorse or promote products derived
+ *    from this software without specific, prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
+ * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+/*
+ * LNK_SPAN PROTOCOL SUPPORT FUNCTIONS
+ *
+ * This code supports the LNK_SPAN protocol.  Essentially all PFS's
+ * clients and services rendezvous with the userland hammer2 service and
+ * open LNK_SPAN transactions using a message header linkid of 0,
+ * registering any PFS's they have connectivity to with us.
+ *
+ * --
+ *
+ * Each registration maintains its own open LNK_SPAN message transaction.
+ * The SPANs are collected, aggregated, and retransmitted over available
+ * connections through the maintainance of additional LNK_SPAN message
+ * transactions on each link.
+ *
+ * The msgid for each active LNK_SPAN transaction we receive allows us to
+ * send a message to the target PFS (which might be one of many belonging
+ * to the same cluster), by specifying that msgid as the linkid in any
+ * message we send to the target PFS.
+ *
+ * Similarly the msgid we allocate for any LNK_SPAN transaction we transmit
+ * (and remember we will maintain multiple open LNK_SPAN transactions on
+ * each connection representing the topology span, so every node sees every
+ * other node as a separate open transaction).  So, similarly the msgid for
+ * these active transactions which we initiated can be used by the other
+ * end to route messages through us to another node, ultimately winding up
+ * at the identified hammer2 PFS.  We have to adjust the spanid in the message
+ * header at each hop to be representative of the outgoing LNK_SPAN we
+ * are forwarding the message through.
+ *
+ * --
+ *
+ * If we were to retransmit every LNK_SPAN transaction we receive it would
+ * create a huge mess, so we have to aggregate all received LNK_SPAN
+ * transactions, sort them by the fsid (the cluster) and sub-sort them by
+ * the pfs_fsid (individual nodes in the cluster), and only retransmit
+ * (create outgoing transactions) for a subset of the nearest weighted-hops
+ * for each individual node.
+ *
+ * The higher level protocols can then issue transactions to the nodes making
+ * up a cluster to perform all actions required.
+ *
+ * --
+ *
+ * Since this is a large topology and a spanning tree protocol, links can
+ * go up and down all the time.  Any time a link goes down its transaction
+ * is closed.  The transaction has to be closed on both ends before we can
+ * delete (and potentially reuse) the related spanid.  The LNK_SPAN being
+ * closed may have been propagated out to other connections and those related
+ * LNK_SPANs are also closed.  Ultimately all routes via the lost LNK_SPAN
+ * go away, ultimately reaching all sources and all targets.
+ *
+ * Any messages in-transit using a route that goes away will be thrown away.
+ * Open transactions are only tracked at the two end-points.  When a link
+ * failure propagates to an end-point the related open transactions lose
+ * their spanid and are automatically aborted.
+ *
+ * It is important to note that internal route nodes cannot just associate
+ * a lost LNK_SPAN transaction with another route to the same destination.
+ * Message transactions MUST be serialized and MUST be ordered.  All messages
+ * for a transaction must run over the same route.  So if the route used by
+ * an active transaction is lost, the related messages will be fully aborted
+ * and the higher protocol levels will retry as appropriate.
+ *
+ * It is also important to note that several paths to the same PFS can be
+ * propagated along the same link, which allows concurrency and even
+ * redundancy over several network interfaces or via different routes through
+ * the topology.  Any given transaction will use only a single route but busy
+ * servers will often have hundreds of transactions active simultaniously,
+ * so having multiple active paths through the network topology for A<->B
+ * will improve performance.
+ *
+ * --
+ *
+ * Most protocols consolidate operations rather than simply relaying them.
+ * This is particularly true of LEAF protocols (such as strict HAMMER2
+ * clients), of which there can be millions connecting into the cluster at
+ * various points.  The SPAN protocol is not used for these LEAF elements.
+ *
+ * Instead the primary service they connect to implements a proxy for the
+ * client protocols so the core topology only has to propagate a couple of
+ * LNK_SPANs and not millions.  LNK_SPANs are meant to be used only for
+ * core master nodes and satellite slaves and cache nodes.
+ */
+
+#include "hammer2.h"
+
+/*
+ * RED-BLACK TREE DEFINITIONS
+ *
+ * We need to track
+ *
+ * (1) shared fsid's (a cluster).
+ * (2) unique fsid's (a node in a cluster) <--- LNK_SPAN transactions.
+ *
+ * We need to aggegate all active LNK_SPANs, aggregate, and create our own
+ * outgoing LNK_SPAN transactions on each of our connections representing
+ * the aggregated state.
+ *
+ * h2span_connect      - list of iocom connections who wish to receive SPAN
+ *                       propagation from other connections.  Might contain
+ *                       a filter string.  Only iocom's with an open
+ *                       LNK_CONN transactions are applicable for SPAN
+ *                       propagation.
+ *
+ * h2span_relay                - List of links relayed (via SPAN).  Essentially
+ *                       each relay structure represents a LNK_SPAN
+ *                       transaction that we initiated, verses h2span_link
+ *                       which is a LNK_SPAN transaction that we received.
+ *
+ * --
+ *
+ * h2span_cluster      - Organizes the shared fsid's.  One structure for
+ *                       each cluster.
+ *
+ * h2span_node         - Organizes the nodes in a cluster.  One structure
+ *                       for each unique {cluster,node}, aka {fsid, pfs_fsid}.
+ *
+ * h2span_link         - Organizes all incoming and outgoing LNK_SPAN message
+ *                       transactions related to a node.
+ *
+ *                       One h2span_link structure for each incoming LNK_SPAN
+ *                       transaction.  Links selected for propagation back
+ *                       out are also where the outgoing LNK_SPAN messages
+ *                       are indexed into (so we can propagate changes).
+ *
+ *                       The h2span_link's use a red-black tree to sort the
+ *                       weighted hop metric for the incoming LNK_SPAN.  We
+ *                       then select the top N for outgoing.  When the
+ *                       topology changes the top N may also change and cause
+ *                       new outgoing LNK_SPAN transactions to be opened
+ *                       and less desireable ones to be closed, causing
+ *                       transactional aborts within the message flow in
+ *                       the process.
+ *
+ * Also note           - All outgoing LNK_SPAN message transactions are also
+ *                       entered into a red-black tree for use by the routing
+ *                       function.  This is handled by msg.c in the state
+ *                       code, not here.
+ */
+
+struct h2span_link;
+struct h2span_relay;
+TAILQ_HEAD(h2span_connect_queue, h2span_connect);
+TAILQ_HEAD(h2span_relay_queue, h2span_relay);
+
+RB_HEAD(h2span_cluster_tree, h2span_cluster);
+RB_HEAD(h2span_node_tree, h2span_node);
+RB_HEAD(h2span_link_tree, h2span_link);
+RB_HEAD(h2span_relay_tree, h2span_relay);
+
+/*
+ * Received LNK_CONN transaction enables SPAN protocol over connection.
+ * (may contain filter).
+ */
+struct h2span_connect {
+       TAILQ_ENTRY(h2span_connect) entry;
+       struct h2span_relay_tree tree;
+       hammer2_state_t *state;
+};
+
+/*
+ * All received LNK_SPANs are organized by cluster (pfs_clid),
+ * node (pfs_fsid), and link (received LNK_SPAN transaction).
+ */
+struct h2span_cluster {
+       RB_ENTRY(h2span_cluster) rbnode;
+       struct h2span_node_tree tree;
+       uuid_t  pfs_clid;               /* shared fsid */
+};
+
+struct h2span_node  {
+       RB_ENTRY(h2span_node) rbnode;
+       struct h2span_link_tree tree;
+       struct h2span_cluster *cls;
+       uuid_t  pfs_fsid;               /* unique fsid */
+};
+
+struct h2span_link {
+       RB_ENTRY(h2span_link) rbnode;
+       hammer2_state_t *state;         /* state<->link */
+       struct h2span_node *node;       /* related node */
+       int32_t weight;
+       struct h2span_relay_queue relayq; /* relay out */
+};
+
+/*
+ * Any LNK_SPAN transactions we receive which are relayed out other
+ * connections utilize this structure to track the LNK_SPAN transaction
+ * we initiate on the other connections, if selected for relay.
+ *
+ * In many respects this is the core of the protocol... actually figuring
+ * out what LNK_SPANs to relay.  The spanid used for relaying is the
+ * address of the 'state' structure, which is why h2span_relay has to
+ * be entered into a RB-TREE based at h2span_connect (so we can look
+ * up the spanid to validate it).
+ */
+struct h2span_relay {
+       RB_ENTRY(h2span_relay) rbnode;  /* from h2span_connect */
+       TAILQ_ENTRY(h2span_relay) entry; /* from link */
+       struct h2span_connect *conn;
+       hammer2_state_t *state;         /* transmitted LNK_SPAN */
+       struct h2span_link *link;       /* received LNK_SPAN */
+};
+
+
+typedef struct h2span_connect h2span_connect_t;
+typedef struct h2span_cluster h2span_cluster_t;
+typedef struct h2span_node h2span_node_t;
+typedef struct h2span_link h2span_link_t;
+typedef struct h2span_relay h2span_relay_t;
+
+static
+int
+h2span_cluster_cmp(h2span_cluster_t *cls1, h2span_cluster_t *cls2)
+{
+       return(uuid_compare(&cls1->pfs_clid, &cls2->pfs_clid, NULL));
+}
+
+static
+int
+h2span_node_cmp(h2span_node_t *node1, h2span_node_t *node2)
+{
+       return(uuid_compare(&node1->pfs_fsid, &node2->pfs_fsid, NULL));
+}
+
+static
+int
+h2span_link_cmp(h2span_link_t *link1, h2span_link_t *link2)
+{
+       if (link1->weight < link2->weight)
+               return(-1);
+       if (link1->weight > link2->weight)
+               return(1);
+       if ((intptr_t)link1 < (intptr_t)link2)
+               return(-1);
+       if ((intptr_t)link1 > (intptr_t)link2)
+               return(1);
+       return(0);
+}
+
+static
+int
+h2span_relay_cmp(h2span_relay_t *relay1, h2span_relay_t *relay2)
+{
+       if ((intptr_t)relay1->state < (intptr_t)relay2->state)
+               return(-1);
+       if ((intptr_t)relay1->state > (intptr_t)relay2->state)
+               return(1);
+       return(0);
+}
+
+RB_PROTOTYPE_STATIC(h2span_cluster_tree, h2span_cluster,
+            rbnode, h2span_cluster_cmp);
+RB_PROTOTYPE_STATIC(h2span_node_tree, h2span_node,
+            rbnode, h2span_node_cmp);
+RB_PROTOTYPE_STATIC(h2span_link_tree, h2span_link,
+            rbnode, h2span_link_cmp);
+RB_PROTOTYPE_STATIC(h2span_relay_tree, h2span_relay,
+            rbnode, h2span_relay_cmp);
+
+RB_GENERATE_STATIC(h2span_cluster_tree, h2span_cluster,
+            rbnode, h2span_cluster_cmp);
+RB_GENERATE_STATIC(h2span_node_tree, h2span_node,
+            rbnode, h2span_node_cmp);
+RB_GENERATE_STATIC(h2span_link_tree, h2span_link,
+            rbnode, h2span_link_cmp);
+RB_GENERATE_STATIC(h2span_relay_tree, h2span_relay,
+            rbnode, h2span_relay_cmp);
+
+/*
+ * Global mutex protects cluster_tree lookups.
+ */
+static pthread_mutex_t cluster_mtx;
+static struct h2span_cluster_tree cluster_tree = RB_INITIALIZER(cluster_tree);
+static struct h2span_connect_queue connq = TAILQ_HEAD_INITIALIZER(connq);
+
+static void hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg);
+static void hammer2_lnk_conn_update(h2span_connect_t *conn);
+
+/*
+ * Receive a HAMMER2_MSG_PROTO_LNK message.  This only called for
+ * one-way and opening-transactions since state->func will be assigned
+ * in all other cases.
+ */
+void
+hammer2_msg_lnk(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
+{
+       switch(msg->any.head.cmd & HAMMER2_MSGF_BASECMDMASK) {
+       case HAMMER2_LNK_CONN:
+               hammer2_lnk_conn(msg->state, msg);
+               break;
+       case HAMMER2_LNK_SPAN:
+               hammer2_lnk_span(msg->state, msg);
+               break;
+       default:
+               fprintf(stderr,
+                       "MSG_PROTO_LNK: Unknown msg %08x\n", msg->any.head.cmd);
+               hammer2_msg_reply(iocom, msg, HAMMER2_MSG_ERR_UNKNOWN);
+               /* state invalid after reply */
+               break;
+       }
+}
+
+void
+hammer2_lnk_conn(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+       h2span_connect_t *conn;
+       h2span_relay_t *relay;
+       char *alloc = NULL;
+
+       pthread_mutex_lock(&cluster_mtx);
+
+       /*
+        * On transaction start we allocate a new h2span_connect and
+        * acknowledge the request, leaving the transaction open.
+        */
+       if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+               state->func = hammer2_lnk_conn;
+
+               fprintf(stderr, "LNK_CONN(%016jx): %s/%s\n",
+                       (intmax_t)msg->any.head.msgid,
+                       hammer2_uuid_to_str(&msg->any.lnk_conn.pfs_clid,
+                                           &alloc),
+                       msg->any.lnk_conn.label);
+               free(alloc);
+
+               conn = hammer2_alloc(sizeof(*conn));
+
+               RB_INIT(&conn->tree);
+               conn->state = state;
+               state->any.conn = conn;
+               TAILQ_INSERT_TAIL(&connq, conn, entry);
+
+               hammer2_lnk_conn_update(conn);
+               hammer2_msg_result(state->iocom, msg, 0);
+       }
+
+       /*
+        * On transaction terminate we clean out our h2span_connect
+        * and acknowledge the request, closing the transaction.
+        */
+       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               fprintf(stderr, "LNK_CONN: Terminated\n");
+               conn = state->any.conn;
+               assert(conn);
+               while ((relay = RB_ROOT(&conn->tree)) != NULL) {
+                       RB_REMOVE(h2span_relay_tree, &conn->tree, relay);
+                       TAILQ_REMOVE(&relay->link->relayq, relay, entry);
+
+                       if (relay->state) {
+                               relay->state->any.relay = NULL;
+                               hammer2_state_reply(relay->state, 0);
+                               /* state invalid after reply */
+                               relay->state = NULL;
+                       }
+                       relay->conn = NULL;
+                       relay->link = NULL;
+                       hammer2_free(relay);
+               }
+
+               /*
+                * Clean out conn
+                */
+               conn->state = NULL;
+               msg->state->any.conn = NULL;
+               TAILQ_REMOVE(&connq, conn, entry);
+               hammer2_free(conn);
+
+               hammer2_msg_reply(state->iocom, msg, 0);
+               /* state invalid after reply */
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+}
+
+void
+hammer2_lnk_span(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+       h2span_cluster_t dummy_cls;
+       h2span_node_t dummy_node;
+       h2span_cluster_t *cls;
+       h2span_node_t *node;
+       h2span_link_t *slink;
+       h2span_relay_t *relay;
+       char *alloc = NULL;
+
+       pthread_mutex_lock(&cluster_mtx);
+
+       /*
+        * On transaction start we initialize the tracking infrastructure
+        */
+       if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+               state->func = hammer2_lnk_span;
+
+               fprintf(stderr, "LNK_SPAN: %s/%s\n",
+                       hammer2_uuid_to_str(&msg->any.lnk_span.pfs_clid,
+                                           &alloc),
+                       msg->any.lnk_span.label);
+               free(alloc);
+
+               /*
+                * Find the cluster
+                */
+               dummy_cls.pfs_clid = msg->any.lnk_span.pfs_clid;
+               cls = RB_FIND(h2span_cluster_tree, &cluster_tree, &dummy_cls);
+               if (cls == NULL) {
+                       cls = hammer2_alloc(sizeof(*cls));
+                       cls->pfs_clid = msg->any.lnk_span.pfs_clid;
+                       RB_INIT(&cls->tree);
+                       RB_INSERT(h2span_cluster_tree, &cluster_tree, cls);
+               }
+
+               /*
+                * Find the node
+                */
+               dummy_node.pfs_fsid = msg->any.lnk_span.pfs_fsid;
+               node = RB_FIND(h2span_node_tree, &cls->tree, &dummy_node);
+               if (node == NULL) {
+                       node = hammer2_alloc(sizeof(*node));
+                       node->pfs_fsid = msg->any.lnk_span.pfs_fsid;
+                       node->cls = cls;
+                       RB_INIT(&node->tree);
+                       RB_INSERT(h2span_node_tree, &cls->tree, node);
+               }
+
+               /*
+                * Create the link
+                */
+               assert(state->any.link == NULL);
+               slink = hammer2_alloc(sizeof(*slink));
+               slink->node = node;
+               slink->weight = msg->any.lnk_span.weight;
+               slink->state = state;
+               state->any.link = slink;
+               RB_INSERT(h2span_link_tree, &node->tree, slink);
+
+               /*
+                * Now filter and relay the span to all other iocoms. XXX
+                */
+       }
+
+       /*
+        * On transaction terminate we remove the tracking infrastructure.
+        */
+       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               slink = state->any.link;
+               assert(slink != NULL);
+               node = slink->node;
+               cls = node->cls;
+
+               /*
+                * Clean out all relays
+                */
+               while ((relay = TAILQ_FIRST(&slink->relayq)) != NULL) {
+                       RB_REMOVE(h2span_relay_tree, &relay->conn->tree, relay);
+                       TAILQ_REMOVE(&slink->relayq, relay, entry);
+
+                       if (relay->state) {
+                               relay->state->any.relay = NULL;
+                               hammer2_state_reply(relay->state, 0);
+                               /* state invalid after reply */
+                               relay->state = NULL;
+                       }
+                       relay->conn = NULL;
+                       relay->link = NULL;
+                       hammer2_free(relay);
+               }
+
+               /*
+                * Clean out the topology
+                */
+               RB_REMOVE(h2span_link_tree, &node->tree, slink);
+               if (RB_EMPTY(&node->tree)) {
+                       RB_REMOVE(h2span_node_tree, &cls->tree, node);
+                       if (RB_EMPTY(&cls->tree)) {
+                               RB_REMOVE(h2span_cluster_tree,
+                                         &cluster_tree, cls);
+                               hammer2_free(cls);
+                       }
+                       node->cls = NULL;
+                       hammer2_free(node);
+               }
+               state->any.link = NULL;
+               slink->state = NULL;
+               slink->node = NULL;
+               hammer2_free(slink);
+       }
+
+       pthread_mutex_unlock(&cluster_mtx);
+}
+
+/*
+ * Initiate/Update the relayed spans associated with a connection.
+ */
+static void
+hammer2_lnk_conn_update(h2span_connect_t *conn __unused)
+{
+}
index adb2939..a51ff86 100644 (file)
@@ -108,28 +108,52 @@ typedef struct hammer2_handshake hammer2_handshake_t;
  */
 struct hammer2_iocom;
 struct hammer2_persist;
-struct hammre2_state;
-struct hammre2_msg;
+struct hammer2_state;
+struct hammer2_address;
+struct hammer2_msg;
 
+TAILQ_HEAD(hammer2_state_queue, hammer2_state);
 TAILQ_HEAD(hammer2_msg_queue, hammer2_msg);
+TAILQ_HEAD(hammer2_address_queue, hammer2_address);
 RB_HEAD(hammer2_state_tree, hammer2_state);
 
+struct h2span_link;
+struct h2span_connect;
+
 struct hammer2_state {
        RB_ENTRY(hammer2_state) rbnode;         /* indexed by msgid */
+       TAILQ_ENTRY(hammer2_state) source_entry;/* if routed */
+       TAILQ_ENTRY(hammer2_state) target_entry;/* if routed */
        struct hammer2_iocom *iocom;
+       struct hammer2_address_t *source_addr;  /* if routed */
+       struct hammer2_address_t *target_addr;  /* if routed */
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
-       uint16_t        source;                 /* command originator */
-       uint16_t        target;                 /* reply originator */
-       uint32_t        msgid;                  /* {source,target,msgid} uniq */
+       uint64_t        spanid;                 /* routing id */
+       uint64_t        msgid;                  /* {spanid,msgid} uniq */
        int             flags;
        int             error;
        struct hammer2_msg *msg;
-       int (*func)(struct hammer2_iocom *, struct hammer2_msg *);
+       void (*func)(struct hammer2_state *, struct hammer2_msg *);
+       union {
+               void *any;
+               struct h2span_link *link;
+               struct h2span_connect *conn;
+               struct h2span_relay *relay;
+       } any;
+};
+
+struct hammer2_address {
+       TAILQ_ENTRY(hammer2_address) entry;     /* on-iocom */
+       struct hammer2_iocom *iocom;            /* related iocom */
+       struct hammer2_state_queue sourceq;     /* states on source queue */
+       struct hammer2_state_queue targetq;     /* states on target queue */
+       uint16_t        id;
 };
 
 #define HAMMER2_STATE_INSERTED 0x0001
 #define HAMMER2_STATE_DYNAMIC  0x0002
+#define HAMMER2_STATE_NODEID   0x0004          /* manages a node id */
 
 struct hammer2_msg {
        TAILQ_ENTRY(hammer2_msg) qentry;
@@ -141,6 +165,7 @@ struct hammer2_msg {
 };
 
 typedef struct hammer2_state hammer2_state_t;
+typedef struct hammer2_address hammer2_address_t;
 typedef struct hammer2_msg hammer2_msg_t;
 typedef struct hammer2_msg_queue hammer2_msg_queue_t;
 
@@ -203,6 +228,7 @@ struct hammer2_iocom {
        hammer2_ioq_t   ioq_tx;
        hammer2_msg_queue_t freeq;              /* free msgs hdr only */
        hammer2_msg_queue_t freeq_aux;          /* free msgs w/aux_data */
+       struct hammer2_address_queue  addrq;    /* source/target addrs */
        void    (*recvmsg_callback)(struct hammer2_iocom *);
        void    (*sendmsg_callback)(struct hammer2_iocom *);
        void    (*altmsg_callback)(struct hammer2_iocom *);
index 30deeda..5ead4f8 100644 (file)
@@ -147,17 +147,20 @@ void
 hammer2_bswap_head(hammer2_msg_hdr_t *head)
 {
        head->magic     = bswap16(head->magic);
-       head->icrc1     = bswap16(head->icrc1);
+       head->reserved02 = bswap16(head->reserved02);
        head->salt      = bswap32(head->salt);
-       head->source    = bswap16(head->source);
-       head->target    = bswap16(head->target);
-       head->msgid     = bswap32(head->msgid);
+
+       head->msgid     = bswap64(head->msgid);
+       head->spanid    = bswap64(head->spanid);
+
        head->cmd       = bswap32(head->cmd);
-       head->error     = bswap16(head->error);
-       head->resv05    = bswap16(head->resv05);
-       head->icrc2     = bswap16(head->icrc2);
-       head->aux_bytes = bswap16(head->aux_bytes);
-       head->aux_icrc  = bswap32(head->aux_icrc);
+       head->aux_crc   = bswap32(head->aux_crc);
+       head->aux_bytes = bswap32(head->aux_bytes);
+       head->error     = bswap32(head->error);
+       head->aux_descr = bswap64(head->aux_descr);
+       head->reserved30= bswap64(head->reserved30);
+       head->reserved38= bswap32(head->reserved38);
+       head->hdr_crc   = bswap32(head->hdr_crc);
 }
 
 const char *
@@ -265,3 +268,23 @@ sizetostr(hammer2_off_t size)
        }
        return(buf);
 }
+
+/*
+ * Allocation wrappers give us shims for possible future use
+ */
+void *
+hammer2_alloc(size_t bytes)
+{
+       void *ptr;
+
+       ptr = malloc(bytes);
+       assert(ptr);
+       bzero(ptr, bytes);
+       return (ptr);
+}
+
+void
+hammer2_free(void *ptr)
+{
+       free(ptr);
+}
index 629dff2..66c757f 100644 (file)
@@ -516,7 +516,7 @@ format_hammer2(int fd, hammer2_off_t total_space, hammer2_off_t free_space)
         */
        rawip->comp_algo = HAMMER2_COMP_AUTOZERO;
 
-       rawip->pfs_id = Hammer2_RPFSId;
+       rawip->pfs_clid = Hammer2_RPFSId;
        rawip->pfs_type = HAMMER2_PFSTYPE_MASTER;
        rawip->op_flags |= HAMMER2_OPFLAG_PFSROOT;
 
@@ -570,7 +570,7 @@ format_hammer2(int fd, hammer2_off_t total_space, hammer2_off_t free_space)
         * snapshots and all if desired.  PFS ids are used to match up
         * mirror sources and targets and cluster copy sources and targets.
         */
-       rawip->pfs_id = Hammer2_SPFSId;
+       rawip->pfs_clid = Hammer2_SPFSId;
        rawip->pfs_type = HAMMER2_PFSTYPE_MASTER;
        rawip->op_flags |= HAMMER2_OPFLAG_PFSROOT;
 
index df6ae9d..7d90531 100644 (file)
@@ -1,3 +1,9 @@
+
+* Kernel-side needs to clean up transaction queues and make appropriate
+  callbacks.
+
+* Userland side needs to do the same for any initiated transactions.
+
 * Nesting problems in the flusher.
 
 * Inefficient vfsync due to thousands of file buffers, one per-vnode.
index 330558e..d3e6a3a 100644 (file)
@@ -320,6 +320,7 @@ struct hammer2_pfsmount {
        thread_t                msgrd_td;       /* cluster thread */
        thread_t                msgwr_td;       /* cluster thread */
        int                     msg_ctl;        /* wakeup flags */
+       int                     msg_seq;        /* cluster msg sequence id */
        uint32_t                msgid_iterator;
        struct lock             msglk;          /* lockmgr lock */
        TAILQ_HEAD(, hammer2_msg) msgq;         /* transmit queue */
@@ -349,14 +350,17 @@ struct hammer2_state {
        struct hammer2_pfsmount *pmp;
        uint32_t        txcmd;                  /* mostly for CMDF flags */
        uint32_t        rxcmd;                  /* mostly for CMDF flags */
-       uint16_t        source;                 /* command originator */
-       uint16_t        target;                 /* reply originator */
-       uint32_t        msgid;                  /* {source,target,msgid} uniq */
+       uint64_t        spanid;                 /* spanning tree routing */
+       uint64_t        msgid;                  /* {spanid,msgid} uniq */
        int             flags;
        int             error;
        struct hammer2_chain *chain;            /* msg associated w/chain */
        struct hammer2_msg *msg;
-       int (*func)(struct hammer2_pfsmount *, struct hammer2_msg *);
+       int (*func)(struct hammer2_state *, struct hammer2_msg *);
+       union {
+               void *any;
+               hammer2_pfsmount_t *pmp;
+       } any;
 };
 
 #define HAMMER2_STATE_INSERTED 0x0001
@@ -542,17 +546,21 @@ void hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
 int hammer2_msg_execute(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
 void hammer2_state_free(hammer2_state_t *state);
 void hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
-hammer2_msg_t *hammer2_msg_alloc(hammer2_pfsmount_t *pmp,
-                               uint16_t source, uint16_t target,
+hammer2_msg_t *hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid,
                                uint32_t cmd);
-hammer2_state_t *hammer2_msg_write(hammer2_pfsmount_t *pmp,
+void hammer2_msg_write(hammer2_pfsmount_t *pmp,
                                hammer2_msg_t *msg,
-                               int (*func)(hammer2_pfsmount_t *,
-                                           hammer2_msg_t *));
+                               int (*func)(hammer2_state_t *, hammer2_msg_t *),
+                               void *data);
+void hammer2_msg_reply(hammer2_pfsmount_t *pmp,
+                               hammer2_msg_t *msg, uint32_t error);
+void hammer2_msg_result(hammer2_pfsmount_t *pmp,
+                               hammer2_msg_t *msg, uint32_t error);
 
 /*
  * hammer2_msgops.c
  */
+int hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
 int hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
 
 /*
index 3309799..37f7c80 100644 (file)
@@ -445,10 +445,10 @@ struct hammer2_inode_data {
        /*
         * These fields are currently only applicable to PFSROOTs.
         *
-        * NOTE: We can't use {volume_data->fsid, pfs_id} to uniquely
+        * NOTE: We can't use {volume_data->fsid, pfs_clid} to uniquely
         *       identify an instance of a PFS in the cluster because
         *       a mount may contain more than one copy of the PFS as
-        *       a separate node.  {pfs_fsid, pfs_id} must be used for
+        *       a separate node.  {pfs_clid, pfs_fsid} must be used for
         *       registration in the cluster.
         */
        uint8_t         target_type;    /* 0084 hardlink target type */
@@ -456,8 +456,8 @@ struct hammer2_inode_data {
        uint8_t         reserved86;     /* 0086 */
        uint8_t         pfs_type;       /* 0087 (if PFSROOT) node type */
        uint64_t        pfs_inum;       /* 0088 (if PFSROOT) inum allocator */
-       uuid_t          pfs_id;         /* 0090 (if PFSROOT) pfs uuid */
-       uuid_t          pfs_fsid;       /* 00A0 (if PFSROOT) unique pfs uuid */
+       uuid_t          pfs_clid;       /* 0090 (if PFSROOT) cluster uuid */
+       uuid_t          pfs_fsid;       /* 00A0 (if PFSROOT) unique uuid */
 
        /*
         * Quotas and cumulative sub-tree counters.
@@ -516,6 +516,7 @@ typedef struct hammer2_inode_data hammer2_inode_data_t;
 #define HAMMER2_PFSTYPE_SOFT_SLAVE     6
 #define HAMMER2_PFSTYPE_SOFT_MASTER    7
 #define HAMMER2_PFSTYPE_MASTER         8
+#define HAMMER2_PFSTYPE_MAX            9       /* 0-8 */
 
 /*
  * The allocref structure represents the allocation table.  One 64K block
@@ -623,7 +624,7 @@ struct hammer2_copy_data {
        uint8_t priority;       /* 07    priority and round-robin flag */
        uint8_t remote_pfs_type;/* 08    probed direct remote PFS type */
        uint8_t reserved08[23]; /* 09-1F */
-       uuid_t  pfs_id;         /* 20-2F copy target must match this uuid */
+       uuid_t  pfs_clid;       /* 20-2F copy target must match this uuid */
        uint8_t label[16];      /* 30-3F import/export label */
        uint8_t path[64];       /* 40-7F target specification string or key */
 };
index 1b28298..e02d432 100644 (file)
@@ -898,8 +898,8 @@ hammer2_hardlink_consolidate(hammer2_inode_t **ipp, hammer2_inode_t *tdip)
                        oip->ip_data.iparent = 0;       /* XXX */
                        oip->ip_data.pfs_type = 0;
                        oip->ip_data.pfs_inum = 0;
-                       bzero(&oip->ip_data.pfs_id,
-                             sizeof(oip->ip_data.pfs_id));
+                       bzero(&oip->ip_data.pfs_clid,
+                             sizeof(oip->ip_data.pfs_clid));
                        bzero(&oip->ip_data.pfs_fsid,
                              sizeof(oip->ip_data.pfs_fsid));
                        oip->ip_data.data_quota = 0;
index ffe1b0a..6ed6ca5 100644 (file)
@@ -323,7 +323,7 @@ hammer2_ioctl_pfs_get(hammer2_inode_t *ip, void *data)
                xip = chain->u.ip;
                pfs->name_key = xip->ip_data.name_key;
                pfs->pfs_type = xip->ip_data.pfs_type;
-               pfs->pfs_id = xip->ip_data.pfs_id;
+               pfs->pfs_clid = xip->ip_data.pfs_clid;
                pfs->pfs_fsid = xip->ip_data.pfs_fsid;
                KKASSERT(xip->ip_data.name_len < sizeof(pfs->name));
                bcopy(xip->ip_data.filename, pfs->name,
@@ -370,7 +370,7 @@ hammer2_ioctl_pfs_create(hammer2_inode_t *ip, void *data)
        if (error == 0) {
                hammer2_chain_modify(hmp, &nip->chain, 0);
                nip->ip_data.pfs_type = pfs->pfs_type;
-               nip->ip_data.pfs_id = pfs->pfs_id;
+               nip->ip_data.pfs_clid = pfs->pfs_clid;
                nip->ip_data.pfs_fsid = pfs->pfs_fsid;
                hammer2_chain_unlock(hmp, &nip->chain);
        }
index 8c0f3e6..0dc8f2b 100644 (file)
@@ -74,9 +74,9 @@ typedef struct hammer2_ioc_remote hammer2_ioc_remote_t;
 /*
  * Ioctls to manage PFSs
  *
- * PFSs can be clustered by matching their pfs_id, and the PFSs making up
+ * PFSs can be clustered by matching their pfs_clid, and the PFSs making up
  * a cluster can be uniquely identified by combining the vol_id with
- * the pfs_id.
+ * the pfs_clid.
  */
 struct hammer2_ioc_pfs {
        hammer2_key_t           name_key;       /* super-root directory scan */
@@ -88,7 +88,7 @@ struct hammer2_ioc_pfs {
        uint32_t                reserved0014;
        uint64_t                reserved0018;
        uuid_t                  pfs_fsid;       /* identifies PFS instance */
-       uuid_t                  pfs_id;         /* identifies PFS cluster */
+       uuid_t                  pfs_clid;       /* identifies PFS cluster */
        char                    name[NAME_MAX+1]; /* device@name mtpt */
 };
 
index 75750ce..60373fa 100644 (file)
@@ -130,11 +130,10 @@ hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
        lockmgr(&pmp->msglk, LK_EXCLUSIVE);
 
        state->msgid = msg->any.head.msgid;
-       state->source = msg->any.head.source;
-       state->target = msg->any.head.target;
-       kprintf("received msg %08x msgid %u source=%u target=%u\n",
-               msg->any.head.cmd, msg->any.head.msgid, msg->any.head.source,
-               msg->any.head.target);
+       state->spanid = msg->any.head.spanid;
+       kprintf("received msg %08x msgid %jx spanid=%jx\n",
+               msg->any.head.cmd,
+               (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
        if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
                state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
        else
@@ -394,8 +393,7 @@ hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
                        msg->state = state;
                        state->msg = msg;
                        state->msgid = msg->any.head.msgid;
-                       state->source = msg->any.head.source;
-                       state->target = msg->any.head.target;
+                       state->spanid = msg->any.head.spanid;
                }
                KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
                if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
@@ -574,8 +572,7 @@ hammer2_state_free(hammer2_state_t *state)
 }
 
 hammer2_msg_t *
-hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint16_t source, uint16_t target,
-                 uint32_t cmd)
+hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
 {
        hammer2_msg_t *msg;
        size_t hbytes;
@@ -585,8 +582,7 @@ hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint16_t source, uint16_t target,
                      pmp->mmsg, M_WAITOK | M_ZERO);
        msg->hdr_size = hbytes;
        msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
-       msg->any.head.source = source;
-       msg->any.head.target = target;
+       msg->any.head.spanid = spanid;
        msg->any.head.cmd = cmd;
 
        return (msg);
@@ -610,13 +606,9 @@ hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
 int
 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
 {
-       if (state1->source < state2->source)
+       if (state1->spanid < state2->spanid)
                return(-1);
-       if (state1->source > state2->source)
-               return(1);
-       if (state1->target < state2->target)
-               return(-1);
-       if (state1->target > state2->target)
+       if (state1->spanid > state2->spanid)
                return(1);
        if (state1->msgid < state2->msgid)
                return(-1);
@@ -626,78 +618,168 @@ hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
 }
 
 /*
- * Write a message.  {source, target, cmd} have been set.  This function
- * merely queues the message to the management thread, it does not write
- * to the message socket/pipe.
+ * Write a message.  All requisit command flags have been set.
+ *
+ * If msg->state is non-NULL the message is written to the existing
+ * transaction.  Both msgid and spanid will be set accordingly.
+ *
+ * If msg->state is NULL and CREATE is set new state is allocated and
+ * (func, data) is installed.
  *
- * If CREATE is set we allocate the state and msgid and do the insertion.
- * If CREATE is not set the state and msgid must already be assigned.
+ * If msg->state is NULL and CREATE is not set the message is assumed
+ * to be a one-way message.  The msgid and spanid must be set by the
+ * caller (msgid is typically set to 0 for this case).
+ *
+ * This function merely queues the message to the management thread, it
+ * does not write to the message socket/pipe.
  */
-hammer2_state_t *
+void
 hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
-                 int (*func)(hammer2_pfsmount_t *, hammer2_msg_t *))
+                 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
 {
        hammer2_state_t *state;
-       uint16_t xcrc16;
-       uint32_t xcrc32;
 
-       /*
-        * Setup transaction (if applicable).  One-off messages always
-        * use a msgid of 0.
-        */
-       if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+       if (msg->state) {
+               /*
+                * Continuance or termination of existing transaction.
+                * The transaction could have been initiated by either end.
+                *
+                * (Function callback and aux data for the receive side can
+                * be replaced or left alone).
+                */
+               msg->any.head.msgid = msg->state->msgid;
+               msg->any.head.spanid = msg->state->spanid;
+               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               if (func) {
+                       msg->state->func = func;
+                       msg->state->any.any = data;
+               }
+       } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
                /*
                 * New transaction, requires tracking state and a unique
-                * msgid.
+                * msgid to be allocated.
                 */
                KKASSERT(msg->state == NULL);
                state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
                state->pmp = pmp;
                state->flags = HAMMER2_STATE_DYNAMIC;
                state->func = func;
+               state->any.any = data;
                state->msg = msg;
-               state->source = msg->any.head.source;
-               state->target = msg->any.head.target;
+               state->msgid = (uint64_t)(uintptr_t)state;
+               state->spanid = msg->any.head.spanid;
                msg->state = state;
 
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
-               if ((state->msgid = pmp->msgid_iterator++) == 0)
-                       state->msgid = pmp->msgid_iterator++;
-               while (RB_INSERT(hammer2_state_tree,
-                                &pmp->statewr_tree, state)) {
-                       if ((state->msgid = pmp->msgid_iterator++) == 0)
-                               state->msgid = pmp->msgid_iterator++;
-               }
+               if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
+                       panic("duplicate msgid allocated");
                msg->any.head.msgid = state->msgid;
-       } else if (msg->state) {
-               /*
-                * Continuance or termination
-                */
-               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
        } else {
                /*
-                * One-off message (always uses msgid 0)
+                * One-off message (always uses msgid 0 to distinguish
+                * between a possibly lost in-transaction message due to
+                * competing aborts and a real one-off message?)
                 */
                msg->any.head.msgid = 0;
                lockmgr(&pmp->msglk, LK_EXCLUSIVE);
        }
 
        /*
-        * Set icrc2 and icrc1
+        * Finish up the msg fields
         */
-       if (msg->hdr_size > sizeof(msg->any.head)) {
-               xcrc32 = hammer2_icrc32(&msg->any.head + 1,
-                                       msg->hdr_size - sizeof(msg->any.head));
-               xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-               msg->any.head.icrc2 = xcrc16;
-       }
-       xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
-                               HAMMER2_MSGHDR_CRCBYTES);
-       xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
-       msg->any.head.icrc1 = xcrc16;
+       msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
+       ++pmp->msg_seq;
+
+       msg->any.head.hdr_crc = 0;
+       msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
 
        TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
        lockmgr(&pmp->msglk, LK_RELEASE);
+}
+
+/*
+ * Reply to a message and terminate our side of the transaction.
+ *
+ * If msg->state is non-NULL we are replying to a one-way message.
+ */
+void
+hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+{
+       hammer2_state_t *state = msg->state;
+       uint32_t cmd;
+
+       /*
+        * Reply with a simple error code and terminate the transaction.
+        */
+       cmd = HAMMER2_LNK_ERROR;
+
+       /*
+        * Check if our direction has even been initiated yet, set CREATE.
+        *
+        * Check what direction this is (command or reply direction).  Note
+        * that txcmd might not have been initiated yet.
+        *
+        * If our direction has already been closed we just return without
+        * doing anything.
+        */
+       if (state) {
+               if (state->txcmd & HAMMER2_MSGF_DELETE)
+                       return;
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       cmd |= HAMMER2_MSGF_CREATE;
+               if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+               cmd |= HAMMER2_MSGF_DELETE;
+       } else {
+               if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+       }
+
+       msg = hammer2_msg_alloc(pmp, 0, cmd);
+       msg->any.head.error = error;
+       hammer2_msg_write(pmp, msg, NULL, NULL);
+}
+
+/*
+ * Reply to a message and continue our side of the transaction.
+ *
+ * If msg->state is non-NULL we are replying to a one-way message and this
+ * function degenerates into the same as hammer2_msg_reply().
+ */
+void
+hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
+{
+       hammer2_state_t *state = msg->state;
+       uint32_t cmd;
+
+       /*
+        * Return a simple result code, do NOT terminate the transaction.
+        */
+       cmd = HAMMER2_LNK_ERROR;
+
+       /*
+        * Check if our direction has even been initiated yet, set CREATE.
+        *
+        * Check what direction this is (command or reply direction).  Note
+        * that txcmd might not have been initiated yet.
+        *
+        * If our direction has already been closed we just return without
+        * doing anything.
+        */
+       if (state) {
+               if (state->txcmd & HAMMER2_MSGF_DELETE)
+                       return;
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
+                       cmd |= HAMMER2_MSGF_CREATE;
+               if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+               /* continuing transaction, do not set MSGF_DELETE */
+       } else {
+               if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
+                       cmd |= HAMMER2_MSGF_REPLY;
+       }
 
-       return (msg->state);
+       msg = hammer2_msg_alloc(pmp, 0, cmd);
+       msg->any.head.error = error;
+       hammer2_msg_write(pmp, msg, NULL, NULL);
 }
index 89f8b12..d3b8421 100644 (file)
@@ -52,3 +52,26 @@ hammer2_msg_adhoc_input(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
        kprintf("ADHOC INPUT MSG %08x\n", msg->any.head.cmd);
        return(0);
 }
+
+int
+hammer2_msg_dbg_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+       switch(msg->any.head.cmd & HAMMER2_MSGF_CMDSWMASK) {
+       case HAMMER2_DBG_SHELL:
+               /*
+                * Execute shell command (not supported atm)
+                */
+               hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_UNKNOWN);
+               break;
+       case HAMMER2_DBG_SHELL | HAMMER2_MSGF_REPLY:
+               if (msg->aux_data) {
+                       msg->aux_data[msg->aux_size - 1] = 0;
+                       kprintf("DEBUGMSG: %s\n", msg->aux_data);
+               }
+               break;
+       default:
+               hammer2_msg_reply(pmp, msg, HAMMER2_MSG_ERR_UNKNOWN);
+               break;
+       }
+       return(0);
+}
index 8f1b254..95305b8 100644 (file)
  * Mesh network protocol structures.
  *
  * The mesh is constructed from point-to-point streaming links with varying
- * levels of interconnectedness, forming a graph.  When a link is established
- * link id #0 is reserved for link-level communications.  This link is used
- * for authentication, registration, ping, further link id negotiations,
- * spanning tree, and so on.
- *
- * The spanning tree forms a weighted shortest-path-first graph amongst
- * those nodes with sufficient administrative rights to relay between
- * registrations.  Each link maintains a full reachability set, aggregates
- * it, and retransmits via the shortest path.  However, leaf nodes (even leaf
- * nodes with multiple connections) can opt not to be part of the spanning
- * tree and typically (due to administrative rights) their registrations
- * are not reported to other leafs.
+ * levels of interconnectedness, forming a graph.  The spanning tree protocol
+ * running on each node transmits a LNK_SPAN transactional message to the
+ * other end.  The protocol collects LNK_SPAN messages from all sources,
+ * aggregates them using a shortest-weighted-path algorithm, and transmits
+ * them over each link as well, creating a multplication within the topology.
+ *
+ * Any node in the graph may transmit a message to any other node by using
+ * the msgid of the LNK_SPAN open transaction as the message's 'linkid'.
+ * This identifies both sides so there is no 'source' and 'target' per-say.
+ *
+ * Open transactions are recorded by the source and the target, but not by
+ * intermediate nodes in the route.  Streaming protocols are used.  If a
+ * span element is lost its transaction will be aborted automatically (even
+ * if other routes to the same target are available), and any related
+ * messages will be aborted.  If the span element was chosen for aggregation
+ * this will propagate through the entire topology and thus ultimately reach
+ * the target which used the aggregated span element, but does not
+ * necessarily effect all paths in the topology.
+ *
+ * When a link failure occurs all SPANs related to that link are
+ * transactionally closed.  The SPANs are not deleted until closed in
+ * both directions, thus the spanid serves as a placeholder allowing all
+ * in-transit messages being routed over that spanid to be properly thrown
+ * out.  Once completely closed the spanid can be reused.
+ *
+ * NOTE: Multiple spans for the same physical {fsid,pfs_fsid} can be
+ *      forwarded, allowing concurrency within the topology.
+ *
+ * NOTE: It is important that messages in a lost route be aborted because
+ *      the messaging protocol expects serialization over any given route.
+ *      Only propagated spans are forwarded as spans to other nodes, so any
+ *      given open span transaction will represent a specific path.
+ *
+ *      If a portion of the path in the middle of the topology is lost it
+ *      will propagate in both directions all the way to the ends that used
+ *      it.  Intermediate route nodes DO NOT silently re-route messages to
+ *      another span.  Messages in-flight will meet the updating SPAN and
+ *      simply be discarded by intermediate nodes.  Ultimately the updating
+ *      SPAN reaches all end-points and auto-aborts the open transaction.
+ *
+ *      If another path is available the transaction can be instantly
+ *      retried.
+ *
+ * NOTE: It is possible to route messages virtually using the msgid of any
+ *      open transaction instead of the msgid of a SPAN transaction, but
+ *      not recommended and not currently coded.
+ *
+ * NOTE: Both the msgid and the spanid are 64-bit fields and may be populated
+ *      with actual memory pointers (which simplifies the end-points).
+ *      However, all such identifiers must be indexed as appropriate by the
+ *      nodes and verified as being valid before any memory dereference
+ *      occurs, for obvious reasons.
  *
  * All message responses follow the SAME PATH that the original message
  * followed, but in reverse.  This is an absolute requirement since messages
- * expecting replies record persistent state at each hop.
+ * expecting replies record persistent state at each hop.  Sequencing must
+ * be preserved.
+ *
+ *                     MESSAGE TRANSACTIONAL STATES
  *
  * Message state is handled by the CREATE, DELETE, REPLY, and ABORT
  * flags.  Message state is typically recorded at the end points and
  * fully closed state.  ABORT|DELETE messages which race the fully closed
  * state are expected to be discarded by the other end.
  *
- *
- *                     NEGOTIATION OF {source} AND {target}
- *
- * In this discussion 'originator' describes the original sender of a message
- * and not the relays inbetween, while 'sender' describes the last relay.
- * The two mean the same thing only when the originator IS the last relay.
- *
- * The {source} field is sender-localized.  The sender assigns this field
- * based on which connection the message originally came from.  The initial
- * message as sent by the originator sets source=0.  This also means that a
- * leaf connection will always send messages with source=0.
- *
- * The {source} field must be re-localized at each hop, since messages
- * coming from multiple connections to a node will use conflicting
- * {source} values.  This can lead to linkid exhaustion which is discussed
- * a few paragraphs down.
- *
- * The {target} field is sender-allocated.  Messages sent to {target} are
- * preceeded by a FORGE message to {target} which associates a registration
- * with {target}, or UNFORGE to delete the associtation.
- *
- * The msgid field is 32 bits (remember some messages have long-lived
- * persistent state so this is important!).  One-way messages always use
- * msgid=0.
- *
- *                             LINKID EXHAUSTION
- *
- * Because {source} must be re-localized at each hop it is possible to run
- * out of link identifiers.  At the same time we want to allow millions of
- * client/leaf connections, and 'millions' is a lot bigger than 65535.
- *
- * We also have a problem with the persistent message state... If a single
- * client's vnode cache has a million vnodes that can represent a million
- * persistent cache states.  Multiply by a million clients and ... oops!
- *
- * To solve these problems leafs connect into protocol-aggregators rather
- * than directly to the cluster.  The linkid and core message protocols only
- * occur within the cluster and not by the leafs.  A leaf can still connect
- * to multiple aggregators for redundancy if it desires but may have to
- * pick and choose which inodes go where since acquiring a cache state lock
- * over one connection will cause conflicts to be invalidated on the other.
- * In otherwords, there are limitations to this approach.
- *
- * A protocol aggregator takes any number of connections and aggregates
- * the operations down to a single linkid.  For example, this means that
- * the protocol aggregator is responsible for maintaining all the cache
- * state and performing crunches to reduce the overall amount of state
- * down to something the cluster core can handle.
- *
  * --
  *
- * All message headers are 32-byte aligned and sized (all command and
- * response structures must be 32-byte aligned), and all transports must
- * support message headers up to HAMMER2_MSGHDR_MAX.  The msg structure
- * can handle up to 8160 bytes but to keep things fairly clean we limit
- * message headers to 2048 bytes.
+ * All base and extended message headers are 64-byte aligned, and all
+ * transports must support extended message headers up to HAMMER2_MSGHDR_MAX.
+ * Currently we allow extended message headers up to 2048 bytes.  Note
+ * that the extended header size is encoded in the 'cmd' field of the header.
  *
- * Any in-band data is padded to a 32-byte alignment and placed directly
+ * Any in-band data is padded to a 64-byte alignment and placed directly
  * after the extended header (after the higher-level cmd/rep structure).
  * The actual unaligned size of the in-band data is encoded in the aux_bytes
  * field in this case.  Maximum data sizes are negotiated during registration.
  *
- * Use of out-of-band data must be negotiated.  In this case bit 31 of
- * aux_bytes will be set and the remaining bits will contain information
- * specific to the out-of-band transfer (such as DMA channel, slot, etc).
- *
- * (must be 32 bytes exactly to match the alignment requirement and to
- *  support pad records in shared-memory FIFO schemes)
+ * Auxillary data can be in-band or out-of-band.  In-band data sets aux_descr
+ * equal to 0.  Any out-of-band data must be negotiated by the SPAN protocol.
+ *
+ * Auxillary data, whether in-band or out-of-band, must be at-least 64-byte
+ * aligned.  The aux_bytes field contains the actual byte-granular length
+ * and not the aligned length.
+ *
+ * hdr_crc is calculated over the entire, ALIGNED extended header.  For
+ * the purposes of calculating the crc, the hdr_crc field is 0.  That is,
+ * if calculating the crc in HW a 32-bit '0' must be inserted in place of
+ * the hdr_crc field when reading the entire header and compared at the
+ * end (but the actual hdr_crc must be left intact in memory).  A simple
+ * counter to replace the field going into the CRC generator does the job
+ * in HW.  The CRC endian is based on the magic number field and may have
+ * to be byte-swapped, too (which is also easy to do in HW).
+ *
+ * aux_crc is calculated over the entire, ALIGNED auxillary data.
+ *
+ *                     SHARED MEMORY IMPLEMENTATIONS
+ *
+ * Shared-memory implementations typically use a pipe to transmit the extended
+ * message header and shared memory to store any auxilary data.  Auxillary
+ * data in one-way (non-transactional) messages is typically required to be
+ * inline.  CRCs are still recommended and required at the beginning, but
+ * may be negotiated away later.
+ *
+ *                      MULTI-PATH MESSAGE DUPLICATION
+ *
+ * Redundancy can be negotiated but is not required in the current spec.
+ * Basically you send the same message, with the same msgid, via several
+ * paths to the target.  The msgid is the rendezvous.  The first copy that
+ * makes it to the target is used, the second is ignored.  Similarly for
+ * replies.  This can improve performance during span flapping.  Only
+ * transactional messages will be serialized.  The target might receive
+ * multiple copies of one-way messages in higher protocol layers (potentially
+ * out of order, too).
  */
 struct hammer2_msg_hdr {
-       uint16_t        magic;          /* sanity, synchronization, endian */
-       uint16_t        icrc1;          /* base header crc &salt on */
-       uint32_t        salt;           /* random salt helps crypto/replay */
-
-       uint16_t        source;         /* command originator linkid */
-       uint16_t        target;         /* reply originator linkid */
-       uint32_t        msgid;          /* {source,target,msgid} unique */
-
-       uint32_t        cmd;            /* flags | cmd | hdr_size / 32 */
-       uint16_t        error;          /* error field */
-       uint16_t        resv05;
-
-       uint16_t        icrc2;          /* extended header crc (after base) */
-       uint16_t        aux_bytes;      /* aux data descriptor or size / 32 */
-       uint32_t        aux_icrc;       /* aux data iscsi crc */
+       uint16_t        magic;          /* 00 sanity, synchro, endian */
+       uint16_t        reserved02;     /* 02 size of header in bytes */
+       uint32_t        salt;           /* 04 random salt helps w/crypto */
+
+       uint64_t        msgid;          /* 08 message transaction id */
+       uint64_t        spanid;         /* 10 message routing id or 0 */
+
+       uint32_t        cmd;            /* 18 flags | cmd | hdr_size / ALIGN */
+       uint32_t        aux_crc;        /* 1C auxillary data crc */
+       uint32_t        aux_bytes;      /* 20 auxillary data length (bytes) */
+       uint32_t        error;          /* 24 error code or 0 */
+       uint64_t        aux_descr;      /* 28 negotiated OOB data descr */
+       uint64_t        reserved30;     /* 30 */
+       uint32_t        reserved38;     /* 38 */
+       uint32_t        hdr_crc;        /* 3C (aligned) extended header crc */
 };
 
 typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
@@ -196,8 +219,8 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
 /*
  * Administrative protocol limits.
  */
-#define HAMMER2_MSGHDR_MAX             2048    /* msg struct max is 8192-32 */
-#define HAMMER2_MSGAUX_MAX             65536   /* msg struct max is 2MB-32 */
+#define HAMMER2_MSGHDR_MAX             2048    /* <= 65535 */
+#define HAMMER2_MSGAUX_MAX             65536   /* <= 1MB */
 #define HAMMER2_MSGBUF_SIZE            (HAMMER2_MSGHDR_MAX * 4)
 #define HAMMER2_MSGBUF_MASK            (HAMMER2_MSGBUF_SIZE - 1)
 
@@ -248,7 +271,7 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
 /*
  * Message command constructors, sans flags
  */
-#define HAMMER2_MSG_ALIGN              32
+#define HAMMER2_MSG_ALIGN              64
 #define HAMMER2_MSG_ALIGNMASK          (HAMMER2_MSG_ALIGN - 1)
 #define HAMMER2_MSG_DOALIGN(bytes)     (((bytes) + HAMMER2_MSG_ALIGNMASK) & \
                                         ~HAMMER2_MSG_ALIGNMASK)
@@ -292,32 +315,24 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
  *               pad message buffers on shared-memory transports.  Not
  *               typically used with TCP.
  *
+ * PING                - One-way message on link-0, keep-alive, run by both sides
+ *               typically 1/sec on idle link, link is lost after 10 seconds
+ *               of inactivity.
+ *
  * AUTH                - Authenticate the connection, negotiate administrative
  *               rights & encryption, protocol class, etc.  Only PAD and
  *               AUTH messages (not even PING) are accepted until
  *               authentication is complete.  This message also identifies
  *               the host.
  *
- * PING                - One-way message on link-0, keep-alive, run by both sides
- *               typically 1/sec on idle link, link is lost after 10 seconds
- *               of inactivity.
- *
- * STATUS      - One-way message on link-0, host-spanning tree message.
- *               Connection and authentication status is propagated using
- *               these messages on a per-connection basis.  Works like SPAN
- *               but is only used for general status.  See the hammer2
- *               'rinfo' command.
+ * CONN                - Enable the SPAN protocol on link-0, possibly also installing
+ *               a PFS filter (by cluster id, unique id, and/or wildcarded
+ *               name).
  *
- * SPAN                - One-way message on link-0, spanning tree message adds,
- *               drops, or updates a remote registration.  Sent by both
- *               sides, delta changes only.  Visbility into remote
- *               registrations may be limited and received registrations
- *               may be filtered depending on administrative controls.
- *
- *               A multiply-connected node maintains SPAN information on
- *               each link independently and then retransmits an aggregation
- *               of the shortest-weighted path for each registration to
- *               all links when a received change adjusts the path.
+ * SPAN                - A SPAN transaction on link-0 enables messages to be relayed
+ *               to/from a particular cluster node.  SPANs are received,
+ *               sorted, aggregated, and retransmitted back out across all
+ *               applicable connections.
  *
  *               The leaf protocol also uses this to make a PFS available
  *               to the cluster (e.g. on-mount).
@@ -325,11 +340,42 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
 #define HAMMER2_LNK_PAD                HAMMER2_MSG_LNK(0x000, hammer2_msg_hdr)
 #define HAMMER2_LNK_PING       HAMMER2_MSG_LNK(0x001, hammer2_msg_hdr)
 #define HAMMER2_LNK_AUTH       HAMMER2_MSG_LNK(0x010, hammer2_lnk_auth)
-#define HAMMER2_LNK_SPAN       HAMMER2_MSG_LNK(0x011, hammer2_lnk_span)
+#define HAMMER2_LNK_CONN       HAMMER2_MSG_LNK(0x011, hammer2_lnk_conn)
+#define HAMMER2_LNK_SPAN       HAMMER2_MSG_LNK(0x012, hammer2_lnk_span)
 #define HAMMER2_LNK_ERROR      HAMMER2_MSG_LNK(0xFFF, hammer2_msg_hdr)
 
 /*
- * SPAN - Registration (transaction, left open)
+ * LNK_CONN - Register connection for SPAN (transaction, left open)
+ *
+ * One LNK_CONN transaction may be opened on a stream connection, registering
+ * the connection with the SPAN subsystem and allowing the subsystem to
+ * accept and relay SPANs to this connection.
+ *
+ * The LNK_CONN message may contain a filter, limiting the desireable SPANs.
+ *
+ * This message contains a lot of the same info that a SPAN message contains,
+ * but is not a SPAN.  That is, without this message the SPAN subprotocol will
+ * not be executed on the connection, nor is this message a promise that the
+ * sending end is a client or node of a cluster.
+ */
+struct hammer2_lnk_conn {
+       hammer2_msg_hdr_t head;
+       uuid_t          pfs_clid;       /* rendezvous pfs uuid */
+       uuid_t          pfs_fsid;       /* unique pfs uuid */
+       uint8_t         pfs_type;       /* peer type */
+       uint8_t         reserved01;
+       uint16_t        proto_version;  /* high level protocol support */
+       uint32_t        status;         /* status flags */
+       uint8_t         reserved02[8];
+       int32_t         weight;         /* span weight */
+       uint32_t        reserved03[15];
+       char            label[256];     /* PFS label (can be wildcard) */
+};
+
+typedef struct hammer2_lnk_conn hammer2_lnk_conn_t;
+
+/*
+ * LNK_SPAN - Relay a SPAN (transaction, left open)
  *
  * This message registers a PFS/PFS_TYPE with the other end of the connection,
  * telling the other end who we are and what we can provide or what we want
@@ -372,14 +418,15 @@ typedef struct hammer2_msg_hdr hammer2_msg_hdr_t;
  */
 struct hammer2_lnk_span {
        hammer2_msg_hdr_t head;
-       uuid_t          pfs_id;         /* rendezvous pfs uuid */
+       uuid_t          pfs_clid;       /* rendezvous pfs uuid */
        uuid_t          pfs_fsid;       /* unique pfs uuid */
        uint8_t         pfs_type;       /* peer type */
        uint8_t         reserved01;
        uint16_t        proto_version;  /* high level protocol support */
        uint32_t        status;         /* status flags */
        uint8_t         reserved02[8];
-       uint32_t        reserved03[16];
+       int32_t         weight;         /* span weight */
+       uint32_t        reserved03[15];
        char            label[256];     /* PFS label (can be wildcard) */
 };
 
@@ -470,6 +517,8 @@ typedef struct hammer2_dbg_shell hammer2_dbg_shell_t;
 #define HAMMER2_QRM_COMMIT     HAMMER2_MSG_QRM(0x001, hammer2_qrm_commit)
 
 /*
+ * NOTE!!!! ALL EXTENDED HEADER STRUCTURES MUST BE 64-BYTE ALIGNED!!!
+ *
  * General message errors
  *
  *     0x00 - 0x1F     Local iocomm errors
@@ -481,6 +530,7 @@ union hammer2_msg_any {
        char                    buf[HAMMER2_MSGHDR_MAX];
        hammer2_msg_hdr_t       head;
        hammer2_lnk_span_t      lnk_span;
+       hammer2_lnk_conn_t      lnk_conn;
 };
 
 typedef union hammer2_msg_any hammer2_msg_any_t;
index 2095202..7c6b3d4 100644 (file)
@@ -139,7 +139,9 @@ static int hammer2_sync_scan2(struct mount *mp, struct vnode *vp, void *data);
 
 static void hammer2_cluster_thread_rd(void *arg);
 static void hammer2_cluster_thread_wr(void *arg);
-static int hammer2_msg_span_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+static int hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg);
+static int hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg);
+static int hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
 
 /*
  * HAMMER2 vfs operations.
@@ -1082,13 +1084,37 @@ hammer2_cluster_thread_rd(void *arg)
                 */
                error = hammer2_state_msgrx(pmp, msg);
                if (error) {
+                       /*
+                        * Raw protocol or connection error
+                        */
                        hammer2_msg_free(pmp, msg);
                        if (error == EALREADY)
                                error = 0;
-               } else if (msg->state) {
-                       error = msg->state->func(pmp, msg);
+               } else if (msg->state && msg->state->func) {
+                       /*
+                        * Message related to state which already has a
+                        * handling function installed for it.
+                        */
+                       error = msg->state->func(msg->state, msg);
+                       hammer2_state_cleanuprx(pmp, msg);
+               } else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
+                          HAMMER2_MSG_PROTO_LNK) {
+                       /*
+                        * Message related to the LNK protocol set
+                        */
+                       error = hammer2_msg_lnk_rcvmsg(pmp, msg);
+                       hammer2_state_cleanuprx(pmp, msg);
+               } else if ((msg->any.head.cmd & HAMMER2_MSGF_PROTOS) ==
+                          HAMMER2_MSG_PROTO_DBG) {
+                       /*
+                        * Message related to the DBG protocol set
+                        */
+                       error = hammer2_msg_dbg_rcvmsg(pmp, msg);
                        hammer2_state_cleanuprx(pmp, msg);
                } else {
+                       /*
+                        * Other higher-level messages (e.g. vnops)
+                        */
                        error = hammer2_msg_adhoc_input(pmp, msg);
                        hammer2_state_cleanuprx(pmp, msg);
                }
@@ -1135,25 +1161,27 @@ hammer2_cluster_thread_wr(void *arg)
        int error = 0;
 
        /*
-        * Initiate a SPAN transaction registering our PFS with the other
-        * end using {source}=1.  The transaction is left open.
+        * Open a LNK_CONN transaction indicating that we want to take part
+        * in the spanning tree algorithm.  Filter explicitly on the PFS
+        * info in the iroot.
+        *
+        * We do not transmit our (only) LNK_SPAN until the other end has
+        * acknowledged our link connection request.
         *
-        * The hammer2_msg_write() function will queue the message, and we
-        * pick it off and write it in our transmit loop.
+        * The transaction remains fully open for the duration of the
+        * connection.
         */
-       msg = hammer2_msg_alloc(pmp, 1, 0,
-                               HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE);
-       msg->any.lnk_span.pfs_id   = pmp->iroot->ip_data.pfs_id;
-       msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
-       msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
-       msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
+       msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE);
+       msg->any.lnk_conn.pfs_clid = pmp->iroot->ip_data.pfs_clid;
+       msg->any.lnk_conn.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
+       msg->any.lnk_conn.pfs_type = pmp->iroot->ip_data.pfs_type;
+       msg->any.lnk_conn.proto_version = HAMMER2_SPAN_PROTO_1;
        name_len = pmp->iroot->ip_data.name_len;
-       if (name_len >= sizeof(msg->any.lnk_span.label))
-               name_len = sizeof(msg->any.lnk_span.label) - 1;
-       bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_span.label, name_len);
-       msg->any.lnk_span.label[name_len] = 0;
-
-       hammer2_msg_write(pmp, msg, hammer2_msg_span_reply);
+       if (name_len >= sizeof(msg->any.lnk_conn.label))
+               name_len = sizeof(msg->any.lnk_conn.label) - 1;
+       bcopy(pmp->iroot->ip_data.filename, msg->any.lnk_conn.label, name_len);
+       msg->any.lnk_conn.label[name_len] = 0;
+       hammer2_msg_write(pmp, msg, hammer2_msg_conn_reply, pmp);
 
        /*
         * Transmit loop
@@ -1251,8 +1279,67 @@ hammer2_cluster_thread_wr(void *arg)
 }
 
 static int
-hammer2_msg_span_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+hammer2_msg_lnk_rcvmsg(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
 {
-       kprintf("SPAN REPLY\n");
+       switch(msg->any.head.cmd & HAMMER2_MSGF_TRANSMASK) {
+       case HAMMER2_LNK_CONN | HAMMER2_MSGF_CREATE:
+               kprintf("CONN RECEIVE - (just ignore it)\n");
+               hammer2_msg_result(pmp, msg, 0);
+               break;
+       case HAMMER2_LNK_SPAN | HAMMER2_MSGF_CREATE:
+               kprintf("SPAN RECEIVE - ADDED FROM CLUSTER\n");
+               break;
+       case HAMMER2_LNK_SPAN | HAMMER2_MSGF_DELETE:
+               kprintf("SPAN RECEIVE - DELETED FROM CLUSTER\n");
+               break;
+       default:
+               break;
+       }
+       return(0);
+}
+
+/*
+ * This function is called when the other end replies to our LNK_CONN
+ * request.
+ *
+ * We transmit our (single) SPAN on the initial reply, leaving that
+ * transaction open too.
+ */
+static int
+hammer2_msg_conn_reply(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+       hammer2_pfsmount_t *pmp = state->any.pmp;
+       size_t name_len;
+
+       if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
+               kprintf("LNK_CONN transaction replied to, initiate SPAN\n");
+               msg = hammer2_msg_alloc(pmp, 0, HAMMER2_LNK_SPAN |
+                                               HAMMER2_MSGF_CREATE);
+               msg->any.lnk_span.pfs_clid = pmp->iroot->ip_data.pfs_clid;
+               msg->any.lnk_span.pfs_fsid = pmp->iroot->ip_data.pfs_fsid;
+               msg->any.lnk_span.pfs_type = pmp->iroot->ip_data.pfs_type;
+               msg->any.lnk_span.proto_version = HAMMER2_SPAN_PROTO_1;
+               name_len = pmp->iroot->ip_data.name_len;
+               if (name_len >= sizeof(msg->any.lnk_span.label))
+                       name_len = sizeof(msg->any.lnk_span.label) - 1;
+               bcopy(pmp->iroot->ip_data.filename,
+                     msg->any.lnk_span.label,
+                     name_len);
+               msg->any.lnk_span.label[name_len] = 0;
+               hammer2_msg_write(pmp, msg, hammer2_msg_span_reply, pmp);
+       }
+       if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               kprintf("LNK_CONN transaction terminated by remote\n");
+               hammer2_msg_reply(pmp, msg, 0);
+       }
+       return(0);
+}
+
+static int
+hammer2_msg_span_reply(hammer2_state_t *state, hammer2_msg_t *msg)
+{
+       hammer2_pfsmount_t *pmp = state->any.pmp;
+
+       kprintf("SPAN REPLY - Our span was terminated? %p\n", pmp);
        return(0);
 }