cluster - circuit tracking, bug fixes
authorMatthew Dillon <dillon@apollo.backplane.com>
Mon, 3 Dec 2012 05:37:55 +0000 (21:37 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 3 Dec 2012 05:37:55 +0000 (21:37 -0800)
* Misc virtual circuit tracking work and bug fixes

lib/libdmsg/dmsg.h
lib/libdmsg/msg.c
lib/libdmsg/msg_lnk.c
lib/libdmsg/service.c
sbin/hammer2/cmd_service.c

index 2a3eb7b..49fbca2 100644 (file)
@@ -228,7 +228,8 @@ struct dmsg_ioq {
        size_t          fifo_cdn;               /* end-cdn unprocessed */
        size_t          fifo_end;
        size_t          hbytes;                 /* header size */
-       size_t          abytes;                 /* aux_data size */
+       size_t          abytes;                 /* aligned aux_data size */
+       size_t          unaligned_aux_size;     /* actual aux_data size */
        int             error;
        int             seq;                    /* salt sequencer */
        int             msgcount;
@@ -272,9 +273,10 @@ typedef struct dmsg_ioq dmsg_ioq_t;
  * dmsg_iocom - governs a messaging stream connection
  */
 struct dmsg_iocom {
+       char            *label;                 /* label for error reporting */
        dmsg_ioq_t      ioq_rx;
        dmsg_ioq_t      ioq_tx;
-       dmsg_msg_queue_t freeq;         /* free msgs hdr only */
+       dmsg_msg_queue_t freeq;                 /* free msgs hdr only */
        dmsg_msg_queue_t freeq_aux;             /* free msgs w/aux_data */
        int     sock_fd;                        /* comm socket or pipe */
        int     alt_fd;                         /* thread signal, tty, etc */
@@ -328,6 +330,7 @@ struct crypto_algo {
 struct dmsg_master_service_info {
        int     fd;
        int     detachme;
+       char    *label;
        void    *handle;
        void    (*dbgmsg_callback)(dmsg_msg_t *msg);
        void    (*exit_callback)(void *handle);
@@ -381,6 +384,7 @@ void dmsg_iocom_restate(dmsg_iocom_t *iocom,
                        void (*state_func)(dmsg_iocom_t *),
                        void (*rcvmsg_func)(dmsg_msg_t *),
                        void (*altmsg_func)(dmsg_iocom_t *));
+void dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...);
 void dmsg_iocom_signal(dmsg_iocom_t *iocom);
 void dmsg_iocom_done(dmsg_iocom_t *iocom);
 void dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit);
index 8c0b3ae..0006b5a 100644 (file)
@@ -120,6 +120,7 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
 
        bzero(iocom, sizeof(*iocom));
 
+       asprintf(&iocom->label, "iocom-%p", iocom);
        iocom->signal_callback = signal_func;
        iocom->rcvmsg_callback = rcvmsg_func;
        iocom->altmsg_callback = altmsg_func;
@@ -167,6 +168,20 @@ dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
 #endif
 }
 
+void
+dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
+{
+       va_list va;
+       char *optr;
+
+       va_start(va, ctl);
+       optr = iocom->label;
+       vasprintf(&iocom->label, ctl, va);
+       va_end(va);
+       if (optr)
+               free(optr);
+}
+
 /*
  * May only be called from a callback from iocom_core.
  *
@@ -263,14 +278,15 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
        dmsg_state_t *state = NULL;
        dmsg_msg_t *msg;
        int hbytes;
+       size_t aligned_size;
 
        pthread_mutex_lock(&iocom->mtx);
        if (aux_size) {
-               aux_size = (aux_size + DMSG_ALIGNMASK) &
-                          ~DMSG_ALIGNMASK;
+               aligned_size = DMSG_DOALIGN(aux_size);
                if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
        } else {
+               aligned_size = 0;
                if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
                        TAILQ_REMOVE(&iocom->freeq, msg, qentry);
        }
@@ -311,6 +327,12 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
                msg->aux_data = NULL;
                msg->aux_size = 0;
        }
+
+       /*
+        * [re]allocate the auxillary data buffer.  The caller knows that
+        * a size-aligned buffer will be allocated but we do not want to
+        * force the caller to zero any tail piece, so we do that ourself.
+        */
        if (msg->aux_size != aux_size) {
                if (msg->aux_data) {
                        free(msg->aux_data);
@@ -318,8 +340,12 @@ dmsg_msg_alloc(dmsg_circuit_t *circuit,
                        msg->aux_size = 0;
                }
                if (aux_size) {
-                       msg->aux_data = malloc(aux_size);
+                       msg->aux_data = malloc(aligned_size);
                        msg->aux_size = aux_size;
+                       if (aux_size != aligned_size) {
+                               bzero(msg->aux_data + aux_size,
+                                     aligned_size - aux_size);
+                       }
                }
        }
        hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
@@ -569,6 +595,7 @@ dmsg_ioq_read(dmsg_iocom_t *iocom)
        ssize_t n;
        size_t bytes;
        size_t nmax;
+       uint32_t aux_size;
        uint32_t xcrc32;
        int error;
 
@@ -664,6 +691,10 @@ again:
                head = (void *)(ioq->buf + ioq->fifo_beg);
                if (head->magic != DMSG_HDR_MAGIC &&
                    head->magic != DMSG_HDR_MAGIC_REV) {
+                       fprintf(stderr, "%s: head->magic is bad %02x\n",
+                               iocom->label, head->magic);
+                       if (iocom->flags & DMSG_IOCOMF_CRYPTED)
+                               fprintf(stderr, "(on encrypted link)\n");
                        ioq->error = DMSG_IOQ_ERROR_SYNC;
                        break;
                }
@@ -674,13 +705,14 @@ again:
                if (head->magic == DMSG_HDR_MAGIC_REV) {
                        ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
                                      DMSG_ALIGN;
-                       ioq->abytes = bswap32(head->aux_bytes) *
-                                     DMSG_ALIGN;
+                       aux_size = bswap32(head->aux_bytes);
                } else {
                        ioq->hbytes = (head->cmd & DMSGF_SIZE) *
                                      DMSG_ALIGN;
-                       ioq->abytes = head->aux_bytes * DMSG_ALIGN;
+                       aux_size = head->aux_bytes;
                }
+               ioq->abytes = DMSG_DOALIGN(aux_size);
+               ioq->unaligned_aux_size = aux_size;
                if (ioq->hbytes < sizeof(msg->any.head) ||
                    ioq->hbytes > sizeof(msg->any) ||
                    ioq->abytes > DMSG_AUX_MAX) {
@@ -690,8 +722,10 @@ again:
 
                /*
                 * Allocate the message, the next state will fill it in.
+                * Note that the actual buffer will be sized to an aligned
+                * value and the aligned remainder zero'd for convenience.
                 */
-               msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0,
+               msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0,
                                     NULL, NULL);
                ioq->msg = msg;
 
@@ -888,17 +922,23 @@ again:
                /*
                 * Insufficient data accumulated (set msg NULL so caller will
                 * retry on event).
+                *
+                * Assert the auxillary data size is correct, then record the
+                * original unaligned size from the message header.
                 */
                if (msg->aux_size < ioq->abytes) {
                        msg = NULL;
                        break;
                }
                assert(msg->aux_size == ioq->abytes);
+               msg->aux_size = ioq->unaligned_aux_size;
 
                /*
-                * Check aux_crc, then we are done.
+                * Check aux_crc, then we are done.  Note that the crc
+                * is calculated over the aligned size, not the actual
+                * size.
                 */
-               xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
+               xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
                if (xcrc32 != msg->any.head.aux_crc) {
                        ioq->error = DMSG_IOQ_ERROR_ACRC;
                        break;
@@ -1127,7 +1167,8 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
        dmsg_ioq_t *ioq = &iocom->ioq_tx;
        dmsg_msg_t *msg;
        uint32_t xcrc32;
-       int hbytes;
+       size_t hbytes;
+       size_t abytes;
        dmsg_msg_queue_t tmpq;
 
        iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
@@ -1165,12 +1206,11 @@ dmsg_iocom_flush1(dmsg_iocom_t *iocom)
                 * Calculate aux_crc if 0, then calculate hdr_crc.
                 */
                if (msg->aux_size && msg->any.head.aux_crc == 0) {
-                       assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
-                       xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
+                       abytes = DMSG_DOALIGN(msg->aux_size);
+                       xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
                        msg->any.head.aux_crc = xcrc32;
                }
-               msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN;
-               assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
+               msg->any.head.aux_bytes = msg->aux_size;
 
                hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
                         DMSG_ALIGN;
@@ -1293,6 +1333,9 @@ dmsg_iocom_flush2(dmsg_iocom_t *iocom)
                                ioq->fifo_cdx = 0;
                                ioq->fifo_end = 0;
                        }
+                       nact = n;
+               } else {
+                       nact = 0;
                }
        } else {
                n = writev(iocom->sock_fd, iov, iovcnt);
@@ -1421,7 +1464,6 @@ dmsg_msg_write(dmsg_msg_t *msg)
                assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
                if (msg->any.head.cmd & DMSGF_CREATE) {
                        state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
-                       state->icmd = state->txcmd & DMSGF_BASECMDMASK;
                }
        }
 
@@ -1994,6 +2036,11 @@ dmsg_state_free(dmsg_state_t *state)
                fprintf(stderr, "terminate state %p id=%08x\n",
                        state, (uint32_t)state->msgid);
        }
