cluster - more libdmsg work
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 4 Dec 2012 05:58:16 +0000 (21:58 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 4 Dec 2012 05:58:16 +0000 (21:58 -0800)
* Fix numerous bugs, including a nasty edge case in the encryption code.

* Do a better job locking things (state structures still need help).

lib/libdmsg/crypto.c
lib/libdmsg/dmsg.h
lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c

index de03731..42c4a8b 100644 (file)
@@ -365,7 +365,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
         */
        if (getpeername(iocom->sock_fd, &sa.sa, &salen) < 0) {
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOPEER;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "accept: getpeername() failed\n");
                goto done;
@@ -373,7 +373,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
        if (getnameinfo(&sa.sa, salen, peername, sizeof(peername),
                        NULL, 0, NI_NUMERICHOST) < 0) {
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOPEER;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "accept: cannot decode sockaddr\n");
                goto done;
@@ -401,7 +401,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
                         DMSG_PATH_REMOTE, peername);
                if (stat(path, &st) < 0) {
                        iocom->ioq_rx.error = DMSG_IOQ_ERROR_NORKEY;
-                       iocom->flags |= DMSG_IOCOMF_EOF;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                        if (DMsgDebugOpt)
                                fprintf(stderr, "auth failure: unknown host\n");
                        goto done;
@@ -415,7 +415,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
                fclose(fp);
                if (keys[0] == NULL) {
                        iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
-                       iocom->flags |= DMSG_IOCOMF_EOF;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                        if (DMsgDebugOpt)
                                fprintf(stderr,
                                        "auth failure: bad key format\n");
@@ -430,14 +430,14 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
        asprintf(&path, DMSG_DEFAULT_DIR "/rsa.pub");
        if ((fp = fopen(path, "r")) == NULL) {
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOLKEY;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                goto done;
        }
        keys[1] = PEM_read_RSA_PUBKEY(fp, NULL, NULL, NULL);
        fclose(fp);
        if (keys[1] == NULL) {
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: bad host key format\n");
                goto done;
@@ -447,7 +447,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
        asprintf(&path, DMSG_DEFAULT_DIR "/rsa.prv");
        if ((fp = fopen(path, "r")) == NULL) {
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOLKEY;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: bad host key format\n");
                goto done;
@@ -456,7 +456,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
        fclose(fp);
        if (keys[2] == NULL) {
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: bad host key format\n");
                goto done;
@@ -473,7 +473,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom)
                    blksize != (size_t)RSA_size(keys[2]) ||
                    sizeof(handtx) % blksize != 0) {
                        iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
-                       iocom->flags |= DMSG_IOCOMF_EOF;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                        if (DMsgDebugOpt)
                                fprintf(stderr, "auth failure: "
                                                "key size mismatch\n");
@@ -500,7 +500,7 @@ urandfail:
                if (fd >= 0)
                        close(fd);
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_BADURANDOM;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: bad rng\n");
                goto done;
@@ -568,7 +568,7 @@ urandfail:
                }
        }
        if (iocom->ioq_rx.error) {
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: key exchange failure "
                                        "during encryption\n");
@@ -598,7 +598,7 @@ urandfail:
                }
        }
        if (iocom->ioq_rx.error) {
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: key exchange failure "
                                        "during decryption\n");
