From: Matthew Dillon Date: Mon, 9 Mar 2015 03:49:31 +0000 (-0700) Subject: dmsg - Stabilization work X-Git-Tag: v4.2.0rc~636 X-Git-Url: https://gitweb.dragonflybsd.org/~tuxillo/dragonfly.git/commitdiff_plain/0a9eefca514ba2a75efead673e50071f516ed2d1 dmsg - Stabilization work * Refactor the circuit failure handling code. When a connection is lost circuits running through that connection and all sub-circuits/states (recursively) are aborted. This will propagate through the graph and there are plenty of edge cases where a failure may be propagating in one direction and a request in the other direction. The library is responsible for providing missing transaction closures when circuits and states fail. * Add code to handle circuit failure races against newly created messages. The STATE_DYING flag is now inherited by the newly created message from its parent. * The state structure on receive is now updated before the callback is made so it can be atomic with the lock, instead of after. * Lots of debugging added and some cleanup. --- diff --git a/lib/libdmsg/debug.c b/lib/libdmsg/debug.c index add9a39b83..0d6472ccc7 100644 --- a/lib/libdmsg/debug.c +++ b/lib/libdmsg/debug.c @@ -238,12 +238,13 @@ dmsg_msg_str(dmsg_msg_t *msg) * Generate the buf */ snprintf(buf, sizeof(buf), - "msg=%s%s %s msgid=%08x %s", + "msg=%s%s %s %s hcrc=%08x id=%016jx", dmsg_basecmd_str(msg->any.head.cmd), flagbuf, errstr, - (uint32_t)(intmax_t)msg->any.head.msgid, /* for brevity */ - statestr); + statestr, + msg->any.head.hdr_crc, + msg->any.head.msgid); return(buf); } diff --git a/lib/libdmsg/dmsg.h b/lib/libdmsg/dmsg.h index d6c6ec0032..c1b431670a 100644 --- a/lib/libdmsg/dmsg.h +++ b/lib/libdmsg/dmsg.h @@ -157,6 +157,7 @@ typedef struct dmsg_media dmsg_media_t; */ struct dmsg_state { RB_ENTRY(dmsg_state) rbnode; /* by state->msgid */ + struct dmsg_state *scan; /* scan check */ TAILQ_HEAD(, dmsg_state) subq; /* active stacked states */ TAILQ_ENTRY(dmsg_state) entry; /* on parent subq */ struct dmsg_iocom *iocom; @@ -181,12 +182,13 @@ struct dmsg_state { #define DMSG_STATE_SUBINSERTED 0x0001 #define DMSG_STATE_DYNAMIC 0x0002 -#define DMSG_STATE_NODEID 0x0004 /* manages a node id */ -#define DMSG_STATE_UNUSED_0008 0x0008 +#define DMSG_STATE_UNUSED0004 0x0004 +#define DMSG_STATE_ABORTING 0x0008 #define DMSG_STATE_OPPOSITE 0x0010 /* initiated by other end */ #define DMSG_STATE_CIRCUIT 0x0020 /* LNK_SPAN special case */ #define DMSG_STATE_DYING 0x0040 /* indicates circuit failure */ #define DMSG_STATE_RBINSERTED 0x0080 +#define DMSG_STATE_NEW 0x0400 /* defer abort processing */ #define DMSG_STATE_ROOT 0x8000 /* iocom->state0 */ /* @@ -195,6 +197,8 @@ struct dmsg_state { * will point to &iocom->state0 for non-transactional messages. * * Message headers are embedded while auxillary data is separately allocated. + * The 'any' portion of the message is allocated dynamically based on + * hdr_size. */ struct dmsg_msg { TAILQ_ENTRY(dmsg_msg) qentry; @@ -279,8 +283,6 @@ struct dmsg_iocom { char *label; /* label for error reporting */ dmsg_ioq_t ioq_rx; dmsg_ioq_t ioq_tx; - dmsg_msg_queue_t freeq; /* free msgs hdr only */ - dmsg_msg_queue_t freeq_aux; /* free msgs w/aux_data */ dmsg_state_t state0; /* root state for stacking */ struct dmsg_state_tree staterd_tree; /* active transactions */ struct dmsg_state_tree statewr_tree; /* active transactions */ diff --git a/lib/libdmsg/msg.c b/lib/libdmsg/msg.c index e6f6f59c87..c46d392cbc 100644 --- a/lib/libdmsg/msg.c +++ b/lib/libdmsg/msg.c @@ -35,17 +35,22 @@ #include "dmsg_local.h" +#define DMSG_BLOCK_DEBUG + int DMsgDebugOpt; int dmsg_state_count; #ifdef DMSG_BLOCK_DEBUG static int biocount; #endif -static int dmsg_state_msgrx(dmsg_msg_t *msg); +static int dmsg_state_msgrx(dmsg_msg_t *msg, int mstate); static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg); static void dmsg_msg_free_locked(dmsg_msg_t *msg); static void dmsg_state_free(dmsg_state_t *state); -static void dmsg_simulate_failure(dmsg_state_t *state, int error); +static void dmsg_subq_delete(dmsg_state_t *state); +static void dmsg_simulate_failure(dmsg_state_t *state, int meto, int error); +static void dmsg_state_abort(dmsg_state_t *state); +static void dmsg_state_dying(dmsg_state_t *state); RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); @@ -121,8 +126,6 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, pthread_mutex_init(&iocom->mtx, NULL); RB_INIT(&iocom->staterd_tree); RB_INIT(&iocom->statewr_tree); - TAILQ_INIT(&iocom->freeq); - TAILQ_INIT(&iocom->freeq_aux); TAILQ_INIT(&iocom->txmsgq); iocom->sock_fd = sock_fd; iocom->alt_fd = alt_fd; @@ -218,8 +221,6 @@ dmsg_iocom_signal(dmsg_iocom_t *iocom) void dmsg_iocom_done(dmsg_iocom_t *iocom) { - dmsg_msg_t *msg; - if (iocom->sock_fd >= 0) { close(iocom->sock_fd); iocom->sock_fd = -1; @@ -230,16 +231,6 @@ dmsg_iocom_done(dmsg_iocom_t *iocom) } dmsg_ioq_done(iocom, &iocom->ioq_rx); dmsg_ioq_done(iocom, &iocom->ioq_tx); - while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { - TAILQ_REMOVE(&iocom->freeq, msg, qentry); - free(msg); - } - while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { - TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); - free(msg->aux_data); - msg->aux_data = NULL; - free(msg); - } if (iocom->wakeupfds[0] >= 0) { close(iocom->wakeupfds[0]); iocom->wakeupfds[0] = -1; @@ -286,35 +277,27 @@ dmsg_msg_alloc_locked(dmsg_state_t *state, int hbytes; size_t aligned_size; -#if 0 - if (aux_size) { - aligned_size = DMSG_DOALIGN(aux_size); - if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) - TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); - } else { - aligned_size = 0; - if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) - TAILQ_REMOVE(&iocom->freeq, msg, qentry); - } -#endif aligned_size = DMSG_DOALIGN(aux_size); - msg = NULL; if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { /* * When CREATE is set without REPLY the caller is * initiating a new transaction stacked under the specified * circuit. * + * It is possible to race a circuit failure, inherit the + * parent's STATE_DYING flag to trigger an abort sequence + * in the transmit path. By not inheriting ABORTING the + * abort sequence can recurse. + * * NOTE: CREATE in txcmd handled by dmsg_msg_write() * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx() */ pstate = state; state = malloc(sizeof(*state)); - atomic_add_int(&dmsg_state_count, 1); bzero(state, sizeof(*state)); + atomic_add_int(&dmsg_state_count, 1); + TAILQ_INIT(&state->subq); - dmsg_state_hold(pstate); - state->refs = 1; state->parent = pstate; state->iocom = iocom; state->flags = DMSG_STATE_DYNAMIC; @@ -325,42 +308,30 @@ dmsg_msg_alloc_locked(dmsg_state_t *state, state->func = func; state->any.any = data; - RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state); - TAILQ_INSERT_TAIL(&pstate->subq, state, entry); state->flags |= DMSG_STATE_SUBINSERTED | DMSG_STATE_RBINSERTED; - - if (DMsgDebugOpt) { - fprintf(stderr, - "create state %p id=%08x on iocom statewr %p\n", - state, (uint32_t)state->msgid, iocom); - } + state->flags |= pstate->flags & DMSG_STATE_DYING; + if (TAILQ_EMPTY(&pstate->subq)) + dmsg_state_hold(pstate); + RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state); + TAILQ_INSERT_TAIL(&pstate->subq, state, entry); + dmsg_state_hold(state); /* state on pstate->subq */ + dmsg_state_hold(state); /* state on rbtree */ + dmsg_state_hold(state); /* msg->state */ } else { /* * Otherwise the message is transmitted over the existing * open transaction. */ pstate = state->parent; + dmsg_state_hold(state); /* msg->state */ } /* XXX SMP race for state */ hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; - if (msg == NULL) { - msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4); - bzero(msg, offsetof(struct dmsg_msg, any.head)); - *(int *)((char *)msg + - offsetof(struct dmsg_msg, any.head) + hbytes) = - 0x71B2C3D4; - if (DMsgDebugOpt) { - fprintf(stderr, - "allo msg %p id=%08x on iocom %p\n", - msg, (int)msg->any.head.msgid, iocom); - } -#if 0 - msg = malloc(sizeof(*msg)); - bzero(msg, sizeof(*msg)); -#endif - } + assert((size_t)hbytes >= sizeof(struct dmsg_hdr)); + msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes); + bzero(msg, offsetof(struct dmsg_msg, any.head)); /* * [re]allocate the auxillary data buffer. The caller knows that @@ -395,8 +366,7 @@ dmsg_msg_alloc_locked(dmsg_state_t *state, /* * Finish filling out the header. */ - if (hbytes) - bzero(&msg->any.head, hbytes); + bzero(&msg->any.head, hbytes); msg->hdr_size = hbytes; msg->any.head.magic = DMSG_HDR_MAGIC; msg->any.head.cmd = cmd; @@ -418,35 +388,18 @@ static void dmsg_msg_free_locked(dmsg_msg_t *msg) { - /*dmsg_iocom_t *iocom = msg->iocom;*/ + dmsg_state_t *state; - if (DMsgDebugOpt) { - fprintf(stderr, - "free msg %p id=%08x on (aux %p)\n", - msg, (int)msg->any.head.msgid, msg->aux_data); - } -#if 1 - int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN; - if (*(int *)((char *)msg + - offsetof(struct dmsg_msg, any.head) + hbytes) != - 0x71B2C3D4) { - fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd); - assert(0); + if ((state = msg->state) != NULL) { + dmsg_state_drop(state); + msg->state = NULL; /* safety */ } -#endif - msg->state = NULL; /* safety */ if (msg->aux_data) { free(msg->aux_data); - msg->aux_data = NULL; + msg->aux_data = NULL; /* safety */ } msg->aux_size = 0; free (msg); -#if 0 - if (msg->aux_data) - TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); - else - TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry); -#endif } void @@ -598,7 +551,9 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) dmsg_msg_str(msg)); } iocom->rcvmsg_callback(msg); + pthread_mutex_lock(&iocom->mtx); dmsg_state_cleanuprx(iocom, msg); + pthread_mutex_unlock(&iocom->mtx); } } @@ -828,6 +783,7 @@ again: */ assert(msg != NULL); if (bytes < ioq->hbytes) { + assert(nmax > 0); n = read(iocom->sock_fd, ioq->buf + ioq->fifo_end, nmax); @@ -964,6 +920,7 @@ again: * Read and decrypt more of the payload. */ if (msg->aux_size < ioq->abytes) { + assert(nmax > 0); assert(bytes == 0); n = read(iocom->sock_fd, ioq->buf + ioq->fifo_end, @@ -1113,7 +1070,7 @@ skip: */ pthread_mutex_lock(&iocom->mtx); dmsg_iocom_drain(iocom); - dmsg_simulate_failure(&iocom->state0, ioq->error); + dmsg_simulate_failure(&iocom->state0, 0, ioq->error); pthread_mutex_unlock(&iocom->mtx); if (TAILQ_FIRST(&ioq->msgq)) goto again; @@ -1174,13 +1131,12 @@ skip: */ if (DMsgDebugOpt >= 5) { fprintf(stderr, - "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n", + "rxmsg cmd=%08x circ=%016jx\n", msg->any.head.cmd, - (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.circuit); } - error = dmsg_state_msgrx(msg); + error = dmsg_state_msgrx(msg, 0); if (error) { /* @@ -1236,6 +1192,10 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) } pthread_mutex_unlock(&iocom->mtx); + /* + * Flush queue, doing all required encryption and CRC generation, + * with the mutex unlocked. + */ while ((msg = TAILQ_FIRST(&tmpq)) != NULL) { /* * Process terminal connection errors. @@ -1255,8 +1215,11 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) msg->any.head.magic = DMSG_HDR_MAGIC; msg->any.head.salt = (random() << 8) | (ioq->seq & 255); ++ioq->seq; - if ((ioq->seq & 32767) == 0) + if ((ioq->seq & 32767) == 0) { + pthread_mutex_lock(&iocom->mtx); srandomdev(); + pthread_mutex_unlock(&iocom->mtx); + } /* * Calculate aux_crc if 0, then calculate hdr_crc. @@ -1473,9 +1436,8 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) #if 0 fprintf(stderr, - "txmsg cmd=%08x msgid=%016jx circ=%016jx\n", + "txmsg cmd=%08x circ=%016jx\n", msg->any.head.cmd, - (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.circuit); #endif @@ -1597,6 +1559,16 @@ dmsg_msg_write(dmsg_msg_t *msg) pthread_mutex_lock(&iocom->mtx); state = msg->state; + if (DMsgDebugOpt) { + fprintf(stderr, + "msgtx: cmd=%08x msgid=%016jx " + "state %p(%08x) error=%d\n", + msg->any.head.cmd, msg->any.head.msgid, + state, (state ? state->icmd : 0), + msg->any.head.error); + } + + #if 0 /* * Make sure the parent transaction is still open in the transmit @@ -1613,7 +1585,6 @@ dmsg_msg_write(dmsg_msg_t *msg) return; } #endif - /* * Process state data into the message as needed, then update the * state based on the message. @@ -1634,6 +1605,7 @@ dmsg_msg_write(dmsg_msg_t *msg) DMSGF_CREATE) { state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; state->icmd = state->txcmd & DMSGF_BASECMDMASK; + state->flags &= ~DMSG_STATE_NEW; } msg->any.head.msgid = state->msgid; @@ -1641,24 +1613,95 @@ dmsg_msg_write(dmsg_msg_t *msg) state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; } } - dmsg_state_cleanuptx(iocom, msg); -#if 0 - fprintf(stderr, - "MSGWRITE %016jx %08x\n", - msg->any.head.msgid, msg->any.head.cmd); -#endif + /* + * Discard messages sent to transactions which are already dead. + */ + if (state && (state->txcmd & DMSGF_DELETE)) { + printf("dmsg_msg_write: drop msg %08x to dead " + "circuit state=%p\n", + msg->any.head.cmd, state); + dmsg_msg_free(msg); + return; + } /* - * Queue it for output, wake up the I/O pthread. Note that the - * I/O thread is responsible for generating the CRCs and encryption. + * Normally we queue the msg for output. However, if the circuit is + * dead or dying we must simulate a failure in the return direction + * and throw the message away. The other end is not expecting any + * further messages from us on this state. + * + * Note that the I/O thread is responsible for generating the CRCs + * and encryption. */ - TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry); - dummy = 0; - write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ + if (state->flags & DMSG_STATE_DYING) { +#if 0 + if ((state->parent->txcmd & DMSGF_DELETE) || + (state->parent->flags & DMSG_STATE_DYING) || + (state->flags & DMSG_STATE_DYING)) { +#endif + /* + * Illegal message, kill state and related sub-state. + * Cannot transmit if state is already dying. + */ + printf("dmsg_msg_write: Write to dying circuit " + "ptxcmd=%08x prxcmd=%08x flags=%08x\n", + state->parent->rxcmd, + state->parent->txcmd, + state->parent->flags); + dmsg_state_hold(state); + dmsg_state_cleanuptx(iocom, msg); + if ((state->flags & DMSG_STATE_ABORTING) == 0) { + dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK); + } + dmsg_state_drop(state); + dmsg_msg_free(msg); + } else { + /* + * Queue the message, clean up transmit state prior to queueing + * to avoid SMP races. + */ + if (DMsgDebugOpt) + printf("dmsg_msg_write: commit msg state=%p to txkmsgq\n", state); + dmsg_state_cleanuptx(iocom, msg); + TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry); + dummy = 0; + write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */ + } pthread_mutex_unlock(&iocom->mtx); } +/* + * Remove state from its parent's subq. This can wind up recursively + * dropping the parent upward. + * + * NOTE: iocom must be locked. + * + * NOTE: Once we drop the parent, our pstate pointer may become invalid. + */ +static +void +dmsg_subq_delete(dmsg_state_t *state) +{ + dmsg_state_t *pstate; + + if (state->flags & DMSG_STATE_SUBINSERTED) { + pstate = state->parent; + assert(pstate); + if (pstate->scan == state) + pstate->scan = NULL; + TAILQ_REMOVE(&pstate->subq, state, entry); + state->flags &= ~DMSG_STATE_SUBINSERTED; + state->parent = NULL; + if (TAILQ_EMPTY(&pstate->subq)) + dmsg_state_drop(pstate);/* pstate->subq */ + pstate = NULL; /* safety */ + dmsg_state_drop(state); /* pstate->subq */ + } else { + assert(state->parent == NULL); + } +} + /* * Simulate reception of a transaction DELETE message when the link goes * bad. This routine must recurse through state->subq and generate messages @@ -1668,90 +1711,108 @@ dmsg_msg_write(dmsg_msg_t *msg) */ static void -dmsg_simulate_failure(dmsg_state_t *state, int error) +dmsg_simulate_failure(dmsg_state_t *state, int meto, int error) { dmsg_state_t *substate; + + dmsg_state_hold(state); + if (meto) + dmsg_state_abort(state); + + /* + * Recurse through sub-states. + */ +again: + TAILQ_FOREACH(substate, &state->subq, entry) { + if (substate->flags & DMSG_STATE_ABORTING) + continue; + state->scan = substate; + dmsg_simulate_failure(substate, 1, error); + if (state->scan != substate) + goto again; + } + + dmsg_state_drop(state); +} + +static +void +dmsg_state_abort(dmsg_state_t *state) +{ dmsg_iocom_t *iocom; dmsg_msg_t *msg; - while ((substate = TAILQ_FIRST(&state->subq)) != NULL) { - dmsg_simulate_failure(substate, error); + /* + * Set ABORTING and DYING, return if already set. If the state was + * just allocated we defer the abort operation until the related + * message is processed. + */ + if (state->flags & DMSG_STATE_ABORTING) + return; + state->flags |= DMSG_STATE_ABORTING; + dmsg_state_dying(state); + if (state->flags & DMSG_STATE_NEW) { + printf("dmsg_state_abort(0): state %p rxcmd %08x txcmd %08x " + "flags %08x - in NEW state\n", + state, state->rxcmd, state->txcmd, state->flags); + return; } - iocom = state->iocom; - if (state == &iocom->state0) { - /* - * No active local or remote transactions remain. - * Generate a final LNK_ERROR. EOF will be flagged - * when the message is returned by dmsg_ioq_read(). - */ - msg = dmsg_msg_alloc_locked(&iocom->state0, 0, - DMSG_LNK_ERROR, + /* + * Simulate parent state failure before child states. Device + * drivers need to understand this and flag the situation but might + * have asynchronous operations in progress that they cannot stop. + * To make things easier, parent states will not actually disappear + * until the children are all gone. + */ + if ((state->rxcmd & DMSGF_DELETE) == 0) { + fprintf(stderr, "SIMULATE ERROR\n"); + msg = dmsg_msg_alloc_locked(state, 0, DMSG_LNK_ERROR, NULL, NULL); - msg->any.head.error = error; - } else if (state->flags & DMSG_STATE_OPPOSITE) { - /* - * Active remote transactions are still present. - * Simulate the other end sending us a DELETE. - */ - if (state->flags & DMSG_STATE_SUBINSERTED) { - TAILQ_REMOVE(&state->parent->subq, state, entry); - state->flags &= ~DMSG_STATE_SUBINSERTED; - } - if ((state->rxcmd & DMSGF_DELETE) == 0) { - fprintf(stderr, "SIMULATE ERROR1\n"); - msg = dmsg_msg_alloc_locked(&iocom->state0, 0, - DMSG_LNK_ERROR, - NULL, NULL); - /*state->txcmd |= DMSGF_DELETE;*/ - msg->state = state; - msg->any.head.error = error; - msg->any.head.msgid = state->msgid; - msg->any.head.circuit = state->parent->msgid; - msg->any.head.cmd |= DMSGF_ABORT | - DMSGF_DELETE; - if ((state->parent->flags & - DMSG_STATE_OPPOSITE) == 0) { - msg->any.head.cmd |= DMSGF_REVCIRC; - } - } else { - msg = NULL; - } - } else { - /* - * Active local transactions are still present. - * Simulate the other end sending us a DELETE. - */ - if (state->flags & DMSG_STATE_SUBINSERTED) { - TAILQ_REMOVE(&state->parent->subq, state, entry); - state->flags &= ~DMSG_STATE_SUBINSERTED; - } - if ((state->rxcmd & DMSGF_DELETE) == 0) { - fprintf(stderr, "SIMULATE ERROR1\n"); - msg = dmsg_msg_alloc_locked(&iocom->state0, 0, - DMSG_LNK_ERROR, - NULL, NULL); - msg->state = state; - msg->any.head.error = error; - msg->any.head.msgid = state->msgid; - msg->any.head.circuit = state->parent->msgid; - msg->any.head.cmd |= DMSGF_ABORT | - DMSGF_DELETE | - DMSGF_REVTRANS | - DMSGF_REPLY; - if ((state->parent->flags & - DMSG_STATE_OPPOSITE) == 0) { - msg->any.head.cmd |= DMSGF_REVCIRC; - } - if ((state->rxcmd & DMSGF_CREATE) == 0) - msg->any.head.cmd |= DMSGF_CREATE; - } else { - msg = NULL; - } - } - if (msg) { + if ((state->rxcmd & DMSGF_CREATE) == 0) + msg->any.head.cmd |= DMSGF_CREATE; + msg->any.head.cmd |= DMSGF_DELETE | + (state->rxcmd & DMSGF_REPLY); + msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC); + msg->any.head.error = DMSG_ERR_LOSTLINK; + msg->any.head.cmd |= DMSGF_ABORT; + + /* + * Issue callback synchronously even though this isn't + * the receiver thread. We need to issue the callback + * before removing state from the subq in order to allow + * the callback to reply. + */ + iocom = state->iocom; + dmsg_state_msgrx(msg, 1); + pthread_mutex_unlock(&iocom->mtx); + iocom->rcvmsg_callback(msg); + pthread_mutex_lock(&iocom->mtx); + dmsg_state_cleanuprx(iocom, msg); +#if 0 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry); atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); +#endif + } +} + + +/* + * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing + * the transmission of any new messages on these states. This is done + * atomically when parent state is terminating, whereas setting ABORTING is + * not atomic and can leak races. + */ +static +void +dmsg_state_dying(dmsg_state_t *state) +{ + dmsg_state_t *scan; + + if ((state->flags & DMSG_STATE_DYING) == 0) { + state->flags |= DMSG_STATE_DYING; + TAILQ_FOREACH(scan, &state->subq, entry) + dmsg_state_dying(scan); } } @@ -2012,7 +2073,7 @@ dmsg_state_result(dmsg_state_t *state, uint32_t error) * will typically just contain status updates. */ static int -dmsg_state_msgrx(dmsg_msg_t *msg) +dmsg_state_msgrx(dmsg_msg_t *msg, int mstate) { dmsg_iocom_t *iocom = msg->state->iocom; dmsg_state_t *state; @@ -2022,12 +2083,29 @@ dmsg_state_msgrx(dmsg_msg_t *msg) pthread_mutex_lock(&iocom->mtx); + if (DMsgDebugOpt) { + fprintf(stderr, + "msgrx: cmd=%08x msgid=%016jx " + "circuit=%016jx error=%d\n", + msg->any.head.cmd, + msg->any.head.msgid, + msg->any.head.circuit, + msg->any.head.error); + } + /* * Lookup the circuit (pstate). The circuit will be an open * transaction. The REVCIRC bit in the message tells us which side * initiated it. + * + * If mstate is non-zero the state has already been incorporated + * into the message as part of a simulated abort. Note that in this + * situation the parent state may have already been removed from + * the RBTREE. */ - if (msg->any.head.circuit) { + if (mstate) { + pstate = msg->state->parent; + } else if (msg->any.head.circuit) { sdummy.msgid = msg->any.head.circuit; if (msg->any.head.cmd & DMSGF_REVCIRC) { @@ -2039,36 +2117,74 @@ dmsg_state_msgrx(dmsg_msg_t *msg) &iocom->staterd_tree, &sdummy); } + + /* + * If we cannot find the circuit throw the message away. + * The state will have already been taken care of by + * the simulated failure code. This case can occur due + * to a failure propagating in one direction crossing a + * request on the failed circuit propagating in the other + * direction. + */ if (pstate == NULL) { fprintf(stderr, "missing parent in stacked trans %s\n", dmsg_msg_str(msg)); - error = DMSG_IOQ_ERROR_TRANS; pthread_mutex_unlock(&iocom->mtx); - assert(0); + error = DMSG_IOQ_ERROR_EALREADY; + + return error; } } else { pstate = &iocom->state0; } + /* WARNING: pstate not (yet) refd */ /* * Lookup the msgid. * + * If mstate is non-zero the state has already been incorporated + * into the message as part of a simulated abort. Note that in this + * situation the state may have already been removed from the RBTREE. + * * If received msg is a command state is on staterd_tree. * If received msg is a reply state is on statewr_tree. * Otherwise there is no state (retain &iocom->state0) */ - sdummy.msgid = msg->any.head.msgid; - if (msg->any.head.cmd & DMSGF_REVTRANS) - state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy); - else - state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy); + if (mstate) { + state = msg->state; + } else { + sdummy.msgid = msg->any.head.msgid; + if (msg->any.head.cmd & DMSGF_REVTRANS) { + state = RB_FIND(dmsg_state_tree, + &iocom->statewr_tree, &sdummy); + } else { + state = RB_FIND(dmsg_state_tree, + &iocom->staterd_tree, &sdummy); + } + } + + if (DMsgDebugOpt) { + fprintf(stderr, + "msgrx:\tstate %p(%08x)", + state, (state ? state->icmd : 0)); + if (pstate != &iocom->state0) { + fprintf(stderr, + " pstate %p(%08x)", + pstate, pstate->icmd); + } + fprintf(stderr, "\n"); + } - if (state) { + if (mstate) { + /* state already assigned to msg */ + } else if (state) { /* * Message over an existing transaction (CREATE should not * be set). */ + dmsg_state_drop(msg->state); + dmsg_state_hold(state); msg->state = state; assert(pstate == state->parent); } else { @@ -2078,8 +2194,6 @@ dmsg_state_msgrx(dmsg_msg_t *msg) state = pstate; } - pthread_mutex_unlock(&iocom->mtx); - /* * Switch on CREATE, DELETE, REPLY, and also handle ABORT from * inside the case statements. @@ -2111,11 +2225,11 @@ dmsg_state_msgrx(dmsg_msg_t *msg) * Allocate the new state. */ state = malloc(sizeof(*state)); - atomic_add_int(&dmsg_state_count, 1); bzero(state, sizeof(*state)); + atomic_add_int(&dmsg_state_count, 1); + TAILQ_INIT(&state->subq); dmsg_state_hold(pstate); - state->refs = 1; state->parent = pstate; state->iocom = iocom; state->flags = DMSG_STATE_DYNAMIC | @@ -2124,12 +2238,18 @@ dmsg_state_msgrx(dmsg_msg_t *msg) state->txcmd = DMSGF_REPLY; state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; state->icmd = state->rxcmd & DMSGF_BASECMDMASK; + state->flags &= ~DMSG_STATE_NEW; msg->state = state; - pthread_mutex_lock(&iocom->mtx); + RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state); + if (TAILQ_EMPTY(&pstate->subq)) + dmsg_state_hold(pstate);/* pstate->subq */ TAILQ_INSERT_TAIL(&pstate->subq, state, entry); state->flags |= DMSG_STATE_SUBINSERTED | DMSG_STATE_RBINSERTED; + dmsg_state_hold(state); /* pstate->subq */ + dmsg_state_hold(state); /* state on rbtree */ + dmsg_state_hold(state); /* msg->state */ /* * If the parent is a relay set up the state handler to @@ -2140,14 +2260,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg) */ if (pstate->relay) state->func = dmsg_state_relay; - pthread_mutex_unlock(&iocom->mtx); error = 0; - - if (DMsgDebugOpt) { - fprintf(stderr, - "create state %p id=%08x on iocom staterd %p\n", - state, (uint32_t)state->msgid, iocom); - } break; case DMSGF_DELETE: /* @@ -2278,7 +2391,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg) */ if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) { if ((msg->state->flags & DMSG_STATE_ROOT) == 0) { - msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) | + msg->tcmd = (state->icmd & DMSGF_BASECMDMASK) | (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)); @@ -2310,6 +2423,48 @@ dmsg_state_msgrx(dmsg_msg_t *msg) } #endif + /* + * Adjust state, mark receive side as DELETED if appropriate and + * adjust RB tree if both sides are DELETED. cleanuprx handles + * the rest after the state callback returns. + */ + assert(msg->state->iocom == iocom); + assert(msg->state == state); + + if (state->flags & DMSG_STATE_ROOT) { + /* + * Nothing to do for non-transactional messages. + */ + } else if (msg->any.head.cmd & DMSGF_DELETE) { + /* + * Message terminating transaction, remove the state from + * the RB tree if the full transaction is now complete. + * The related state, subq, and parent link is retained + * until after the state callback is complete. + */ + assert((state->rxcmd & DMSGF_DELETE) == 0); + state->rxcmd |= DMSGF_DELETE; + if (state->txcmd & DMSGF_DELETE) { + assert(state->flags & DMSG_STATE_RBINSERTED); + if (state->rxcmd & DMSGF_REPLY) { + assert(msg->any.head.cmd & DMSGF_REPLY); + RB_REMOVE(dmsg_state_tree, + &iocom->statewr_tree, state); + } else { + assert((msg->any.head.cmd & DMSGF_REPLY) == 0); + RB_REMOVE(dmsg_state_tree, + &iocom->staterd_tree, state); + } + state->flags &= ~DMSG_STATE_RBINSERTED; + dmsg_state_drop(state); + } + } + + pthread_mutex_unlock(&iocom->mtx); + + if (DMsgDebugOpt && error) + fprintf(stderr, "msgrx: error %d\n", error); + return (error); } @@ -2327,6 +2482,12 @@ dmsg_state_relay(dmsg_msg_t *lmsg) #ifdef DMSG_BLOCK_DEBUG switch (lmsg->tcmd) { + case DMSG_BLK_OPEN | DMSGF_CREATE: + fprintf(stderr, "relay BIO_OPEN (CREATE)\n"); + break; + case DMSG_BLK_OPEN | DMSGF_DELETE: + fprintf(stderr, "relay BIO_OPEN (DELETE)\n"); + break; case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE: case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE: atomic_add_int(&biocount, 1); @@ -2375,6 +2536,17 @@ dmsg_state_relay(dmsg_msg_t *lmsg) rstate = lstate->relay; assert(rstate != NULL); + assert((rstate->txcmd & DMSGF_DELETE) == 0); + +#if 0 + if (lstate->flags & DMSG_STATE_ABORTING) { + fprintf(stderr, + "relay: relay lost link l=%p r=%p\n", + lstate, rstate); + dmsg_simulate_failure(rstate, 0, DMSG_ERR_LOSTLINK); + } +#endif + rmsg = dmsg_msg_alloc(rstate, 0, lmsg->any.head.cmd, dmsg_state_relay, NULL); @@ -2389,6 +2561,7 @@ dmsg_state_relay(dmsg_msg_t *lmsg) rmsg->aux_size = lmsg->aux_size; rmsg->aux_data = lmsg->aux_data; lmsg->aux_data = NULL; + /* fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd); */ @@ -2396,13 +2569,16 @@ dmsg_state_relay(dmsg_msg_t *lmsg) } /* - * Cleanup and retire msg after processing + * Cleanup and retire msg after issuing the state callback. The state + * has already been removed from the RB tree. The subq and msg must be + * cleaned up. + * + * Called with the iocom mutex held (to handle subq disconnection). */ void dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) { dmsg_state_t *state; - dmsg_state_t *pstate; assert(msg->state->iocom == iocom); state = msg->state; @@ -2412,52 +2588,18 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) * to worry about. */ dmsg_msg_free(msg); - } else if (msg->any.head.cmd & DMSGF_DELETE) { + } else if ((state->flags & DMSG_STATE_SUBINSERTED) && + (state->rxcmd & DMSGF_DELETE) && + (state->txcmd & DMSGF_DELETE)) { /* - * Message terminating transaction, destroy the related - * state, the original message, and this message (if it - * isn't the original message due to a CREATE|DELETE). - * - * It's possible for governing state to terminate while - * sub-transactions still exist. This is allowed but - * will cause sub-transactions to recursively fail. - * Further reception of sub-transaction messages will be - * impossible because the circuit will no longer exist. - * (XXX need code to make sure that happens properly). + * Must disconnect from parent and drop relay. */ - pthread_mutex_lock(&iocom->mtx); - state->rxcmd |= DMSGF_DELETE; - - if (state->txcmd & DMSGF_DELETE) { - assert(state->flags & DMSG_STATE_RBINSERTED); - if (state->rxcmd & DMSGF_REPLY) { - assert(msg->any.head.cmd & DMSGF_REPLY); - RB_REMOVE(dmsg_state_tree, - &iocom->statewr_tree, state); - } else { - assert((msg->any.head.cmd & DMSGF_REPLY) == 0); - RB_REMOVE(dmsg_state_tree, - &iocom->staterd_tree, state); - } - state->flags &= ~DMSG_STATE_RBINSERTED; - if (state->flags & DMSG_STATE_SUBINSERTED) { - pstate = state->parent; - TAILQ_REMOVE(&pstate->subq, state, entry); - state->flags &= ~DMSG_STATE_SUBINSERTED; - dmsg_state_drop(pstate); - } - state->parent = NULL; - - if (state->relay) { - dmsg_state_drop(state->relay); - state->relay = NULL; - } - dmsg_msg_free(msg); - dmsg_state_drop(state); - } else { - dmsg_msg_free(msg); + dmsg_subq_delete(state); + if (state->relay) { + dmsg_state_drop(state->relay); + state->relay = NULL; } - pthread_mutex_unlock(&iocom->mtx); + dmsg_msg_free(msg); } else { /* * Message not terminating transaction, leave state intact @@ -2470,15 +2612,19 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) /* * Clean up the state after pulling out needed fields and queueing the * message for transmission. This occurs in dmsg_msg_write(). + * + * Called with the mutex locked. */ static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) { dmsg_state_t *state; - dmsg_state_t *pstate; assert(iocom == msg->state->iocom); state = msg->state; + + dmsg_state_hold(state); + if (state->flags & DMSG_STATE_ROOT) { ; } else if (msg->any.head.cmd & DMSGF_DELETE) { @@ -2493,11 +2639,15 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) * Further reception of sub-transaction messages will be * impossible because the circuit will no longer exist. * (XXX need code to make sure that happens properly). - */ - pthread_mutex_lock(&iocom->mtx); - assert((state->txcmd & DMSGF_DELETE) == 0); - state->txcmd |= DMSGF_DELETE; - if (state->rxcmd & DMSGF_DELETE) { + * + * NOTE: It is possible for a fafilure to terminate the + * state after we have written the message but before + * we are able to call cleanuptx, so txcmd might already + * have DMSGF_DELETE set. + */ + if ((state->txcmd & DMSGF_DELETE) == 0 && + (state->rxcmd & DMSGF_DELETE)) { + state->txcmd |= DMSGF_DELETE; assert(state->flags & DMSG_STATE_RBINSERTED); if (state->txcmd & DMSGF_REPLY) { assert(msg->any.head.cmd & DMSGF_REPLY); @@ -2509,22 +2659,31 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg) &iocom->statewr_tree, state); } state->flags &= ~DMSG_STATE_RBINSERTED; - pstate = state->parent; - if (state->flags & DMSG_STATE_SUBINSERTED) { - TAILQ_REMOVE(&pstate->subq, state, entry); - state->flags &= ~DMSG_STATE_SUBINSERTED; - } - state->parent = NULL; - dmsg_state_drop(pstate); + dmsg_subq_delete(state); if (state->relay) { dmsg_state_drop(state->relay); state->relay = NULL; } - dmsg_state_drop(state); /* usually the last drop */ + dmsg_state_drop(state); /* state->rbtree */ + } else if ((state->txcmd & DMSGF_DELETE) == 0) { + state->txcmd |= DMSGF_DELETE; } - pthread_mutex_unlock(&iocom->mtx); } + + /* + * Deferred abort after transmission. + */ + if ((state->flags & (DMSG_STATE_ABORTING | DMSG_STATE_DYING)) && + (state->rxcmd & DMSGF_DELETE) == 0) { + printf("kdmsg_state_cleanuptx: state=%p " + "executing deferred abort\n", + state); + state->flags &= ~DMSG_STATE_ABORTING; + dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK); + } + + dmsg_state_drop(state); } /* @@ -2539,6 +2698,7 @@ dmsg_state_hold(dmsg_state_t *state) void dmsg_state_drop(dmsg_state_t *state) { + assert(state->refs > 0); if (atomic_fetchadd_int(&state->refs, -1) == 1) dmsg_state_free(state); } @@ -2551,8 +2711,7 @@ dmsg_state_free(dmsg_state_t *state) { atomic_add_int(&dmsg_state_count, -1); if (DMsgDebugOpt) { - fprintf(stderr, "terminate state %p id=%08x\n", - state, (uint32_t)state->msgid); + fprintf(stderr, "terminate state %p\n", state); } assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_SUBINSERTED | diff --git a/lib/libdmsg/msg_lnk.c b/lib/libdmsg/msg_lnk.c index c55009fa5e..52ebf81c23 100644 --- a/lib/libdmsg/msg_lnk.c +++ b/lib/libdmsg/msg_lnk.c @@ -418,6 +418,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg) RB_INIT(&conn->tree); state->iocom->conn = conn; /* XXX only one */ state->iocom->conn_msgid = state->msgid; + dmsg_state_hold(state); conn->state = state; state->func = dmsg_lnk_conn; state->any.conn = conn; @@ -493,6 +494,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg) dmsg_free(conn); dmsg_msg_reply(msg, 0); + dmsg_state_drop(state); /* state invalid after reply */ break; default: @@ -610,6 +612,7 @@ dmsg_lnk_span(dmsg_msg_t *msg) * allows such a feature to be added in the future. */ assert(state->any.link == NULL); + dmsg_state_hold(state); slink = dmsg_alloc(sizeof(*slink)); TAILQ_INIT(&slink->relayq); slink->node = node; @@ -645,6 +648,7 @@ dmsg_lnk_span(dmsg_msg_t *msg) */ if (msg->any.head.cmd & DMSGF_DELETE) { slink = state->any.link; + assert(slink->state == state); assert(slink != NULL); node = slink->node; cls = node->cls; @@ -683,6 +687,7 @@ dmsg_lnk_span(dmsg_msg_t *msg) state->any.link = NULL; slink->state = NULL; slink->node = NULL; + dmsg_state_drop(state); dmsg_free(slink); /* @@ -1026,6 +1031,7 @@ dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink) h2span_relay_t *relay; dmsg_msg_t *msg; + dmsg_state_hold(slink->state); relay = dmsg_alloc(sizeof(*relay)); relay->conn = conn; relay->source_rt = slink->state; @@ -1039,6 +1045,7 @@ dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink) msg = dmsg_msg_alloc(&conn->state->iocom->state0, 0, DMSG_LNK_SPAN | DMSGF_CREATE, dmsg_lnk_relay, relay); + dmsg_state_hold(msg->state); relay->target_rt = msg->state; msg->any.lnk_span = slink->lnk_span; @@ -1110,6 +1117,7 @@ dmsg_relay_delete(h2span_relay_t *relay) if (relay->target_rt) { relay->target_rt->any.relay = NULL; dmsg_state_reply(relay->target_rt, 0); + dmsg_state_drop(relay->target_rt); /* state invalid after reply */ relay->target_rt = NULL; } @@ -1119,7 +1127,10 @@ dmsg_relay_delete(h2span_relay_t *relay) * state, not by this relay structure. */ relay->conn = NULL; - relay->source_rt = NULL; + if (relay->source_rt) { + dmsg_state_drop(relay->source_rt); + relay->source_rt = NULL; + } dmsg_free(relay); } diff --git a/lib/libdmsg/subs.c b/lib/libdmsg/subs.c index 2466044d85..2e6b50994b 100644 --- a/lib/libdmsg/subs.c +++ b/lib/libdmsg/subs.c @@ -125,8 +125,10 @@ dmsg_connect(const char *hostname) } if (connect(fd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) { close(fd); - fprintf(stderr, "debug: connect failed: %s\n", - strerror(errno)); + if (DMsgDebugOpt > 2) { + fprintf(stderr, "debug: Connect failed: %s\n", + strerror(errno)); + } return -1; } return (fd); diff --git a/sys/kern/kern_dmsg.c b/sys/kern/kern_dmsg.c index ac725138da..c407629d56 100644 --- a/sys/kern/kern_dmsg.c +++ b/sys/kern/kern_dmsg.c @@ -62,20 +62,22 @@ static int kdmsg_state_msgrx(kdmsg_msg_t *msg); static int kdmsg_state_msgtx(kdmsg_msg_t *msg); static void kdmsg_state_cleanuprx(kdmsg_msg_t *msg); static void kdmsg_state_cleanuptx(kdmsg_msg_t *msg); +static void kdmsg_subq_delete(kdmsg_state_t *state); static void kdmsg_simulate_failure(kdmsg_state_t *state, int meto, int error); static void kdmsg_state_abort(kdmsg_state_t *state); +static void kdmsg_state_dying(kdmsg_state_t *state); static void kdmsg_state_free(kdmsg_state_t *state); #ifdef KDMSG_DEBUG #define KDMSG_DEBUG_ARGS , const char *file, int line -#define kdmsg_state_ref(state) _kdmsg_state_ref(state, __FILE__, __LINE__) +#define kdmsg_state_hold(state) _kdmsg_state_hold(state, __FILE__, __LINE__) #define kdmsg_state_drop(state) _kdmsg_state_drop(state, __FILE__, __LINE__) #else #define KDMSG_DEBUG_ARGS -#define kdmsg_state_ref(state) _kdmsg_state_ref(state) +#define kdmsg_state_hold(state) _kdmsg_state_hold(state) #define kdmsg_state_drop(state) _kdmsg_state_drop(state) #endif -static void _kdmsg_state_ref(kdmsg_state_t *state KDMSG_DEBUG_ARGS); +static void _kdmsg_state_hold(kdmsg_state_t *state KDMSG_DEBUG_ARGS); static void _kdmsg_state_drop(kdmsg_state_t *state KDMSG_DEBUG_ARGS); static void kdmsg_iocom_thread_rd(void *arg); @@ -170,7 +172,7 @@ kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom, iocom->auto_lnk_conn.head = msg->any.head; msg->any.lnk_conn = iocom->auto_lnk_conn; iocom->conn_state = msg->state; - kdmsg_state_ref(msg->state); /* iocom->conn_state */ + kdmsg_state_hold(msg->state); /* iocom->conn_state */ kdmsg_msg_write(msg); } @@ -506,7 +508,7 @@ kdmsg_iocom_thread_wr(void *arg) RB_ROOT(&iocom->staterd_tree) || RB_ROOT(&iocom->statewr_tree)) { /* - * + * Simulate failure for all sub-states of state0. */ kdmsg_drain_msgq(iocom); kprintf("simulate failure for all substates of state0\n"); @@ -603,23 +605,6 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg) kdmsg_iocom_t *iocom = msg->state->iocom; int error; -#if 0 - /* - * If sub-states exist and we are deleting (typically due to a - * disconnect), we may not receive deletes for any of the substates - * and must simulate associated failures. - */ - if (msg->state && - (msg->any.head.cmd & DMSGF_DELETE) && - TAILQ_FIRST(&msg->state->subq)) { - lockmgr(&iocom->msglk, LK_EXCLUSIVE); - kprintf("simulate failure for substates of cmd %08x/%08x\n", - msg->state->rxcmd, msg->state->txcmd); - kdmsg_simulate_failure(msg->state, 0, DMSG_ERR_LOSTLINK); - lockmgr(&iocom->msglk, LK_RELEASE); - } -#endif - /* * State machine tracking, state assignment for msg, * returns error and discard status. Errors are fatal @@ -627,10 +612,16 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg) * a discard without execution. */ error = kdmsg_state_msgrx(msg); + if (msg->state->flags & KDMSG_STATE_ABORTING) { + kprintf("kdmsg_state_abort(b): state %p rxcmd=%08x txcmd=%08x msgrx error %d\n", + msg->state, msg->state->rxcmd, msg->state->txcmd, error); + } if (error) { /* * Raw protocol or connection error */ + if (msg->state->flags & KDMSG_STATE_ABORTING) + kprintf("X1 state %p error %d\n", msg->state, error); kdmsg_msg_free(msg); if (error == EALREADY) error = 0; @@ -639,12 +630,18 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg) * Message related to state which already has a * handling function installed for it. */ + if (msg->state->flags & KDMSG_STATE_ABORTING) + kprintf("X2 state %p func %p\n", msg->state, msg->state->func); error = msg->state->func(msg->state, msg); kdmsg_state_cleanuprx(msg); } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) { + if (msg->state->flags & KDMSG_STATE_ABORTING) + kprintf("X3 state %p\n", msg->state); error = kdmsg_autorxmsg(msg); kdmsg_state_cleanuprx(msg); } else { + if (msg->state->flags & KDMSG_STATE_ABORTING) + kprintf("X4 state %p\n", msg->state); error = iocom->rcvmsg(msg); kdmsg_state_cleanuprx(msg); } @@ -652,35 +649,21 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg) } /* - * Process state tracking for a message after reception, prior to - * execution. + * Process state tracking for a message after reception and dequeueing, + * prior to execution of the state callback. The state is updated and + * will be removed from the RBTREE if completely closed, but the state->parent + * and subq linkage is not cleaned up until after the callback (see + * cleanuprx()). * - * Called with msglk held and the msg dequeued. + * msglk is not held. * - * All messages are called with dummy state and return actual state. - * (One-off messages often just return the same dummy state). + * NOTE: A message transaction can consist of several messages in either + * direction. * - * May request that caller discard the message by setting *discardp to 1. - * The returned state is not used in this case and is allowed to be NULL. - * - * -- - * - * These routines handle persistent and command/reply message state via the - * CREATE and DELETE flags. The first message in a command or reply sequence - * sets CREATE, the last message in a command or reply sequence sets DELETE. - * - * There can be any number of intermediate messages belonging to the same - * sequence sent inbetween the CREATE message and the DELETE message, - * which set neither flag. This represents a streaming command or reply. - * - * Any command message received with CREATE set expects a reply sequence to - * be returned. Reply sequences work the same as command sequences except the - * REPLY bit is also sent. Both the command side and reply side can - * degenerate into a single message with both CREATE and DELETE set. Note - * that one side can be streaming and the other side not, or neither, or both. - * - * The msgid is unique for the initiator. That is, two sides sending a new - * message can use the same msgid without colliding. + * NOTE: The msgid is unique to the initiator, not necessarily unique for + * us or for any relay or for the return direction for that matter. + * That is, two sides sending a new message can use the same msgid + * without colliding. * * -- * @@ -695,7 +678,7 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg) * also race, and in this situation the other side might have already * initiated a new unrelated command with the same message id. Since * the abort has not set the CREATE flag the situation can be detected - * and the message will also be discarded. + * and the message will also be discarded. * * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE]. * The ABORT request is essentially integrated into the command instead @@ -775,7 +758,7 @@ again: lksleep(state, &iocom->msglk, 0, "dmrace", hz); goto again; } - kdmsg_state_ref(state); + kdmsg_state_hold(state); kdmsg_state_drop(msg->state); /* iocom->state0 */ msg->state = state; } else { @@ -850,13 +833,15 @@ again: state->flags |= KDMSG_STATE_RBINSERTED | KDMSG_STATE_SUBINSERTED | KDMSG_STATE_OPPOSITE; - kdmsg_state_ref(pstate); /* states on pstate->subq */ - kdmsg_state_ref(state); /* state on pstate->subq */ - kdmsg_state_ref(state); /* state on rbtree */ + if (TAILQ_EMPTY(&pstate->subq)) + kdmsg_state_hold(pstate);/* states on pstate->subq */ + kdmsg_state_hold(state); /* state on pstate->subq */ + kdmsg_state_hold(state); /* state on rbtree */ state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK; state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE; state->txcmd = DMSGF_REPLY; state->msgid = msg->any.head.msgid; + state->flags &= ~KDMSG_STATE_NEW; RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state); TAILQ_INSERT_TAIL(&pstate->subq, state, entry); error = 0; @@ -868,6 +853,8 @@ again: */ if (state == &iocom->state0) { if (msg->any.head.cmd & DMSGF_ABORT) { + kprintf("kdmsg_state_msgrx: " + "state already A\n"); error = EALREADY; } else { kprintf("kdmsg_state_msgrx: " @@ -883,6 +870,8 @@ again: */ if ((state->rxcmd & DMSGF_CREATE) == 0) { if (msg->any.head.cmd & DMSGF_ABORT) { + kprintf("kdmsg_state_msgrx: " + "state already B\n"); error = EALREADY; } else { kprintf("kdmsg_state_msgrx: " @@ -983,8 +972,6 @@ again: * always have a DMSGF_CREATE and/or DMSGF_DELETE flag. */ done: - lockmgr(&iocom->msglk, LK_RELEASE); - if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) { if (state != &iocom->state0) { msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) | @@ -997,6 +984,40 @@ done: } else { msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK; } + + /* + * Adjust the state for DELETE handling now, before making the + * callback so we are atomic with other state updates. + * + * Subq/parent linkages are cleaned up after the callback. + * If an error occurred the message is ignored and state is not + * updated. + */ + if ((state = msg->state) == NULL || error != 0) { + kprintf("kdmsg_state_msgrx: state=%p error %d\n", state, error); + ; + } else if (msg->any.head.cmd & DMSGF_DELETE) { + KKASSERT((state->rxcmd & DMSGF_DELETE) == 0); + state->rxcmd |= DMSGF_DELETE; + if (state->txcmd & DMSGF_DELETE) { + KKASSERT(state->flags & KDMSG_STATE_RBINSERTED); + if (state->rxcmd & DMSGF_REPLY) { + KKASSERT(msg->any.head.cmd & + DMSGF_REPLY); + RB_REMOVE(kdmsg_state_tree, + &iocom->statewr_tree, state); + } else { + KKASSERT((msg->any.head.cmd & + DMSGF_REPLY) == 0); + RB_REMOVE(kdmsg_state_tree, + &iocom->staterd_tree, state); + } + state->flags &= ~KDMSG_STATE_RBINSERTED; + kdmsg_state_drop(state); /* state on rbtree */ + } + } + lockmgr(&iocom->msglk, LK_RELEASE); + return (error); } @@ -1117,55 +1138,80 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg) /* * Post-receive-handling message and state cleanup. This routine is called * after the state function handling/callback to properly dispose of the - * message and update or dispose of the state. + * message and unlink the state's parent/subq linkage if the state is + * completely closed. + * + * msglk is not held. */ static void kdmsg_state_cleanuprx(kdmsg_msg_t *msg) { - kdmsg_iocom_t *iocom = msg->state->iocom; - kdmsg_state_t *state; + kdmsg_state_t *state = msg->state; + kdmsg_iocom_t *iocom = state->iocom; + + lockmgr(&iocom->msglk, LK_EXCLUSIVE); + if (state != &iocom->state0) { + /* + * When terminating a transaction (in either direction), all + * sub-states are aborted. + */ + if ((msg->any.head.cmd & DMSGF_DELETE) && + TAILQ_FIRST(&msg->state->subq)) { + kprintf("simulate failure for substates of state %p " + "cmd %08x/%08x\n", + msg->state, + msg->state->rxcmd, + msg->state->txcmd); + kdmsg_simulate_failure(msg->state, + 0, DMSG_ERR_LOSTLINK); + } + + /* + * Once the state is fully closed we can (try to) remove it + * from the subq topology. + */ + if ((state->flags & KDMSG_STATE_SUBINSERTED) && + (state->rxcmd & DMSGF_DELETE) && + (state->txcmd & DMSGF_DELETE)) { + /* + * Remove parent linkage if state is completely closed. + */ + kdmsg_subq_delete(state); + } + } + kdmsg_msg_free(msg); + + lockmgr(&iocom->msglk, LK_RELEASE); +} + +/* + * Remove state from its parent's subq. This can wind up recursively + * dropping the parent upward. + * + * NOTE: Once we drop the parent, our pstate pointer may become invalid. + */ +static +void +kdmsg_subq_delete(kdmsg_state_t *state) +{ kdmsg_state_t *pstate; - if ((state = msg->state) == NULL) { - kdmsg_msg_free(msg); - } else if (msg->any.head.cmd & DMSGF_DELETE) { - lockmgr(&iocom->msglk, LK_EXCLUSIVE); - KKASSERT((state->rxcmd & DMSGF_DELETE) == 0); - state->rxcmd |= DMSGF_DELETE; - if (state->txcmd & DMSGF_DELETE) { - KKASSERT(state->flags & KDMSG_STATE_RBINSERTED); - if (state->rxcmd & DMSGF_REPLY) { - KKASSERT(msg->any.head.cmd & - DMSGF_REPLY); - RB_REMOVE(kdmsg_state_tree, - &iocom->statewr_tree, state); - } else { - KKASSERT((msg->any.head.cmd & - DMSGF_REPLY) == 0); - RB_REMOVE(kdmsg_state_tree, - &iocom->staterd_tree, state); - } - state->flags &= ~KDMSG_STATE_RBINSERTED; - pstate = state->parent; - if (state->flags & KDMSG_STATE_SUBINSERTED) { - TAILQ_REMOVE(&pstate->subq, state, entry); - state->flags &= ~KDMSG_STATE_SUBINSERTED; - kdmsg_state_drop(pstate); /* pstate->subq */ - kdmsg_state_drop(state); /* pstate->subq */ - state->parent = NULL; - } else { - KKASSERT(state->parent == NULL); - } - kdmsg_msg_free(msg); - kdmsg_state_drop(state); /* state on rbtree */ - lockmgr(&iocom->msglk, LK_RELEASE); - } else { - kdmsg_msg_free(msg); - lockmgr(&iocom->msglk, LK_RELEASE); + if (state->flags & KDMSG_STATE_SUBINSERTED) { + pstate = state->parent; + KKASSERT(pstate); + if (pstate->scan == state) + pstate->scan = NULL; + TAILQ_REMOVE(&pstate->subq, state, entry); + state->flags &= ~KDMSG_STATE_SUBINSERTED; + state->parent = NULL; + if (TAILQ_EMPTY(&pstate->subq)) { + kdmsg_state_drop(pstate);/* pstate->subq */ } + pstate = NULL; /* safety */ + kdmsg_state_drop(state); /* pstate->subq */ } else { - kdmsg_msg_free(msg); + KKASSERT(state->parent == NULL); } } @@ -1187,12 +1233,31 @@ kdmsg_simulate_failure(kdmsg_state_t *state, int meto, int error) { kdmsg_state_t *substate; - kdmsg_state_ref(state); /* aborting */ - while ((substate = TAILQ_FIRST(&state->subq)) != NULL) { - kdmsg_simulate_failure(substate, 1, error); - } + kdmsg_state_hold(state); /* aborting */ + + /* + * Abort parent state first. Parent will not actually disappear + * until children are gone. Device drivers must handle the situation. + * The advantage of this is that device drivers can flag the situation + * as an interlock against new operations on dying states. And since + * device operations are often asynchronous anyway, this sequence of + * events works out better. + */ if (meto) kdmsg_state_abort(state); + + /* + * Recurse through any children. + */ +again: + TAILQ_FOREACH(substate, &state->subq, entry) { + if (substate->flags & KDMSG_STATE_ABORTING) + continue; + state->scan = substate; + kdmsg_simulate_failure(substate, 1, error); + if (state->scan != substate) + goto again; + } kdmsg_state_drop(state); /* aborting */ } @@ -1203,23 +1268,33 @@ kdmsg_state_abort(kdmsg_state_t *state) kdmsg_msg_t *msg; /* - * Prevent recursive aborts which could otherwise occur if the - * simulated message reception runs state->func which then turns - * around and tries to reply to a broken circuit when then calls - * the state abort code again. + * Set ABORTING and DYING, return if already set. If the state was + * just allocated we defer the abort operation until the related + * message is processed. */ KKASSERT((state->flags & KDMSG_STATE_ABORTING) == 0); if (state->flags & KDMSG_STATE_ABORTING) return; state->flags |= KDMSG_STATE_ABORTING; + kdmsg_state_dying(state); + if (state->flags & KDMSG_STATE_NEW) { + kprintf("kdmsg_state_abort(0): state %p rxcmd %08x txcmd %08x flags %08x - in NEW state\n", + state, state->rxcmd, state->txcmd, state->flags); + return; + } /* + * NOTE: The DELETE flag might already be set due to an early + * termination. + * * NOTE: Args to kdmsg_msg_alloc() to avoid dynamic state allocation. * * NOTE: We are simulating a received message using our state * (vs a message generated by the other side using its state), * so we must invert DMSGF_REVTRANS and DMSGF_REVCIRC. */ + kprintf("kdmsg_state_abort(1): state %p rxcmd %08x txcmd %08x\n", + state, state->rxcmd, state->txcmd); if ((state->rxcmd & DMSGF_DELETE) == 0) { msg = kdmsg_msg_alloc(state, DMSG_LNK_ERROR, NULL, NULL); if ((state->rxcmd & DMSGF_CREATE) == 0) @@ -1228,24 +1303,34 @@ kdmsg_state_abort(kdmsg_state_t *state) (state->rxcmd & DMSGF_REPLY); msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC); msg->any.head.error = DMSG_ERR_LOSTLINK; + kprintf("kdmsg_state_abort(a): state %p msgcmd %08x\n", + state, msg->any.head.cmd); + /* circuit not initialized */ lockmgr(&state->iocom->msglk, LK_RELEASE); kdmsg_msg_receive_handling(msg); lockmgr(&state->iocom->msglk, LK_EXCLUSIVE); msg = NULL; } + kprintf("kdmsg_state_abort(2): state %p rxcmd %08x txcmd %08x\n", + state, state->rxcmd, state->txcmd); +} - /* - * If the state still has a parent association we must remove it - * now even if it is not fully closed or the simulation loop will - * livelock. - */ - if (state->flags & KDMSG_STATE_SUBINSERTED) { - KKASSERT(state->flags & KDMSG_STATE_RBINSERTED); - TAILQ_REMOVE(&state->parent->subq, state, entry); - state->flags &= ~KDMSG_STATE_SUBINSERTED; - kdmsg_state_drop(state->parent); /* pstate->subq */ - kdmsg_state_drop(state); /* pstate->subq */ - state->parent = NULL; +/* + * Recursively sets KDMSG_STATE_DYING on state and all sub-states, preventing + * the transmission of any new messages on these states. This is done + * atomically when parent state is terminating, whereas setting ABORTING is + * not atomic and can leak races. + */ +static +void +kdmsg_state_dying(kdmsg_state_t *state) +{ + kdmsg_state_t *scan; + + if ((state->flags & KDMSG_STATE_DYING) == 0) { + state->flags |= KDMSG_STATE_DYING; + TAILQ_FOREACH(scan, &state->subq, entry) + kdmsg_state_dying(scan); } } @@ -1322,6 +1407,7 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg) state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK; state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; state->rxcmd = DMSGF_REPLY; + state->flags &= ~KDMSG_STATE_NEW; error = 0; break; case DMSGF_DELETE: @@ -1462,7 +1548,6 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg) { kdmsg_iocom_t *iocom = msg->state->iocom; kdmsg_state_t *state; - kdmsg_state_t *pstate; if ((state = msg->state) == NULL) { kdmsg_msg_free(msg); @@ -1480,6 +1565,7 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg) wakeup(state); } state->flags &= ~(KDMSG_STATE_INTERLOCK | KDMSG_STATE_SIGNAL); + kdmsg_state_hold(state); if (msg->any.head.cmd & DMSGF_DELETE) { KKASSERT((state->txcmd & DMSGF_DELETE) == 0); @@ -1498,16 +1584,25 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg) &iocom->statewr_tree, state); } state->flags &= ~KDMSG_STATE_RBINSERTED; - pstate = state->parent; - if (state->flags & KDMSG_STATE_SUBINSERTED) { - TAILQ_REMOVE(&pstate->subq, state, entry); - state->flags &= ~KDMSG_STATE_SUBINSERTED; - kdmsg_state_drop(pstate); /* pstate->subq */ - kdmsg_state_drop(state); /* pstate->subq */ - state->parent = NULL; - } else { - KKASSERT(state->parent == NULL); - } + + /* + * The subq recursion is used for parent linking and + * scanning the topology for aborts, we can only + * remove leafs. The circuit is effectively dead now, + * but topology won't be torn down until all of its + * children have finished/aborted. + * + * This is particularly important for end-point + * devices which might need to access private data + * in parent states. Out of order disconnects can + * occur if an end-point device is processing a + * message transaction asynchronously because abort + * requests are basically synchronous and it probably + * isn't convenient (or possible) for the end-point + * to abort an asynchronous operation. + */ + if (TAILQ_EMPTY(&state->subq)) + kdmsg_subq_delete(state); kdmsg_msg_free(msg); kdmsg_state_drop(state); /* state on rbtree */ } else { @@ -1516,11 +1611,24 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg) } else { kdmsg_msg_free(msg); } + + /* + * Deferred abort after transmission. + */ + if ((state->flags & (KDMSG_STATE_ABORTING | KDMSG_STATE_DYING)) && + (state->rxcmd & DMSGF_DELETE) == 0) { + kprintf("kdmsg_state_cleanuptx: state=%p " + "executing deferred abort\n", + state); + state->flags &= ~KDMSG_STATE_ABORTING; + kdmsg_state_abort(state); + } + kdmsg_state_drop(state); } static void -_kdmsg_state_ref(kdmsg_state_t *state KDMSG_DEBUG_ARGS) +_kdmsg_state_hold(kdmsg_state_t *state KDMSG_DEBUG_ARGS) { atomic_add_int(&state->refs, 1); #if KDMSG_DEBUG @@ -1573,13 +1681,23 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd, /* * New transaction, requires tracking state and a unique * msgid to be allocated. + * + * It is possible to race a circuit failure, inherit the + * parent's STATE_DYING flag to trigger an abort sequence + * in the transmit path. By not inheriting ABORTING the + * abort sequence can recurse. + * + * NOTE: The transactions has not yet been initiated so we + * cannot set DMSGF_CREATE/DELETE bits in txcmd or rxcmd. + * We have to properly setup DMSGF_REPLY, however. */ pstate = state; state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO); TAILQ_INIT(&state->subq); state->iocom = iocom; state->parent = pstate; - state->flags = KDMSG_STATE_DYNAMIC; + state->flags = KDMSG_STATE_DYNAMIC | + KDMSG_STATE_NEW; state->func = func; state->any.any = data; state->msgid = (uint64_t)(uintptr_t)state; @@ -1588,16 +1706,20 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd, lockmgr(&iocom->msglk, LK_EXCLUSIVE); if (RB_INSERT(kdmsg_state_tree, &iocom->statewr_tree, state)) panic("duplicate msgid allocated"); + if (TAILQ_EMPTY(&pstate->subq)) + kdmsg_state_hold(pstate);/* pstate->subq */ TAILQ_INSERT_TAIL(&pstate->subq, state, entry); state->flags |= KDMSG_STATE_RBINSERTED | KDMSG_STATE_SUBINSERTED; - kdmsg_state_ref(pstate); /* pstate->subq */ - kdmsg_state_ref(state); /* pstate->subq */ - kdmsg_state_ref(state); /* state on rbtree */ + state->flags |= pstate->flags & KDMSG_STATE_DYING; + kdmsg_state_hold(state); /* pstate->subq */ + kdmsg_state_hold(state); /* state on rbtree */ + kdmsg_state_hold(state); /* msg->state */ lockmgr(&iocom->msglk, LK_RELEASE); } else { pstate = state->parent; KKASSERT(pstate != NULL); + kdmsg_state_hold(state); /* msg->state */ } if (state->flags & KDMSG_STATE_OPPOSITE) @@ -1610,7 +1732,6 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd, msg->any.head.msgid = state->msgid; msg->any.head.circuit = pstate->msgid; msg->state = state; - kdmsg_state_ref(state); /* msg->state */ return (msg); } @@ -1698,6 +1819,8 @@ kdmsg_msg_write(kdmsg_msg_t *msg) kdmsg_iocom_t *iocom = msg->state->iocom; kdmsg_state_t *state; + lockmgr(&iocom->msglk, LK_EXCLUSIVE); + if (msg->state) { /* * Continuance or termination of existing transaction. @@ -1718,9 +1841,11 @@ kdmsg_msg_write(kdmsg_msg_t *msg) msg->any.head.msgid = 0; } - lockmgr(&iocom->msglk, LK_EXCLUSIVE); - +#if 0 /* + * XXX removed - don't make this a panic, allow the state checks + * below to catch the situation. + * * This flag is not set until after the tx thread has drained * the tx msgq and simulated responses. After that point the * txthread is dead and can no longer simulate responses. @@ -1734,47 +1859,43 @@ kdmsg_msg_write(kdmsg_msg_t *msg) panic("kdmsg_msg_write: Attempt to write message to " "terminated iocom\n"); } +#endif /* - * For stateful messages, if the circuit is dead we have to abort - * the state and discard the message. + * For stateful messages, if the circuit is dead or dying we have + * to abort the potentially newly-created state and discard the + * message. * * - We must discard the message because the other end will not - * be expecting any more messages over the dead circuit and might - * not be able to receive them. + * be expecting any more messages over the dead or dying circuit + * and might not be able to receive them. * * - We abort the state by simulating a failure to generate a fake * incoming DELETE. This will trigger the state callback and allow * the device to clean things up and reply, closing the outgoing - * direction and terminating the state. + * direction and allowing the state to be freed. * - * - Because there are numerous races, it is possible that an abort - * has already been initiated on this state. - * - * - For now, don't bother checking to see if this is a CREATE - * message, though we could probably add that as a restriction. - * Any pre-existing state will probably have already had an abort - * initiated on it. - * - * This race occurs quite often, particularly as SPANs stabilize. + * This situation occurs quite often, particularly as SPANs stabilize. * End-points must do the right thing. */ if (state) { KKASSERT((state->txcmd & DMSGF_DELETE) == 0); - if ((state->parent->txcmd & DMSGF_DELETE) || - (state->parent->flags & KDMSG_STATE_ABORTING)) { + if (state->flags & KDMSG_STATE_DYING) { +#if 0 + if ((state->flags & KDMSG_STATE_DYING) || + (state->parent->txcmd & DMSGF_DELETE) || + (state->parent->flags & KDMSG_STATE_DYING)) { +#endif kprintf("kdmsg_msg_write: Write to dying circuit " + "state=%p " "ptxcmd=%08x prxcmd=%08x flags=%08x\n", + state, state->parent->rxcmd, state->parent->txcmd, state->parent->flags); - kdmsg_state_ref(state); + kdmsg_state_hold(state); kdmsg_state_msgtx(msg); kdmsg_state_cleanuptx(msg); - if ((state->flags & KDMSG_STATE_ABORTING) == 0) { - kdmsg_simulate_failure(state, 1, - DMSG_ERR_LOSTLINK); - } kdmsg_state_drop(state); lockmgr(&iocom->msglk, LK_RELEASE); return; diff --git a/sys/sys/dmsg.h b/sys/sys/dmsg.h index 75e53f07b3..a0858c1c15 100644 --- a/sys/sys/dmsg.h +++ b/sys/sys/dmsg.h @@ -717,11 +717,22 @@ struct kdmsg_data; * Transactional state structure, representing an open transaction. The * transaction might represent a cache state (and thus have a chain * association), or a VOP op, LNK_SPAN, or other things. + * + * NOTE: A non-empty subq represents one ref. + * If we are inserted on a parent's subq, that's one ref (SUBINSERTED). + * If we are inserted on a RB tree, that's one ref (RBINSERTED). + * msg->state represents a ref. + * Other code references may hold refs. + * + * NOTE: The parent association stays intact as long as a state has a + * non-empty subq. Otherwise simulated failures might not be able + * to reach the children. */ TAILQ_HEAD(kdmsg_state_list, kdmsg_state); struct kdmsg_state { RB_ENTRY(kdmsg_state) rbnode; /* indexed by msgid */ + struct kdmsg_state *scan; /* scan check */ struct kdmsg_state_list subq; /* active stacked states */ TAILQ_ENTRY(kdmsg_state) entry; /* on parent subq */ TAILQ_ENTRY(kdmsg_state) user_entry; /* available to devices */ @@ -745,13 +756,14 @@ struct kdmsg_state { #define KDMSG_STATE_SUBINSERTED 0x0001 #define KDMSG_STATE_DYNAMIC 0x0002 -#define KDMSG_STATE_DELPEND 0x0004 /* transmit delete pending */ +#define KDMSG_STATE_UNUSED0004 0x0004 #define KDMSG_STATE_ABORTING 0x0008 /* avoids recursive abort */ #define KDMSG_STATE_OPPOSITE 0x0010 /* opposite direction */ -#define KDMSG_STATE_DYING 0x0020 /* indicates circuit failure */ +#define KDMSG_STATE_DYING 0x0020 /* atomic recursive circ fail */ #define KDMSG_STATE_INTERLOCK 0x0040 #define KDMSG_STATE_RBINSERTED 0x0080 #define KDMSG_STATE_SIGNAL 0x0400 +#define KDMSG_STATE_NEW 0x0800 /* defer abort processing */ struct kdmsg_msg { TAILQ_ENTRY(kdmsg_msg) qentry; /* serialized queue */