hammer2 - wire up msg transaction state machine
authorMatthew Dillon <dillon@apollo.backplane.com>
Wed, 13 Jun 2012 01:43:32 +0000 (18:43 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Wed, 13 Jun 2012 01:43:32 +0000 (18:43 -0700)
* Wire up the msg transaction state machine.  Split hammer2_msg into
  hammer2_state and hammer2_msg.  Track states in two RB trees in the
  hammer2_pfsmount structure.

  Handle CREATE, DELETE, ABORT, and REPLY flags.

* Revamp the API.  The adjustments make it less likely that messages can
  be left dangling.

* Compile only, untested, not yet operational.

sys/vfs/hammer2/Makefile
sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_msg.c [new file with mode: 0644]
sys/vfs/hammer2/hammer2_network.h
sys/vfs/hammer2/hammer2_vfsops.c

index 1dc3fd8..5dfde44 100644 (file)
@@ -7,6 +7,6 @@ CFLAGS+= -DINVARIANTS -DSMP
 KMOD=  hammer2
 SRCS=  hammer2_vfsops.c hammer2_vnops.c hammer2_inode.c hammer2_ccms.c
 SRCS+= hammer2_chain.c hammer2_freemap.c hammer2_subr.c hammer2_icrc.c
 KMOD=  hammer2
 SRCS=  hammer2_vfsops.c hammer2_vnops.c hammer2_inode.c hammer2_ccms.c
 SRCS+= hammer2_chain.c hammer2_freemap.c hammer2_subr.c hammer2_icrc.c
-SRCS+= hammer2_ioctl.c
+SRCS+= hammer2_ioctl.c hammer2_msg.c
 
 .include <bsd.kmod.mk>
 
 .include <bsd.kmod.mk>
index 3a1cc44..8faa6c6 100644 (file)
 #include "hammer2_mount.h"
 #include "hammer2_ioctl.h"
 #include "hammer2_ccms.h"
 #include "hammer2_mount.h"
 #include "hammer2_ioctl.h"
 #include "hammer2_ccms.h"
+#include "hammer2_network.h"
 
 struct hammer2_chain;
 struct hammer2_inode;
 struct hammer2_mount;
 struct hammer2_pfsmount;
 
 struct hammer2_chain;
 struct hammer2_inode;
 struct hammer2_mount;
 struct hammer2_pfsmount;
+struct hammer2_state;
+struct hammer2_msg;
 
 /*
  * The chain structure tracks blockref recursions all the way to
 
 /*
  * The chain structure tracks blockref recursions all the way to
@@ -98,14 +101,16 @@ struct hammer2_pfsmount;
  * not match the blockref at (parent, index).
  */
 RB_HEAD(hammer2_chain_tree, hammer2_chain);
  * not match the blockref at (parent, index).
  */
 RB_HEAD(hammer2_chain_tree, hammer2_chain);
+RB_HEAD(hammer2_state_tree, hammer2_state);
 TAILQ_HEAD(flush_deferral_list, hammer2_chain);
 
 struct hammer2_chain {
        ccms_cst_t      cst;                    /* attr or data cst */
        struct hammer2_blockref bref;
        struct hammer2_blockref bref_flush;     /* synchronized w/MOVED bit */
 TAILQ_HEAD(flush_deferral_list, hammer2_chain);
 
 struct hammer2_chain {
        ccms_cst_t      cst;                    /* attr or data cst */
        struct hammer2_blockref bref;
        struct hammer2_blockref bref_flush;     /* synchronized w/MOVED bit */
-       struct hammer2_chain *parent;           /* return chain to root */
+       struct hammer2_chain    *parent;        /* return chain to root */
        struct hammer2_chain_tree rbhead;
        struct hammer2_chain_tree rbhead;
+       struct hammer2_state    *state;         /* if active cache msg */
        RB_ENTRY(hammer2_chain) rbnode;
        TAILQ_ENTRY(hammer2_chain) flush_node;  /* flush deferral list */
        union {
        RB_ENTRY(hammer2_chain) rbnode;
        TAILQ_ENTRY(hammer2_chain) flush_node;  /* flush deferral list */
        union {
@@ -307,6 +312,7 @@ struct hammer2_pfsmount {
        struct hammer2_mount    *hmp;           /* device global mount */
        hammer2_chain_t         *rchain;        /* PFS root chain */
        hammer2_inode_t         *iroot;         /* PFS root inode */
        struct hammer2_mount    *hmp;           /* device global mount */
        hammer2_chain_t         *rchain;        /* PFS root chain */
        hammer2_inode_t         *iroot;         /* PFS root inode */
+       struct malloc_type      *mmsg;
        ccms_domain_t           ccms_dom;
        struct netexport        export;         /* nfs export */
        int                     ronly;          /* read-only mount */
        ccms_domain_t           ccms_dom;
        struct netexport        export;         /* nfs export */
        int                     ronly;          /* read-only mount */
@@ -314,12 +320,55 @@ struct hammer2_pfsmount {
        thread_t                msgrd_td;       /* cluster thread */
        thread_t                msgwr_td;       /* cluster thread */
        int                     msg_ctl;        /* wakeup flags */
        thread_t                msgrd_td;       /* cluster thread */
        thread_t                msgwr_td;       /* cluster thread */
        int                     msg_ctl;        /* wakeup flags */
+       struct lock             msglk;          /* lockmgr lock */
+       TAILQ_HEAD(, hammer2_msg) msgq;         /* transmit queue */
+       struct hammer2_state    *freerd_state;  /* allocation cache */
+       struct hammer2_state    *freewr_state;  /* allocation cache */
+       struct hammer2_state_tree staterd_tree; /* active messages */
+       struct hammer2_state_tree statewr_tree; /* active messages */
 };
 
 typedef struct hammer2_pfsmount hammer2_pfsmount_t;
 
 #define HAMMER2_CLUSTERCTL_KILL        0x0001
 
 };
 
 typedef struct hammer2_pfsmount hammer2_pfsmount_t;
 
 #define HAMMER2_CLUSTERCTL_KILL        0x0001
 
+/*
+ * In-memory message structure for hammer2.
+ *
+ * Persistent cache state messages will be associated with a hammer2_chain.
+ */
+struct hammer2_state {
+       RB_ENTRY(hammer2_state) rbnode;         /* indexed by msgid */
+       struct hammer2_pfsmount *pmp;
+       uint32_t        txcmd;                  /* mostly for CMDF flags */
+       uint32_t        rxcmd;                  /* mostly for CMDF flags */
+       uint32_t        msgid;
+       int             flags;
+       int             error;
+       struct hammer2_chain *chain;            /* msg associated w/chain */
+       struct hammer2_msg *msg;
+       void (*func)(struct hammer2_state *state, struct hammer2_msg *msg);
+};
+
+#define HAMMER2_STATE_INSERTED 0x0001
+#define HAMMER2_STATE_DYNAMIC  0x0002
+
+struct hammer2_msg {
+       TAILQ_ENTRY(hammer2_msg) qentry;        /* serialized queue */
+       struct hammer2_state *state;
+       size_t          hdr_size;
+       size_t          aux_size;
+       char            *aux_data;
+       hammer2_any_t   any;
+};
+
+typedef struct hammer2_state hammer2_state_t;
+typedef struct hammer2_msg hammer2_msg_t;
+
+int hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2);
+RB_PROTOTYPE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
+
+
 #if defined(_KERNEL)
 
 MALLOC_DECLARE(M_HAMMER2);
 #if defined(_KERNEL)
 
 MALLOC_DECLARE(M_HAMMER2);
@@ -474,6 +523,17 @@ void hammer2_chain_commit(hammer2_mount_t *hmp, hammer2_chain_t *chain);
 int hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data,
                                int fflag, struct ucred *cred);
 
 int hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data,
                                int fflag, struct ucred *cred);
 
+/*
+ * hammer2_msg.c
+ */
+int hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+int hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+void hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+void hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+int hammer2_msg_execute(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+void hammer2_state_free(hammer2_state_t *state);
+void hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg);
+
 /*
  * hammer2_freemap.c
  */
 /*
  * hammer2_freemap.c
  */
diff --git a/sys/vfs/hammer2/hammer2_msg.c b/sys/vfs/hammer2/hammer2_msg.c
new file mode 100644 (file)
index 0000000..6ae0186
--- /dev/null
@@ -0,0 +1,604 @@
+/*-
+ * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
+ *
+ * This code is derived from software contributed to The DragonFly Project
+ * by Matthew Dillon <dillon@backplane.com>
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in
+ *    the documentation and/or other materials provided with the
+ *    distribution.
+ * 3. Neither the name of The DragonFly Project nor the names of its
+ *    contributors may be used to endorse or promote products derived
+ *    from this software without specific, prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
+ * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
+ * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
+ * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
+ * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
+ * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
+ * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+ * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+ * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ */
+
+#include "hammer2.h"
+
+/*
+ * Process state tracking for a message after reception, prior to
+ * execution.
+ *
+ * Called with msglk held and the msg dequeued.
+ *
+ * All messages are called with dummy state and return actual state.
+ * (One-off messages often just return the same dummy state).
+ *
+ * May request that caller discard the message by setting *discardp to 1.
+ * The returned state is not used in this case and is allowed to be NULL.
+ *
+ * --
+ *
+ * These routines handle persistent and command/reply message state via the
+ * CREATE and DELETE flags.  The first message in a command or reply sequence
+ * sets CREATE, the last message in a command or reply sequence sets DELETE.
+ *
+ * There can be any number of intermediate messages belonging to the same
+ * sequence sent inbetween the CREATE message and the DELETE message,
+ * which set neither flag.  This represents a streaming command or reply.
+ *
+ * Any command message received with CREATE set expects a reply sequence to
+ * be returned.  Reply sequences work the same as command sequences except the
+ * REPLY bit is also sent.  Both the command side and reply side can
+ * degenerate into a single message with both CREATE and DELETE set.  Note
+ * that one side can be streaming and the other side not, or neither, or both.
+ *
+ * The msgid is unique for the initiator.  That is, two sides sending a new
+ * message can use the same msgid without colliding.
+ *
+ * --
+ *
+ * ABORT sequences work by setting the ABORT flag along with normal message
+ * state.  However, ABORTs can also be sent on half-closed messages, that is
+ * even if the command or reply side has already sent a DELETE, as long as
+ * the message has not been fully closed it can still send an ABORT+DELETE
+ * to terminate the half-closed message state.
+ *
+ * Since ABORT+DELETEs can race we silently discard ABORT's for message
+ * state which has already been fully closed.  REPLY+ABORT+DELETEs can
+ * also race, and in this situation the other side might have already
+ * initiated a new unrelated command with the same message id.  Since
+ * the abort has not set the CREATE flag the situation can be detected
+ * and the message will also be discarded.
+ *
+ * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
+ * The ABORT request is essentially integrated into the command instead
+ * of being sent later on.  In this situation the command implementation
+ * detects that CREATE and ABORT are both set (vs ABORT alone) and can
+ * special-case non-blocking operation for the command.
+ *
+ * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
+ *       to be mid-stream aborts for command/reply sequences.  ABORTs on
+ *       one-way messages are not supported.
+ *
+ * NOTE!  If a command sequence does not support aborts the ABORT flag is
+ *       simply ignored.
+ *
+ * --
+ *
+ * One-off messages (no reply expected) are sent with neither CREATE or DELETE
+ * set.  One-off messages cannot be aborted and typically aren't processed
+ * by these routines.  The REPLY bit can be used to distinguish whether a
+ * one-off message is a command or reply.  For example, one-off replies
+ * will typically just contain status updates.
+ */
+int
+hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+       int error;
+
+       /*
+        * Make sure a state structure is ready to go in case we need a new
+        * one.  This is the only routine which uses freerd_state so no
+        * races are possible.
+        */
+       if ((state = pmp->freerd_state) == NULL) {
+               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
+               state->pmp = pmp;
+               state->flags = HAMMER2_STATE_DYNAMIC;
+               pmp->freerd_state = state;
+       }
+
+       /*
+        * Lock RB tree and locate existing persistent state, if any.
+        *
+        * If received msg is a command state is on staterd_tree.
+        * If received msg is a reply state is on statewr_tree.
+        */
+       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+
+       state->msgid = msg->any.head.msgid;
+       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
+               state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
+       else
+               state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
+       msg->state = state;
+
+       /*
+        * Short-cut one-off or mid-stream messages (state may be NULL).
+        */
+       if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                 HAMMER2_MSGF_ABORT)) == 0) {
+               lockmgr(&pmp->msglk, LK_RELEASE);
+               return(0);
+       }
+
+       /*
+        * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
+        * inside the case statements.
+        */
+       switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                   HAMMER2_MSGF_REPLY)) {
+       case HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * New persistant command received.
+                */
+               if (state) {
+                       kprintf("hammer2_state_msgrx: duplicate transaction\n");
+                       error = EINVAL;
+                       break;
+               }
+               state = pmp->freerd_state;
+               pmp->freerd_state = NULL;
+               msg->state = state;
+               state->msg = msg;
+               state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
+               state->flags |= HAMMER2_STATE_INSERTED;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_DELETE:
+               /*
+                * Persistent state is expected but might not exist if an
+                * ABORT+DELETE races the close.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgrx: no state "
+                                       "for DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+
+               /*
+                * Handle another ABORT+DELETE case if the msgid has already
+                * been reused.
+                */
+               if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgrx: state reused "
+                                       "for DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       default:
+               /*
+                * Check for mid-stream ABORT command received, otherwise
+                * allow.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * When receiving a reply with CREATE set the original
+                * persistent state message should already exist.
+                */
+               if (state == NULL) {
+                       kprintf("hammer2_state_msgrx: no state match for "
+                               "REPLY\n");
+                       error = EINVAL;
+                       break;
+               }
+               state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
+               /*
+                * Received REPLY+ABORT+DELETE in case where msgid has
+                * already been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgrx: no state match "
+                                       "for REPLY|DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+
+               /*
+                * Received REPLY+ABORT+DELETE in case where msgid has
+                * already been reused for an unrelated message,
+                * ignore the message.
+                */
+               if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgrx: state reused "
+                                       "for REPLY|DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY:
+               /*
+                * Check for mid-stream ABORT reply received to sent command.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       }
+       lockmgr(&pmp->msglk, LK_RELEASE);
+       return (error);
+}
+
+void
+hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+
+       if ((state = msg->state) == NULL) {
+               hammer2_msg_free(pmp, msg);
+       } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               state->rxcmd |= HAMMER2_MSGF_DELETE;
+               if (state->txcmd & HAMMER2_MSGF_DELETE) {
+                       if (state->msg == msg)
+                               state->msg = NULL;
+                       KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
+                       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &pmp->statewr_tree, state);
+                       } else {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &pmp->staterd_tree, state);
+                       }
+                       state->flags &= ~HAMMER2_STATE_INSERTED;
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       hammer2_state_free(state);
+               } else {
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+               }
+               hammer2_msg_free(pmp, msg);
+       } else if (state->msg != msg) {
+               hammer2_msg_free(pmp, msg);
+       }
+}
+
+/*
+ * Process state tracking for a message prior to transmission.
+ *
+ * Called with msglk held and the msg dequeued.
+ *
+ * One-off messages are usually with dummy state and msg->state may be NULL
+ * in this situation.
+ *
+ * New transactions (when CREATE is set) will insert the state.
+ *
+ * May request that caller discard the message by setting *discardp to 1.
+ * A NULL state may be returned in this case.
+ */
+int
+hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+       int error;
+
+       /*
+        * Make sure a state structure is ready to go in case we need a new
+        * one.  This is the only routine which uses freewr_state so no
+        * races are possible.
+        */
+       if ((state = pmp->freewr_state) == NULL) {
+               state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
+               state->pmp = pmp;
+               state->flags = HAMMER2_STATE_DYNAMIC;
+               pmp->freewr_state = state;
+       }
+
+       /*
+        * Lock RB tree.  If persistent state is present it will have already
+        * been assigned to msg.
+        */
+       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+       state = msg->state;
+
+       /*
+        * Short-cut one-off or mid-stream messages (state may be NULL).
+        */
+       if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                 HAMMER2_MSGF_ABORT)) == 0) {
+               lockmgr(&pmp->msglk, LK_RELEASE);
+               return(0);
+       }
+
+
+       /*
+        * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
+        * inside the case statements.
+        */
+       switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
+                                   HAMMER2_MSGF_REPLY)) {
+       case HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * Insert the new persistent message state and mark
+                * half-closed if DELETE is set.  Since this is a new
+                * message it isn't possible to transition into the fully
+                * closed state here.
+                */
+               if (state == NULL) {
+                       state = pmp->freerd_state;
+                       pmp->freerd_state = NULL;
+                       msg->state = state;
+                       state->msg = msg;
+               }
+               KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
+               if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
+                       kprintf("hammer2_state_msgtx: duplicate transaction\n");
+                       error = EINVAL;
+                       break;
+               }
+               state->flags |= HAMMER2_STATE_INSERTED;
+               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_DELETE:
+               /*
+                * Sent ABORT+DELETE in case where msgid has already
+                * been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgtx: no state match "
+                                       "for DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+
+               /*
+                * Sent ABORT+DELETE in case where msgid has
+                * already been reused for an unrelated message,
+                * ignore the message.
+                */
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgtx: state reused "
+                                       "for DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       default:
+               /*
+                * Check for mid-stream ABORT command sent
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
+               /*
+                * When transmitting a reply with CREATE set the original
+                * persistent state message should already exist.
+                */
+               if (state == NULL) {
+                       kprintf("hammer2_state_msgtx: no state match "
+                               "for REPLY | CREATE\n");
+                       error = EINVAL;
+                       break;
+               }
+               state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
+               /*
+                * When transmitting a reply with DELETE set the original
+                * persistent state message should already exist.
+                *
+                * This is very similar to the REPLY|CREATE|* case except
+                * txcmd is already stored, so we just add the DELETE flag.
+                *
+                * Sent REPLY+ABORT+DELETE in case where msgid has
+                * already been fully closed, ignore the message.
+                */
+               if (state == NULL) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgtx: no state match "
+                                       "for REPLY | DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+
+               /*
+                * Sent REPLY+ABORT+DELETE in case where msgid has already
+                * been reused for an unrelated message, ignore the message.
+                */
+               if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                       if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                               error = EALREADY;
+                       } else {
+                               kprintf("hammer2_state_msgtx: state reused "
+                                       "for REPLY | DELETE\n");
+                               error = EINVAL;
+                       }
+                       break;
+               }
+               error = 0;
+               break;
+       case HAMMER2_MSGF_REPLY:
+               /*
+                * Check for mid-stream ABORT reply sent.
+                *
+                * One-off REPLY messages are allowed for e.g. status updates.
+                */
+               if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
+                       if (state == NULL ||
+                           (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
+                               error = EALREADY;
+                               break;
+                       }
+               }
+               error = 0;
+               break;
+       }
+       lockmgr(&pmp->msglk, LK_RELEASE);
+       return (error);
+}
+
+void
+hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+       hammer2_state_t *state;
+
+       if ((state = msg->state) == NULL) {
+               hammer2_msg_free(pmp, msg);
+       } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
+               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               state->txcmd |= HAMMER2_MSGF_DELETE;
+               if (state->rxcmd & HAMMER2_MSGF_DELETE) {
+                       if (state->msg == msg)
+                               state->msg = NULL;
+                       KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
+                       if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &pmp->staterd_tree, state);
+                       } else {
+                               RB_REMOVE(hammer2_state_tree,
+                                         &pmp->statewr_tree, state);
+                       }
+                       state->flags &= ~HAMMER2_STATE_INSERTED;
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+                       hammer2_state_free(state);
+               } else {
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+               }
+               hammer2_msg_free(pmp, msg);
+       } else if (state->msg != msg) {
+               hammer2_msg_free(pmp, msg);
+       }
+}
+
+void
+hammer2_state_free(hammer2_state_t *state)
+{
+       hammer2_pfsmount_t *pmp = state->pmp;
+       hammer2_msg_t *msg;
+
+       msg = state->msg;
+       state->msg = NULL;
+       kfree(state, pmp->mmsg);
+       if (msg)
+               hammer2_msg_free(pmp, msg);
+}
+
+void
+hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
+{
+       if (msg->aux_data && msg->aux_size) {
+               kfree(msg->aux_data, pmp->mmsg);
+               msg->aux_data = NULL;
+               msg->aux_size = 0;
+       }
+       kfree(msg, pmp->mmsg);
+}
+
+/*
+ * Indexed messages are stored in a red-black tree indexed by their
+ * msgid.  Only persistent messages are indexed.
+ */
+int
+hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
+{
+       if (state1->msgid < state2->msgid)
+               return(-1);
+       if (state1->msgid > state2->msgid)
+               return(1);
+       return(0);
+}
+
+/*
+ * Execute a received message.
+ *
+ * If msg is part of a transaction msg->state will be non-NULL.  In this
+ * situation state->msg saves the original command and can be replaced
+ * by ongoing activity if desired.  Any msg which does not match state->msg
+ * will be destroyed after we return.
+ *
+ * Single-element transactions set both CREATE and DELETE.
+ *
+ * Non-blocking transactions also set ABORT with CREATE.
+ *
+ * Specific transactional ABORTs can also be received and must be handled,
+ * and can be received even when the other end has closed (after DELETE
+ * received) if we have not yet closed our end of the transaction.
+ */
+int
+hammer2_msg_execute(hammer2_pfsmount_t *pmp __unused, hammer2_msg_t *msg __unused)
+{
+       return(0);
+}
index 5045857..c5a597c 100644 (file)
  * at each hop until a DELETE is received from both sides.
  *
  * One-way messages such as those used by spanning tree commands are not
  * at each hop until a DELETE is received from both sides.
  *
  * One-way messages such as those used by spanning tree commands are not