@@ -612,7 +612,7 @@ urandfail:
        if (i != sizeof(handrx)) {
 keyxchgfail:
                iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYXCHGFAIL;
-               iocom->flags |= DMSG_IOCOMF_EOF;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                if (DMsgDebugOpt)
                        fprintf(stderr, "auth failure: key exchange failure\n");
                goto done;
@@ -652,7 +652,7 @@ keyxchgfail:
        if (error)
                goto keyxchgfail;
 
-       iocom->flags |= DMSG_IOCOMF_CRYPTED;
+       atomic_set_int(&iocom->flags, DMSG_IOCOMF_CRYPTED);
 
        if (DMsgDebugOpt)
                fprintf(stderr, "auth success: %s\n", handrx.quickmsg);
index 49fbca2..8b9dc9b 100644 (file)
@@ -197,7 +197,7 @@ struct dmsg_msg {
        size_t          hdr_size;
        size_t          aux_size;
        char            *aux_data;
-       dmsg_any_t      any;
+       dmsg_any_t      any;                    /* must be last element */
 };
 
 typedef struct dmsg_circuit dmsg_circuit_t;
@@ -408,7 +408,9 @@ void dmsg_iocom_flush2(dmsg_iocom_t *iocom);
 
 void dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
 void dmsg_state_free(dmsg_state_t *state);
+void dmsg_circuit_hold(dmsg_circuit_t *circuit);
 void dmsg_circuit_drop(dmsg_circuit_t *circuit);
+void dmsg_circuit_drop_locked(dmsg_circuit_t *circuit);
 
 int dmsg_circuit_relay(dmsg_msg_t *msg);
 
@@ -419,6 +421,8 @@ void dmsg_msg_lnk_signal(dmsg_iocom_t *iocom);
 void dmsg_msg_lnk(dmsg_msg_t *msg);
 void dmsg_msg_dbg(dmsg_msg_t *msg);
 void dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused);
+int dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep);
+
 
 /*
  * Crypto functions
index 4cb2aaf..b579ac8 100644 (file)
@@ -39,6 +39,7 @@ int DMsgDebugOpt;
 
 static int dmsg_state_msgrx(dmsg_msg_t *msg);
 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
+static void dmsg_msg_free_locked(dmsg_msg_t *msg);
 
 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
@@ -194,20 +195,24 @@ dmsg_iocom_restate(dmsg_iocom_t *iocom,
                   void (*rcvmsg_func)(dmsg_msg_t *msg),
                   void (*altmsg_func)(dmsg_iocom_t *))
 {
+       pthread_mutex_lock(&iocom->mtx);
        iocom->signal_callback = signal_func;
        iocom->rcvmsg_callback = rcvmsg_func;
        iocom->altmsg_callback = altmsg_func;
        if (signal_func)
-               iocom->flags |= DMSG_IOCOMF_SWORK;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
        else
-               iocom->flags &= ~DMSG_IOCOMF_SWORK;
+               atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
+       pthread_mutex_unlock(&iocom->mtx);
 }
 
 void
 dmsg_iocom_signal(dmsg_iocom_t *iocom)
 {
+       pthread_mutex_lock(&iocom->mtx);
        if (iocom->signal_callback)
-               iocom->flags |= DMSG_IOCOMF_SWORK;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
+       pthread_mutex_unlock(&iocom->mtx);
 }
 
 /*
@@ -231,11 +236,11 @@ dmsg_iocom_done(dmsg_iocom_t *iocom)
        }
        dmsg_ioq_done(iocom, &iocom->ioq_rx);
        dmsg_ioq_done(iocom, &iocom->ioq_tx);
-       if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
+       while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
                TAILQ_REMOVE(&iocom->freeq, msg, qentry);
                free(msg);
        }
-       if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
+       while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
                TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
                free(msg->aux_data);
                msg->aux_data = NULL;
@@ -253,17 +258,17 @@ dmsg_iocom_done(dmsg_iocom_t *iocom)
 }
 
 /*
- * Initialize a circuit structure and add it to the iocom's circuit_tree.
- * circuit0 is left out and will not be added to the tree.
+ * Basic initialization of a circuit structure.
+ *
+ * The circuit structure is initialized with one ref.
  */
 void
 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
 {
+       circuit->refs = 1;
        circuit->iocom = iocom;
        RB_INIT(&circuit->staterd_tree);
        RB_INIT(&circuit->statewr_tree);
-       if (circuit->msgid)
-               RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
 }
 
 /*
@@ -281,6 +286,7 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
        size_t aligned_size;
 
        pthread_mutex_lock(&iocom->mtx);
+#if 0
        if (aux_size) {
                aligned_size = DMSG_DOALIGN(aux_size);
                if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
@@ -290,6 +296,9 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
+#endif
+       aligned_size = DMSG_DOALIGN(aux_size);
+       msg = NULL;
        if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
                /*
                 * Create state when CREATE is set without REPLY.
@@ -320,12 +329,19 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
                pthread_mutex_unlock(&iocom->mtx);
                state->flags |= DMSG_STATE_INSERTED;
        }
+       /* XXX SMP race for state */
        pthread_mutex_unlock(&iocom->mtx);