+       fprintf(stderr,
+               "dmsg_state_free state %p any.any %p func %p icmd %08x\n",
+               state, state->any.any, state->func, state->icmd);
+       if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
+               closefrom(3);
        assert(state->any.any == NULL);
        msg = state->msg;
        state->msg = NULL;
index c76da55..73f943a 100644 (file)
@@ -432,6 +432,9 @@ dmsg_lnk_conn(dmsg_msg_t *msg)
 
        pthread_mutex_lock(&cluster_mtx);
 
+       fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n",
+               msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd);
+
        switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
        case DMSG_LNK_CONN | DMSGF_CREATE:
        case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
@@ -785,6 +788,7 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
        dmsg_msg_t *fwd_msg;
        dmsg_iocom_t *iocomA;
        dmsg_iocom_t *iocomB;
+       int disconnect;
 
        /*pthread_mutex_lock(&cluster_mtx);*/
 
@@ -887,8 +891,10 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                assert(msg->state == circA->state);
 
                /*
-                * If we are closing A and the peer B is closed, disconnect.
+                * We are closing B's send side.  If B's receive side is
+                * already closed we disconnect the circuit from B's state.
                 */
+               disconnect = 0;
                if (circB && (state = circB->state) != NULL) {
                        if (state->rxcmd & DMSGF_DELETE) {
                                circB->state = NULL;
@@ -896,23 +902,32 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                dmsg_circuit_drop(circB);
                        }
                        dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on A.  If A's send side is already
+                * closed we disconnect the circuit from A's state.
+                */
+               if (circA && (state = circA->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circA->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circA);
+                       }
+                       disconnect = 1;
                }
 
                /*
-                * If both sides now closed terminate the peer association
-                * and the state association.  This may drop up to two refs
-                * on circA and one on circB.
+                * Disconnect the peer<->peer association
                 */
