dmsg - Stabilization work
authorMatthew Dillon <dillon@apollo.backplane.com>
Mon, 9 Mar 2015 03:49:31 +0000 (20:49 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 9 Mar 2015 03:49:31 +0000 (20:49 -0700)
* Refactor the circuit failure handling code.  When a connection is lost
  circuits running through that connection and all sub-circuits/states
  (recursively) are aborted.  This will propagate through the graph and
  there are plenty of edge cases where a failure may be propagating in
  one direction and a request in the other direction.

  The library is responsible for providing missing transaction closures
  when circuits and states fail.

* Add code to handle circuit failure races against newly created messages.
  The STATE_DYING flag is now inherited by the newly created message from
  its parent.

* The state structure on receive is now updated before the callback is
  made so it can be atomic with the lock, instead of after.

* Lots of debugging added and some cleanup.

lib/libdmsg/debug.c
lib/libdmsg/dmsg.h
lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c
lib/libdmsg/subs.c
sys/kern/kern_dmsg.c
sys/sys/dmsg.h

index add9a39..0d6472c 100644 (file)
@@ -238,12 +238,13 @@ dmsg_msg_str(dmsg_msg_t *msg)
         * Generate the buf
         */
        snprintf(buf, sizeof(buf),
-               "msg=%s%s %s msgid=%08x %s",
+               "msg=%s%s %s %s hcrc=%08x id=%016jx",
                 dmsg_basecmd_str(msg->any.head.cmd),
                 flagbuf,
                 errstr,
-                (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
-                statestr);
+                statestr,
+                msg->any.head.hdr_crc,
+                msg->any.head.msgid);
 
        return(buf);
 }