+       hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
        if (msg == NULL) {
+               msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
+               bzero(msg, offsetof(struct dmsg_msg, any.head));
+               *(int *)((char *)msg +
+                        offsetof(struct dmsg_msg, any.head) + hbytes) =
+                                0x71B2C3D4;
+#if 0
                msg = malloc(sizeof(*msg));
                bzero(msg, sizeof(*msg));
-               msg->aux_data = NULL;
-               msg->aux_size = 0;
+#endif
        }
 
        /*
@@ -348,7 +364,6 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
                        }
                }
        }
-       hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
        if (hbytes)
                bzero(&msg->any.head, hbytes);
        msg->hdr_size = hbytes;
@@ -359,6 +374,7 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
        msg->any.head.circuit = 0;
        msg->circuit = circuit;
        msg->iocom = iocom;
+       dmsg_circuit_hold(circuit);
        if (state) {
                msg->state = state;
                state->msg = msg;
@@ -378,13 +394,33 @@ static
 void
 dmsg_msg_free_locked(dmsg_msg_t *msg)
 {
-       dmsg_iocom_t *iocom = msg->iocom;
-
+       /*dmsg_iocom_t *iocom = msg->iocom;*/
+#if 1
+       int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
+       if (*(int *)((char *)msg +
+                    offsetof(struct  dmsg_msg, any.head) + hbytes) !=
+            0x71B2C3D4) {
+               fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
+               assert(0);
+       }
+#endif
+       if (msg->circuit) {
+               dmsg_circuit_drop_locked(msg->circuit);
+               msg->circuit = NULL;
+       }
        msg->state = NULL;
+       if (msg->aux_data) {
+               free(msg->aux_data);
+               msg->aux_data = NULL;
+       }
+       msg->aux_size = 0;
+       free (msg);
+#if 0
        if (msg->aux_data)
                TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
        else
                TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
+#endif
 }
 
 void
@@ -415,6 +451,11 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
        int ai; /* alt bulk path socket */
 
        while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
+               /*
+                * These iocom->flags are only manipulated within the
+                * context of the current thread.  However, modifications
+                * still require atomic ops.
+                */
                if ((iocom->flags & (DMSG_IOCOMF_RWORK |
                                     DMSG_IOCOMF_WWORK |
                                     DMSG_IOCOMF_PWORK |
@@ -471,24 +512,29 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                        poll(fds, count, timeout);
 
                        if (wi >= 0 && (fds[wi].revents & POLLIN))
-                               iocom->flags |= DMSG_IOCOMF_PWORK;
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_PWORK);
                        if (si >= 0 && (fds[si].revents & POLLIN))
-                               iocom->flags |= DMSG_IOCOMF_RWORK;
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_RWORK);
                        if (si >= 0 && (fds[si].revents & POLLOUT))
-                               iocom->flags |= DMSG_IOCOMF_WWORK;
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_WWORK);
                        if (wi >= 0 && (fds[wi].revents & POLLOUT))
-                               iocom->flags |= DMSG_IOCOMF_WWORK;
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_WWORK);
                        if (ai >= 0 && (fds[ai].revents & POLLIN))
-                               iocom->flags |= DMSG_IOCOMF_ARWORK;
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_ARWORK);
                } else {
                        /*
                         * Always check the pipe
                         */
-                       iocom->flags |= DMSG_IOCOMF_PWORK;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
                }
 
                if (iocom->flags & DMSG_IOCOMF_SWORK) {
-                       iocom->flags &= ~DMSG_IOCOMF_SWORK;
+                       atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
                        iocom->signal_callback(iocom);
                }
 