- * recorded.  These are sent with no flags set.  Aborts and replies are not
- * possible.
- *
- * A normal message with no persistent state is sent with CREATE|DELETE and
- * the response is returned with REPLY|CREATE|DELETE.  A normal command can
- * be aborted by sending an ABORT message to the msgid that is in progress.
- * An ABORT sent by the originator must still wait for the reply from the
- * target and, since we've already sent the DELETE with our CREATE|DELETE,
- * may also cross the REPLY|CREATE|DELETE message in the opposite direction.
- * In this situation the message state has been destroyed on the target and
- * the target ignores the ABORT (because CREATE is not set, and
- * differentiated from one-way messages because ABORT is set).
- *
- * A command which has persistent state must maintain a persistent message.
- * For example, a lock or cache state request.  A persistent message is
- * initiated with just CREATE and the initial response is returned with
- * REPLY|CREATE.  Successive messages are sent with no flags and responses
- * with just REPLY.  The DELETE flag acts like a half-close (and degenerately
- * works the same as it does for normal messages).  This flag can be set
- * in the initial command or any successive command, and in the initial reply
- * or in any successive reply.  The recorded message state is destroyed when
- * both sides have sent a DELETE.
- *
- * Aborts for persistent messages work in the same fashion as they do for
- * normal messages, except that the target can also initiate an ABORT
- * by using ABORT|REPLY.  The target has one restriction, however, it cannot
- * send an ABORT with the CREATE flag set (i.e. as the initial reply),
- * because if the originator reuses the msgid the originator would not
- * then be able to determine that the ABORT is associated with the previous
- * session and not the new session.
- *
- * If a link failure occurs any active or persistent messages will be
- * auto-replied to the originator, and auto-aborted to the target.
- *
- * Additional features:
- *
- *     ABORT+CREATE    - This may be used to make a non-blocking request.
- *                       The target receives the normal command and is free
- *                       to ignore the ABORT flag, but may use it as an
- *                       indication that a non-blocking request is being
- *                       made.  The target must still reply the message of
- *                       course.  Works for normal and persistent messages
- *                       but does NOT work for one-way messages (because
- *                       ABORT alone without recorded msgid state has to be
- *                       ignored).
- *
- *     ABORT           - ABORT messages are allowed to bypass input queues.
- *                       Normal ABORTs are sent without the DELETE flag,
- *                       even for normal messages which had already set the
- *                       DELETE flag in the initial message.  This allows
- *                       the normal DELETE half-close operation to proceed
- *                       so an ABORT is basically advisory and the originator
- *                       must still wait for a reply.  Aborts are also
- *                       advisory when sent by targets.
- *
- *                       ABORT messages cannot be used with one-way messages
- *                       as this would cause such messages to be ignored.
- *
- *     ABORT+DELETE    - This is a special form of ABORT that allows the
- *                       recorded message state on the sender and on all
- *                       hops the message is relayed through to be destroyed
- *                       on the fly, as if a two-way DELETE had occurred.
- *                       It will cause an auto-reply or auto-abort to be
- *                       issued as if the link had been lost, but allows
- *                       the link to remain live.
- *
- *                       This form is basically like a socket close(),
- *                       where you aren't just sending an EOF but you are
- *                       completely aborting the request in both directions.
- *
- *                       This form cannot be used with CREATE as that could
- *                       generate a false reply if msgid is reused and
- *                       crosses the abort over the wire.
- *
- *                       ABORT messages cannot be used with one-way messages
- *                       as this would cause such messages to be ignored.
- *
- *     SUBSTREAMS      - Persistent messages coupled with the fact that
- *                       all commands and responses run through a single
- *                       chain of relays over reliable streams allows one
- *                       to treat persistent message updates as a data
- *                       stream and use the DELETE flag or an ABORT to
- *                       indicate EOF.
+ * recorded.  These are sent without the CREATE, DELETE, or ABORT flags set.
+ * ABORT is not supported for one-off messages.  The REPLY bit can be used
+ * to distinguish between command and status if desired.
+ *
+ * Persistent-state messages are messages which require a reply to be
+ * returned.  These messages can also consist of multiple message elements
+ * for the command or reply or both (or neither).  The command message
+ * sequence sets CREATE on the first message and DELETE on the last message.
+ * A single message command sets both (CREATE|DELETE).  The reply message
+ * sequence works the same way but of course also sets the REPLY bit.
+ *
+ * Persistent-state messages can be aborted by sending a message element
+ * with the ABORT flag set.  This flag can be combined with either or both
+ * the CREATE and DELETE flags.  When combined with the CREATE flag the
+ * command is treated as non-blocking but still executes.  Whem combined
+ * with the DELETE flag no additional message elements are required.
+ *
+ * ABORT SPECIAL CASE - Mid-stream aborts.  A mid-stream abort can be sent
+ * when supported by the sender by sending an ABORT message with neither
+ * CREATE or DELETE set.  This effectively turns the message into a
+ * non-blocking message (but depending on what is being represented can also
+ * cut short prior data elements in the stream).
+ *
+ * ABORT SPECIAL CASE - Abort-after-DELETE.  Persistent messages have to be
+ * abortable if the stream/pipe/whatever is lost.  In this situation any
+ * forwarding relay needs to unconditionally abort commands and replies that
+ * are still active.  This is done by sending an ABORT|DELETE even in
+ * situations where a DELETE has already been sent in that direction.  This
+ * is done, for example, when links are in a half-closed state.  In this
+ * situation it is possible for the abort request to race a transition to the
+ * fully closed state.  ABORT|DELETE messages which race the fully closed
+ * state are expected to be discarded by the other end.
  *
  *
  *                     NEGOTIATION OF {source} AND {target}
  *
  *
  *                     NEGOTIATION OF {source} AND {target}
