From: Matthew Dillon Date: Tue, 4 Dec 2012 05:58:16 +0000 (-0800) Subject: cluster - more libdmsg work X-Git-Tag: v3.4.0rc~744 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/a2179323cae23105c50b96e4fe13169efe445734 cluster - more libdmsg work * Fix numerous bugs, including a nasty edge case in the encryption code. * Do a better job locking things (state structures still need help). --- diff --git a/lib/libdmsg/crypto.c b/lib/libdmsg/crypto.c index de0373187b..42c4a8bd58 100644 --- a/lib/libdmsg/crypto.c +++ b/lib/libdmsg/crypto.c @@ -365,7 +365,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) */ if (getpeername(iocom->sock_fd, &sa.sa, &salen) < 0) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOPEER; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "accept: getpeername() failed\n"); goto done; @@ -373,7 +373,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) if (getnameinfo(&sa.sa, salen, peername, sizeof(peername), NULL, 0, NI_NUMERICHOST) < 0) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOPEER; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "accept: cannot decode sockaddr\n"); goto done; @@ -401,7 +401,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) DMSG_PATH_REMOTE, peername); if (stat(path, &st) < 0) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_NORKEY; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: unknown host\n"); goto done; @@ -415,7 +415,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) fclose(fp); if (keys[0] == NULL) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: bad key format\n"); @@ -430,14 +430,14 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) asprintf(&path, DMSG_DEFAULT_DIR "/rsa.pub"); if ((fp = fopen(path, "r")) == NULL) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOLKEY; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); goto done; } keys[1] = PEM_read_RSA_PUBKEY(fp, NULL, NULL, NULL); fclose(fp); if (keys[1] == NULL) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: bad host key format\n"); goto done; @@ -447,7 +447,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) asprintf(&path, DMSG_DEFAULT_DIR "/rsa.prv"); if ((fp = fopen(path, "r")) == NULL) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOLKEY; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: bad host key format\n"); goto done; @@ -456,7 +456,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) fclose(fp); if (keys[2] == NULL) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: bad host key format\n"); goto done; @@ -473,7 +473,7 @@ dmsg_crypto_negotiate(dmsg_iocom_t *iocom) blksize != (size_t)RSA_size(keys[2]) || sizeof(handtx) % blksize != 0) { iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: " "key size mismatch\n"); @@ -500,7 +500,7 @@ urandfail: if (fd >= 0) close(fd); iocom->ioq_rx.error = DMSG_IOQ_ERROR_BADURANDOM; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: bad rng\n"); goto done; @@ -568,7 +568,7 @@ urandfail: } } if (iocom->ioq_rx.error) { - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: key exchange failure " "during encryption\n"); @@ -598,7 +598,7 @@ urandfail: } } if (iocom->ioq_rx.error) { - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: key exchange failure " "during decryption\n"); @@ -612,7 +612,7 @@ urandfail: if (i != sizeof(handrx)) { keyxchgfail: iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYXCHGFAIL; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); if (DMsgDebugOpt) fprintf(stderr, "auth failure: key exchange failure\n"); goto done; @@ -652,7 +652,7 @@ keyxchgfail: if (error) goto keyxchgfail; - iocom->flags |= DMSG_IOCOMF_CRYPTED; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_CRYPTED); if (DMsgDebugOpt) fprintf(stderr, "auth success: %s\n", handrx.quickmsg); diff --git a/lib/libdmsg/dmsg.h b/lib/libdmsg/dmsg.h index 49fbca2f15..8b9dc9b9e3 100644 --- a/lib/libdmsg/dmsg.h +++ b/lib/libdmsg/dmsg.h @@ -197,7 +197,7 @@ struct dmsg_msg { size_t hdr_size; size_t aux_size; char *aux_data; - dmsg_any_t any; + dmsg_any_t any; /* must be last element */ }; typedef struct dmsg_circuit dmsg_circuit_t; @@ -408,7 +408,9 @@ void dmsg_iocom_flush2(dmsg_iocom_t *iocom); void dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg); void dmsg_state_free(dmsg_state_t *state); +void dmsg_circuit_hold(dmsg_circuit_t *circuit); void dmsg_circuit_drop(dmsg_circuit_t *circuit); +void dmsg_circuit_drop_locked(dmsg_circuit_t *circuit); int dmsg_circuit_relay(dmsg_msg_t *msg); @@ -419,6 +421,8 @@ void dmsg_msg_lnk_signal(dmsg_iocom_t *iocom); void dmsg_msg_lnk(dmsg_msg_t *msg); void dmsg_msg_dbg(dmsg_msg_t *msg); void dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused); +int dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep); + /* * Crypto functions diff --git a/lib/libdmsg/msg.c b/lib/libdmsg/msg.c index 4cb2aafaa1..b579ac86e8 100644 --- a/lib/libdmsg/msg.c +++ b/lib/libdmsg/msg.c @@ -39,6 +39,7 @@ int DMsgDebugOpt; static int dmsg_state_msgrx(dmsg_msg_t *msg); static void dmsg_state_cleanuptx(dmsg_msg_t *msg); +static void dmsg_msg_free_locked(dmsg_msg_t *msg); RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp); RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp); @@ -194,20 +195,24 @@ dmsg_iocom_restate(dmsg_iocom_t *iocom, void (*rcvmsg_func)(dmsg_msg_t *msg), void (*altmsg_func)(dmsg_iocom_t *)) { + pthread_mutex_lock(&iocom->mtx); iocom->signal_callback = signal_func; iocom->rcvmsg_callback = rcvmsg_func; iocom->altmsg_callback = altmsg_func; if (signal_func) - iocom->flags |= DMSG_IOCOMF_SWORK; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK); else - iocom->flags &= ~DMSG_IOCOMF_SWORK; + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK); + pthread_mutex_unlock(&iocom->mtx); } void dmsg_iocom_signal(dmsg_iocom_t *iocom) { + pthread_mutex_lock(&iocom->mtx); if (iocom->signal_callback) - iocom->flags |= DMSG_IOCOMF_SWORK; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK); + pthread_mutex_unlock(&iocom->mtx); } /* @@ -231,11 +236,11 @@ dmsg_iocom_done(dmsg_iocom_t *iocom) } dmsg_ioq_done(iocom, &iocom->ioq_rx); dmsg_ioq_done(iocom, &iocom->ioq_tx); - if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { + while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) { TAILQ_REMOVE(&iocom->freeq, msg, qentry); free(msg); } - if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { + while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) { TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); free(msg->aux_data); msg->aux_data = NULL; @@ -253,17 +258,17 @@ dmsg_iocom_done(dmsg_iocom_t *iocom) } /* - * Initialize a circuit structure and add it to the iocom's circuit_tree. - * circuit0 is left out and will not be added to the tree. + * Basic initialization of a circuit structure. + * + * The circuit structure is initialized with one ref. */ void dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit) { + circuit->refs = 1; circuit->iocom = iocom; RB_INIT(&circuit->staterd_tree); RB_INIT(&circuit->statewr_tree); - if (circuit->msgid) - RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit); } /* @@ -281,6 +286,7 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, size_t aligned_size; pthread_mutex_lock(&iocom->mtx); +#if 0 if (aux_size) { aligned_size = DMSG_DOALIGN(aux_size); if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) @@ -290,6 +296,9 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) TAILQ_REMOVE(&iocom->freeq, msg, qentry); } +#endif + aligned_size = DMSG_DOALIGN(aux_size); + msg = NULL; if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) { /* * Create state when CREATE is set without REPLY. @@ -320,12 +329,19 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, pthread_mutex_unlock(&iocom->mtx); state->flags |= DMSG_STATE_INSERTED; } + /* XXX SMP race for state */ pthread_mutex_unlock(&iocom->mtx); + hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; if (msg == NULL) { + msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4); + bzero(msg, offsetof(struct dmsg_msg, any.head)); + *(int *)((char *)msg + + offsetof(struct dmsg_msg, any.head) + hbytes) = + 0x71B2C3D4; +#if 0 msg = malloc(sizeof(*msg)); bzero(msg, sizeof(*msg)); - msg->aux_data = NULL; - msg->aux_size = 0; +#endif } /* @@ -348,7 +364,6 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, } } } - hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; if (hbytes) bzero(&msg->any.head, hbytes); msg->hdr_size = hbytes; @@ -359,6 +374,7 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, msg->any.head.circuit = 0; msg->circuit = circuit; msg->iocom = iocom; + dmsg_circuit_hold(circuit); if (state) { msg->state = state; state->msg = msg; @@ -378,13 +394,33 @@ static void dmsg_msg_free_locked(dmsg_msg_t *msg) { - dmsg_iocom_t *iocom = msg->iocom; - + /*dmsg_iocom_t *iocom = msg->iocom;*/ +#if 1 + int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN; + if (*(int *)((char *)msg + + offsetof(struct dmsg_msg, any.head) + hbytes) != + 0x71B2C3D4) { + fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd); + assert(0); + } +#endif + if (msg->circuit) { + dmsg_circuit_drop_locked(msg->circuit); + msg->circuit = NULL; + } msg->state = NULL; + if (msg->aux_data) { + free(msg->aux_data); + msg->aux_data = NULL; + } + msg->aux_size = 0; + free (msg); +#if 0 if (msg->aux_data) TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry); else TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry); +#endif } void @@ -415,6 +451,11 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) int ai; /* alt bulk path socket */ while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) { + /* + * These iocom->flags are only manipulated within the + * context of the current thread. However, modifications + * still require atomic ops. + */ if ((iocom->flags & (DMSG_IOCOMF_RWORK | DMSG_IOCOMF_WWORK | DMSG_IOCOMF_PWORK | @@ -471,24 +512,29 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) poll(fds, count, timeout); if (wi >= 0 && (fds[wi].revents & POLLIN)) - iocom->flags |= DMSG_IOCOMF_PWORK; + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_PWORK); if (si >= 0 && (fds[si].revents & POLLIN)) - iocom->flags |= DMSG_IOCOMF_RWORK; + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_RWORK); if (si >= 0 && (fds[si].revents & POLLOUT)) - iocom->flags |= DMSG_IOCOMF_WWORK; + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_WWORK); if (wi >= 0 && (fds[wi].revents & POLLOUT)) - iocom->flags |= DMSG_IOCOMF_WWORK; + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_WWORK); if (ai >= 0 && (fds[ai].revents & POLLIN)) - iocom->flags |= DMSG_IOCOMF_ARWORK; + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_ARWORK); } else { /* * Always check the pipe */ - iocom->flags |= DMSG_IOCOMF_PWORK; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK); } if (iocom->flags & DMSG_IOCOMF_SWORK) { - iocom->flags &= ~DMSG_IOCOMF_SWORK; + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK); iocom->signal_callback(iocom); } @@ -498,10 +544,10 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) * the pipe with a dummy read. */ if (iocom->flags & DMSG_IOCOMF_PWORK) { - iocom->flags &= ~DMSG_IOCOMF_PWORK; + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK); read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf)); - iocom->flags |= DMSG_IOCOMF_RWORK; - iocom->flags |= DMSG_IOCOMF_WWORK; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); + atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK); if (TAILQ_FIRST(&iocom->txmsgq)) dmsg_iocom_flush1(iocom); } @@ -530,7 +576,7 @@ dmsg_iocom_core(dmsg_iocom_t *iocom) } if (iocom->flags & DMSG_IOCOMF_ARWORK) { - iocom->flags &= ~DMSG_IOCOMF_ARWORK; + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK); iocom->altmsg_callback(iocom); } } @@ -600,8 +646,6 @@ dmsg_ioq_read(dmsg_iocom_t *iocom) int error; again: - iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK); - /* * If a message is already pending we can just remove and * return it. Message state has already been processed. @@ -611,6 +655,7 @@ again: TAILQ_REMOVE(&ioq->msgq, msg, qentry); return (msg); } + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK); /* * If the stream is errored out we stop processing it. @@ -722,10 +767,11 @@ again: /* * Allocate the message, the next state will fill it in. - * Note that the actual buffer will be sized to an aligned + * Note that the aux_data buffer will be sized to an aligned * value and the aligned remainder zero'd for convenience. */ - msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0, + msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, + ioq->hbytes / DMSG_ALIGN, NULL, NULL); ioq->msg = msg; @@ -837,12 +883,16 @@ again: /* fall through */ case DMSG_MSGQ_STATE_AUXDATA1: /* - * Copy the partial or complete payload from remaining - * bytes in the FIFO in order to optimize the makeroom call - * in the AUXDATA2 state. We have to fall-through either - * way so we can check the crc. + * Copy the partial or complete [decrypted] payload from + * remaining bytes in the FIFO in order to optimize the + * makeroom call in the AUXDATA2 state. We have to + * fall-through either way so we can check the crc. * * msg->aux_size tracks our aux data. + * + * (Lets not complicate matters if the data is encrypted, + * since the data in-stream is not the same size as the + * data decrypted). */ if (bytes >= ioq->abytes) { bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data, @@ -941,6 +991,8 @@ again: xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes); if (xcrc32 != msg->any.head.aux_crc) { ioq->error = DMSG_IOQ_ERROR_ACRC; + fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n", + xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes); break; } break; @@ -981,6 +1033,7 @@ again: */ if (ioq->error) { skip: + fprintf(stderr, "IOQ ERROR %d\n", ioq->error); /* * An unrecoverable error causes all active receive * transactions to be terminated with a LNK_ERROR message. @@ -1026,6 +1079,12 @@ skip: */ if (state->rxcmd & DMSGF_DELETE) { dmsg_msg_free(msg); + fprintf(stderr, + "iocom: ioq error %d sleeping\n", + ioq->error); + sleep(1); /* XXX */ + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_RWORK); msg = NULL; } else { /*state->txcmd |= DMSGF_DELETE;*/ @@ -1042,6 +1101,12 @@ skip: */ if (state->rxcmd & DMSGF_DELETE) { dmsg_msg_free(msg); + fprintf(stderr, + "iocom: ioq error %d sleeping\n", + ioq->error); + sleep(1); /* XXX */ + atomic_set_int(&iocom->flags, + DMSG_IOCOMF_RWORK); msg = NULL; } else { msg->state = state; @@ -1061,7 +1126,7 @@ skip: * Generate a final LNK_ERROR and flag EOF. */ msg->state = NULL; - iocom->flags |= DMSG_IOCOMF_EOF; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF); fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd); } pthread_mutex_unlock(&iocom->mtx); @@ -1077,7 +1142,7 @@ skip: * re-set RWORK. */ if (msg) - iocom->flags |= DMSG_IOCOMF_RWORK; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); } else if (msg == NULL) { /* * Insufficient data received to finish building the message, @@ -1086,7 +1151,7 @@ skip: * Leave ioq->msg intact. * Leave the FIFO intact. */ - iocom->flags |= DMSG_IOCOMF_RREQ; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ); } else { /* * Continue processing msg. @@ -1100,13 +1165,13 @@ skip: */ if (ioq->fifo_beg == ioq->fifo_cdx && ioq->fifo_cdn == ioq->fifo_end) { - iocom->flags |= DMSG_IOCOMF_RREQ; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ); ioq->fifo_cdx = 0; ioq->fifo_cdn = 0; ioq->fifo_beg = 0; ioq->fifo_end = 0; } else { - iocom->flags |= DMSG_IOCOMF_RWORK; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK); } ioq->state = DMSG_MSGQ_STATE_HEADER1; ioq->msg = NULL; @@ -1118,6 +1183,13 @@ skip: * * State processing only occurs for messages destined for us. */ + if (DMsgDebugOpt >= 5) { + fprintf(stderr, + "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n", + msg->any.head.cmd, + (intmax_t)msg->any.head.msgid, + (intmax_t)msg->any.head.circuit); + } if (msg->any.head.circuit) error = dmsg_circuit_relay(msg); else @@ -1159,7 +1231,7 @@ skip: * A non-NULL msg is added to the queue but not necessarily flushed. * Calling this function with msg == NULL will get a flush going. * - * Caller must hold iocom->mtx. + * (called from iocom_core only) */ void dmsg_iocom_flush1(dmsg_iocom_t *iocom) @@ -1171,7 +1243,7 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) size_t abytes; dmsg_msg_queue_t tmpq; - iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); TAILQ_INIT(&tmpq); pthread_mutex_lock(&iocom->mtx); while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) { @@ -1229,6 +1301,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) /* * Thread localized, iocom->mtx not held by caller. + * + * (called from iocom_core via iocom_flush1 only) */ void dmsg_iocom_flush2(dmsg_iocom_t *iocom) @@ -1310,9 +1384,14 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) /* * Make sure the FIFO has a reasonable amount of space * left (if not completely full). + * + * In this situation we are staging the encrypted message + * data in the FIFO. (nact) represents how much plaintext + * has been staged, (n) represents how much encrypted data + * has been flushed. The two are independent of each other. */ if (ioq->fifo_beg > sizeof(ioq->buf) / 2 && - sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) { + sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) { bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, ioq->fifo_end - ioq->fifo_beg); ioq->fifo_cdx -= ioq->fifo_beg; @@ -1333,11 +1412,19 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) ioq->fifo_cdx = 0; ioq->fifo_end = 0; } - /* XXX what if interrupted mid-write? */ - } else { - nact = 0; } + /* + * We don't mess with the nact returned by the crypto_encrypt + * call, which represents the filling of the FIFO. (n) tells + * us how much we were able to write from the FIFO. The two + * are different beasts when encrypting. + */ } else { + /* + * In this situation we are not staging the messages to the + * FIFO but instead writing them directly from the msg + * structure(s), so (nact) is basically (n). + */ n = writev(iocom->sock_fd, iov, iovcnt); if (n > 0) nact = n; @@ -1368,6 +1455,15 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) break; } nact -= abytes - ioq->abytes; + /* ioq->abytes = abytes; optimized out */ + + if (DMsgDebugOpt >= 5) { + fprintf(stderr, + "txmsg cmd=%08x msgid=%016jx circ=%016jx\n", + msg->any.head.cmd, + (intmax_t)msg->any.head.msgid, + (intmax_t)msg->any.head.circuit); + } TAILQ_REMOVE(&ioq->msgq, msg, qentry); --ioq->msgcount; @@ -1394,10 +1490,10 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) /* * Wait for socket buffer space */ - iocom->flags |= DMSG_IOCOMF_WREQ; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ); } } else { - iocom->flags |= DMSG_IOCOMF_WREQ; + atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ); } if (ioq->error) { dmsg_iocom_drain(iocom); @@ -1410,7 +1506,7 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) * the caller to pull off our contrived terminal error msg to detect * the connection failure. * - * Thread localized, iocom->mtx not held by caller. + * Localized to iocom_core thread, iocom->mtx not held by caller. */ void dmsg_iocom_drain(dmsg_iocom_t *iocom) @@ -1418,7 +1514,7 @@ dmsg_iocom_drain(dmsg_iocom_t *iocom) dmsg_ioq_t *ioq = &iocom->ioq_tx; dmsg_msg_t *msg; - iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); + atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); ioq->hbytes = 0; ioq->abytes = 0; @@ -1737,6 +1833,7 @@ dmsg_state_msgrx(dmsg_msg_t *msg) { dmsg_iocom_t *iocom = msg->iocom; dmsg_circuit_t *circuit; + dmsg_circuit_t *ocircuit; dmsg_state_t *state; dmsg_state_t sdummy; dmsg_circuit_t cdummy; @@ -1753,11 +1850,18 @@ dmsg_state_msgrx(dmsg_msg_t *msg) cdummy.msgid = msg->any.head.circuit; circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &cdummy); - if (circuit == NULL) + if (circuit == NULL) { + pthread_mutex_unlock(&iocom->mtx); return (DMSG_IOQ_ERROR_BAD_CIRCUIT); + } } + + /* + * Replace circuit0 with actual + */ + dmsg_circuit_hold(circuit); + ocircuit = msg->circuit; msg->circuit = circuit; - ++circuit->refs; /* * If received msg is a command state is on staterd_tree. @@ -1772,7 +1876,10 @@ dmsg_state_msgrx(dmsg_msg_t *msg) &sdummy); } msg->state = state; + pthread_mutex_unlock(&iocom->mtx); + if (ocircuit) + dmsg_circuit_drop(ocircuit); /* * Short-cut one-off or mid-stream messages (state may be NULL). @@ -2046,6 +2153,16 @@ dmsg_state_free(dmsg_state_t *state) free(state); } +/* + * Called with iocom locked + */ +void +dmsg_circuit_hold(dmsg_circuit_t *circuit) +{ + assert(circuit->refs > 0); /* caller must hold ref */ + atomic_add_int(&circuit->refs, 1); /* to safely add more */ +} + /* * Called with iocom locked */ @@ -2061,13 +2178,16 @@ dmsg_circuit_drop(dmsg_circuit_t *circuit) /* * Decrement circuit refs, destroy circuit when refs drops to 0. */ - if (--circuit->refs > 0) + if (atomic_fetchadd_int(&circuit->refs, -1) != 1) return; + assert(circuit != &iocom->circuit0); assert(RB_EMPTY(&circuit->staterd_tree)); assert(RB_EMPTY(&circuit->statewr_tree)); + pthread_mutex_lock(&iocom->mtx); RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit); circuit->iocom = NULL; + pthread_mutex_unlock(&iocom->mtx); dmsg_free(circuit); /* @@ -2087,6 +2207,29 @@ dmsg_circuit_drop(dmsg_circuit_t *circuit) } } +void +dmsg_circuit_drop_locked(dmsg_circuit_t *circuit) +{ + dmsg_iocom_t *iocom; + + iocom = circuit->iocom; + assert(circuit->refs > 0); + assert(iocom); + + if (atomic_fetchadd_int(&circuit->refs, -1) == 1) { + assert(circuit != &iocom->circuit0); + assert(RB_EMPTY(&circuit->staterd_tree)); + assert(RB_EMPTY(&circuit->statewr_tree)); + RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit); + circuit->iocom = NULL; + dmsg_free(circuit); + if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) { + char dummy = 0; + write(iocom->wakeupfds[1], &dummy, 1); + } + } +} + /* * This swaps endian for a hammer2_msg_hdr. Note that the extended * header is not adjusted, just the core header. diff --git a/lib/libdmsg/msg_lnk.c b/lib/libdmsg/msg_lnk.c index 2544d8b727..e2e2b737cf 100644 --- a/lib/libdmsg/msg_lnk.c +++ b/lib/libdmsg/msg_lnk.c @@ -792,6 +792,9 @@ dmsg_lnk_circ(dmsg_msg_t *msg) /*pthread_mutex_lock(&cluster_mtx);*/ + if (DMsgDebugOpt >= 4) + fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd); + switch (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)) { @@ -825,10 +828,17 @@ dmsg_lnk_circ(dmsg_msg_t *msg) &dummy); pthread_mutex_unlock(&iocomA->mtx); if (tx_state == NULL) { + /* XXX SMP race */ fprintf(stderr, "dmsg_lnk_circ: no circuit\n"); dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC); break; } + if (tx_state->icmd != DMSG_LNK_SPAN) { + /* XXX SMP race */ + fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n"); + dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC); + break; + } /* locate h2span_link */ rx_state = tx_state->any.relay->source_rt; @@ -841,7 +851,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg) * it received from us as . */ circA = dmsg_alloc(sizeof(*circA)); - circA->iocom = iocomA; + dmsg_circuit_init(iocomA, circA); circA->state = msg->state; /* LNK_CIRC state */ circA->msgid = msg->state->msgid; circA->span_state = tx_state; /* H2SPAN_RELAY state */ @@ -852,6 +862,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg) iocomB = rx_state->iocom; circB = dmsg_alloc(sizeof(*circB)); + dmsg_circuit_init(iocomB, circB); /* * Create a LNK_CIRC transaction on B @@ -860,23 +871,40 @@ dmsg_lnk_circ(dmsg_msg_t *msg) 0, DMSG_LNK_CIRC | DMSGF_CREATE, dmsg_lnk_circ, circB); fwd_msg->state->any.circ = circB; - circB->iocom = iocomB; + fwd_msg->any.lnk_circ.target = rx_state->msgid; circB->state = fwd_msg->state; /* LNK_CIRC state */ circB->msgid = fwd_msg->any.head.msgid; circB->span_state = rx_state; /* H2SPAN_LINK state */ circB->is_relay = 0; circB->refs = 2; /* state and peer */ + if (DMsgDebugOpt >= 4) + fprintf(stderr, "CIRC forward %p->%p\n", circA, circB); + /* * Link the two circuits together. */ circA->peer = circB; circB->peer = circA; + if (iocomA < iocomB) { + pthread_mutex_lock(&iocomA->mtx); + pthread_mutex_lock(&iocomB->mtx); + } else { + pthread_mutex_lock(&iocomB->mtx); + pthread_mutex_lock(&iocomA->mtx); + } if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA)) assert(0); if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB)) assert(0); + if (iocomA < iocomB) { + pthread_mutex_unlock(&iocomB->mtx); + pthread_mutex_unlock(&iocomA->mtx); + } else { + pthread_mutex_unlock(&iocomA->mtx); + pthread_mutex_unlock(&iocomB->mtx); + } dmsg_msg_write(fwd_msg); @@ -1270,6 +1298,7 @@ dmsg_relay_scan_specific(h2span_node_t *node, h2span_conn_t *conn) */ while (relay && relay->source_rt->any.link->node == node) { next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay); + fprintf(stderr, "RELAY DELETE FROM EXTRAS\n"); dmsg_relay_delete(relay); relay = next_relay; } @@ -1331,6 +1360,7 @@ dmsg_lnk_relay(dmsg_msg_t *msg) if (msg->any.head.cmd & DMSGF_DELETE) { pthread_mutex_lock(&cluster_mtx); + fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n"); if ((relay = state->any.relay) != NULL) { dmsg_relay_delete(relay); } else { @@ -1340,7 +1370,9 @@ dmsg_lnk_relay(dmsg_msg_t *msg) } } - +/* + * cluster_mtx held by caller + */ static void dmsg_relay_delete(h2span_relay_t *relay) @@ -1480,17 +1512,26 @@ dmsg_circuit_relay(dmsg_msg_t *msg) /* * Lookup the circuit on the incoming iocom. */ - pthread_mutex_lock(&cluster_mtx); + pthread_mutex_lock(&iocom->mtx); dummy.msgid = msg->any.head.circuit; circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy); assert(circ); peer = circ->peer; + dmsg_circuit_hold(peer); + + if (DMsgDebugOpt >= 4) { + fprintf(stderr, + "CIRC relay %08x %p->%p\n", + msg->any.head.cmd, circ, peer); + } msg->iocom = peer->iocom; msg->any.head.circuit = peer->msgid; + dmsg_circuit_drop_locked(msg->circuit); + msg->circuit = peer; - pthread_mutex_unlock(&cluster_mtx); + pthread_mutex_unlock(&iocom->mtx); dmsg_msg_write(msg); error = DMSG_IOQ_ERROR_ROUTED; @@ -1560,6 +1601,8 @@ dmsg_node_get(h2span_cluster_t *cls, uuid_t *pfs_fsid) /* * Dumps the spanning tree + * + * DEBUG ONLY */ void dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) @@ -1567,6 +1610,7 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) h2span_cluster_t *cls; h2span_node_t *node; h2span_link_t *slink; + h2span_relay_t *relay; char *uustr = NULL; pthread_mutex_lock(&cluster_mtx); @@ -1582,9 +1626,18 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) node->fs_label); RB_FOREACH(slink, h2span_link_tree, &node->tree) { dmsg_circuit_printf(circuit, - "\tLink dist=%d via %d\n", + "\tSLink msgid %016jx " + "dist=%d via %d\n", + (intmax_t)slink->state->msgid, slink->dist, slink->state->iocom->sock_fd); + TAILQ_FOREACH(relay, &slink->relayq, entry) { + dmsg_circuit_printf(circuit, + "\t Relay-out msgid %016jx " + "via %d\n", + (intmax_t)relay->target_rt->msgid, + relay->target_rt->iocom->sock_fd); + } } } } @@ -1597,6 +1650,39 @@ dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused) #endif } +/* + * DEBUG ONLY + * + * Locate the state representing an incoming LNK_SPAN given its msgid. + */ +int +dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep) +{ + h2span_cluster_t *cls; + h2span_node_t *node; + h2span_link_t *slink; + h2span_relay_t *relay; + + pthread_mutex_lock(&cluster_mtx); + relay = NULL; + RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) { + RB_FOREACH(node, h2span_node_tree, &cls->tree) { + RB_FOREACH(slink, h2span_link_tree, &node->tree) { + if (slink->state->msgid == msgid) { + *statep = slink->state; + goto found; + } + } + } + } + pthread_mutex_unlock(&cluster_mtx); + *statep = NULL; + return(ENOENT); +found: + pthread_mutex_unlock(&cluster_mtx); + return(0); +} + /* * Random number sub-sort value to add to SPAN rnss fields on relay. * This allows us to differentiate spans with the same field