-               if (circA->state->txcmd & DMSGF_DELETE) {
+               if (disconnect) {
                        if (circB) {
                                circA->peer = NULL;
                                circB->peer = NULL;
                                dmsg_circuit_drop(circA);
                                dmsg_circuit_drop(circB); /* XXX SMP */
                        }
-                       circA->state->any.circ = NULL;
-                       circA->state = NULL;
-                       dmsg_circuit_drop(circA);
                }
                break;
        case DMSGF_REPLY | DMSGF_CREATE:
@@ -924,9 +939,11 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                 * via the virtual circuit before seeing this reply.
                 */
                circB = msg->state->any.circ;
+               assert(circB);
                circA = circB->peer;
                assert(msg->state == circB->state);
-               if (circA && (msg->any.head.cmd & DMSGF_DELETE) == 0) {
+               assert(circA);
+               if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
                        dmsg_state_result(circA->state, msg->any.head.error);
                        break;
                }
@@ -943,8 +960,10 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                assert(msg->state == circB->state);
 
                /*
-                * If we are closing A and the peer B is closed, disconnect.
+                * We received a close on (B), propagate to (A).  If we have
+                * already received the close from (A) we disconnect the state.
                 */
+               disconnect = 0;
                if (circA && (state = circA->state) != NULL) {
                        if (state->rxcmd & DMSGF_DELETE) {
                                circA->state = NULL;
@@ -952,23 +971,32 @@ dmsg_lnk_circ(dmsg_msg_t *msg)
                                dmsg_circuit_drop(circA);
                        }
                        dmsg_state_reply(state, msg->any.head.error);
+                       disconnect = 1;
+               }
+
+               /*
+                * We received a close on (B).  If (B)'s send side is already
+                * closed we disconnect the state.
+                */
+               if (circB && (state = circB->state) != NULL) {
+                       if (state->txcmd & DMSGF_DELETE) {
+                               circB->state = NULL;
+                               state->any.circ = NULL;
+                               dmsg_circuit_drop(circB);
+                       }
+                       disconnect = 1;
                }
 
                /*
-                * If both sides now closed terminate the peer association
-                * and the state association.  This may drop up to two refs
-                * on circA and one on circB.
+                * Disconnect the peer<->peer association
                 */
-               if (circB->state->txcmd & DMSGF_DELETE) {
+               if (disconnect) {
                        if (circA) {
                                circB->peer = NULL;
                                circA->peer = NULL;
                                dmsg_circuit_drop(circB);
                                dmsg_circuit_drop(circA); /* XXX SMP */
                        }
-                       circB->state->any.circ = NULL;
-                       circB->state = NULL;
-                       dmsg_circuit_drop(circB);
                }
                break;
        }
@@ -1457,8 +1485,6 @@ dmsg_circuit_relay(dmsg_msg_t *msg)
 
        pthread_mutex_unlock(&cluster_mtx);
 
-       fprintf(stderr, "ROUTE MESSAGE VC %08x to %08x\n",
-               (uint32_t)circ->msgid, (uint32_t)peer->msgid);  /* brevity */
        dmsg_msg_write(msg);
        error = DMSG_IOQ_ERROR_ROUTED;
 
index 43fcb71..a72fd82 100644 (file)
@@ -59,6 +59,11 @@ dmsg_master_service(void *data)
                           master_auth_rxmsg,
                           info->dbgmsg_callback,
                           NULL);