index cad6538..140aa68 100644 (file)
@@ -351,6 +351,11 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data,
        pmp = kmalloc(sizeof(*pmp), M_HAMMER2, M_WAITOK | M_ZERO);
        mp->mnt_data = (qaddr_t)pmp;
        pmp->mp = mp;
        pmp = kmalloc(sizeof(*pmp), M_HAMMER2, M_WAITOK | M_ZERO);
        mp->mnt_data = (qaddr_t)pmp;
        pmp->mp = mp;
+       kmalloc_create(&pmp->mmsg, "HAMMER2-pfsmsg");
+       lockinit(&pmp->msglk, "h2msg", 0, 0);
+       TAILQ_INIT(&pmp->msgq);
+       RB_INIT(&pmp->staterd_tree);
+       RB_INIT(&pmp->statewr_tree);
 
        if (create_hmp) {
                hmp = kmalloc(sizeof(*hmp), M_HAMMER2, M_WAITOK | M_ZERO);
 
        if (create_hmp) {
                hmp = kmalloc(sizeof(*hmp), M_HAMMER2, M_WAITOK | M_ZERO);
@@ -631,6 +636,8 @@ hammer2_vfs_unmount(struct mount *mp, int mntflags)
        pmp->hmp = NULL;
        mp->mnt_data = NULL;
 
        pmp->hmp = NULL;
        mp->mnt_data = NULL;
 
+       kmalloc_destroy(&pmp->mmsg);
+
        kfree(pmp, M_HAMMER2);
        if (hmp->pmp_count == 0) {
                TAILQ_REMOVE(&hammer2_mntlist, hmp, mntentry);
        kfree(pmp, M_HAMMER2);
        if (hmp->pmp_count == 0) {
                TAILQ_REMOVE(&hammer2_mntlist, hmp, mntentry);
@@ -1005,17 +1012,107 @@ void
 hammer2_cluster_thread_rd(void *arg)
 {
        hammer2_pfsmount_t *pmp = arg;
 hammer2_cluster_thread_rd(void *arg)
 {
        hammer2_pfsmount_t *pmp = arg;
-       hammer2_any_t any;
-       int error;
+       hammer2_msg_hdr_t hdr;
+       hammer2_msg_t *msg;
+       hammer2_state_t *state;
+       size_t hbytes;
+       int error = 0;
 
        while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0) {
 
        while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0) {
-               error = fp_read(pmp->msg_fp,
-                               any.buf, sizeof(hammer2_msg_hdr_t),
+               /*
+                * Retrieve the message from the pipe or socket.
+                */
+               error = fp_read(pmp->msg_fp, &hdr, sizeof(hdr),
                                NULL, 1, UIO_SYSSPACE);
                                NULL, 1, UIO_SYSSPACE);
-               kprintf("fp_read %d\n", error);
                if (error)
                        break;
                if (error)
                        break;
+               if (hdr.magic != HAMMER2_MSGHDR_MAGIC) {
+                       kprintf("hammer2: msgrd: bad magic: %04x\n",
+                               hdr.magic);
+                       error = EINVAL;
+                       break;
+               }
+               hbytes = (hdr.cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
+               if (hbytes < sizeof(hdr) || hbytes > HAMMER2_MSGAUX_MAX) {
+                       kprintf("hammer2: msgrd: bad header size %zd\n",
+                               hbytes);
+                       error = EINVAL;
+                       break;
+               }
+               msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
+                             pmp->mmsg, M_WAITOK | M_ZERO);
+               msg->any.head = hdr;
+               msg->hdr_size = hbytes;
+               if (hbytes > sizeof(hdr)) {
+                       error = fp_read(pmp->msg_fp, &msg->any.head + 1,
+                                       hbytes - sizeof(hdr),
+                                       NULL, 1, UIO_SYSSPACE);
+                       if (error) {
+                               kprintf("hammer2: short msg received\n");
+                               error = EINVAL;
+                               break;
+                       }
+               }
+               msg->aux_size = hdr.aux_bytes * HAMMER2_MSG_ALIGN;
+               if (msg->aux_size > HAMMER2_MSGAUX_MAX) {
+                       kprintf("hammer2: illegal msg payload size %zd\n",
+                               msg->aux_size);
+                       error = EINVAL;
+                       break;
+               }
+               if (msg->aux_size) {
+                       msg->aux_data = kmalloc(msg->aux_size, pmp->mmsg,
+                                               M_WAITOK | M_ZERO);
+                       error = fp_read(pmp->msg_fp, msg->aux_data,
+                                       msg->aux_size,
+                                       NULL, 1, UIO_SYSSPACE);
+                       if (error) {
+                               kprintf("hammer2: short msg "
+                                       "payload received\n");
+                               break;
+                       }
+               }
+
+               /*
+                * State machine tracking, state assignment for msg,
+                * returns error and discard status.  Errors are fatal
+                * to the connection except for EALREADY which forces
+                * a discard without execution.
+                */
+               error = hammer2_state_msgrx(pmp, msg);
+               if (error) {
+                       hammer2_msg_free(pmp, msg);
+                       if (error == EALREADY)
+                               error = 0;
+               } else {
+                       error = hammer2_msg_execute(pmp, msg);
+                       hammer2_state_cleanuprx(pmp, msg);
+               }
+               msg = NULL;
        }
        }
+
+       if (error)
+               kprintf("hammer2: msg read failed error %d\n", error);
+
+       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+       if (msg) {
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               hammer2_msg_free(pmp, msg);
+       }
+
+       if ((state = pmp->freerd_state) != NULL) {
+               pmp->freerd_state = NULL;
+               hammer2_state_free(state);
+       }
+
+       while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) {
+               RB_REMOVE(hammer2_state_tree, &pmp->staterd_tree, state);
+               hammer2_state_free(state);
+       }
+       lockmgr(&pmp->msglk, LK_RELEASE);
+
+       fp_shutdown(pmp->msg_fp, SHUT_RDWR);
        pmp->msgrd_td = NULL;
        /* pmp can be ripped out from under us at this point */
        wakeup(pmp);
        pmp->msgrd_td = NULL;
        /* pmp can be ripped out from under us at this point */
        wakeup(pmp);
@@ -1027,13 +1124,97 @@ void
 hammer2_cluster_thread_wr(void *arg)
 {
        hammer2_pfsmount_t *pmp = arg;
 hammer2_cluster_thread_wr(void *arg)
 {
        hammer2_pfsmount_t *pmp = arg;
+       hammer2_msg_t *msg = NULL;
+       hammer2_state_t *state;
+       ssize_t res;
+       int error = 0;
 
 
-       while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0) {
-               tsleep(&pmp->msg_ctl, 0, "msgwr", hz);
+       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+       while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) {
+               lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz);
+               while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
+                       /*
+                        * Remove msg from the transmit queue and do
+                        * persist and half-closed state handling.
+                        */
+                       TAILQ_REMOVE(&pmp->msgq, msg, qentry);
+                       lockmgr(&pmp->msglk, LK_RELEASE);
+
+                       error = hammer2_state_msgtx(pmp, msg);
+                       if (error == EALREADY) {
+                               error = 0;
+                               hammer2_msg_free(pmp, msg);
+                               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+                               continue;
+                       }
+                       if (error)
+                               break;
+
+                       /*
+                        * Dump the message to the pipe or socket.
+                        */
+                       error = fp_write(pmp->msg_fp, &msg->any, msg->hdr_size,
+                                        &res, UIO_SYSSPACE);
+                       if (error || res != msg->hdr_size) {
+                               if (error == 0)
+                                       error = EINVAL;
+                               lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+                               break;
+                       }
+                       if (msg->aux_size) {
+                               error = fp_write(pmp->msg_fp,
+                                                msg->aux_data, msg->aux_size,
+                                                &res, UIO_SYSSPACE);
+                               if (error || res != msg->aux_size) {
+                                       if (error == 0)
+                                               error = EINVAL;
+                                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+                                       break;
+                               }
+                       }
+                       hammer2_state_cleanuptx(pmp, msg);
+                       lockmgr(&pmp->msglk, LK_EXCLUSIVE);
+               }
+       }
+
+       /*
+        * Cleanup messages pending transmission and release msgq lock.
+        */
+       if (error)
+               kprintf("hammer2: msg write failed error %d\n", error);
+
+       if (msg) {
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               hammer2_msg_free(pmp, msg);
+       }
+
+       while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) {
+               TAILQ_REMOVE(&pmp->msgq, msg, qentry);
+               if (msg->state && msg->state->msg == msg)
+                       msg->state->msg = NULL;
+               hammer2_msg_free(pmp, msg);
        }
        }
+
+       if ((state = pmp->freewr_state) != NULL) {
+               pmp->freewr_state = NULL;
+               hammer2_state_free(state);
+       }
+
+       while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) {
+               RB_REMOVE(hammer2_state_tree, &pmp->statewr_tree, state);
+               hammer2_state_free(state);
+       }
+       lockmgr(&pmp->msglk, LK_RELEASE);
+
+       /*
+        * Cleanup descriptor, be sure the read size is shutdown so the
+        * (probably blocked) read operations returns an error.
+        *
+        * pmp can be ripped out from under us once msgwr_td is set to NULL.
+        */
        fp_shutdown(pmp->msg_fp, SHUT_RDWR);
        pmp->msgwr_td = NULL;
        fp_shutdown(pmp->msg_fp, SHUT_RDWR);
        pmp->msgwr_td = NULL;
-       /* pmp can be ripped out from under us at this point */
        wakeup(pmp);
        lwkt_exit();
 }
        wakeup(pmp);
        lwkt_exit();
 }