@@ -498,10 +544,10 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                 * the pipe with a dummy read.
                 */
                if (iocom->flags & DMSG_IOCOMF_PWORK) {
-                       iocom->flags &= ~DMSG_IOCOMF_PWORK;
+                       atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
                        read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
-                       iocom->flags |= DMSG_IOCOMF_RWORK;
-                       iocom->flags |= DMSG_IOCOMF_WWORK;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
                        if (TAILQ_FIRST(&iocom->txmsgq))
                                dmsg_iocom_flush1(iocom);
                }
@@ -530,7 +576,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom)
                }
 
                if (iocom->flags & DMSG_IOCOMF_ARWORK) {
-                       iocom->flags &= ~DMSG_IOCOMF_ARWORK;
+                       atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
                        iocom->altmsg_callback(iocom);
                }
        }
@@ -600,8 +646,6 @@ dmsg_ioq_read(dmsg_iocom_t *iocom)
        int error;
 
 again:
-       iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
-
        /*
         * If a message is already pending we can just remove and
         * return it.  Message state has already been processed.
@@ -611,6 +655,7 @@ again:
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                return (msg);
        }
+       atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
 
        /*
         * If the stream is errored out we stop processing it.
@@ -722,10 +767,11 @@ again:
 
                /*
                 * Allocate the message, the next state will fill it in.
-                * Note that the actual buffer will be sized to an aligned
+                * Note that the aux_data buffer will be sized to an aligned
                 * value and the aligned remainder zero'd for convenience.
                 */
-               msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0,
+               msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
+                                    ioq->hbytes / DMSG_ALIGN,
                                     NULL, NULL);
                ioq->msg = msg;
 
