size_t fifo_cdn; /* end-cdn unprocessed */
size_t fifo_end;
size_t hbytes; /* header size */
- size_t abytes; /* aux_data size */
+ size_t abytes; /* aligned aux_data size */
+ size_t unaligned_aux_size; /* actual aux_data size */
int error;
int seq; /* salt sequencer */
int msgcount;
* dmsg_iocom - governs a messaging stream connection
*/
struct dmsg_iocom {
+ char *label; /* label for error reporting */
dmsg_ioq_t ioq_rx;
dmsg_ioq_t ioq_tx;
- dmsg_msg_queue_t freeq; /* free msgs hdr only */
+ dmsg_msg_queue_t freeq; /* free msgs hdr only */
dmsg_msg_queue_t freeq_aux; /* free msgs w/aux_data */
int sock_fd; /* comm socket or pipe */
int alt_fd; /* thread signal, tty, etc */
struct dmsg_master_service_info {
int fd;
int detachme;
+ char *label;
void *handle;
void (*dbgmsg_callback)(dmsg_msg_t *msg);
void (*exit_callback)(void *handle);
void (*state_func)(dmsg_iocom_t *),
void (*rcvmsg_func)(dmsg_msg_t *),
void (*altmsg_func)(dmsg_iocom_t *));
+void dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...);
void dmsg_iocom_signal(dmsg_iocom_t *iocom);
void dmsg_iocom_done(dmsg_iocom_t *iocom);
void dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit);
bzero(iocom, sizeof(*iocom));
+ asprintf(&iocom->label, "iocom-%p", iocom);
iocom->signal_callback = signal_func;
iocom->rcvmsg_callback = rcvmsg_func;
iocom->altmsg_callback = altmsg_func;
#endif
}
+void
+dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
+{
+ va_list va;
+ char *optr;
+
+ va_start(va, ctl);
+ optr = iocom->label;
+ vasprintf(&iocom->label, ctl, va);
+ va_end(va);
+ if (optr)
+ free(optr);
+}
+
/*
* May only be called from a callback from iocom_core.
*
dmsg_state_t *state = NULL;
dmsg_msg_t *msg;
int hbytes;
+ size_t aligned_size;
pthread_mutex_lock(&iocom->mtx);
if (aux_size) {
- aux_size = (aux_size + DMSG_ALIGNMASK) &
- ~DMSG_ALIGNMASK;
+ aligned_size = DMSG_DOALIGN(aux_size);
if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
} else {
+ aligned_size = 0;
if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
TAILQ_REMOVE(&iocom->freeq, msg, qentry);
}
msg->aux_data = NULL;
msg->aux_size = 0;
}
+
+ /*
+ * [re]allocate the auxillary data buffer. The caller knows that
+ * a size-aligned buffer will be allocated but we do not want to
+ * force the caller to zero any tail piece, so we do that ourself.
+ */
if (msg->aux_size != aux_size) {
if (msg->aux_data) {
free(msg->aux_data);
msg->aux_size = 0;
}
if (aux_size) {
- msg->aux_data = malloc(aux_size);
+ msg->aux_data = malloc(aligned_size);
msg->aux_size = aux_size;
+ if (aux_size != aligned_size) {
+ bzero(msg->aux_data + aux_size,
+ aligned_size - aux_size);
+ }
}
}
hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
ssize_t n;
size_t bytes;
size_t nmax;
+ uint32_t aux_size;
uint32_t xcrc32;
int error;
head = (void *)(ioq->buf + ioq->fifo_beg);
if (head->magic != DMSG_HDR_MAGIC &&
head->magic != DMSG_HDR_MAGIC_REV) {
+ fprintf(stderr, "%s: head->magic is bad %02x\n",
+ iocom->label, head->magic);
+ if (iocom->flags & DMSG_IOCOMF_CRYPTED)
+ fprintf(stderr, "(on encrypted link)\n");
ioq->error = DMSG_IOQ_ERROR_SYNC;
break;
}
if (head->magic == DMSG_HDR_MAGIC_REV) {
ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
DMSG_ALIGN;
- ioq->abytes = bswap32(head->aux_bytes) *
- DMSG_ALIGN;
+ aux_size = bswap32(head->aux_bytes);
} else {
ioq->hbytes = (head->cmd & DMSGF_SIZE) *
DMSG_ALIGN;
- ioq->abytes = head->aux_bytes * DMSG_ALIGN;
+ aux_size = head->aux_bytes;
}
+ ioq->abytes = DMSG_DOALIGN(aux_size);
+ ioq->unaligned_aux_size = aux_size;
if (ioq->hbytes < sizeof(msg->any.head) ||
ioq->hbytes > sizeof(msg->any) ||
ioq->abytes > DMSG_AUX_MAX) {
/*
* Allocate the message, the next state will fill it in.
+ * Note that the actual buffer will be sized to an aligned
+ * value and the aligned remainder zero'd for convenience.
*/
- msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0,
+ msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0,
NULL, NULL);
ioq->msg = msg;
/*
* Insufficient data accumulated (set msg NULL so caller will
* retry on event).
+ *
+ * Assert the auxillary data size is correct, then record the
+ * original unaligned size from the message header.
*/
if (msg->aux_size < ioq->abytes) {
msg = NULL;
break;
}
assert(msg->aux_size == ioq->abytes);
+ msg->aux_size = ioq->unaligned_aux_size;
/*
- * Check aux_crc, then we are done.
+ * Check aux_crc, then we are done. Note that the crc
+ * is calculated over the aligned size, not the actual
+ * size.
*/
- xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
+ xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
if (xcrc32 != msg->any.head.aux_crc) {
ioq->error = DMSG_IOQ_ERROR_ACRC;
break;
dmsg_ioq_t *ioq = &iocom->ioq_tx;
dmsg_msg_t *msg;
uint32_t xcrc32;
- int hbytes;
+ size_t hbytes;
+ size_t abytes;
dmsg_msg_queue_t tmpq;
iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
* Calculate aux_crc if 0, then calculate hdr_crc.
*/
if (msg->aux_size && msg->any.head.aux_crc == 0) {
- assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
- xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
+ abytes = DMSG_DOALIGN(msg->aux_size);
+ xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
msg->any.head.aux_crc = xcrc32;
}
- msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN;
- assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
+ msg->any.head.aux_bytes = msg->aux_size;
hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
DMSG_ALIGN;
ioq->fifo_cdx = 0;
ioq->fifo_end = 0;
}
+ nact = n;
+ } else {
+ nact = 0;
}
} else {
n = writev(iocom->sock_fd, iov, iovcnt);
assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
if (msg->any.head.cmd & DMSGF_CREATE) {
state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
- state->icmd = state->txcmd & DMSGF_BASECMDMASK;
}
}
fprintf(stderr, "terminate state %p id=%08x\n",
state, (uint32_t)state->msgid);
}
+ fprintf(stderr,
+ "dmsg_state_free state %p any.any %p func %p icmd %08x\n",
+ state, state->any.any, state->func, state->icmd);
+ if (state->any.any != NULL) /* XXX avoid deadlock w/exit & kernel */
+ closefrom(3);
assert(state->any.any == NULL);
msg = state->msg;
state->msg = NULL;
pthread_mutex_lock(&cluster_mtx);
+ fprintf(stderr, "dmsg_lnk_conn: msg %p cmd %08x state %p txcmd %08x rxcmd %08x\n",
+ msg, msg->any.head.cmd, state, state->txcmd, state->rxcmd);
+
switch(msg->any.head.cmd & DMSGF_TRANSMASK) {
case DMSG_LNK_CONN | DMSGF_CREATE:
case DMSG_LNK_CONN | DMSGF_CREATE | DMSGF_DELETE:
dmsg_msg_t *fwd_msg;
dmsg_iocom_t *iocomA;
dmsg_iocom_t *iocomB;
+ int disconnect;
/*pthread_mutex_lock(&cluster_mtx);*/
assert(msg->state == circA->state);
/*
- * If we are closing A and the peer B is closed, disconnect.
+ * We are closing B's send side. If B's receive side is
+ * already closed we disconnect the circuit from B's state.
*/
+ disconnect = 0;
if (circB && (state = circB->state) != NULL) {
if (state->rxcmd & DMSGF_DELETE) {
circB->state = NULL;
dmsg_circuit_drop(circB);
}
dmsg_state_reply(state, msg->any.head.error);
+ disconnect = 1;
+ }
+
+ /*
+ * We received a close on A. If A's send side is already
+ * closed we disconnect the circuit from A's state.
+ */
+ if (circA && (state = circA->state) != NULL) {
+ if (state->txcmd & DMSGF_DELETE) {
+ circA->state = NULL;
+ state->any.circ = NULL;
+ dmsg_circuit_drop(circA);
+ }
+ disconnect = 1;
}
/*
- * If both sides now closed terminate the peer association
- * and the state association. This may drop up to two refs
- * on circA and one on circB.
+ * Disconnect the peer<->peer association
*/
- if (circA->state->txcmd & DMSGF_DELETE) {
+ if (disconnect) {
if (circB) {
circA->peer = NULL;
circB->peer = NULL;
dmsg_circuit_drop(circA);
dmsg_circuit_drop(circB); /* XXX SMP */
}
- circA->state->any.circ = NULL;
- circA->state = NULL;
- dmsg_circuit_drop(circA);
}
break;
case DMSGF_REPLY | DMSGF_CREATE:
* via the virtual circuit before seeing this reply.
*/
circB = msg->state->any.circ;
+ assert(circB);
circA = circB->peer;
assert(msg->state == circB->state);
- if (circA && (msg->any.head.cmd & DMSGF_DELETE) == 0) {
+ assert(circA);
+ if ((msg->any.head.cmd & DMSGF_DELETE) == 0) {
dmsg_state_result(circA->state, msg->any.head.error);
break;
}
assert(msg->state == circB->state);
/*
- * If we are closing A and the peer B is closed, disconnect.
+ * We received a close on (B), propagate to (A). If we have
+ * already received the close from (A) we disconnect the state.
*/
+ disconnect = 0;
if (circA && (state = circA->state) != NULL) {
if (state->rxcmd & DMSGF_DELETE) {
circA->state = NULL;
dmsg_circuit_drop(circA);
}
dmsg_state_reply(state, msg->any.head.error);
+ disconnect = 1;
+ }
+
+ /*
+ * We received a close on (B). If (B)'s send side is already
+ * closed we disconnect the state.
+ */
+ if (circB && (state = circB->state) != NULL) {
+ if (state->txcmd & DMSGF_DELETE) {
+ circB->state = NULL;
+ state->any.circ = NULL;
+ dmsg_circuit_drop(circB);
+ }
+ disconnect = 1;
}
/*
- * If both sides now closed terminate the peer association
- * and the state association. This may drop up to two refs
- * on circA and one on circB.
+ * Disconnect the peer<->peer association
*/
- if (circB->state->txcmd & DMSGF_DELETE) {
+ if (disconnect) {
if (circA) {
circB->peer = NULL;
circA->peer = NULL;
dmsg_circuit_drop(circB);
dmsg_circuit_drop(circA); /* XXX SMP */
}
- circB->state->any.circ = NULL;
- circB->state = NULL;
- dmsg_circuit_drop(circB);
}
break;
}
pthread_mutex_unlock(&cluster_mtx);
- fprintf(stderr, "ROUTE MESSAGE VC %08x to %08x\n",
- (uint32_t)circ->msgid, (uint32_t)peer->msgid); /* brevity */
dmsg_msg_write(msg);
error = DMSG_IOQ_ERROR_ROUTED;
master_auth_rxmsg,
info->dbgmsg_callback,
NULL);
+ if (info->label) {
+ dmsg_iocom_label(&iocom, "%s", info->label);
+ free(info->label);
+ info->label = NULL;
+ }
dmsg_iocom_core(&iocom);
fprintf(stderr,
info->fd = fd;
info->detachme = 1;
info->dbgmsg_callback = hammer2_shell_parse;
+ info->label = strdup("client");
pthread_create(&thread, NULL, dmsg_master_service, info);
}
return (NULL);
info->fd = pipefds[1];
info->detachme = 1;
info->dbgmsg_callback = hammer2_shell_parse;
+ info->label = strdup("hammer2");
pthread_create(&thread, NULL, dmsg_master_service, info);
}
info->dbgmsg_callback = hammer2_shell_parse;
info->exit_callback = disk_disconnect;
info->handle = dc;
+ info->label = strdup(dc->disk);
pthread_create(&thread, NULL, dmsg_master_service, info);
}
info->handle = xdisk;
xdisk->servicing = 1;
xdisk->servicefd = info->fd;
+ info->label = strdup(xdisk->cl_label);
pthread_create(&thread, NULL, dmsg_master_service, info);
/*