From: Matthew Dillon Date: Wed, 13 Jun 2012 01:43:32 +0000 (-0700) Subject: hammer2 - wire up msg transaction state machine X-Git-Tag: v3.4.0rc~1072 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/26bf1a3684f1458a52caf29eaa96eba1f99c1703 hammer2 - wire up msg transaction state machine * Wire up the msg transaction state machine. Split hammer2_msg into hammer2_state and hammer2_msg. Track states in two RB trees in the hammer2_pfsmount structure. Handle CREATE, DELETE, ABORT, and REPLY flags. * Revamp the API. The adjustments make it less likely that messages can be left dangling. * Compile only, untested, not yet operational. --- diff --git a/sys/vfs/hammer2/Makefile b/sys/vfs/hammer2/Makefile index 1dc3fd81cb..5dfde44b0f 100644 --- a/sys/vfs/hammer2/Makefile +++ b/sys/vfs/hammer2/Makefile @@ -7,6 +7,6 @@ CFLAGS+= -DINVARIANTS -DSMP KMOD= hammer2 SRCS= hammer2_vfsops.c hammer2_vnops.c hammer2_inode.c hammer2_ccms.c SRCS+= hammer2_chain.c hammer2_freemap.c hammer2_subr.c hammer2_icrc.c -SRCS+= hammer2_ioctl.c +SRCS+= hammer2_ioctl.c hammer2_msg.c .include diff --git a/sys/vfs/hammer2/hammer2.h b/sys/vfs/hammer2/hammer2.h index 3a1cc442af..8faa6c659c 100644 --- a/sys/vfs/hammer2/hammer2.h +++ b/sys/vfs/hammer2/hammer2.h @@ -68,11 +68,14 @@ #include "hammer2_mount.h" #include "hammer2_ioctl.h" #include "hammer2_ccms.h" +#include "hammer2_network.h" struct hammer2_chain; struct hammer2_inode; struct hammer2_mount; struct hammer2_pfsmount; +struct hammer2_state; +struct hammer2_msg; /* * The chain structure tracks blockref recursions all the way to @@ -98,14 +101,16 @@ struct hammer2_pfsmount; * not match the blockref at (parent, index). */ RB_HEAD(hammer2_chain_tree, hammer2_chain); +RB_HEAD(hammer2_state_tree, hammer2_state); TAILQ_HEAD(flush_deferral_list, hammer2_chain); struct hammer2_chain { ccms_cst_t cst; /* attr or data cst */ struct hammer2_blockref bref; struct hammer2_blockref bref_flush; /* synchronized w/MOVED bit */ - struct hammer2_chain *parent; /* return chain to root */ + struct hammer2_chain *parent; /* return chain to root */ struct hammer2_chain_tree rbhead; + struct hammer2_state *state; /* if active cache msg */ RB_ENTRY(hammer2_chain) rbnode; TAILQ_ENTRY(hammer2_chain) flush_node; /* flush deferral list */ union { @@ -307,6 +312,7 @@ struct hammer2_pfsmount { struct hammer2_mount *hmp; /* device global mount */ hammer2_chain_t *rchain; /* PFS root chain */ hammer2_inode_t *iroot; /* PFS root inode */ + struct malloc_type *mmsg; ccms_domain_t ccms_dom; struct netexport export; /* nfs export */ int ronly; /* read-only mount */ @@ -314,12 +320,55 @@ struct hammer2_pfsmount { thread_t msgrd_td; /* cluster thread */ thread_t msgwr_td; /* cluster thread */ int msg_ctl; /* wakeup flags */ + struct lock msglk; /* lockmgr lock */ + TAILQ_HEAD(, hammer2_msg) msgq; /* transmit queue */ + struct hammer2_state *freerd_state; /* allocation cache */ + struct hammer2_state *freewr_state; /* allocation cache */ + struct hammer2_state_tree staterd_tree; /* active messages */ + struct hammer2_state_tree statewr_tree; /* active messages */ }; typedef struct hammer2_pfsmount hammer2_pfsmount_t; #define HAMMER2_CLUSTERCTL_KILL 0x0001 +/* + * In-memory message structure for hammer2. + * + * Persistent cache state messages will be associated with a hammer2_chain. + */ +struct hammer2_state { + RB_ENTRY(hammer2_state) rbnode; /* indexed by msgid */ + struct hammer2_pfsmount *pmp; + uint32_t txcmd; /* mostly for CMDF flags */ + uint32_t rxcmd; /* mostly for CMDF flags */ + uint32_t msgid; + int flags; + int error; + struct hammer2_chain *chain; /* msg associated w/chain */ + struct hammer2_msg *msg; + void (*func)(struct hammer2_state *state, struct hammer2_msg *msg); +}; + +#define HAMMER2_STATE_INSERTED 0x0001 +#define HAMMER2_STATE_DYNAMIC 0x0002 + +struct hammer2_msg { + TAILQ_ENTRY(hammer2_msg) qentry; /* serialized queue */ + struct hammer2_state *state; + size_t hdr_size; + size_t aux_size; + char *aux_data; + hammer2_any_t any; +}; + +typedef struct hammer2_state hammer2_state_t; +typedef struct hammer2_msg hammer2_msg_t; + +int hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2); +RB_PROTOTYPE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp); + + #if defined(_KERNEL) MALLOC_DECLARE(M_HAMMER2); @@ -474,6 +523,17 @@ void hammer2_chain_commit(hammer2_mount_t *hmp, hammer2_chain_t *chain); int hammer2_ioctl(hammer2_inode_t *ip, u_long com, void *data, int fflag, struct ucred *cred); +/* + * hammer2_msg.c + */ +int hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg); +int hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg); +void hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg); +void hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg); +int hammer2_msg_execute(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg); +void hammer2_state_free(hammer2_state_t *state); +void hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg); + /* * hammer2_freemap.c */ diff --git a/sys/vfs/hammer2/hammer2_msg.c b/sys/vfs/hammer2/hammer2_msg.c new file mode 100644 index 0000000000..6ae0186424 --- /dev/null +++ b/sys/vfs/hammer2/hammer2_msg.c @@ -0,0 +1,604 @@ +/*- + * Copyright (c) 2012 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions + * are met: + * + * 1. Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in + * the documentation and/or other materials provided with the + * distribution. + * 3. Neither the name of The DragonFly Project nor the names of its + * contributors may be used to endorse or promote products derived + * from this software without specific, prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS + * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE + * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, + * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, + * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; + * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED + * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, + * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT + * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF + * SUCH DAMAGE. + */ + +#include "hammer2.h" + +/* + * Process state tracking for a message after reception, prior to + * execution. + * + * Called with msglk held and the msg dequeued. + * + * All messages are called with dummy state and return actual state. + * (One-off messages often just return the same dummy state). + * + * May request that caller discard the message by setting *discardp to 1. + * The returned state is not used in this case and is allowed to be NULL. + * + * -- + * + * These routines handle persistent and command/reply message state via the + * CREATE and DELETE flags. The first message in a command or reply sequence + * sets CREATE, the last message in a command or reply sequence sets DELETE. + * + * There can be any number of intermediate messages belonging to the same + * sequence sent inbetween the CREATE message and the DELETE message, + * which set neither flag. This represents a streaming command or reply. + * + * Any command message received with CREATE set expects a reply sequence to + * be returned. Reply sequences work the same as command sequences except the + * REPLY bit is also sent. Both the command side and reply side can + * degenerate into a single message with both CREATE and DELETE set. Note + * that one side can be streaming and the other side not, or neither, or both. + * + * The msgid is unique for the initiator. That is, two sides sending a new + * message can use the same msgid without colliding. + * + * -- + * + * ABORT sequences work by setting the ABORT flag along with normal message + * state. However, ABORTs can also be sent on half-closed messages, that is + * even if the command or reply side has already sent a DELETE, as long as + * the message has not been fully closed it can still send an ABORT+DELETE + * to terminate the half-closed message state. + * + * Since ABORT+DELETEs can race we silently discard ABORT's for message + * state which has already been fully closed. REPLY+ABORT+DELETEs can + * also race, and in this situation the other side might have already + * initiated a new unrelated command with the same message id. Since + * the abort has not set the CREATE flag the situation can be detected + * and the message will also be discarded. + * + * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. + * The ABORT request is essentially integrated into the command instead + * of being sent later on. In this situation the command implementation + * detects that CREATE and ABORT are both set (vs ABORT alone) and can + * special-case non-blocking operation for the command. + * + * NOTE! Messages with ABORT set without CREATE or DELETE are considered + * to be mid-stream aborts for command/reply sequences. ABORTs on + * one-way messages are not supported. + * + * NOTE! If a command sequence does not support aborts the ABORT flag is + * simply ignored. + * + * -- + * + * One-off messages (no reply expected) are sent with neither CREATE or DELETE + * set. One-off messages cannot be aborted and typically aren't processed + * by these routines. The REPLY bit can be used to distinguish whether a + * one-off message is a command or reply. For example, one-off replies + * will typically just contain status updates. + */ +int +hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + int error; + + /* + * Make sure a state structure is ready to go in case we need a new + * one. This is the only routine which uses freerd_state so no + * races are possible. + */ + if ((state = pmp->freerd_state) == NULL) { + state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO); + state->pmp = pmp; + state->flags = HAMMER2_STATE_DYNAMIC; + pmp->freerd_state = state; + } + + /* + * Lock RB tree and locate existing persistent state, if any. + * + * If received msg is a command state is on staterd_tree. + * If received msg is a reply state is on statewr_tree. + */ + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + + state->msgid = msg->any.head.msgid; + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) + state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state); + else + state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state); + msg->state = state; + + /* + * Short-cut one-off or mid-stream messages (state may be NULL). + */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_ABORT)) == 0) { + lockmgr(&pmp->msglk, LK_RELEASE); + return(0); + } + + /* + * Switch on CREATE, DELETE, REPLY, and also handle ABORT from + * inside the case statements. + */ + switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY)) { + case HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * New persistant command received. + */ + if (state) { + kprintf("hammer2_state_msgrx: duplicate transaction\n"); + error = EINVAL; + break; + } + state = pmp->freerd_state; + pmp->freerd_state = NULL; + msg->state = state; + state->msg = msg; + state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state); + state->flags |= HAMMER2_STATE_INSERTED; + error = 0; + break; + case HAMMER2_MSGF_DELETE: + /* + * Persistent state is expected but might not exist if an + * ABORT+DELETE races the close. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgrx: no state " + "for DELETE\n"); + error = EINVAL; + } + break; + } + + /* + * Handle another ABORT+DELETE case if the msgid has already + * been reused. + */ + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgrx: state reused " + "for DELETE\n"); + error = EINVAL; + } + break; + } + error = 0; + break; + default: + /* + * Check for mid-stream ABORT command received, otherwise + * allow. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + error = EALREADY; + break; + } + } + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * When receiving a reply with CREATE set the original + * persistent state message should already exist. + */ + if (state == NULL) { + kprintf("hammer2_state_msgrx: no state match for " + "REPLY\n"); + error = EINVAL; + break; + } + state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE: + /* + * Received REPLY+ABORT+DELETE in case where msgid has + * already been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgrx: no state match " + "for REPLY|DELETE\n"); + error = EINVAL; + } + break; + } + + /* + * Received REPLY+ABORT+DELETE in case where msgid has + * already been reused for an unrelated message, + * ignore the message. + */ + if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgrx: state reused " + "for REPLY|DELETE\n"); + error = EINVAL; + } + break; + } + error = 0; + break; + case HAMMER2_MSGF_REPLY: + /* + * Check for mid-stream ABORT reply received to sent command. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) { + error = EALREADY; + break; + } + } + error = 0; + break; + } + lockmgr(&pmp->msglk, LK_RELEASE); + return (error); +} + +void +hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + + if ((state = msg->state) == NULL) { + hammer2_msg_free(pmp, msg); + } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + state->rxcmd |= HAMMER2_MSGF_DELETE; + if (state->txcmd & HAMMER2_MSGF_DELETE) { + if (state->msg == msg) + state->msg = NULL; + KKASSERT(state->flags & HAMMER2_STATE_INSERTED); + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + RB_REMOVE(hammer2_state_tree, + &pmp->statewr_tree, state); + } else { + RB_REMOVE(hammer2_state_tree, + &pmp->staterd_tree, state); + } + state->flags &= ~HAMMER2_STATE_INSERTED; + lockmgr(&pmp->msglk, LK_RELEASE); + hammer2_state_free(state); + } else { + lockmgr(&pmp->msglk, LK_RELEASE); + } + hammer2_msg_free(pmp, msg); + } else if (state->msg != msg) { + hammer2_msg_free(pmp, msg); + } +} + +/* + * Process state tracking for a message prior to transmission. + * + * Called with msglk held and the msg dequeued. + * + * One-off messages are usually with dummy state and msg->state may be NULL + * in this situation. + * + * New transactions (when CREATE is set) will insert the state. + * + * May request that caller discard the message by setting *discardp to 1. + * A NULL state may be returned in this case. + */ +int +hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + int error; + + /* + * Make sure a state structure is ready to go in case we need a new + * one. This is the only routine which uses freewr_state so no + * races are possible. + */ + if ((state = pmp->freewr_state) == NULL) { + state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO); + state->pmp = pmp; + state->flags = HAMMER2_STATE_DYNAMIC; + pmp->freewr_state = state; + } + + /* + * Lock RB tree. If persistent state is present it will have already + * been assigned to msg. + */ + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + state = msg->state; + + /* + * Short-cut one-off or mid-stream messages (state may be NULL). + */ + if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_ABORT)) == 0) { + lockmgr(&pmp->msglk, LK_RELEASE); + return(0); + } + + + /* + * Switch on CREATE, DELETE, REPLY, and also handle ABORT from + * inside the case statements. + */ + switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE | + HAMMER2_MSGF_REPLY)) { + case HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * Insert the new persistent message state and mark + * half-closed if DELETE is set. Since this is a new + * message it isn't possible to transition into the fully + * closed state here. + */ + if (state == NULL) { + state = pmp->freerd_state; + pmp->freerd_state = NULL; + msg->state = state; + state->msg = msg; + } + KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0); + if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) { + kprintf("hammer2_state_msgtx: duplicate transaction\n"); + error = EINVAL; + break; + } + state->flags |= HAMMER2_STATE_INSERTED; + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_DELETE: + /* + * Sent ABORT+DELETE in case where msgid has already + * been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgtx: no state match " + "for DELETE\n"); + error = EINVAL; + } + break; + } + + /* + * Sent ABORT+DELETE in case where msgid has + * already been reused for an unrelated message, + * ignore the message. + */ + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgtx: state reused " + "for DELETE\n"); + error = EINVAL; + } + break; + } + error = 0; + break; + default: + /* + * Check for mid-stream ABORT command sent + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + error = EALREADY; + break; + } + } + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE: + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE: + /* + * When transmitting a reply with CREATE set the original + * persistent state message should already exist. + */ + if (state == NULL) { + kprintf("hammer2_state_msgtx: no state match " + "for REPLY | CREATE\n"); + error = EINVAL; + break; + } + state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE; + error = 0; + break; + case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE: + /* + * When transmitting a reply with DELETE set the original + * persistent state message should already exist. + * + * This is very similar to the REPLY|CREATE|* case except + * txcmd is already stored, so we just add the DELETE flag. + * + * Sent REPLY+ABORT+DELETE in case where msgid has + * already been fully closed, ignore the message. + */ + if (state == NULL) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgtx: no state match " + "for REPLY | DELETE\n"); + error = EINVAL; + } + break; + } + + /* + * Sent REPLY+ABORT+DELETE in case where msgid has already + * been reused for an unrelated message, ignore the message. + */ + if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + error = EALREADY; + } else { + kprintf("hammer2_state_msgtx: state reused " + "for REPLY | DELETE\n"); + error = EINVAL; + } + break; + } + error = 0; + break; + case HAMMER2_MSGF_REPLY: + /* + * Check for mid-stream ABORT reply sent. + * + * One-off REPLY messages are allowed for e.g. status updates. + */ + if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) { + if (state == NULL || + (state->txcmd & HAMMER2_MSGF_CREATE) == 0) { + error = EALREADY; + break; + } + } + error = 0; + break; + } + lockmgr(&pmp->msglk, LK_RELEASE); + return (error); +} + +void +hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg) +{ + hammer2_state_t *state; + + if ((state = msg->state) == NULL) { + hammer2_msg_free(pmp, msg); + } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) { + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + state->txcmd |= HAMMER2_MSGF_DELETE; + if (state->rxcmd & HAMMER2_MSGF_DELETE) { + if (state->msg == msg) + state->msg = NULL; + KKASSERT(state->flags & HAMMER2_STATE_INSERTED); + if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) { + RB_REMOVE(hammer2_state_tree, + &pmp->staterd_tree, state); + } else { + RB_REMOVE(hammer2_state_tree, + &pmp->statewr_tree, state); + } + state->flags &= ~HAMMER2_STATE_INSERTED; + lockmgr(&pmp->msglk, LK_RELEASE); + hammer2_state_free(state); + } else { + lockmgr(&pmp->msglk, LK_RELEASE); + } + hammer2_msg_free(pmp, msg); + } else if (state->msg != msg) { + hammer2_msg_free(pmp, msg); + } +} + +void +hammer2_state_free(hammer2_state_t *state) +{ + hammer2_pfsmount_t *pmp = state->pmp; + hammer2_msg_t *msg; + + msg = state->msg; + state->msg = NULL; + kfree(state, pmp->mmsg); + if (msg) + hammer2_msg_free(pmp, msg); +} + +void +hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg) +{ + if (msg->aux_data && msg->aux_size) { + kfree(msg->aux_data, pmp->mmsg); + msg->aux_data = NULL; + msg->aux_size = 0; + } + kfree(msg, pmp->mmsg); +} + +/* + * Indexed messages are stored in a red-black tree indexed by their + * msgid. Only persistent messages are indexed. + */ +int +hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2) +{ + if (state1->msgid < state2->msgid) + return(-1); + if (state1->msgid > state2->msgid) + return(1); + return(0); +} + +/* + * Execute a received message. + * + * If msg is part of a transaction msg->state will be non-NULL. In this + * situation state->msg saves the original command and can be replaced + * by ongoing activity if desired. Any msg which does not match state->msg + * will be destroyed after we return. + * + * Single-element transactions set both CREATE and DELETE. + * + * Non-blocking transactions also set ABORT with CREATE. + * + * Specific transactional ABORTs can also be received and must be handled, + * and can be received even when the other end has closed (after DELETE + * received) if we have not yet closed our end of the transaction. + */ +int +hammer2_msg_execute(hammer2_pfsmount_t *pmp __unused, hammer2_msg_t *msg __unused) +{ + return(0); +} diff --git a/sys/vfs/hammer2/hammer2_network.h b/sys/vfs/hammer2/hammer2_network.h index 5045857660..c5a597c9e1 100644 --- a/sys/vfs/hammer2/hammer2_network.h +++ b/sys/vfs/hammer2/hammer2_network.h @@ -65,89 +65,38 @@ * at each hop until a DELETE is received from both sides. * * One-way messages such as those used by spanning tree commands are not - * recorded. These are sent with no flags set. Aborts and replies are not - * possible. - * - * A normal message with no persistent state is sent with CREATE|DELETE and - * the response is returned with REPLY|CREATE|DELETE. A normal command can - * be aborted by sending an ABORT message to the msgid that is in progress. - * An ABORT sent by the originator must still wait for the reply from the - * target and, since we've already sent the DELETE with our CREATE|DELETE, - * may also cross the REPLY|CREATE|DELETE message in the opposite direction. - * In this situation the message state has been destroyed on the target and - * the target ignores the ABORT (because CREATE is not set, and - * differentiated from one-way messages because ABORT is set). - * - * A command which has persistent state must maintain a persistent message. - * For example, a lock or cache state request. A persistent message is - * initiated with just CREATE and the initial response is returned with - * REPLY|CREATE. Successive messages are sent with no flags and responses - * with just REPLY. The DELETE flag acts like a half-close (and degenerately - * works the same as it does for normal messages). This flag can be set - * in the initial command or any successive command, and in the initial reply - * or in any successive reply. The recorded message state is destroyed when - * both sides have sent a DELETE. - * - * Aborts for persistent messages work in the same fashion as they do for - * normal messages, except that the target can also initiate an ABORT - * by using ABORT|REPLY. The target has one restriction, however, it cannot - * send an ABORT with the CREATE flag set (i.e. as the initial reply), - * because if the originator reuses the msgid the originator would not - * then be able to determine that the ABORT is associated with the previous - * session and not the new session. - * - * If a link failure occurs any active or persistent messages will be - * auto-replied to the originator, and auto-aborted to the target. - * - * Additional features: - * - * ABORT+CREATE - This may be used to make a non-blocking request. - * The target receives the normal command and is free - * to ignore the ABORT flag, but may use it as an - * indication that a non-blocking request is being - * made. The target must still reply the message of - * course. Works for normal and persistent messages - * but does NOT work for one-way messages (because - * ABORT alone without recorded msgid state has to be - * ignored). - * - * ABORT - ABORT messages are allowed to bypass input queues. - * Normal ABORTs are sent without the DELETE flag, - * even for normal messages which had already set the - * DELETE flag in the initial message. This allows - * the normal DELETE half-close operation to proceed - * so an ABORT is basically advisory and the originator - * must still wait for a reply. Aborts are also - * advisory when sent by targets. - * - * ABORT messages cannot be used with one-way messages - * as this would cause such messages to be ignored. - * - * ABORT+DELETE - This is a special form of ABORT that allows the - * recorded message state on the sender and on all - * hops the message is relayed through to be destroyed - * on the fly, as if a two-way DELETE had occurred. - * It will cause an auto-reply or auto-abort to be - * issued as if the link had been lost, but allows - * the link to remain live. - * - * This form is basically like a socket close(), - * where you aren't just sending an EOF but you are - * completely aborting the request in both directions. - * - * This form cannot be used with CREATE as that could - * generate a false reply if msgid is reused and - * crosses the abort over the wire. - * - * ABORT messages cannot be used with one-way messages - * as this would cause such messages to be ignored. - * - * SUBSTREAMS - Persistent messages coupled with the fact that - * all commands and responses run through a single - * chain of relays over reliable streams allows one - * to treat persistent message updates as a data - * stream and use the DELETE flag or an ABORT to - * indicate EOF. + * recorded. These are sent without the CREATE, DELETE, or ABORT flags set. + * ABORT is not supported for one-off messages. The REPLY bit can be used + * to distinguish between command and status if desired. + * + * Persistent-state messages are messages which require a reply to be + * returned. These messages can also consist of multiple message elements + * for the command or reply or both (or neither). The command message + * sequence sets CREATE on the first message and DELETE on the last message. + * A single message command sets both (CREATE|DELETE). The reply message + * sequence works the same way but of course also sets the REPLY bit. + * + * Persistent-state messages can be aborted by sending a message element + * with the ABORT flag set. This flag can be combined with either or both + * the CREATE and DELETE flags. When combined with the CREATE flag the + * command is treated as non-blocking but still executes. Whem combined + * with the DELETE flag no additional message elements are required. + * + * ABORT SPECIAL CASE - Mid-stream aborts. A mid-stream abort can be sent + * when supported by the sender by sending an ABORT message with neither + * CREATE or DELETE set. This effectively turns the message into a + * non-blocking message (but depending on what is being represented can also + * cut short prior data elements in the stream). + * + * ABORT SPECIAL CASE - Abort-after-DELETE. Persistent messages have to be + * abortable if the stream/pipe/whatever is lost. In this situation any + * forwarding relay needs to unconditionally abort commands and replies that + * are still active. This is done by sending an ABORT|DELETE even in + * situations where a DELETE has already been sent in that direction. This + * is done, for example, when links are in a half-closed state. In this + * situation it is possible for the abort request to race a transition to the + * fully closed state. ABORT|DELETE messages which race the fully closed + * state are expected to be discarded by the other end. * * * NEGOTIATION OF {source} AND {target} diff --git a/sys/vfs/hammer2/hammer2_vfsops.c b/sys/vfs/hammer2/hammer2_vfsops.c index cad6538854..140aa681df 100644 --- a/sys/vfs/hammer2/hammer2_vfsops.c +++ b/sys/vfs/hammer2/hammer2_vfsops.c @@ -351,6 +351,11 @@ hammer2_vfs_mount(struct mount *mp, char *path, caddr_t data, pmp = kmalloc(sizeof(*pmp), M_HAMMER2, M_WAITOK | M_ZERO); mp->mnt_data = (qaddr_t)pmp; pmp->mp = mp; + kmalloc_create(&pmp->mmsg, "HAMMER2-pfsmsg"); + lockinit(&pmp->msglk, "h2msg", 0, 0); + TAILQ_INIT(&pmp->msgq); + RB_INIT(&pmp->staterd_tree); + RB_INIT(&pmp->statewr_tree); if (create_hmp) { hmp = kmalloc(sizeof(*hmp), M_HAMMER2, M_WAITOK | M_ZERO); @@ -631,6 +636,8 @@ hammer2_vfs_unmount(struct mount *mp, int mntflags) pmp->hmp = NULL; mp->mnt_data = NULL; + kmalloc_destroy(&pmp->mmsg); + kfree(pmp, M_HAMMER2); if (hmp->pmp_count == 0) { TAILQ_REMOVE(&hammer2_mntlist, hmp, mntentry); @@ -1005,17 +1012,107 @@ void hammer2_cluster_thread_rd(void *arg) { hammer2_pfsmount_t *pmp = arg; - hammer2_any_t any; - int error; + hammer2_msg_hdr_t hdr; + hammer2_msg_t *msg; + hammer2_state_t *state; + size_t hbytes; + int error = 0; while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0) { - error = fp_read(pmp->msg_fp, - any.buf, sizeof(hammer2_msg_hdr_t), + /* + * Retrieve the message from the pipe or socket. + */ + error = fp_read(pmp->msg_fp, &hdr, sizeof(hdr), NULL, 1, UIO_SYSSPACE); - kprintf("fp_read %d\n", error); if (error) break; + if (hdr.magic != HAMMER2_MSGHDR_MAGIC) { + kprintf("hammer2: msgrd: bad magic: %04x\n", + hdr.magic); + error = EINVAL; + break; + } + hbytes = (hdr.cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN; + if (hbytes < sizeof(hdr) || hbytes > HAMMER2_MSGAUX_MAX) { + kprintf("hammer2: msgrd: bad header size %zd\n", + hbytes); + error = EINVAL; + break; + } + msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes, + pmp->mmsg, M_WAITOK | M_ZERO); + msg->any.head = hdr; + msg->hdr_size = hbytes; + if (hbytes > sizeof(hdr)) { + error = fp_read(pmp->msg_fp, &msg->any.head + 1, + hbytes - sizeof(hdr), + NULL, 1, UIO_SYSSPACE); + if (error) { + kprintf("hammer2: short msg received\n"); + error = EINVAL; + break; + } + } + msg->aux_size = hdr.aux_bytes * HAMMER2_MSG_ALIGN; + if (msg->aux_size > HAMMER2_MSGAUX_MAX) { + kprintf("hammer2: illegal msg payload size %zd\n", + msg->aux_size); + error = EINVAL; + break; + } + if (msg->aux_size) { + msg->aux_data = kmalloc(msg->aux_size, pmp->mmsg, + M_WAITOK | M_ZERO); + error = fp_read(pmp->msg_fp, msg->aux_data, + msg->aux_size, + NULL, 1, UIO_SYSSPACE); + if (error) { + kprintf("hammer2: short msg " + "payload received\n"); + break; + } + } + + /* + * State machine tracking, state assignment for msg, + * returns error and discard status. Errors are fatal + * to the connection except for EALREADY which forces + * a discard without execution. + */ + error = hammer2_state_msgrx(pmp, msg); + if (error) { + hammer2_msg_free(pmp, msg); + if (error == EALREADY) + error = 0; + } else { + error = hammer2_msg_execute(pmp, msg); + hammer2_state_cleanuprx(pmp, msg); + } + msg = NULL; } + + if (error) + kprintf("hammer2: msg read failed error %d\n", error); + + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + if (msg) { + if (msg->state && msg->state->msg == msg) + msg->state->msg = NULL; + hammer2_msg_free(pmp, msg); + } + + if ((state = pmp->freerd_state) != NULL) { + pmp->freerd_state = NULL; + hammer2_state_free(state); + } + + while ((state = RB_ROOT(&pmp->staterd_tree)) != NULL) { + RB_REMOVE(hammer2_state_tree, &pmp->staterd_tree, state); + hammer2_state_free(state); + } + lockmgr(&pmp->msglk, LK_RELEASE); + + fp_shutdown(pmp->msg_fp, SHUT_RDWR); pmp->msgrd_td = NULL; /* pmp can be ripped out from under us at this point */ wakeup(pmp); @@ -1027,13 +1124,97 @@ void hammer2_cluster_thread_wr(void *arg) { hammer2_pfsmount_t *pmp = arg; + hammer2_msg_t *msg = NULL; + hammer2_state_t *state; + ssize_t res; + int error = 0; - while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0) { - tsleep(&pmp->msg_ctl, 0, "msgwr", hz); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + while ((pmp->msg_ctl & HAMMER2_CLUSTERCTL_KILL) == 0 && error == 0) { + lksleep(&pmp->msg_ctl, &pmp->msglk, 0, "msgwr", hz); + while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) { + /* + * Remove msg from the transmit queue and do + * persist and half-closed state handling. + */ + TAILQ_REMOVE(&pmp->msgq, msg, qentry); + lockmgr(&pmp->msglk, LK_RELEASE); + + error = hammer2_state_msgtx(pmp, msg); + if (error == EALREADY) { + error = 0; + hammer2_msg_free(pmp, msg); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + continue; + } + if (error) + break; + + /* + * Dump the message to the pipe or socket. + */ + error = fp_write(pmp->msg_fp, &msg->any, msg->hdr_size, + &res, UIO_SYSSPACE); + if (error || res != msg->hdr_size) { + if (error == 0) + error = EINVAL; + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + break; + } + if (msg->aux_size) { + error = fp_write(pmp->msg_fp, + msg->aux_data, msg->aux_size, + &res, UIO_SYSSPACE); + if (error || res != msg->aux_size) { + if (error == 0) + error = EINVAL; + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + break; + } + } + hammer2_state_cleanuptx(pmp, msg); + lockmgr(&pmp->msglk, LK_EXCLUSIVE); + } + } + + /* + * Cleanup messages pending transmission and release msgq lock. + */ + if (error) + kprintf("hammer2: msg write failed error %d\n", error); + + if (msg) { + if (msg->state && msg->state->msg == msg) + msg->state->msg = NULL; + hammer2_msg_free(pmp, msg); + } + + while ((msg = TAILQ_FIRST(&pmp->msgq)) != NULL) { + TAILQ_REMOVE(&pmp->msgq, msg, qentry); + if (msg->state && msg->state->msg == msg) + msg->state->msg = NULL; + hammer2_msg_free(pmp, msg); } + + if ((state = pmp->freewr_state) != NULL) { + pmp->freewr_state = NULL; + hammer2_state_free(state); + } + + while ((state = RB_ROOT(&pmp->statewr_tree)) != NULL) { + RB_REMOVE(hammer2_state_tree, &pmp->statewr_tree, state); + hammer2_state_free(state); + } + lockmgr(&pmp->msglk, LK_RELEASE); + + /* + * Cleanup descriptor, be sure the read size is shutdown so the + * (probably blocked) read operations returns an error. + * + * pmp can be ripped out from under us once msgwr_td is set to NULL. + */ fp_shutdown(pmp->msg_fp, SHUT_RDWR); pmp->msgwr_td = NULL; - /* pmp can be ripped out from under us at this point */ wakeup(pmp); lwkt_exit(); }