@@ -837,12 +883,16 @@ again:
                /* fall through */
        case DMSG_MSGQ_STATE_AUXDATA1:
                /*
-                * Copy the partial or complete payload from remaining
-                * bytes in the FIFO in order to optimize the makeroom call
-                * in the AUXDATA2 state.  We have to fall-through either
-                * way so we can check the crc.
+                * Copy the partial or complete [decrypted] payload from
+                * remaining bytes in the FIFO in order to optimize the
+                * makeroom call in the AUXDATA2 state.  We have to
+                * fall-through either way so we can check the crc.
                 *
                 * msg->aux_size tracks our aux data.
+                *
+                * (Lets not complicate matters if the data is encrypted,
+                *  since the data in-stream is not the same size as the
+                *  data decrypted).
                 */
                if (bytes >= ioq->abytes) {
                        bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
@@ -941,6 +991,8 @@ again:
                xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
                if (xcrc32 != msg->any.head.aux_crc) {
                        ioq->error = DMSG_IOQ_ERROR_ACRC;
+                       fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
+                               xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
                        break;
                }
                break;
@@ -981,6 +1033,7 @@ again:
         */
        if (ioq->error) {
 skip:
+               fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
                /*
                 * An unrecoverable error causes all active receive
                 * transactions to be terminated with a LNK_ERROR message.
@@ -1026,6 +1079,12 @@ skip:
                         */
                        if (state->rxcmd & DMSGF_DELETE) {
                                dmsg_msg_free(msg);
+                               fprintf(stderr,
+                                       "iocom: ioq error %d sleeping\n",
+                                       ioq->error);
+                               sleep(1);       /* XXX */
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_RWORK);
                                msg = NULL;
                        } else {
                                /*state->txcmd |= DMSGF_DELETE;*/
@@ -1042,6 +1101,12 @@ skip:
                         */
                        if (state->rxcmd & DMSGF_DELETE) {
                                dmsg_msg_free(msg);
+                               fprintf(stderr,
+                                       "iocom: ioq error %d sleeping\n",
+                                       ioq->error);
+                               sleep(1);       /* XXX */
+                               atomic_set_int(&iocom->flags,
+                                              DMSG_IOCOMF_RWORK);
                                msg = NULL;
                        } else {
                                msg->state = state;
@@ -1061,7 +1126,7 @@ skip:
                         * Generate a final LNK_ERROR and flag EOF.
                         */
                        msg->state = NULL;
-                       iocom->flags |= DMSG_IOCOMF_EOF;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
                        fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
                }
                pthread_mutex_unlock(&iocom->mtx);
@@ -1077,7 +1142,7 @@ skip:
                 * re-set RWORK.
                 */
                if (msg)
-                       iocom->flags |= DMSG_IOCOMF_RWORK;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
        } else if (msg == NULL) {
                /*
                 * Insufficient data received to finish building the message,
@@ -1086,7 +1151,7 @@ skip:
                 * Leave ioq->msg intact.
                 * Leave the FIFO intact.
                 */
-               iocom->flags |= DMSG_IOCOMF_RREQ;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
        } else {
                /*
                 * Continue processing msg.
@@ -1100,13 +1165,13 @@ skip:
                 */
                if (ioq->fifo_beg == ioq->fifo_cdx &&
                    ioq->fifo_cdn == ioq->fifo_end) {
-                       iocom->flags |= DMSG_IOCOMF_RREQ;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
                        ioq->fifo_cdx = 0;
                        ioq->fifo_cdn = 0;
                        ioq->fifo_beg = 0;
                        ioq->fifo_end = 0;
                } else {
-                       iocom->flags |= DMSG_IOCOMF_RWORK;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
                }
                ioq->state = DMSG_MSGQ_STATE_HEADER1;
                ioq->msg = NULL;
@@ -1118,6 +1183,13 @@ skip:
                 *
                 * State processing only occurs for messages destined for us.
                 */
+               if (DMsgDebugOpt >= 5) {
+                       fprintf(stderr,
+                               "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
+                               msg->any.head.cmd,
+                               (intmax_t)msg->any.head.msgid,
+                               (intmax_t)msg->any.head.circuit);
+               }
                if (msg->any.head.circuit)
                        error = dmsg_circuit_relay(msg);
                else
@@ -1159,7 +1231,7 @@ skip:
  * A non-NULL msg is added to the queue but not necessarily flushed.
  * Calling this function with msg == NULL will get a flush going.
  *
- * Caller must hold iocom->mtx.
+ * (called from iocom_core only)
  */
 void
 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
@@ -1171,7 +1243,7 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
        size_t abytes;
        dmsg_msg_queue_t tmpq;
 
-       iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
+       atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
        TAILQ_INIT(&tmpq);
        pthread_mutex_lock(&iocom->mtx);
        while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
@@ -1229,6 +1301,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
 
 /*
  * Thread localized, iocom->mtx not held by caller.
+ *
+ * (called from iocom_core via iocom_flush1 only)
  */
 void
 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
@@ -1310,9 +1384,14 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
                /*
                 * Make sure the FIFO has a reasonable amount of space
                 * left (if not completely full).
+                *
+                * In this situation we are staging the encrypted message
+                * data in the FIFO.  (nact) represents how much plaintext
+                * has been staged, (n) represents how much encrypted data
+                * has been flushed.  The two are independent of each other.
                 */
                if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
-                   sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
+                   sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
                        bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
                              ioq->fifo_end - ioq->fifo_beg);
                        ioq->fifo_cdx -= ioq->fifo_beg;
@@ -1333,11 +1412,19 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
                                ioq->fifo_cdx = 0;
                                ioq->fifo_end = 0;
                        }
-                       /* XXX what if interrupted mid-write? */
-               } else {
-                       nact = 0;
                }
+               /*
+                * We don't mess with the nact returned by the crypto_encrypt
+                * call, which represents the filling of the FIFO.  (n) tells
+                * us how much we were able to write from the FIFO.  The two
+                * are different beasts when encrypting.
+                */
        } else {
+               /*
+                * In this situation we are not staging the messages to the
+                * FIFO but instead writing them directly from the msg
+                * structure(s), so (nact) is basically (n).
+                */
                n = writev(iocom->sock_fd, iov, iovcnt);
                if (n > 0)
                        nact = n;
@@ -1368,6 +1455,15 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
                        break;
                }
                nact -= abytes - ioq->abytes;
