From f306de83941839488b397fcc6607a535506d4a52 Mon Sep 17 00:00:00 2001 From: Matthew Dillon Date: Sun, 2 Dec 2012 21:37:55 -0800 Subject: [PATCH] cluster - circuit tracking, bug fixes * Misc virtual circuit tracking work and bug fixes --- lib/libdmsg/dmsg.h | 8 +++- lib/libdmsg/msg.c | 77 ++++++++++++++++++++++++++++++-------- lib/libdmsg/msg_lnk.c | 64 +++++++++++++++++++++---------- lib/libdmsg/service.c | 5 +++ sbin/hammer2/cmd_service.c | 4 ++ 5 files changed, 122 insertions(+), 36 deletions(-) diff --git a/lib/libdmsg/dmsg.h b/lib/libdmsg/dmsg.h index 2a3eb7be40..49fbca2f15 100644 --- a/lib/libdmsg/dmsg.h +++ b/lib/libdmsg/dmsg.h @@ -228,7 +228,8 @@ struct dmsg_ioq { size_t fifo_cdn; /* end-cdn unprocessed */ size_t fifo_end; size_t hbytes; /* header size */ - size_t abytes; /* aux_data size */ + size_t abytes; /* aligned aux_data size */ + size_t unaligned_aux_size; /* actual aux_data size */ int error; int seq; /* salt sequencer */ int msgcount; @@ -272,9 +273,10 @@ typedef struct dmsg_ioq dmsg_ioq_t; * dmsg_iocom - governs a messaging stream connection */ struct dmsg_iocom { + char *label; /* label for error reporting */ dmsg_ioq_t ioq_rx; dmsg_ioq_t ioq_tx; - dmsg_msg_queue_t freeq; /* free msgs hdr only */ + dmsg_msg_queue_t freeq; /* free msgs hdr only */ dmsg_msg_queue_t freeq_aux; /* free msgs w/aux_data */ int sock_fd; /* comm socket or pipe */ int alt_fd; /* thread signal, tty, etc */ @@ -328,6 +330,7 @@ struct crypto_algo { struct dmsg_master_service_info { int fd; int detachme; + char *label; void *handle; void (*dbgmsg_callback)(dmsg_msg_t *msg); void (*exit_callback)(void *handle); @@ -381,6 +384,7 @@ void dmsg_iocom_restate(dmsg_iocom_t *iocom, void (*state_func)(dmsg_iocom_t *), void (*rcvmsg_func)(dmsg_msg_t *), void (*altmsg_func)(dmsg_iocom_t *)); +void dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...); void dmsg_iocom_signal(dmsg_iocom_t *iocom); void dmsg_iocom_done(dmsg_iocom_t *iocom); void dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit); diff --git a/lib/libdmsg/msg.c b/lib/libdmsg/msg.c index 8c0b3aecf8..0006b5a7d1 100644 --- a/lib/libdmsg/msg.c +++ b/lib/libdmsg/msg.c @@ -120,6 +120,7 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, bzero(iocom, sizeof(*iocom)); + asprintf(&iocom->label, "iocom-%p", iocom); iocom->signal_callback = signal_func; iocom->rcvmsg_callback = rcvmsg_func; iocom->altmsg_callback = altmsg_func; @@ -167,6 +168,20 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd, #endif } +void +dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...) +{ + va_list va; + char *optr; + + va_start(va, ctl); + optr = iocom->label; + vasprintf(&iocom->label, ctl, va); + va_end(va); + if (optr) + free(optr); +} + /* * May only be called from a callback from iocom_core. * @@ -263,14 +278,15 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, dmsg_state_t *state = NULL; dmsg_msg_t *msg; int hbytes; + size_t aligned_size; pthread_mutex_lock(&iocom->mtx); if (aux_size) { - aux_size = (aux_size + DMSG_ALIGNMASK) & - ~DMSG_ALIGNMASK; + aligned_size = DMSG_DOALIGN(aux_size); if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry); } else { + aligned_size = 0; if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) TAILQ_REMOVE(&iocom->freeq, msg, qentry); } @@ -311,6 +327,12 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, msg->aux_data = NULL; msg->aux_size = 0; } + + /* + * [re]allocate the auxillary data buffer. The caller knows that + * a size-aligned buffer will be allocated but we do not want to + * force the caller to zero any tail piece, so we do that ourself. + */ if (msg->aux_size != aux_size) { if (msg->aux_data) { free(msg->aux_data); @@ -318,8 +340,12 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit, msg->aux_size = 0; } if (aux_size) { - msg->aux_data = malloc(aux_size); + msg->aux_data = malloc(aligned_size); msg->aux_size = aux_size; + if (aux_size != aligned_size) { + bzero(msg->aux_data + aux_size, + aligned_size - aux_size); + } } } hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN; @@ -569,6 +595,7 @@ dmsg_ioq_read(dmsg_iocom_t *iocom) ssize_t n; size_t bytes; size_t nmax; + uint32_t aux_size; uint32_t xcrc32; int error; @@ -664,6 +691,10 @@ again: head = (void *)(ioq->buf + ioq->fifo_beg); if (head->magic != DMSG_HDR_MAGIC && head->magic != DMSG_HDR_MAGIC_REV) { + fprintf(stderr, "%s: head->magic is bad %02x\n", + iocom->label, head->magic); + if (iocom->flags & DMSG_IOCOMF_CRYPTED) + fprintf(stderr, "(on encrypted link)\n"); ioq->error = DMSG_IOQ_ERROR_SYNC; break; } @@ -674,13 +705,14 @@ again: if (head->magic == DMSG_HDR_MAGIC_REV) { ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) * DMSG_ALIGN; - ioq->abytes = bswap32(head->aux_bytes) * - DMSG_ALIGN; + aux_size = bswap32(head->aux_bytes); } else { ioq->hbytes = (head->cmd & DMSGF_SIZE) * DMSG_ALIGN; - ioq->abytes = head->aux_bytes * DMSG_ALIGN; + aux_size = head->aux_bytes; } + ioq->abytes = DMSG_DOALIGN(aux_size); + ioq->unaligned_aux_size = aux_size; if (ioq->hbytes < sizeof(msg->any.head) || ioq->hbytes > sizeof(msg->any) || ioq->abytes > DMSG_AUX_MAX) { @@ -690,8 +722,10 @@ again: /* * Allocate the message, the next state will fill it in. + * Note that the actual buffer will be sized to an aligned + * value and the aligned remainder zero'd for convenience. */ - msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0, + msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0, NULL, NULL); ioq->msg = msg; @@ -888,17 +922,23 @@ again: /* * Insufficient data accumulated (set msg NULL so caller will * retry on event). + * + * Assert the auxillary data size is correct, then record the + * original unaligned size from the message header. */ if (msg->aux_size < ioq->abytes) { msg = NULL; break; } assert(msg->aux_size == ioq->abytes); + msg->aux_size = ioq->unaligned_aux_size; /* - * Check aux_crc, then we are done. + * Check aux_crc, then we are done. Note that the crc + * is calculated over the aligned size, not the actual + * size. */ - xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size); + xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes); if (xcrc32 != msg->any.head.aux_crc) { ioq->error = DMSG_IOQ_ERROR_ACRC; break; @@ -1127,7 +1167,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) dmsg_ioq_t *ioq = &iocom->ioq_tx; dmsg_msg_t *msg; uint32_t xcrc32; - int hbytes; + size_t hbytes; + size_t abytes; dmsg_msg_queue_t tmpq; iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK); @@ -1165,12 +1206,11 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom) * Calculate aux_crc if 0, then calculate hdr_crc. */ if (msg->aux_size && msg->any.head.aux_crc == 0) { - assert((msg->aux_size & DMSG_ALIGNMASK) == 0); - xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size); + abytes = DMSG_DOALIGN(msg->aux_size); + xcrc32 = dmsg_icrc32(msg->aux_data, abytes); msg->any.head.aux_crc = xcrc32; } - msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN; - assert((msg->aux_size & DMSG_ALIGNMASK) == 0); + msg->any.head.aux_bytes = msg->aux_size; hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN; @@ -1293,6 +1333,9 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom) ioq->fifo_cdx = 0; ioq->fifo_end = 0; } + nact = n; + } else { + nact = 0; } } else { n = writev(iocom->sock_fd, iov, iovcnt); @@ -1421,7 +1464,6 @@ dmsg_msg_write(dmsg_msg_t *msg) assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0); if (msg->any.head.cmd & DMSGF_CREATE) { state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE; - state->icmd = state->txcmd & DMSGF_BASECMDMASK; } } @@ -1994,6 +2036,11 @@ dmsg_state_free(dmsg_state_t *state) fprintf(stderr, "terminate state %p id=%08x\n", state, (uint32_t)state->msgid); } + fprintf(stderr, + "dmsg_state_free state %p any.any %p func %p icmd %08x\n", + state, state->any.any, state->func, state->icmd); + if (state->any.any != NULL) /* XXX avoid deadlock w/exit & kernel */ + closefrom(3); assert(state->any.any == NULL); msg = state->msg; state->msg = NULL; diff --git a/lib/libdmsg/msg_lnk.c b/lib/libdmsg/msg_lnk.c index c76da5548a..73f943a813 100644 --- a/lib/libdmsg/msg_lnk.c +++ b/lib/libdmsg/msg_lnk.c @@ -432,6 +432,9 @@ dmsg_lnk_conn(dmsg_msg_t *msg) pthread_mutex_lock(&cluster_mtx); + fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n", + msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd); + switch(msg->any.head.cmd & DMSGF_TRANSMASK) { case DMSG_LNK_CONN | DMSGF_CREATE: case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE: @@ -785,6 +788,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg) dmsg_msg_t *fwd_msg; dmsg_iocom_t *iocomA; dmsg_iocom_t *iocomB; + int disconnect; /*pthread_mutex_lock(&cluster_mtx);*/ @@ -887,8 +891,10 @@ dmsg_lnk_circ(dmsg_msg_t *msg) assert(msg->state == circA->state); /* - * If we are closing A and the peer B is closed, disconnect. + * We are closing B's send side. If B's receive side is + * already closed we disconnect the circuit from B's state. */ + disconnect = 0; if (circB && (state = circB->state) != NULL) { if (state->rxcmd & DMSGF_DELETE) { circB->state = NULL; @@ -896,23 +902,32 @@ dmsg_lnk_circ(dmsg_msg_t *msg) dmsg_circuit_drop(circB); } dmsg_state_reply(state, msg->any.head.error); + disconnect = 1; + } + + /* + * We received a close on A. If A's send side is already + * closed we disconnect the circuit from A's state. + */ + if (circA && (state = circA->state) != NULL) { + if (state->txcmd & DMSGF_DELETE) { + circA->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circA); + } + disconnect = 1; } /* - * If both sides now closed terminate the peer association - * and the state association. This may drop up to two refs - * on circA and one on circB. + * Disconnect the peer<->peer association */ - if (circA->state->txcmd & DMSGF_DELETE) { + if (disconnect) { if (circB) { circA->peer = NULL; circB->peer = NULL; dmsg_circuit_drop(circA); dmsg_circuit_drop(circB); /* XXX SMP */ } - circA->state->any.circ = NULL; - circA->state = NULL; - dmsg_circuit_drop(circA); } break; case DMSGF_REPLY | DMSGF_CREATE: @@ -924,9 +939,11 @@ dmsg_lnk_circ(dmsg_msg_t *msg) * via the virtual circuit before seeing this reply. */ circB = msg->state->any.circ; + assert(circB); circA = circB->peer; assert(msg->state == circB->state); - if (circA && (msg->any.head.cmd & DMSGF_DELETE) == 0) { + assert(circA); + if ((msg->any.head.cmd & DMSGF_DELETE) == 0) { dmsg_state_result(circA->state, msg->any.head.error); break; } @@ -943,8 +960,10 @@ dmsg_lnk_circ(dmsg_msg_t *msg) assert(msg->state == circB->state); /* - * If we are closing A and the peer B is closed, disconnect. + * We received a close on (B), propagate to (A). If we have + * already received the close from (A) we disconnect the state. */ + disconnect = 0; if (circA && (state = circA->state) != NULL) { if (state->rxcmd & DMSGF_DELETE) { circA->state = NULL; @@ -952,23 +971,32 @@ dmsg_lnk_circ(dmsg_msg_t *msg) dmsg_circuit_drop(circA); } dmsg_state_reply(state, msg->any.head.error); + disconnect = 1; + } + + /* + * We received a close on (B). If (B)'s send side is already + * closed we disconnect the state. + */ + if (circB && (state = circB->state) != NULL) { + if (state->txcmd & DMSGF_DELETE) { + circB->state = NULL; + state->any.circ = NULL; + dmsg_circuit_drop(circB); + } + disconnect = 1; } /* - * If both sides now closed terminate the peer association - * and the state association. This may drop up to two refs - * on circA and one on circB. + * Disconnect the peer<->peer association */ - if (circB->state->txcmd & DMSGF_DELETE) { + if (disconnect) { if (circA) { circB->peer = NULL; circA->peer = NULL; dmsg_circuit_drop(circB); dmsg_circuit_drop(circA); /* XXX SMP */ } - circB->state->any.circ = NULL; - circB->state = NULL; - dmsg_circuit_drop(circB); } break; } @@ -1457,8 +1485,6 @@ dmsg_circuit_relay(dmsg_msg_t *msg) pthread_mutex_unlock(&cluster_mtx); - fprintf(stderr, "ROUTE MESSAGE VC %08x to %08x\n", - (uint32_t)circ->msgid, (uint32_t)peer->msgid); /* brevity */ dmsg_msg_write(msg); error = DMSG_IOQ_ERROR_ROUTED; diff --git a/lib/libdmsg/service.c b/lib/libdmsg/service.c index 43fcb71133..a72fd82d20 100644 --- a/lib/libdmsg/service.c +++ b/lib/libdmsg/service.c @@ -59,6 +59,11 @@ dmsg_master_service(void *data) master_auth_rxmsg, info->dbgmsg_callback, NULL); + if (info->label) { + dmsg_iocom_label(&iocom, "%s", info->label); + free(info->label); + info->label = NULL; + } dmsg_iocom_core(&iocom); fprintf(stderr, diff --git a/sbin/hammer2/cmd_service.c b/sbin/hammer2/cmd_service.c index 2c8c74dc7c..ec7d2730a1 100644 --- a/sbin/hammer2/cmd_service.c +++ b/sbin/hammer2/cmd_service.c @@ -200,6 +200,7 @@ service_thread(void *data) info->fd = fd; info->detachme = 1; info->dbgmsg_callback = hammer2_shell_parse; + info->label = strdup("client"); pthread_create(&thread, NULL, dmsg_master_service, info); } return (NULL); @@ -380,6 +381,7 @@ master_reconnect(const char *mntpt) info->fd = pipefds[1]; info->detachme = 1; info->dbgmsg_callback = hammer2_shell_parse; + info->label = strdup("hammer2"); pthread_create(&thread, NULL, dmsg_master_service, info); } @@ -462,6 +464,7 @@ disk_reconnect(const char *disk) info->dbgmsg_callback = hammer2_shell_parse; info->exit_callback = disk_disconnect; info->handle = dc; + info->label = strdup(dc->disk); pthread_create(&thread, NULL, dmsg_master_service, info); } @@ -507,6 +510,7 @@ xdisk_reconnect(struct service_node_opaque *xdisk) info->handle = xdisk; xdisk->servicing = 1; xdisk->servicefd = info->fd; + info->label = strdup(xdisk->cl_label); pthread_create(&thread, NULL, dmsg_master_service, info); /* -- 2.41.0