+       if (info->label) {
+               dmsg_iocom_label(&iocom, "%s", info->label);
+               free(info->label);
+               info->label = NULL;
+       }
        dmsg_iocom_core(&iocom);
 
        fprintf(stderr,
index 2c8c74d..ec7d273 100644 (file)
@@ -200,6 +200,7 @@ service_thread(void *data)
                info->fd = fd;
                info->detachme = 1;
                info->dbgmsg_callback = hammer2_shell_parse;
+               info->label = strdup("client");
                pthread_create(&thread, NULL, dmsg_master_service, info);
        }
        return (NULL);
@@ -380,6 +381,7 @@ master_reconnect(const char *mntpt)
        info->fd = pipefds[1];
        info->detachme = 1;
        info->dbgmsg_callback = hammer2_shell_parse;
+       info->label = strdup("hammer2");
        pthread_create(&thread, NULL, dmsg_master_service, info);
 }
 
@@ -462,6 +464,7 @@ disk_reconnect(const char *disk)
        info->dbgmsg_callback = hammer2_shell_parse;
        info->exit_callback = disk_disconnect;
        info->handle = dc;
+       info->label = strdup(dc->disk);
        pthread_create(&thread, NULL, dmsg_master_service, info);
 }
 
@@ -507,6 +510,7 @@ xdisk_reconnect(struct service_node_opaque *xdisk)
        info->handle = xdisk;
        xdisk->servicing = 1;
        xdisk->servicefd = info->fd;
+       info->label = strdup(xdisk->cl_label);
        pthread_create(&thread, NULL, dmsg_master_service, info);
 
        /*