+               /* ioq->abytes = abytes; optimized out */
+
+               if (DMsgDebugOpt >= 5) {
+                       fprintf(stderr,
+                               "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
+                               msg->any.head.cmd,
+                               (intmax_t)msg->any.head.msgid,
+                               (intmax_t)msg->any.head.circuit);
+               }
 
                TAILQ_REMOVE(&ioq->msgq, msg, qentry);
                --ioq->msgcount;
@@ -1394,10 +1490,10 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
                        /*
                         * Wait for socket buffer space
                         */
-                       iocom->flags |= DMSG_IOCOMF_WREQ;
+                       atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
                }
        } else {
-               iocom->flags |= DMSG_IOCOMF_WREQ;
+               atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
        }
        if (ioq->error) {
                dmsg_iocom_drain(iocom);
@@ -1410,7 +1506,7 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
  * the caller to pull off our contrived terminal error msg to detect
  * the connection failure.
  *
- * Thread localized, iocom->mtx not held by caller.
+ * Localized to iocom_core thread, iocom->mtx not held by caller.
  */
 void
 dmsg_iocom_drain(dmsg_iocom_t *iocom)
@@ -1418,7 +1514,7 @@ dmsg_iocom_drain(dmsg_iocom_t *iocom)
        dmsg_ioq_t *ioq = &iocom->ioq_tx;
        dmsg_msg_t *msg;
 
-       iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
+       atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
        ioq->hbytes = 0;
        ioq->abytes = 0;
 
@@ -1737,6 +1833,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
 {
        dmsg_iocom_t *iocom = msg->iocom;
        dmsg_circuit_t *circuit;
+       dmsg_circuit_t *ocircuit;
        dmsg_state_t *state;
        dmsg_state_t sdummy;
        dmsg_circuit_t cdummy;
@@ -1753,11 +1850,18 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                cdummy.msgid = msg->any.head.circuit;
                circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
                                  &cdummy);
-               if (circuit == NULL)
+               if (circuit == NULL) {
+                       pthread_mutex_unlock(&iocom->mtx);
                        return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
+               }
        }
+
+       /*
+        * Replace circuit0 with actual
+        */
+       dmsg_circuit_hold(circuit);
+       ocircuit = msg->circuit;
        msg->circuit = circuit;
-       ++circuit->refs;
 
        /*
         * If received msg is a command state is on staterd_tree.
@@ -1772,7 +1876,10 @@ dmsg_state_msgrx(dmsg_msg_t *msg)
                                &sdummy);
        }
        msg->state = state;
+
        pthread_mutex_unlock(&iocom->mtx);
+       if (ocircuit)
+               dmsg_circuit_drop(ocircuit);
 
        /*
         * Short-cut one-off or mid-stream messages (state may be NULL).
@@ -2046,6 +2153,16 @@ dmsg_state_free(dmsg_state_t *state)
        free(state);
 }
 
+/*
+ * Called with iocom locked
+ */
+void
+dmsg_circuit_hold(dmsg_circuit_t *circuit)
+{
+       assert(circuit->refs > 0);              /* caller must hold ref */
+       atomic_add_int(&circuit->refs, 1);      /* to safely add more */
+}
+
 /*
  * Called with iocom locked
  */
@@ -2061,13 +2178,16 @@ dmsg_circuit_drop(dmsg_circuit_t *circuit)
        /*
         * Decrement circuit refs, destroy circuit when refs drops to 0.
         */
-       if (--circuit->refs > 0)
+       if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
                return;
+       assert(circuit != &iocom->circuit0);
 
        assert(RB_EMPTY(&circuit->staterd_tree));
        assert(RB_EMPTY(&circuit->statewr_tree));
+       pthread_mutex_lock(&iocom->mtx);
        RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
        circuit->iocom = NULL;