index d6c6ec0..c1b4316 100644 (file)
@@ -157,6 +157,7 @@ typedef struct dmsg_media dmsg_media_t;
  */
 struct dmsg_state {
        RB_ENTRY(dmsg_state) rbnode;            /* by state->msgid */
+       struct dmsg_state       *scan;          /* scan check */
        TAILQ_HEAD(, dmsg_state) subq;          /* active stacked states */
        TAILQ_ENTRY(dmsg_state) entry;          /* on parent subq */
        struct dmsg_iocom *iocom;
@@ -181,12 +182,13 @@ struct dmsg_state {
 
 #define DMSG_STATE_SUBINSERTED 0x0001
 #define DMSG_STATE_DYNAMIC     0x0002
-#define DMSG_STATE_NODEID      0x0004          /* manages a node id */
-#define DMSG_STATE_UNUSED_0008 0x0008
+#define DMSG_STATE_UNUSED0004  0x0004
+#define DMSG_STATE_ABORTING    0x0008
 #define DMSG_STATE_OPPOSITE    0x0010          /* initiated by other end */
 #define DMSG_STATE_CIRCUIT     0x0020          /* LNK_SPAN special case */
 #define DMSG_STATE_DYING       0x0040          /* indicates circuit failure */
 #define DMSG_STATE_RBINSERTED  0x0080
+#define DMSG_STATE_NEW         0x0400          /* defer abort processing */
 #define DMSG_STATE_ROOT                0x8000          /* iocom->state0 */
 
 /*
@@ -195,6 +197,8 @@ struct dmsg_state {
  * will point to &iocom->state0 for non-transactional messages.
  *
  * Message headers are embedded while auxillary data is separately allocated.
+ * The 'any' portion of the message is allocated dynamically based on
+ * hdr_size.
  */
 struct dmsg_msg {
        TAILQ_ENTRY(dmsg_msg) qentry;
@@ -279,8 +283,6 @@ struct dmsg_iocom {
        char            *label;                 /* label for error reporting */
        dmsg_ioq_t      ioq_rx;
        dmsg_ioq_t      ioq_tx;
-       dmsg_msg_queue_t freeq;                 /* free msgs hdr only */
-       dmsg_msg_queue_t freeq_aux;             /* free msgs w/aux_data */
        dmsg_state_t    state0;                 /* root state for stacking */
        struct dmsg_state_tree  staterd_tree;   /* active transactions */
        struct dmsg_state_tree  statewr_tree;   /* active transactions */
index e6f6f59..c46d392 100644 (file)
 
 #include "dmsg_local.h"
 
+#define DMSG_BLOCK_DEBUG
+
 int DMsgDebugOpt;
 int dmsg_state_count;
 #ifdef DMSG_BLOCK_DEBUG
 static int biocount;
 #endif
 
-static int dmsg_state_msgrx(dmsg_msg_t *msg);
+static int dmsg_state_msgrx(dmsg_msg_t *msg, int mstate);
 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
 static void dmsg_state_free(dmsg_state_t *state);
-static void dmsg_simulate_failure(dmsg_state_t *state, int error);
+static void dmsg_subq_delete(dmsg_state_t *state);
+static void dmsg_simulate_failure(dmsg_state_t *state, int meto, int error);
+static void dmsg_state_abort(dmsg_state_t *state);
+static void dmsg_state_dying(dmsg_state_t *state);
 
 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
 
@@ -121,8 +126,6 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
        pthread_mutex_init(&iocom->mtx, NULL);
        RB_INIT(&iocom->staterd_tree);
        RB_INIT(&iocom->statewr_tree);
-       TAILQ_INIT(&iocom->freeq);
-       TAILQ_INIT(&iocom->freeq_aux);
        TAILQ_INIT(&iocom->txmsgq);
        iocom->sock_fd = sock_fd;
        iocom->alt_fd = alt_fd;
@@ -218,8 +221,6 @@ dmsg_iocom_signal(dmsg_iocom_t *iocom)
 void
 dmsg_iocom_done(dmsg_iocom_t *iocom)
 {
-       dmsg_msg_t *msg;
-
        if (iocom->sock_fd >= 0) {
                close(iocom->sock_fd);
                iocom->sock_fd = -1;
@@ -230,16 +231,6 @@ dmsg_iocom_done(dmsg_iocom_t *iocom)
        }
        dmsg_ioq_done(iocom, &iocom->ioq_rx);
        dmsg_ioq_done(iocom, &iocom->ioq_tx);
-       while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
-               TAILQ_REMOVE(&iocom->freeq, msg, qentry);
-               free(msg);
-       }
-       while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
-               TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
-               free(msg->aux_data);
-               msg->aux_data = NULL;
-               free(msg);
-       }
        if (iocom->wakeupfds[0] >= 0) {
                close(iocom->wakeupfds[0]);
                iocom->wakeupfds[0] = -1;
@@ -286,35 +277,27 @@ dmsg_msg_alloc_locked(dmsg_state_t *state,
        int hbytes;
        size_t aligned_size;
 
-#if 0
-       if (aux_size) {
-               aligned_size = DMSG_DOALIGN(aux_size);
-               if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
-                       TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
-       } else {
-               aligned_size = 0;
-               if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
-                       TAILQ_REMOVE(&iocom->freeq, msg, qentry);
-       }
-#endif
        aligned_size = DMSG_DOALIGN(aux_size);
-       msg = NULL;
        if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
                /*
                 * When CREATE is set without REPLY the caller is
                 * initiating a new transaction stacked under the specified
                 * circuit.
                 *
+                * It is possible to race a circuit failure, inherit the
+                * parent's STATE_DYING flag to trigger an abort sequence
+                * in the transmit path.  By not inheriting ABORTING the
+                * abort sequence can recurse.
+                *
                 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
                 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
                 */
                pstate = state;
                state = malloc(sizeof(*state));
-               atomic_add_int(&dmsg_state_count, 1);
                bzero(state, sizeof(*state));
+               atomic_add_int(&dmsg_state_count, 1);
+
                TAILQ_INIT(&state->subq);
-               dmsg_state_hold(pstate);
-               state->refs = 1;
                state->parent = pstate;
                state->iocom = iocom;
                state->flags = DMSG_STATE_DYNAMIC;
@@ -325,42 +308,30 @@ dmsg_msg_alloc_locked(dmsg_state_t *state,
                state->func = func;
                state->any.any = data;
 
-               RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
-               TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
                state->flags |= DMSG_STATE_SUBINSERTED |
                                DMSG_STATE_RBINSERTED;
-
-               if (DMsgDebugOpt) {
-                       fprintf(stderr,
-                               "create state %p id=%08x on iocom statewr %p\n",
-                               state, (uint32_t)state->msgid, iocom);
-               }
+               state->flags |= pstate->flags & DMSG_STATE_DYING;
+               if (TAILQ_EMPTY(&pstate->subq))
+                       dmsg_state_hold(pstate);
+               RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
+               TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
+               dmsg_state_hold(state);         /* state on pstate->subq */
+               dmsg_state_hold(state);         /* state on rbtree */
+               dmsg_state_hold(state);         /* msg->state */
        } else {
                /*
                 * Otherwise the message is transmitted over the existing
                 * open transaction.
                 */
                pstate = state->parent;
+               dmsg_state_hold(state);         /* msg->state */
        }
 
        /* XXX SMP race for state */
        hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
-       if (msg == NULL) {
-               msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
-               bzero(msg, offsetof(struct dmsg_msg, any.head));
-               *(int *)((char *)msg +
-                        offsetof(struct dmsg_msg, any.head) + hbytes) =
-                                0x71B2C3D4;
-               if (DMsgDebugOpt) {
-                       fprintf(stderr,
-                               "allo msg %p id=%08x on iocom %p\n",
-                               msg, (int)msg->any.head.msgid, iocom);
-               }
-#if 0
-               msg = malloc(sizeof(*msg));
-               bzero(msg, sizeof(*msg));
-#endif
-       }
+       assert((size_t)hbytes >= sizeof(struct dmsg_hdr));
+       msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes);
+       bzero(msg, offsetof(struct dmsg_msg, any.head));
 
        /*
         * [re]allocate the auxillary data buffer.  The caller knows that
@@ -395,8 +366,7 @@ dmsg_msg_alloc_locked(dmsg_state_t *state,
        /*
         * Finish filling out the header.
         */
-       if (hbytes)
-               bzero(&msg->any.head, hbytes);
+       bzero(&msg->any.head, hbytes);
        msg->hdr_size = hbytes;
        msg->any.head.magic = DMSG_HDR_MAGIC;
        msg->any.head.cmd = cmd;
@@ -418,35 +388,18 @@ static
 void
 dmsg_msg_free_locked(dmsg_msg_t *msg)
 {
-       /*dmsg_iocom_t *iocom = msg->iocom;*/
+       dmsg_state_t *state;
 
-       if (DMsgDebugOpt) {
-               fprintf(stderr,
-                       "free msg %p id=%08x on (aux %p)\n",
-                       msg, (int)msg->any.head.msgid, msg->aux_data);
-       }
-#if 1
-       int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
-       if (*(int *)((char *)msg +
-                    offsetof(struct  dmsg_msg, any.head) + hbytes) !=
-            0x71B2C3D4) {
-               fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
-               assert(0);
+       if ((state = msg->state) != NULL) {
+               dmsg_state_drop(state);
+               msg->state = NULL;      /* safety */
        }
-#endif
-       msg->state = NULL;      /* safety */
        if (msg->aux_data) {
                free(msg->aux_data);
-               msg->aux_data = NULL;
+               msg->aux_data = NULL;   /* safety */
        }
        msg->aux_size = 0;
        free (msg);
-#if 0
-       if (msg->aux_data)
-               TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
-       else
-               TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
-#endif
 }
 
 void
@@ -598,7 +551,9 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                                                dmsg_msg_str(msg));
                                }
                                iocom->rcvmsg_callback(msg);
+                               pthread_mutex_lock(&iocom->mtx);
                                dmsg_state_cleanuprx(iocom, msg);
+                               pthread_mutex_unlock(&iocom->mtx);
                        }
                }
 
@@ -828,6 +783,7 @@ again:
                 */
                assert(msg != NULL);
                if (bytes < ioq->hbytes) {
+                       assert(nmax > 0);
                        n = read(iocom->sock_fd,
                                 ioq->buf + ioq->fifo_end,
                                 nmax);
@@ -964,6 +920,7 @@ again:
                 * Read and decrypt more of the payload.
                 */
                if (msg->aux_size < ioq->abytes) {
+                       assert(nmax > 0);
                        assert(bytes == 0);
                        n = read(iocom->sock_fd,
                                 ioq->buf + ioq->fifo_end,
@@ -1113,7 +1070,7 @@ skip:
                 */
                pthread_mutex_lock(&iocom->mtx);
                dmsg_iocom_drain(iocom);
-               dmsg_simulate_failure(&iocom->state0, ioq->error);
+               dmsg_simulate_failure(&iocom->state0, 0, ioq->error);
                pthread_mutex_unlock(&iocom->mtx);
                if (TAILQ_FIRST(&ioq->msgq))
                        goto again;
@@ -1174,13 +1131,12 @@ skip:
                 */
                if (DMsgDebugOpt >= 5) {
                        fprintf(stderr,
-                               "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
+                               "rxmsg cmd=%08x circ=%016jx\n",
                                msg->any.head.cmd,
-                               (intmax_t)msg->any.head.msgid,
                                (intmax_t)msg->any.head.circuit);
                }
 
-               error = dmsg_state_msgrx(msg);
+               error = dmsg_state_msgrx(msg, 0);
 
                if (error) {
                        /*
@@ -1236,6 +1192,10 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
        }
        pthread_mutex_unlock(&iocom->mtx);
 
+       /*
+        * Flush queue, doing all required encryption and CRC generation,
+        * with the mutex unlocked.
+        */
        while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
                /*
                 * Process terminal connection errors.
@@ -1255,8 +1215,11 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
                msg->any.head.magic = DMSG_HDR_MAGIC;
                msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
                ++ioq->seq;
-               if ((ioq->seq & 32767) == 0)
+               if ((ioq->seq & 32767) == 0) {
+                       pthread_mutex_lock(&iocom->mtx);
                        srandomdev();
+                       pthread_mutex_unlock(&iocom->mtx);
+               }
 
                /*
                 * Calculate aux_crc if 0, then calculate hdr_crc.
@@ -1473,9 +1436,8 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
 
 #if 0
                fprintf(stderr,
-                       "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
+                       "txmsg cmd=%08x circ=%016jx\n",
                        msg->any.head.cmd,
-                       (intmax_t)msg->any.head.msgid,
                        (intmax_t)msg->any.head.circuit);
 #endif
 
@@ -1597,6 +1559,16 @@ dmsg_msg_write(dmsg_msg_t *msg)
        pthread_mutex_lock(&iocom->mtx);
        state = msg->state;
 
+       if (DMsgDebugOpt) {
+               fprintf(stderr,
+                       "msgtx: cmd=%08x msgid=%016jx "
+                       "state %p(%08x) error=%d\n",
+                       msg->any.head.cmd, msg->any.head.msgid,
+                       state, (state ? state->icmd : 0),
+                       msg->any.head.error);
+       }
+
+
 #if 0
        /*
         * Make sure the parent transaction is still open in the transmit
@@ -1613,7 +1585,6 @@ dmsg_msg_write(dmsg_msg_t *msg)
                return;
        }
 #endif
-
        /*
         * Process state data into the message as needed, then update the
         * state based on the message.
@@ -1634,6 +1605,7 @@ dmsg_msg_write(dmsg_msg_t *msg)
                    DMSGF_CREATE) {
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                        state->icmd = state->txcmd & DMSGF_BASECMDMASK;
+                       state->flags &= ~DMSG_STATE_NEW;
                }
                msg->any.head.msgid = state->msgid;
 
@@ -1641,24 +1613,95 @@ dmsg_msg_write(dmsg_msg_t *msg)
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                }
        }
-       dmsg_state_cleanuptx(iocom, msg);
 
-#if 0
-       fprintf(stderr,
-               "MSGWRITE %016jx %08x\n",
-               msg->any.head.msgid, msg->any.head.cmd);
-#endif
+       /*
+        * Discard messages sent to transactions which are already dead.
+        */
+       if (state && (state->txcmd & DMSGF_DELETE)) {
+               printf("dmsg_msg_write: drop msg %08x to dead "
+                      "circuit state=%p\n",
+                      msg->any.head.cmd, state);
+               dmsg_msg_free(msg);
+               return;
+       }
 
        /*
-        * Queue it for output, wake up the I/O pthread.  Note that the
-        * I/O thread is responsible for generating the CRCs and encryption.
+        * Normally we queue the msg for output.  However, if the circuit is
+        * dead or dying we must simulate a failure in the return direction
+        * and throw the message away.  The other end is not expecting any
+        * further messages from us on this state.
+        *
+        * Note that the I/O thread is responsible for generating the CRCs
+        * and encryption.
         */
-       TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
-       dummy = 0;
-       write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
+       if (state->flags & DMSG_STATE_DYING) {
+#if 0
+       if ((state->parent->txcmd & DMSGF_DELETE) ||
+           (state->parent->flags & DMSG_STATE_DYING) ||
+           (state->flags & DMSG_STATE_DYING)) {
+#endif
+               /* 
+                * Illegal message, kill state and related sub-state.
+                * Cannot transmit if state is already dying.
+                */
+               printf("dmsg_msg_write: Write to dying circuit "
+                       "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
+                       state->parent->rxcmd,
+                       state->parent->txcmd,
+                       state->parent->flags);
+               dmsg_state_hold(state);
+               dmsg_state_cleanuptx(iocom, msg);
+               if ((state->flags & DMSG_STATE_ABORTING) == 0) {
+                       dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
+               }
+               dmsg_state_drop(state);
+               dmsg_msg_free(msg);
+       } else {
+               /*
+                * Queue the message, clean up transmit state prior to queueing
+                * to avoid SMP races.
+                */
+               if (DMsgDebugOpt)
+               printf("dmsg_msg_write: commit msg state=%p to txkmsgq\n", state);
+               dmsg_state_cleanuptx(iocom, msg);
+               TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
+               dummy = 0;
+               write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
+       }
        pthread_mutex_unlock(&iocom->mtx);
 }
 
+/*
+ * Remove state from its parent's subq.  This can wind up recursively
+ * dropping the parent upward.
+ *
+ * NOTE: iocom must be locked.
+ *
+ * NOTE: Once we drop the parent, our pstate pointer may become invalid.
+ */
+static
+void
+dmsg_subq_delete(dmsg_state_t *state)
+{
+       dmsg_state_t *pstate;
+
+       if (state->flags & DMSG_STATE_SUBINSERTED) {
+               pstate = state->parent;
+               assert(pstate);
+               if (pstate->scan == state)
+                       pstate->scan = NULL;
+               TAILQ_REMOVE(&pstate->subq, state, entry);
+               state->flags &= ~DMSG_STATE_SUBINSERTED;
+               state->parent = NULL;
+               if (TAILQ_EMPTY(&pstate->subq))
+                       dmsg_state_drop(pstate);/* pstate->subq */
+               pstate = NULL;                  /* safety */
+               dmsg_state_drop(state);         /* pstate->subq */
+       } else {
+               assert(state->parent == NULL);
+       }
+}
+
 /*
  * Simulate reception of a transaction DELETE message when the link goes
  * bad.  This routine must recurse through state->subq and generate messages
@@ -1668,90 +1711,108 @@ dmsg_msg_write(dmsg_msg_t *msg)
  */
 static
 void
-dmsg_simulate_failure(dmsg_state_t *state, int error)
+dmsg_simulate_failure(dmsg_state_t *state, int meto, int error)
 {
        dmsg_state_t *substate;
+
+       dmsg_state_hold(state);
+       if (meto)
+               dmsg_state_abort(state);
+
+       /*
+        * Recurse through sub-states.
+        */
+again:
+       TAILQ_FOREACH(substate, &state->subq, entry) {
+               if (substate->flags & DMSG_STATE_ABORTING)
+                       continue;
+               state->scan = substate;
+               dmsg_simulate_failure(substate, 1, error);
+               if (state->scan != substate)
+                       goto again;
+       }
+
+       dmsg_state_drop(state);
+}
+
+static
+void
+dmsg_state_abort(dmsg_state_t *state)
+{
        dmsg_iocom_t *iocom;
        dmsg_msg_t *msg;
 
-       while ((substate = TAILQ_FIRST(&state->subq)) != NULL) {
-               dmsg_simulate_failure(substate, error);
+       /*
+        * Set ABORTING and DYING, return if already set.  If the state was
+        * just allocated we defer the abort operation until the related
+        * message is processed.
+        */
+       if (state->flags & DMSG_STATE_ABORTING)
+               return;
+       state->flags |= DMSG_STATE_ABORTING;
+       dmsg_state_dying(state);
+       if (state->flags & DMSG_STATE_NEW) {
+               printf("dmsg_state_abort(0): state %p rxcmd %08x txcmd %08x "
+                      "flags %08x - in NEW state\n",
+                      state, state->rxcmd, state->txcmd, state->flags);
+               return;
        }
 
-       iocom = state->iocom;
-       if (state == &iocom->state0) {
-               /*
-                * No active local or remote transactions remain.
-                * Generate a final LNK_ERROR.  EOF will be flagged
-                * when the message is returned by dmsg_ioq_read().
-                */
-               msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
-                                           DMSG_LNK_ERROR,
+       /*
+        * Simulate parent state failure before child states.  Device
+        * drivers need to understand this and flag the situation but might
+        * have asynchronous operations in progress that they cannot stop.
+        * To make things easier, parent states will not actually disappear
+        * until the children are all gone.
+        */
+       if ((state->rxcmd & DMSGF_DELETE) == 0) {
+               fprintf(stderr, "SIMULATE ERROR\n");
+               msg = dmsg_msg_alloc_locked(state, 0, DMSG_LNK_ERROR,
                                            NULL, NULL);
-               msg->any.head.error = error;
-       } else if (state->flags & DMSG_STATE_OPPOSITE) {
-               /*
-                * Active remote transactions are still present.
-                * Simulate the other end sending us a DELETE.
-                */
-               if (state->flags & DMSG_STATE_SUBINSERTED) {
-                       TAILQ_REMOVE(&state->parent->subq, state, entry);
-                       state->flags &= ~DMSG_STATE_SUBINSERTED;
-               }
-               if ((state->rxcmd & DMSGF_DELETE) == 0) {
-                       fprintf(stderr, "SIMULATE ERROR1\n");
-                       msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
-                                            DMSG_LNK_ERROR,
-                                            NULL, NULL);
-                       /*state->txcmd |= DMSGF_DELETE;*/
-                       msg->state = state;
-                       msg->any.head.error = error;
-                       msg->any.head.msgid = state->msgid;
-                       msg->any.head.circuit = state->parent->msgid;
-                       msg->any.head.cmd |= DMSGF_ABORT |
-                                            DMSGF_DELETE;
-                       if ((state->parent->flags &
-                            DMSG_STATE_OPPOSITE) == 0) {
-                               msg->any.head.cmd |= DMSGF_REVCIRC;
-                       }
-               } else {
-                       msg = NULL;
-               }
-       } else {
-               /*
-                * Active local transactions are still present.
-                * Simulate the other end sending us a DELETE.
-                */
-               if (state->flags & DMSG_STATE_SUBINSERTED) {
-                       TAILQ_REMOVE(&state->parent->subq, state, entry);
-                       state->flags &= ~DMSG_STATE_SUBINSERTED;
-               }
-               if ((state->rxcmd & DMSGF_DELETE) == 0) {
-                       fprintf(stderr, "SIMULATE ERROR1\n");
-                       msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
-                                            DMSG_LNK_ERROR,
-                                            NULL, NULL);
-                       msg->state = state;
-                       msg->any.head.error = error;
-                       msg->any.head.msgid = state->msgid;
-                       msg->any.head.circuit = state->parent->msgid;
-                       msg->any.head.cmd |= DMSGF_ABORT |
-                                            DMSGF_DELETE |
-                                            DMSGF_REVTRANS |
-                                            DMSGF_REPLY;
-                       if ((state->parent->flags &
-                            DMSG_STATE_OPPOSITE) == 0) {
-                               msg->any.head.cmd |= DMSGF_REVCIRC;
-                       }
-                       if ((state->rxcmd & DMSGF_CREATE) == 0)
-                               msg->any.head.cmd |= DMSGF_CREATE;
-               } else {
-                       msg = NULL;
-               }
-       }
-       if (msg) {
+               if ((state->rxcmd & DMSGF_CREATE) == 0)
+                       msg->any.head.cmd |= DMSGF_CREATE;
+               msg->any.head.cmd |= DMSGF_DELETE |
+                                    (state->rxcmd & DMSGF_REPLY);
+               msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
+               msg->any.head.error = DMSG_ERR_LOSTLINK;
+               msg->any.head.cmd |= DMSGF_ABORT;
+
+               /*
+                * Issue callback synchronously even though this isn't
+                * the receiver thread.  We need to issue the callback
+                * before removing state from the subq in order to allow
+                * the callback to reply.
+                */
+               iocom = state->iocom;
+               dmsg_state_msgrx(msg, 1);
+               pthread_mutex_unlock(&iocom->mtx);
+               iocom->rcvmsg_callback(msg);
+               pthread_mutex_lock(&iocom->mtx);
+               dmsg_state_cleanuprx(iocom, msg);
+#if 0
                TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
                atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
+#endif
+       }
+}
+
+
+/*
+ * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing
+ * the transmission of any new messages on these states.  This is done
+ * atomically when parent state is terminating, whereas setting ABORTING is
+ * not atomic and can leak races.
+ */
+static
+void
+dmsg_state_dying(dmsg_state_t *state)
+{
+       dmsg_state_t *scan;
+
+       if ((state->flags & DMSG_STATE_DYING) == 0) {
+               state->flags |= DMSG_STATE_DYING;
+               TAILQ_FOREACH(scan, &state->subq, entry)
+                       dmsg_state_dying(scan);
        }
 }
 
@@ -2012,7 +2073,7 @@ dmsg_state_result(dmsg_state_t *state, uint32_t error)
  * will typically just contain status updates.
  */
 static int
-dmsg_state_msgrx(dmsg_msg_t *msg)
+dmsg_state_msgrx(dmsg_msg_t *msg, int mstate)
 {
        dmsg_iocom_t *iocom = msg->state->iocom;
        dmsg_state_t *state;
@@ -2022,12 +2083,29 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
 
        pthread_mutex_lock(&iocom->mtx);
 
+       if (DMsgDebugOpt) {
+               fprintf(stderr,
+                       "msgrx: cmd=%08x msgid=%016jx "
+                       "circuit=%016jx error=%d\n",
+                       msg->any.head.cmd,
+                       msg->any.head.msgid,
+                       msg->any.head.circuit,
+                       msg->any.head.error);
+       }
+
        /*
         * Lookup the circuit (pstate).  The circuit will be an open
         * transaction.  The REVCIRC bit in the message tells us which side
         * initiated it.
+        *
+        * If mstate is non-zero the state has already been incorporated
+        * into the message as part of a simulated abort.  Note that in this
+        * situation the parent state may have already been removed from
+        * the RBTREE.
         */
-       if (msg->any.head.circuit) {
+       if (mstate) {
+               pstate = msg->state->parent;
+       } else if (msg->any.head.circuit) {
                sdummy.msgid = msg->any.head.circuit;
 
                if (msg->any.head.cmd & DMSGF_REVCIRC) {
@@ -2039,36 +2117,74 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                                         &iocom->staterd_tree,
                                         &sdummy);
                }
+
+               /*
+                * If we cannot find the circuit throw the message away.
+                * The state will have already been taken care of by
+                * the simulated failure code.  This case can occur due
+                * to a failure propagating in one direction crossing a
+                * request on the failed circuit propagating in the other
+                * direction.
+                */
                if (pstate == NULL) {
                        fprintf(stderr,
                                "missing parent in stacked trans %s\n",
                                dmsg_msg_str(msg));
-                       error = DMSG_IOQ_ERROR_TRANS;
                        pthread_mutex_unlock(&iocom->mtx);
-                       assert(0);
+                       error = DMSG_IOQ_ERROR_EALREADY;
+
+                       return error;
                }
        } else {
                pstate = &iocom->state0;
        }
+       /* WARNING: pstate not (yet) refd */
 
        /*
         * Lookup the msgid.
         *
+        * If mstate is non-zero the state has already been incorporated
+        * into the message as part of a simulated abort.  Note that in this
+        * situation the state may have already been removed from the RBTREE.
+        *
         * If received msg is a command state is on staterd_tree.
         * If received msg is a reply state is on statewr_tree.
         * Otherwise there is no state (retain &iocom->state0)
         */
-       sdummy.msgid = msg->any.head.msgid;
-       if (msg->any.head.cmd & DMSGF_REVTRANS)
-               state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
-       else
-               state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
+       if (mstate) {
+               state = msg->state;
+       } else {
+               sdummy.msgid = msg->any.head.msgid;
+               if (msg->any.head.cmd & DMSGF_REVTRANS) {
+                       state = RB_FIND(dmsg_state_tree,
+                                       &iocom->statewr_tree, &sdummy);
+               } else {
+                       state = RB_FIND(dmsg_state_tree,
+                                       &iocom->staterd_tree, &sdummy);
+               }
+       }
+
+       if (DMsgDebugOpt) {
+               fprintf(stderr,
+                       "msgrx:\tstate %p(%08x)",
+                       state, (state ? state->icmd : 0));
+               if (pstate != &iocom->state0) {
+                       fprintf(stderr,
+                               " pstate %p(%08x)",
+                               pstate, pstate->icmd);
+               }
+               fprintf(stderr, "\n");
+       }
 
-       if (state) {
+       if (mstate) {
+               /* state already assigned to msg */
+       } else if (state) {
                /*
                 * Message over an existing transaction (CREATE should not
                 * be set).
                 */
+               dmsg_state_drop(msg->state);
+               dmsg_state_hold(state);
                msg->state = state;
                assert(pstate == state->parent);
        } else {
@@ -2078,8 +2194,6 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                state = pstate;
        }
 
-       pthread_mutex_unlock(&iocom->mtx);
-
        /*
         * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
         * inside the case statements.
@@ -2111,11 +2225,11 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                 * Allocate the new state.
                 */
                state = malloc(sizeof(*state));
-               atomic_add_int(&dmsg_state_count, 1);
                bzero(state, sizeof(*state));
+               atomic_add_int(&dmsg_state_count, 1);
+
                TAILQ_INIT(&state->subq);
                dmsg_state_hold(pstate);
-               state->refs = 1;
                state->parent = pstate;
                state->iocom = iocom;
                state->flags = DMSG_STATE_DYNAMIC |
@@ -2124,12 +2238,18 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                state->txcmd = DMSGF_REPLY;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
+               state->flags &= ~DMSG_STATE_NEW;
                msg->state = state;
-               pthread_mutex_lock(&iocom->mtx);
+
                RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
+               if (TAILQ_EMPTY(&pstate->subq))
+                       dmsg_state_hold(pstate);/* pstate->subq */
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
                state->flags |= DMSG_STATE_SUBINSERTED |
                                DMSG_STATE_RBINSERTED;
+               dmsg_state_hold(state);         /* pstate->subq */
+               dmsg_state_hold(state);         /* state on rbtree */
+               dmsg_state_hold(state);         /* msg->state */
 
                /*
                 * If the parent is a relay set up the state handler to
@@ -2140,14 +2260,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                 */
                if (pstate->relay)
                        state->func = dmsg_state_relay;
-               pthread_mutex_unlock(&iocom->mtx);
                error = 0;
-
-               if (DMsgDebugOpt) {
-                       fprintf(stderr,
-                               "create state %p id=%08x on iocom staterd %p\n",
-                               state, (uint32_t)state->msgid, iocom);
-               }
                break;
        case DMSGF_DELETE:
                /*
@@ -2278,7 +2391,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
         */
        if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
                if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
-                       msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
+                       msg->tcmd = (state->icmd & DMSGF_BASECMDMASK) |
                                    (msg->any.head.cmd & (DMSGF_CREATE |
                                                          DMSGF_DELETE |
                                                          DMSGF_REPLY));
@@ -2310,6 +2423,48 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
        }
 #endif
 
+       /*
+        * Adjust state, mark receive side as DELETED if appropriate and
+        * adjust RB tree if both sides are DELETED.  cleanuprx handles
+        * the rest after the state callback returns.
+        */
+       assert(msg->state->iocom == iocom);
+       assert(msg->state == state);
+
+       if (state->flags & DMSG_STATE_ROOT) {
+               /*
+                * Nothing to do for non-transactional messages.
+                */
+       } else if (msg->any.head.cmd & DMSGF_DELETE) {
+               /*
+                * Message terminating transaction, remove the state from
+                * the RB tree if the full transaction is now complete.
+                * The related state, subq, and parent link is retained
+                * until after the state callback is complete.
+                */
+               assert((state->rxcmd & DMSGF_DELETE) == 0);
+               state->rxcmd |= DMSGF_DELETE;
+               if (state->txcmd & DMSGF_DELETE) {
+                       assert(state->flags & DMSG_STATE_RBINSERTED);
+                       if (state->rxcmd & DMSGF_REPLY) {
+                               assert(msg->any.head.cmd & DMSGF_REPLY);
+                               RB_REMOVE(dmsg_state_tree,
+                                         &iocom->statewr_tree, state);
+                       } else {
+                               assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
+                               RB_REMOVE(dmsg_state_tree,
+                                         &iocom->staterd_tree, state);
+                       }
+                       state->flags &= ~DMSG_STATE_RBINSERTED;
+                       dmsg_state_drop(state);
+               }
+       }
+
+       pthread_mutex_unlock(&iocom->mtx);
+
+       if (DMsgDebugOpt && error)
+               fprintf(stderr, "msgrx: error %d\n", error);
+
        return (error);
 }
 
@@ -2327,6 +2482,12 @@ dmsg_state_relay(dmsg_msg_t *lmsg)
 
 #ifdef DMSG_BLOCK_DEBUG
        switch (lmsg->tcmd) {
+       case DMSG_BLK_OPEN | DMSGF_CREATE:
+               fprintf(stderr, "relay BIO_OPEN (CREATE)\n");
+               break;
+       case DMSG_BLK_OPEN | DMSGF_DELETE:
+               fprintf(stderr, "relay BIO_OPEN (DELETE)\n");
+               break;
        case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
        case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
                atomic_add_int(&biocount, 1);
@@ -2375,6 +2536,17 @@ dmsg_state_relay(dmsg_msg_t *lmsg)
                rstate = lstate->relay;
                assert(rstate != NULL);
 
+               assert((rstate->txcmd & DMSGF_DELETE) == 0);
+
+#if 0
+               if (lstate->flags & DMSG_STATE_ABORTING) {
+                       fprintf(stderr,
+                               "relay: relay lost link l=%p r=%p\n",
+                               lstate, rstate);
+                       dmsg_simulate_failure(rstate, 0, DMSG_ERR_LOSTLINK);
+               }
+#endif
+
                rmsg = dmsg_msg_alloc(rstate, 0,
                                      lmsg->any.head.cmd,
                                      dmsg_state_relay, NULL);
@@ -2389,6 +2561,7 @@ dmsg_state_relay(dmsg_msg_t *lmsg)
        rmsg->aux_size = lmsg->aux_size;
        rmsg->aux_data = lmsg->aux_data;
        lmsg->aux_data = NULL;
+
        /*
        fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
        */
@@ -2396,13 +2569,16 @@ dmsg_state_relay(dmsg_msg_t *lmsg)
 }
 
 /*
- * Cleanup and retire msg after processing
+ * Cleanup and retire msg after issuing the state callback.  The state
+ * has already been removed from the RB tree.  The subq and msg must be
+ * cleaned up.
+ *
+ * Called with the iocom mutex held (to handle subq disconnection).
  */
 void
 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 {
        dmsg_state_t *state;
-       dmsg_state_t *pstate;
 
        assert(msg->state->iocom == iocom);
        state = msg->state;
@@ -2412,52 +2588,18 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                 * to worry about.
                 */
                dmsg_msg_free(msg);
-       } else if (msg->any.head.cmd & DMSGF_DELETE) {
+       } else if ((state->flags & DMSG_STATE_SUBINSERTED) &&
+                  (state->rxcmd & DMSGF_DELETE) &&
+                  (state->txcmd & DMSGF_DELETE)) {
                /*
-                * Message terminating transaction, destroy the related
-                * state, the original message, and this message (if it
-                * isn't the original message due to a CREATE|DELETE).
-                *
-                * It's possible for governing state to terminate while
-                * sub-transactions still exist.  This is allowed but
-                * will cause sub-transactions to recursively fail.
-                * Further reception of sub-transaction messages will be
-                * impossible because the circuit will no longer exist.
-                * (XXX need code to make sure that happens properly).
+                * Must disconnect from parent and drop relay.
                 */
-               pthread_mutex_lock(&iocom->mtx);
-               state->rxcmd |= DMSGF_DELETE;
-
-               if (state->txcmd & DMSGF_DELETE) {
-                       assert(state->flags & DMSG_STATE_RBINSERTED);
-                       if (state->rxcmd & DMSGF_REPLY) {
-                               assert(msg->any.head.cmd & DMSGF_REPLY);
-                               RB_REMOVE(dmsg_state_tree,
-                                         &iocom->statewr_tree, state);
-                       } else {
-                               assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
-                               RB_REMOVE(dmsg_state_tree,
-                                         &iocom->staterd_tree, state);
-                       }
-                       state->flags &= ~DMSG_STATE_RBINSERTED;
-                       if (state->flags & DMSG_STATE_SUBINSERTED) {
-                               pstate = state->parent;
-                               TAILQ_REMOVE(&pstate->subq, state, entry);
-                               state->flags &= ~DMSG_STATE_SUBINSERTED;
-                               dmsg_state_drop(pstate);
-                       }
-                       state->parent = NULL;
-
-                       if (state->relay) {
-                               dmsg_state_drop(state->relay);
-                               state->relay = NULL;
-                       }
-                       dmsg_msg_free(msg);
-                       dmsg_state_drop(state);
-               } else {
-                       dmsg_msg_free(msg);
+               dmsg_subq_delete(state);
+               if (state->relay) {
+                       dmsg_state_drop(state->relay);
+                       state->relay = NULL;
                }
-               pthread_mutex_unlock(&iocom->mtx);
+               dmsg_msg_free(msg);
        } else {
                /*
                 * Message not terminating transaction, leave state intact
@@ -2470,15 +2612,19 @@ dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 /*
  * Clean up the state after pulling out needed fields and queueing the
  * message for transmission.   This occurs in dmsg_msg_write().
+ *
+ * Called with the mutex locked.
  */
 static void
 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
 {
        dmsg_state_t *state;
-       dmsg_state_t *pstate;
 
        assert(iocom == msg->state->iocom);
        state = msg->state;
+
+       dmsg_state_hold(state);
+
        if (state->flags & DMSG_STATE_ROOT) {
                ;
        } else if (msg->any.head.cmd & DMSGF_DELETE) {
@@ -2493,11 +2639,15 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                 * Further reception of sub-transaction messages will be
                 * impossible because the circuit will no longer exist.
                 * (XXX need code to make sure that happens properly).
-                */
-               pthread_mutex_lock(&iocom->mtx);
-               assert((state->txcmd & DMSGF_DELETE) == 0);
-               state->txcmd |= DMSGF_DELETE;
-               if (state->rxcmd & DMSGF_DELETE) {
+                *
+                * NOTE: It is possible for a fafilure to terminate the
+                *       state after we have written the message but before
+                *       we are able to call cleanuptx, so txcmd might already
+                *       have DMSGF_DELETE set.
+                */
+               if ((state->txcmd & DMSGF_DELETE) == 0 &&
+                   (state->rxcmd & DMSGF_DELETE)) {
+                       state->txcmd |= DMSGF_DELETE;
                        assert(state->flags & DMSG_STATE_RBINSERTED);
                        if (state->txcmd & DMSGF_REPLY) {
                                assert(msg->any.head.cmd & DMSGF_REPLY);
@@ -2509,22 +2659,31 @@ dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
                                          &iocom->statewr_tree, state);
                        }
                        state->flags &= ~DMSG_STATE_RBINSERTED;
-                       pstate = state->parent;
-                       if (state->flags & DMSG_STATE_SUBINSERTED) {
-                               TAILQ_REMOVE(&pstate->subq, state, entry);
-                               state->flags &= ~DMSG_STATE_SUBINSERTED;
-                       }
-                       state->parent = NULL;
-                       dmsg_state_drop(pstate);
+                       dmsg_subq_delete(state);
 
                        if (state->relay) {
                                dmsg_state_drop(state->relay);
                                state->relay = NULL;
                        }
-                       dmsg_state_drop(state); /* usually the last drop */
+                       dmsg_state_drop(state); /* state->rbtree */
+               } else if ((state->txcmd & DMSGF_DELETE) == 0) {
+                       state->txcmd |= DMSGF_DELETE;
                }
-               pthread_mutex_unlock(&iocom->mtx);
        }
+
+       /*
+        * Deferred abort after transmission.
+        */
+       if ((state->flags & (DMSG_STATE_ABORTING | DMSG_STATE_DYING)) &&
+           (state->rxcmd & DMSGF_DELETE) == 0) {
+               printf("kdmsg_state_cleanuptx: state=%p "
+                       "executing deferred abort\n",
+                       state);
+               state->flags &= ~DMSG_STATE_ABORTING;
+               dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
+       }
+
+       dmsg_state_drop(state);
 }
 
 /*
@@ -2539,6 +2698,7 @@ dmsg_state_hold(dmsg_state_t *state)
 void
 dmsg_state_drop(dmsg_state_t *state)
 {
+       assert(state->refs > 0);
        if (atomic_fetchadd_int(&state->refs, -1) == 1)
                dmsg_state_free(state);
 }
@@ -2551,8 +2711,7 @@ dmsg_state_free(dmsg_state_t *state)
 {
        atomic_add_int(&dmsg_state_count, -1);
        if (DMsgDebugOpt) {
-               fprintf(stderr, "terminate state %p id=%08x\n",
-                       state, (uint32_t)state->msgid);
+               fprintf(stderr, "terminate state %p\n", state);
        }
        assert((state->flags & (DMSG_STATE_ROOT |
                                DMSG_STATE_SUBINSERTED |
index c55009f..52ebf81 100644 (file)
@@ -418,6 +418,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
                RB_INIT(&conn->tree);
                state->iocom->conn = conn;      /* XXX only one */
                state->iocom->conn_msgid = state->msgid;
+               dmsg_state_hold(state);
                conn->state = state;
                state->func = dmsg_lnk_conn;
                state->any.conn = conn;
@@ -493,6 +494,7 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
                dmsg_free(conn);
 
                dmsg_msg_reply(msg, 0);
+               dmsg_state_drop(state);
                /* state invalid after reply */
                break;
        default:
@@ -610,6 +612,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                 *       allows such a feature to be added in the future.
                 */
                assert(state->any.link == NULL);
+               dmsg_state_hold(state);
                slink = dmsg_alloc(sizeof(*slink));
                TAILQ_INIT(&slink->relayq);
                slink->node = node;
@@ -645,6 +648,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
         */
        if (msg->any.head.cmd & DMSGF_DELETE) {
                slink = state->any.link;
+               assert(slink->state == state);
                assert(slink != NULL);
                node = slink->node;
                cls = node->cls;
@@ -683,6 +687,7 @@ dmsg_lnk_span(dmsg_msg_t *msg)
                state->any.link = NULL;
                slink->state = NULL;
                slink->node = NULL;
+               dmsg_state_drop(state);
                dmsg_free(slink);
 
                /*
@@ -1026,6 +1031,7 @@ dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
        h2span_relay_t *relay;
        dmsg_msg_t *msg;
 
+       dmsg_state_hold(slink->state);
        relay = dmsg_alloc(sizeof(*relay));
        relay->conn = conn;
        relay->source_rt = slink->state;
@@ -1039,6 +1045,7 @@ dmsg_generate_relay(h2span_conn_t *conn, h2span_link_t *slink)
        msg = dmsg_msg_alloc(&conn->state->iocom->state0,
                             0, DMSG_LNK_SPAN | DMSGF_CREATE,
                             dmsg_lnk_relay, relay);
+       dmsg_state_hold(msg->state);
        relay->target_rt = msg->state;
 
        msg->any.lnk_span = slink->lnk_span;
@@ -1110,6 +1117,7 @@ dmsg_relay_delete(h2span_relay_t *relay)
        if (relay->target_rt) {
                relay->target_rt->any.relay = NULL;
                dmsg_state_reply(relay->target_rt, 0);
+               dmsg_state_drop(relay->target_rt);
                /* state invalid after reply */
                relay->target_rt = NULL;
        }
@@ -1119,7 +1127,10 @@ dmsg_relay_delete(h2span_relay_t *relay)
         *       state, not by this relay structure.
         */
        relay->conn = NULL;
-       relay->source_rt = NULL;
+       if (relay->source_rt) {
+               dmsg_state_drop(relay->source_rt);
+               relay->source_rt = NULL;
+       }
        dmsg_free(relay);
 }
 
index 2466044..2e6b509 100644 (file)
@@ -125,8 +125,10 @@ dmsg_connect(const char *hostname)
        }
        if (connect(fd, (struct sockaddr *)&lsin, sizeof(lsin)) < 0) {
                close(fd);
-               fprintf(stderr, "debug: connect failed: %s\n",
-                       strerror(errno));
+               if (DMsgDebugOpt > 2) {
+                       fprintf(stderr, "debug: Connect failed: %s\n",
+                               strerror(errno));
+               }
                return -1;
        }
        return (fd);
index ac72513..c407629 100644 (file)
@@ -62,20 +62,22 @@ static int kdmsg_state_msgrx(kdmsg_msg_t *msg);
 static int kdmsg_state_msgtx(kdmsg_msg_t *msg);
 static void kdmsg_state_cleanuprx(kdmsg_msg_t *msg);
 static void kdmsg_state_cleanuptx(kdmsg_msg_t *msg);
+static void kdmsg_subq_delete(kdmsg_state_t *state);
 static void kdmsg_simulate_failure(kdmsg_state_t *state, int meto, int error);
 static void kdmsg_state_abort(kdmsg_state_t *state);
+static void kdmsg_state_dying(kdmsg_state_t *state);
 static void kdmsg_state_free(kdmsg_state_t *state);
 
 #ifdef KDMSG_DEBUG
 #define KDMSG_DEBUG_ARGS       , const char *file, int line
-#define kdmsg_state_ref(state) _kdmsg_state_ref(state, __FILE__, __LINE__)
+#define kdmsg_state_hold(state)        _kdmsg_state_hold(state, __FILE__, __LINE__)
 #define kdmsg_state_drop(state)        _kdmsg_state_drop(state, __FILE__, __LINE__)
 #else
 #define KDMSG_DEBUG_ARGS
-#define kdmsg_state_ref(state) _kdmsg_state_ref(state)
+#define kdmsg_state_hold(state)        _kdmsg_state_hold(state)
 #define kdmsg_state_drop(state)        _kdmsg_state_drop(state)
 #endif
-static void _kdmsg_state_ref(kdmsg_state_t *state KDMSG_DEBUG_ARGS);
+static void _kdmsg_state_hold(kdmsg_state_t *state KDMSG_DEBUG_ARGS);
 static void _kdmsg_state_drop(kdmsg_state_t *state KDMSG_DEBUG_ARGS);
 
 static void kdmsg_iocom_thread_rd(void *arg);
@@ -170,7 +172,7 @@ kdmsg_iocom_autoinitiate(kdmsg_iocom_t *iocom,
        iocom->auto_lnk_conn.head = msg->any.head;
        msg->any.lnk_conn = iocom->auto_lnk_conn;
        iocom->conn_state = msg->state;
-       kdmsg_state_ref(msg->state);    /* iocom->conn_state */
+       kdmsg_state_hold(msg->state);   /* iocom->conn_state */
        kdmsg_msg_write(msg);
 }
 
@@ -506,7 +508,7 @@ kdmsg_iocom_thread_wr(void *arg)
               RB_ROOT(&iocom->staterd_tree) ||
               RB_ROOT(&iocom->statewr_tree)) {
                /*
-                *
+                * Simulate failure for all sub-states of state0.
                 */
                kdmsg_drain_msgq(iocom);
                kprintf("simulate failure for all substates of state0\n");
@@ -603,23 +605,6 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
        kdmsg_iocom_t *iocom = msg->state->iocom;
        int error;
 
-#if 0
-       /*
-        * If sub-states exist and we are deleting (typically due to a
-        * disconnect), we may not receive deletes for any of the substates
-        * and must simulate associated failures.
-        */
-       if (msg->state &&
-           (msg->any.head.cmd & DMSGF_DELETE) &&
-           TAILQ_FIRST(&msg->state->subq)) {
-               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-               kprintf("simulate failure for substates of cmd %08x/%08x\n",
-                       msg->state->rxcmd, msg->state->txcmd);
-               kdmsg_simulate_failure(msg->state, 0, DMSG_ERR_LOSTLINK);
-               lockmgr(&iocom->msglk, LK_RELEASE);
-       }
-#endif
-
        /*
         * State machine tracking, state assignment for msg,
         * returns error and discard status.  Errors are fatal
@@ -627,10 +612,16 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
         * a discard without execution.
         */
        error = kdmsg_state_msgrx(msg);
+       if (msg->state->flags & KDMSG_STATE_ABORTING) {
+               kprintf("kdmsg_state_abort(b): state %p rxcmd=%08x txcmd=%08x msgrx error %d\n",
+                       msg->state, msg->state->rxcmd, msg->state->txcmd, error);
+       }
        if (error) {
                /*
                 * Raw protocol or connection error
                 */
+               if (msg->state->flags & KDMSG_STATE_ABORTING)
+                       kprintf("X1 state %p error %d\n", msg->state, error);
                kdmsg_msg_free(msg);
                if (error == EALREADY)
                        error = 0;
@@ -639,12 +630,18 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
                 * Message related to state which already has a
                 * handling function installed for it.
                 */
+               if (msg->state->flags & KDMSG_STATE_ABORTING)
+                       kprintf("X2 state %p func %p\n", msg->state, msg->state->func);
                error = msg->state->func(msg->state, msg);
                kdmsg_state_cleanuprx(msg);
        } else if (iocom->flags & KDMSG_IOCOMF_AUTOANY) {
+               if (msg->state->flags & KDMSG_STATE_ABORTING)
+                       kprintf("X3 state %p\n", msg->state);
                error = kdmsg_autorxmsg(msg);
                kdmsg_state_cleanuprx(msg);
        } else {
+               if (msg->state->flags & KDMSG_STATE_ABORTING)
+                       kprintf("X4 state %p\n", msg->state);
                error = iocom->rcvmsg(msg);
                kdmsg_state_cleanuprx(msg);
        }
@@ -652,35 +649,21 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
 }
 
 /*
- * Process state tracking for a message after reception, prior to
- * execution.
+ * Process state tracking for a message after reception and dequeueing,
+ * prior to execution of the state callback.  The state is updated and
+ * will be removed from the RBTREE if completely closed, but the state->parent
+ * and subq linkage is not cleaned up until after the callback (see
+ * cleanuprx()).
  *
- * Called with msglk held and the msg dequeued.
+ * msglk is not held.
  *
- * All messages are called with dummy state and return actual state.
- * (One-off messages often just return the same dummy state).
+ * NOTE: A message transaction can consist of several messages in either
+ *      direction.
  *
- * May request that caller discard the message by setting *discardp to 1.
- * The returned state is not used in this case and is allowed to be NULL.
- *
- * --
- *
- * These routines handle persistent and command/reply message state via the
- * CREATE and DELETE flags.  The first message in a command or reply sequence
- * sets CREATE, the last message in a command or reply sequence sets DELETE.
- *
- * There can be any number of intermediate messages belonging to the same
- * sequence sent inbetween the CREATE message and the DELETE message,
- * which set neither flag.  This represents a streaming command or reply.
- *
- * Any command message received with CREATE set expects a reply sequence to
- * be returned.  Reply sequences work the same as command sequences except the
- * REPLY bit is also sent.  Both the command side and reply side can
- * degenerate into a single message with both CREATE and DELETE set.  Note
- * that one side can be streaming and the other side not, or neither, or both.
- *
- * The msgid is unique for the initiator.  That is, two sides sending a new
- * message can use the same msgid without colliding.
+ * NOTE: The msgid is unique to the initiator, not necessarily unique for
+ *      us or for any relay or for the return direction for that matter.
+ *      That is, two sides sending a new message can use the same msgid
+ *      without colliding.
  *
  * --
  *
@@ -695,7 +678,7 @@ kdmsg_msg_receive_handling(kdmsg_msg_t *msg)
  * also race, and in this situation the other side might have already
  * initiated a new unrelated command with the same message id.  Since
  * the abort has not set the CREATE flag the situation can be detected
- * and the message will also be discarded.
 * and the message will also be discarded.
  *
  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
  * The ABORT request is essentially integrated into the command instead
@@ -775,7 +758,7 @@ again:
                        lksleep(state, &iocom->msglk, 0, "dmrace", hz);
                        goto again;
                }
-               kdmsg_state_ref(state);
+               kdmsg_state_hold(state);
                kdmsg_state_drop(msg->state);   /* iocom->state0 */
                msg->state = state;
        } else {
@@ -850,13 +833,15 @@ again:
                state->flags |= KDMSG_STATE_RBINSERTED |
                                KDMSG_STATE_SUBINSERTED |
                                KDMSG_STATE_OPPOSITE;
-               kdmsg_state_ref(pstate);        /* states on pstate->subq */
-               kdmsg_state_ref(state);         /* state on pstate->subq */
-               kdmsg_state_ref(state);         /* state on rbtree */
+               if (TAILQ_EMPTY(&pstate->subq))
+                       kdmsg_state_hold(pstate);/* states on pstate->subq */
+               kdmsg_state_hold(state);        /* state on pstate->subq */
+               kdmsg_state_hold(state);        /* state on rbtree */
                state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK;
                state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->txcmd = DMSGF_REPLY;
                state->msgid = msg->any.head.msgid;
+               state->flags &= ~KDMSG_STATE_NEW;
                RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state);
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
                error = 0;
@@ -868,6 +853,8 @@ again:
                 */
                if (state == &iocom->state0) {
                        if (msg->any.head.cmd & DMSGF_ABORT) {
+                               kprintf("kdmsg_state_msgrx: "
+                                       "state already A\n");
                                error = EALREADY;
                        } else {
                                kprintf("kdmsg_state_msgrx: "
@@ -883,6 +870,8 @@ again:
                 */
                if ((state->rxcmd & DMSGF_CREATE) == 0) {
                        if (msg->any.head.cmd & DMSGF_ABORT) {
+                               kprintf("kdmsg_state_msgrx: "
+                                       "state already B\n");
                                error = EALREADY;
                        } else {
                                kprintf("kdmsg_state_msgrx: "
@@ -983,8 +972,6 @@ again:
         * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
         */
 done:
-       lockmgr(&iocom->msglk, LK_RELEASE);
-
        if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
                if (state != &iocom->state0) {
                        msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
@@ -997,6 +984,40 @@ done:
        } else {
                msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
        }
+
+       /*
+        * Adjust the state for DELETE handling now, before making the
+        * callback so we are atomic with other state updates.
+        *
+        * Subq/parent linkages are cleaned up after the callback.
+        * If an error occurred the message is ignored and state is not
+        * updated.
+        */
+       if ((state = msg->state) == NULL || error != 0) {
+               kprintf("kdmsg_state_msgrx: state=%p error %d\n", state, error);
+               ;
+       } else if (msg->any.head.cmd & DMSGF_DELETE) {
+               KKASSERT((state->rxcmd & DMSGF_DELETE) == 0);
+               state->rxcmd |= DMSGF_DELETE;
+               if (state->txcmd & DMSGF_DELETE) {
+                       KKASSERT(state->flags & KDMSG_STATE_RBINSERTED);
+                       if (state->rxcmd & DMSGF_REPLY) {
+                               KKASSERT(msg->any.head.cmd &
+                                        DMSGF_REPLY);
+                               RB_REMOVE(kdmsg_state_tree,
+                                         &iocom->statewr_tree, state);
+                       } else {
+                               KKASSERT((msg->any.head.cmd &
+                                         DMSGF_REPLY) == 0);
+                               RB_REMOVE(kdmsg_state_tree,
+                                         &iocom->staterd_tree, state);
+                       }
+                       state->flags &= ~KDMSG_STATE_RBINSERTED;
+                       kdmsg_state_drop(state);        /* state on rbtree */
+               }
+       }
+       lockmgr(&iocom->msglk, LK_RELEASE);
+
        return (error);
 }
 
@@ -1117,55 +1138,80 @@ kdmsg_autorxmsg(kdmsg_msg_t *msg)
 /*
  * Post-receive-handling message and state cleanup.  This routine is called
  * after the state function handling/callback to properly dispose of the
- * message and update or dispose of the state.
+ * message and unlink the state's parent/subq linkage if the state is
+ * completely closed.
+ *
+ * msglk is not held.
  */
 static
 void
 kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
 {
-       kdmsg_iocom_t *iocom = msg->state->iocom;
-       kdmsg_state_t *state;
+       kdmsg_state_t *state = msg->state;
+       kdmsg_iocom_t *iocom = state->iocom;
+
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+       if (state != &iocom->state0) {
+               /*
+                * When terminating a transaction (in either direction), all
+                * sub-states are aborted.
+                */
+               if ((msg->any.head.cmd & DMSGF_DELETE) &&
+                   TAILQ_FIRST(&msg->state->subq)) {
+                       kprintf("simulate failure for substates of state %p "
+                               "cmd %08x/%08x\n",
+                               msg->state,
+                               msg->state->rxcmd,
+                               msg->state->txcmd);
+                       kdmsg_simulate_failure(msg->state,
+                                              0, DMSG_ERR_LOSTLINK);
+               }
+
+               /*
+                * Once the state is fully closed we can (try to) remove it
+                * from the subq topology.
+                */
+               if ((state->flags & KDMSG_STATE_SUBINSERTED) &&
+                   (state->rxcmd & DMSGF_DELETE) &&
+                   (state->txcmd & DMSGF_DELETE)) {
+                       /* 
+                        * Remove parent linkage if state is completely closed.
+                        */
+                       kdmsg_subq_delete(state);
+               }
+       }
+       kdmsg_msg_free(msg);
+
+       lockmgr(&iocom->msglk, LK_RELEASE);
+}
+
+/*
+ * Remove state from its parent's subq.  This can wind up recursively
+ * dropping the parent upward.
+ *
+ * NOTE: Once we drop the parent, our pstate pointer may become invalid.
+ */
+static
+void
+kdmsg_subq_delete(kdmsg_state_t *state)
+{
        kdmsg_state_t *pstate;
 
-       if ((state = msg->state) == NULL) {
-               kdmsg_msg_free(msg);
-       } else if (msg->any.head.cmd & DMSGF_DELETE) {
-               lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-               KKASSERT((state->rxcmd & DMSGF_DELETE) == 0);
-               state->rxcmd |= DMSGF_DELETE;
-               if (state->txcmd & DMSGF_DELETE) {
-                       KKASSERT(state->flags & KDMSG_STATE_RBINSERTED);
-                       if (state->rxcmd & DMSGF_REPLY) {
-                               KKASSERT(msg->any.head.cmd &
-                                        DMSGF_REPLY);
-                               RB_REMOVE(kdmsg_state_tree,
-                                         &iocom->statewr_tree, state);
-                       } else {
-                               KKASSERT((msg->any.head.cmd &
-                                         DMSGF_REPLY) == 0);
-                               RB_REMOVE(kdmsg_state_tree,
-                                         &iocom->staterd_tree, state);
-                       }
-                       state->flags &= ~KDMSG_STATE_RBINSERTED;
-                       pstate = state->parent;
-                       if (state->flags & KDMSG_STATE_SUBINSERTED) {
-                               TAILQ_REMOVE(&pstate->subq, state, entry);
-                               state->flags &= ~KDMSG_STATE_SUBINSERTED;
-                               kdmsg_state_drop(pstate);  /* pstate->subq */
-                               kdmsg_state_drop(state);   /* pstate->subq */
-                               state->parent = NULL;
-                       } else {
-                               KKASSERT(state->parent == NULL);
-                       }
-                       kdmsg_msg_free(msg);
-                       kdmsg_state_drop(state);        /* state on rbtree */
-                       lockmgr(&iocom->msglk, LK_RELEASE);
-               } else {
-                       kdmsg_msg_free(msg);
-                       lockmgr(&iocom->msglk, LK_RELEASE);
+       if (state->flags & KDMSG_STATE_SUBINSERTED) {
+               pstate = state->parent;
+               KKASSERT(pstate);
+               if (pstate->scan == state)
+                       pstate->scan = NULL;
+               TAILQ_REMOVE(&pstate->subq, state, entry);
+               state->flags &= ~KDMSG_STATE_SUBINSERTED;
+               state->parent = NULL;
+               if (TAILQ_EMPTY(&pstate->subq)) {
+                       kdmsg_state_drop(pstate);/* pstate->subq */
                }
+               pstate = NULL;                   /* safety */
+               kdmsg_state_drop(state);         /* pstate->subq */
        } else {
-               kdmsg_msg_free(msg);
+               KKASSERT(state->parent == NULL);
        }
 }
 
@@ -1187,12 +1233,31 @@ kdmsg_simulate_failure(kdmsg_state_t *state, int meto, int error)
 {
        kdmsg_state_t *substate;
 
-       kdmsg_state_ref(state);                 /* aborting */
-       while ((substate = TAILQ_FIRST(&state->subq)) != NULL) {
-               kdmsg_simulate_failure(substate, 1, error);
-       }
+       kdmsg_state_hold(state);                /* aborting */
+
+       /*
+        * Abort parent state first. Parent will not actually disappear
+        * until children are gone.  Device drivers must handle the situation.
+        * The advantage of this is that device drivers can flag the situation
+        * as an interlock against new operations on dying states.  And since
+        * device operations are often asynchronous anyway, this sequence of
+        * events works out better.
+        */
        if (meto)
                kdmsg_state_abort(state);
+
+       /*
+        * Recurse through any children.
+        */
+again:
+       TAILQ_FOREACH(substate, &state->subq, entry) {
+               if (substate->flags & KDMSG_STATE_ABORTING)
+                       continue;
+               state->scan = substate;
+               kdmsg_simulate_failure(substate, 1, error);
+               if (state->scan != substate)
+                       goto again;
+       }
        kdmsg_state_drop(state);                /* aborting */
 }
 
@@ -1203,23 +1268,33 @@ kdmsg_state_abort(kdmsg_state_t *state)
        kdmsg_msg_t *msg;
 
        /*
-        * Prevent recursive aborts which could otherwise occur if the
-        * simulated message reception runs state->func which then turns
-        * around and tries to reply to a broken circuit when then calls
-        * the state abort code again.
+        * Set ABORTING and DYING, return if already set.  If the state was
+        * just allocated we defer the abort operation until the related
+        * message is processed.
         */
        KKASSERT((state->flags & KDMSG_STATE_ABORTING) == 0);
        if (state->flags & KDMSG_STATE_ABORTING)
                return;
        state->flags |= KDMSG_STATE_ABORTING;
+       kdmsg_state_dying(state);
+       if (state->flags & KDMSG_STATE_NEW) {
+               kprintf("kdmsg_state_abort(0): state %p rxcmd %08x txcmd %08x flags %08x - in NEW state\n",
+                       state, state->rxcmd, state->txcmd, state->flags);
+               return;
+       }
 
        /*
+        * NOTE: The DELETE flag might already be set due to an early
+        *       termination.
+        *
         * NOTE: Args to kdmsg_msg_alloc() to avoid dynamic state allocation.
         *
         * NOTE: We are simulating a received message using our state
         *       (vs a message generated by the other side using its state),
         *       so we must invert DMSGF_REVTRANS and DMSGF_REVCIRC.
         */
+       kprintf("kdmsg_state_abort(1): state %p rxcmd %08x txcmd %08x\n",
+               state, state->rxcmd, state->txcmd);
        if ((state->rxcmd & DMSGF_DELETE) == 0) {
                msg = kdmsg_msg_alloc(state, DMSG_LNK_ERROR, NULL, NULL);
                if ((state->rxcmd & DMSGF_CREATE) == 0)
@@ -1228,24 +1303,34 @@ kdmsg_state_abort(kdmsg_state_t *state)
                                     (state->rxcmd & DMSGF_REPLY);
                msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
                msg->any.head.error = DMSG_ERR_LOSTLINK;
+               kprintf("kdmsg_state_abort(a): state %p msgcmd %08x\n",
+                       state, msg->any.head.cmd);
+               /* circuit not initialized */
                lockmgr(&state->iocom->msglk, LK_RELEASE);
                kdmsg_msg_receive_handling(msg);
                lockmgr(&state->iocom->msglk, LK_EXCLUSIVE);
                msg = NULL;
        }
+       kprintf("kdmsg_state_abort(2): state %p rxcmd %08x txcmd %08x\n",
+               state, state->rxcmd, state->txcmd);
+}
 
-       /*
-        * If the state still has a parent association we must remove it
-        * now even if it is not fully closed or the simulation loop will
-        * livelock.
-        */
-       if (state->flags & KDMSG_STATE_SUBINSERTED) {
-               KKASSERT(state->flags & KDMSG_STATE_RBINSERTED);
-               TAILQ_REMOVE(&state->parent->subq, state, entry);
-               state->flags &= ~KDMSG_STATE_SUBINSERTED;
-               kdmsg_state_drop(state->parent);  /* pstate->subq */
-               kdmsg_state_drop(state);          /* pstate->subq */
-               state->parent = NULL;
+/*
+ * Recursively sets KDMSG_STATE_DYING on state and all sub-states, preventing
+ * the transmission of any new messages on these states.  This is done
+ * atomically when parent state is terminating, whereas setting ABORTING is
+ * not atomic and can leak races.
+ */
+static
+void
+kdmsg_state_dying(kdmsg_state_t *state)
+{
+       kdmsg_state_t *scan;
+
+       if ((state->flags & KDMSG_STATE_DYING) == 0) {
+               state->flags |= KDMSG_STATE_DYING;
+               TAILQ_FOREACH(scan, &state->subq, entry)
+                       kdmsg_state_dying(scan);
        }
 }
 
@@ -1322,6 +1407,7 @@ kdmsg_state_msgtx(kdmsg_msg_t *msg)
                state->icmd = msg->any.head.cmd & DMSGF_BASECMDMASK;
                state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
                state->rxcmd = DMSGF_REPLY;
+               state->flags &= ~KDMSG_STATE_NEW;
                error = 0;
                break;
        case DMSGF_DELETE:
@@ -1462,7 +1548,6 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
 {
        kdmsg_iocom_t *iocom = msg->state->iocom;
        kdmsg_state_t *state;
-       kdmsg_state_t *pstate;
 
        if ((state = msg->state) == NULL) {
                kdmsg_msg_free(msg);
@@ -1480,6 +1565,7 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                wakeup(state);
        }
        state->flags &= ~(KDMSG_STATE_INTERLOCK | KDMSG_STATE_SIGNAL);
+       kdmsg_state_hold(state);
 
        if (msg->any.head.cmd & DMSGF_DELETE) {
                KKASSERT((state->txcmd & DMSGF_DELETE) == 0);
@@ -1498,16 +1584,25 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
                                          &iocom->statewr_tree, state);
                        }
                        state->flags &= ~KDMSG_STATE_RBINSERTED;
-                       pstate = state->parent;
-                       if (state->flags & KDMSG_STATE_SUBINSERTED) {
-                               TAILQ_REMOVE(&pstate->subq, state, entry);
-                               state->flags &= ~KDMSG_STATE_SUBINSERTED;
-                               kdmsg_state_drop(pstate);  /* pstate->subq */
-                               kdmsg_state_drop(state);   /* pstate->subq */
-                               state->parent = NULL;
-                       } else {
-                               KKASSERT(state->parent == NULL);
-                       }
+
+                       /*
+                        * The subq recursion is used for parent linking and
+                        * scanning the topology for aborts, we can only
+                        * remove leafs.  The circuit is effectively dead now,
+                        * but topology won't be torn down until all of its
+                        * children have finished/aborted.
+                        *
+                        * This is particularly important for end-point
+                        * devices which might need to access private data
+                        * in parent states.  Out of order disconnects can
+                        * occur if an end-point device is processing a
+                        * message transaction asynchronously because abort
+                        * requests are basically synchronous and it probably
+                        * isn't convenient (or possible) for the end-point
+                        * to abort an asynchronous operation.
+                        */
+                       if (TAILQ_EMPTY(&state->subq))
+                               kdmsg_subq_delete(state);
                        kdmsg_msg_free(msg);
                        kdmsg_state_drop(state);   /* state on rbtree */
                } else {
@@ -1516,11 +1611,24 @@ kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
        } else {
                kdmsg_msg_free(msg);
        }
+
+       /*
+        * Deferred abort after transmission.
+        */
+       if ((state->flags & (KDMSG_STATE_ABORTING | KDMSG_STATE_DYING)) &&
+           (state->rxcmd & DMSGF_DELETE) == 0) {
+               kprintf("kdmsg_state_cleanuptx: state=%p "
+                       "executing deferred abort\n",
+                       state);
+               state->flags &= ~KDMSG_STATE_ABORTING;
+               kdmsg_state_abort(state);
+       }
+       kdmsg_state_drop(state);
 }
 
 static
 void
-_kdmsg_state_ref(kdmsg_state_t *state KDMSG_DEBUG_ARGS)
+_kdmsg_state_hold(kdmsg_state_t *state KDMSG_DEBUG_ARGS)
 {
        atomic_add_int(&state->refs, 1);
 #if KDMSG_DEBUG
@@ -1573,13 +1681,23 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd,
                /*
                 * New transaction, requires tracking state and a unique
                 * msgid to be allocated.
+                *
+                * It is possible to race a circuit failure, inherit the
+                * parent's STATE_DYING flag to trigger an abort sequence
+                * in the transmit path.  By not inheriting ABORTING the
+                * abort sequence can recurse.
+                *
+                * NOTE: The transactions has not yet been initiated so we
+                *       cannot set DMSGF_CREATE/DELETE bits in txcmd or rxcmd.
+                *       We have to properly setup DMSGF_REPLY, however.
                 */
                pstate = state;
                state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
                TAILQ_INIT(&state->subq);
                state->iocom = iocom;
                state->parent = pstate;
-               state->flags = KDMSG_STATE_DYNAMIC;
+               state->flags = KDMSG_STATE_DYNAMIC |
+                              KDMSG_STATE_NEW;
                state->func = func;
                state->any.any = data;
                state->msgid = (uint64_t)(uintptr_t)state;
@@ -1588,16 +1706,20 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd,
                lockmgr(&iocom->msglk, LK_EXCLUSIVE);
                if (RB_INSERT(kdmsg_state_tree, &iocom->statewr_tree, state))
                        panic("duplicate msgid allocated");
+               if (TAILQ_EMPTY(&pstate->subq))
+                       kdmsg_state_hold(pstate);/* pstate->subq */
                TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
                state->flags |= KDMSG_STATE_RBINSERTED |
                                KDMSG_STATE_SUBINSERTED;
-               kdmsg_state_ref(pstate);        /* pstate->subq */
-               kdmsg_state_ref(state);         /* pstate->subq */
-               kdmsg_state_ref(state);         /* state on rbtree */
+               state->flags |= pstate->flags & KDMSG_STATE_DYING;
+               kdmsg_state_hold(state);        /* pstate->subq */
+               kdmsg_state_hold(state);        /* state on rbtree */
+               kdmsg_state_hold(state);        /* msg->state */
                lockmgr(&iocom->msglk, LK_RELEASE);
        } else {
                pstate = state->parent;
                KKASSERT(pstate != NULL);
+               kdmsg_state_hold(state);        /* msg->state */
        }
 
        if (state->flags & KDMSG_STATE_OPPOSITE)
@@ -1610,7 +1732,6 @@ kdmsg_msg_alloc(kdmsg_state_t *state, uint32_t cmd,
        msg->any.head.msgid = state->msgid;
        msg->any.head.circuit = pstate->msgid;
        msg->state = state;
-       kdmsg_state_ref(state);                 /* msg->state */
 
        return (msg);
 }
@@ -1698,6 +1819,8 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
        kdmsg_iocom_t *iocom = msg->state->iocom;
        kdmsg_state_t *state;
 
+       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
+
        if (msg->state) {
                /*
                 * Continuance or termination of existing transaction.
@@ -1718,9 +1841,11 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                msg->any.head.msgid = 0;
        }
 
-       lockmgr(&iocom->msglk, LK_EXCLUSIVE);
-
+#if 0
        /*
+        * XXX removed - don't make this a panic, allow the state checks
+        *     below to catch the situation.
+        *
         * This flag is not set until after the tx thread has drained
         * the tx msgq and simulated responses.  After that point the
         * txthread is dead and can no longer simulate responses.
@@ -1734,47 +1859,43 @@ kdmsg_msg_write(kdmsg_msg_t *msg)
                panic("kdmsg_msg_write: Attempt to write message to "
                      "terminated iocom\n");
        }
+#endif
 
        /*
-        * For stateful messages, if the circuit is dead we have to abort
-        * the state and discard the message.
+        * For stateful messages, if the circuit is dead or dying we have
+        * to abort the potentially newly-created state and discard the
+        * message.
         *
         * - We must discard the message because the other end will not
-        *   be expecting any more messages over the dead circuit and might
-        *   not be able to receive them.
+        *   be expecting any more messages over the dead or dying circuit
+        *   and might not be able to receive them.
         *
         * - We abort the state by simulating a failure to generate a fake
         *   incoming DELETE.  This will trigger the state callback and allow
         *   the device to clean things up and reply, closing the outgoing
-        *   direction and terminating the state.
+        *   direction and allowing the state to be freed.
         *
-        * - Because there are numerous races, it is possible that an abort
-        *   has already been initiated on this state.
-        *
-        * - For now, don't bother checking to see if this is a CREATE
-        *   message, though we could probably add that as a restriction.
-        *   Any pre-existing state will probably have already had an abort
-        *   initiated on it.
-        *
-        * This race occurs quite often, particularly as SPANs stabilize.
+        * This situation occurs quite often, particularly as SPANs stabilize.
         * End-points must do the right thing.
         */
        if (state) {
                KKASSERT((state->txcmd & DMSGF_DELETE) == 0);
-               if ((state->parent->txcmd & DMSGF_DELETE) ||
-                   (state->parent->flags & KDMSG_STATE_ABORTING)) {
+               if (state->flags & KDMSG_STATE_DYING) {
+#if 0
+               if ((state->flags & KDMSG_STATE_DYING) ||
+                   (state->parent->txcmd & DMSGF_DELETE) ||
+                   (state->parent->flags & KDMSG_STATE_DYING)) {
+#endif
                        kprintf("kdmsg_msg_write: Write to dying circuit "
+                               "state=%p "
                                "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
+                               state,
                                state->parent->rxcmd,
                                state->parent->txcmd,
                                state->parent->flags);
-                       kdmsg_state_ref(state);
+                       kdmsg_state_hold(state);
                        kdmsg_state_msgtx(msg);
                        kdmsg_state_cleanuptx(msg);
-                       if ((state->flags & KDMSG_STATE_ABORTING) == 0) {
-                               kdmsg_simulate_failure(state, 1,
-                                                      DMSG_ERR_LOSTLINK);
-                       }
                        kdmsg_state_drop(state);
                        lockmgr(&iocom->msglk, LK_RELEASE);
                        return;
index 75e53f0..a0858c1 100644 (file)
@@ -717,11 +717,22 @@ struct kdmsg_data;
  * Transactional state structure, representing an open transaction.  The
  * transaction might represent a cache state (and thus have a chain
  * association), or a VOP op, LNK_SPAN, or other things.
+ *
+ * NOTE: A non-empty subq represents one ref.
+ *      If we are inserted on a parent's subq, that's one ref (SUBINSERTED).
+ *      If we are inserted on a RB tree, that's one ref (RBINSERTED).
+ *      msg->state represents a ref.
+ *      Other code references may hold refs.
+ *
+ * NOTE: The parent association stays intact as long as a state has a
+ *      non-empty subq.  Otherwise simulated failures might not be able
+ *      to reach the children.
  */
 TAILQ_HEAD(kdmsg_state_list, kdmsg_state);
 
 struct kdmsg_state {
        RB_ENTRY(kdmsg_state) rbnode;           /* indexed by msgid */
+       struct kdmsg_state      *scan;          /* scan check */
        struct kdmsg_state_list subq;           /* active stacked states */
        TAILQ_ENTRY(kdmsg_state) entry;         /* on parent subq */
        TAILQ_ENTRY(kdmsg_state) user_entry;    /* available to devices */
@@ -745,13 +756,14 @@ struct kdmsg_state {
 
 #define KDMSG_STATE_SUBINSERTED        0x0001
 #define KDMSG_STATE_DYNAMIC    0x0002
-#define KDMSG_STATE_DELPEND    0x0004          /* transmit delete pending */
+#define KDMSG_STATE_UNUSED0004 0x0004
 #define KDMSG_STATE_ABORTING   0x0008          /* avoids recursive abort */
 #define KDMSG_STATE_OPPOSITE   0x0010          /* opposite direction */
-#define KDMSG_STATE_DYING      0x0020          /* indicates circuit failure */
+#define KDMSG_STATE_DYING      0x0020          /* atomic recursive circ fail */
 #define KDMSG_STATE_INTERLOCK  0x0040
 #define KDMSG_STATE_RBINSERTED 0x0080
 #define KDMSG_STATE_SIGNAL     0x0400
+#define KDMSG_STATE_NEW                0x0800          /* defer abort processing */
 
 struct kdmsg_msg {
        TAILQ_ENTRY(kdmsg_msg) qentry;          /* serialized queue */