+       pthread_mutex_unlock(&iocom->mtx);
        dmsg_free(circuit);
 
        /*
@@ -2087,6 +2207,29 @@ dmsg_circuit_drop(dmsg_circuit_t *circuit)
        }
 }
 
+void
+dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
+{
+       dmsg_iocom_t *iocom;
+
+       iocom = circuit->iocom;
+       assert(circuit->refs > 0);
+       assert(iocom);
+
+       if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
+               assert(circuit != &iocom->circuit0);
+               assert(RB_EMPTY(&circuit->staterd_tree));
+               assert(RB_EMPTY(&circuit->statewr_tree));
+               RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
+               circuit->iocom = NULL;
+               dmsg_free(circuit);
+               if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
+                       char dummy = 0;
+                       write(iocom->wakeupfds[1], &dummy, 1);
+               }
+       }
+}
+
 /*
  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
  * header is not adjusted, just the core header.
index 2544d8b..e2e2b73 100644 (file)
@@ -792,6 +792,9 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
 
        /*pthread_mutex_lock(&cluster_mtx);*/
 
+       if (DMsgDebugOpt >= 4)
+               fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd);
+
        switch (msg->any.head.cmd & (DMSGF_CREATE |
                                     DMSGF_DELETE |
                                     DMSGF_REPLY)) {
@@ -825,10 +828,17 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                   &dummy);
                pthread_mutex_unlock(&iocomA->mtx);
                if (tx_state == NULL) {
+                       /* XXX SMP race */
                        fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
                        dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
                        break;
                }
+               if (tx_state->icmd != DMSG_LNK_SPAN) {
+                       /* XXX SMP race */
+                       fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n");
+                       dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+                       break;
+               }
 
                /* locate h2span_link */
                rx_state = tx_state->any.relay->source_rt;
@@ -841,7 +851,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * it received from us as <target>.
                 */
                circA = dmsg_alloc(sizeof(*circA));
-               circA->iocom = iocomA;
+               dmsg_circuit_init(iocomA, circA);
                circA->state = msg->state;      /* LNK_CIRC state */
                circA->msgid = msg->state->msgid;
                circA->span_state = tx_state;   /* H2SPAN_RELAY state */
@@ -852,6 +862,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                iocomB = rx_state->iocom;
 
                circB = dmsg_alloc(sizeof(*circB));
+               dmsg_circuit_init(iocomB, circB);
 
                /*
                 * Create a LNK_CIRC transaction on B
@@ -860,23 +871,40 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                         0, DMSG_LNK_CIRC | DMSGF_CREATE,
                                         dmsg_lnk_circ, circB);
                fwd_msg->state->any.circ = circB;
-               circB->iocom = iocomB;
+               fwd_msg->any.lnk_circ.target = rx_state->msgid;
                circB->state = fwd_msg->state;  /* LNK_CIRC state */
                circB->msgid = fwd_msg->any.head.msgid;
                circB->span_state = rx_state;   /* H2SPAN_LINK state */
                circB->is_relay = 0;
                circB->refs = 2;                /* state and peer */
 
+               if (DMsgDebugOpt >= 4)
+                       fprintf(stderr, "CIRC forward %p->%p\n", circA, circB);
+
                /*
                 * Link the two circuits together.
                 */
                circA->peer = circB;
                circB->peer = circA;
 
+               if (iocomA < iocomB) {
+                       pthread_mutex_lock(&iocomA->mtx);
+                       pthread_mutex_lock(&iocomB->mtx);
+               } else {
+                       pthread_mutex_lock(&iocomB->mtx);
+                       pthread_mutex_lock(&iocomA->mtx);
+               }
                if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
                        assert(0);
                if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
                        assert(0);
+               if (iocomA < iocomB) {
+                       pthread_mutex_unlock(&iocomB->mtx);
+                       pthread_mutex_unlock(&iocomA->mtx);
+               } else {
+                       pthread_mutex_unlock(&iocomA->mtx);
+                       pthread_mutex_unlock(&iocomB->mtx);
+               }
 
                dmsg_msg_write(fwd_msg);
 
@@ -1270,6 +1298,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn)
         */
        while (relay && relay->source_rt->any.link->node == node) {
                next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+               fprintf(stderr, "RELAY DELETE FROM EXTRAS\n");
                dmsg_relay_delete(relay);
                relay = next_relay;
        }
@@ -1331,6 +1360,7 @@ dmsg_lnk_relay(dmsg_msg_t *msg)
 
        if (msg->any.head.cmd & DMSGF_DELETE) {
                pthread_mutex_lock(&cluster_mtx);
+               fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n");
                if ((relay = state->any.relay) != NULL) {
                        dmsg_relay_delete(relay);
                } else {
@@ -1340,7 +1370,9 @@ dmsg_lnk_relay(dmsg_msg_t *msg)
        }
 }
 
-
+/*
+ * cluster_mtx held by caller
+ */
 static
 void
 dmsg_relay_delete(h2span_relay_t *relay)
@@ -1480,17 +1512,26 @@ dmsg_circuit_relay(dmsg_msg_t *msg)
        /*
         * Lookup the circuit on the incoming iocom.
         */
-       pthread_mutex_lock(&cluster_mtx);
+       pthread_mutex_lock(&iocom->mtx);
 
        dummy.msgid = msg->any.head.circuit;
        circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
        assert(circ);
        peer = circ->peer;
+       dmsg_circuit_hold(peer);
+
+       if (DMsgDebugOpt >= 4) {
+               fprintf(stderr,
+                       "CIRC relay %08x %p->%p\n",
+                       msg->any.head.cmd, circ, peer);
+       }
 
        msg->iocom = peer->iocom;
        msg->any.head.circuit = peer->msgid;
+       dmsg_circuit_drop_locked(msg->circuit);
+       msg->circuit = peer;
 
-       pthread_mutex_unlock(&cluster_mtx);
+       pthread_mutex_unlock(&iocom->mtx);
 
        dmsg_msg_write(msg);
        error = DMSG_IOQ_ERROR_ROUTED;
@@ -1560,6 +1601,8 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid)
 
 /*
  * Dumps the spanning tree
+ *
+ * DEBUG ONLY
  */
 void
 dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
@@ -1567,6 +1610,7 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
        h2span_cluster_t *cls;
        h2span_node_t *node;
        h2span_link_t *slink;
+       h2span_relay_t *relay;
        char *uustr = NULL;
 
        pthread_mutex_lock(&cluster_mtx);
@@ -1582,9 +1626,18 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
                                node->fs_label);
                        RB_FOREACH(slink, h2span_link_tree, &node->tree) {
                                dmsg_circuit_printf(circuit,
-                                           "\tLink dist=%d via %d\n",
+                                           "\tSLink msgid %016jx "
+                                           "dist=%d via %d\n",
+                                           (intmax_t)slink->state->msgid,
                                            slink->dist,
                                            slink->state->iocom->sock_fd);
+                               TAILQ_FOREACH(relay, &slink->relayq, entry) {
+                                       dmsg_circuit_printf(circuit,
+                                           "\t    Relay-out msgid %016jx "
+                                           "via %d\n",
+                                           (intmax_t)relay->target_rt->msgid,
+                                           relay->target_rt->iocom->sock_fd);
+                               }
                        }
                }
        }
@@ -1597,6 +1650,39 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
 #endif
 }
 
+/*
+ * DEBUG ONLY
+ *
+ * Locate the state representing an incoming LNK_SPAN given its msgid.
+ */
+int
+dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep)
+{
+       h2span_cluster_t *cls;
+       h2span_node_t *node;
+       h2span_link_t *slink;
+       h2span_relay_t *relay;
+
+       pthread_mutex_lock(&cluster_mtx);
+       relay = NULL;
+       RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+               RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+                       RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+                               if (slink->state->msgid == msgid) {
+                                       *statep = slink->state;
+                                       goto found;
+                               }
+                       }
+               }
+       }
+       pthread_mutex_unlock(&cluster_mtx);
+       *statep = NULL;
+       return(ENOENT);
+found:
+       pthread_mutex_unlock(&cluster_mtx);
+       return(0);
+}
+
 /*
  * Random number sub-sort value to add to SPAN rnss fields on relay.
  * This allows us to differentiate spans with the same <dist> field