*/
if (getpeername(iocom->sock_fd, &sa.sa, &salen) < 0) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOPEER;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "accept: getpeername() failed\n");
goto done;
if (getnameinfo(&sa.sa, salen, peername, sizeof(peername),
NULL, 0, NI_NUMERICHOST) < 0) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOPEER;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "accept: cannot decode sockaddr\n");
goto done;
DMSG_PATH_REMOTE, peername);
if (stat(path, &st) < 0) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_NORKEY;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: unknown host\n");
goto done;
fclose(fp);
if (keys[0] == NULL) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr,
"auth failure: bad key format\n");
asprintf(&path, DMSG_DEFAULT_DIR "/rsa.pub");
if ((fp = fopen(path, "r")) == NULL) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOLKEY;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
goto done;
}
keys[1] = PEM_read_RSA_PUBKEY(fp, NULL, NULL, NULL);
fclose(fp);
if (keys[1] == NULL) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: bad host key format\n");
goto done;
asprintf(&path, DMSG_DEFAULT_DIR "/rsa.prv");
if ((fp = fopen(path, "r")) == NULL) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_NOLKEY;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: bad host key format\n");
goto done;
fclose(fp);
if (keys[2] == NULL) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: bad host key format\n");
goto done;
blksize != (size_t)RSA_size(keys[2]) ||
sizeof(handtx) % blksize != 0) {
iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYFMT;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: "
"key size mismatch\n");
if (fd >= 0)
close(fd);
iocom->ioq_rx.error = DMSG_IOQ_ERROR_BADURANDOM;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: bad rng\n");
goto done;
}
}
if (iocom->ioq_rx.error) {
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: key exchange failure "
"during encryption\n");
}
}
if (iocom->ioq_rx.error) {
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: key exchange failure "
"during decryption\n");
if (i != sizeof(handrx)) {
keyxchgfail:
iocom->ioq_rx.error = DMSG_IOQ_ERROR_KEYXCHGFAIL;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
if (DMsgDebugOpt)
fprintf(stderr, "auth failure: key exchange failure\n");
goto done;
if (error)
goto keyxchgfail;
- iocom->flags |= DMSG_IOCOMF_CRYPTED;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_CRYPTED);
if (DMsgDebugOpt)
fprintf(stderr, "auth success: %s\n", handrx.quickmsg);
static int dmsg_state_msgrx(dmsg_msg_t *msg);
static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
+static void dmsg_msg_free_locked(dmsg_msg_t *msg);
RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
void (*rcvmsg_func)(dmsg_msg_t *msg),
void (*altmsg_func)(dmsg_iocom_t *))
{
+ pthread_mutex_lock(&iocom->mtx);
iocom->signal_callback = signal_func;
iocom->rcvmsg_callback = rcvmsg_func;
iocom->altmsg_callback = altmsg_func;
if (signal_func)
- iocom->flags |= DMSG_IOCOMF_SWORK;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
else
- iocom->flags &= ~DMSG_IOCOMF_SWORK;
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
+ pthread_mutex_unlock(&iocom->mtx);
}
void
dmsg_iocom_signal(dmsg_iocom_t *iocom)
{
+ pthread_mutex_lock(&iocom->mtx);
if (iocom->signal_callback)
- iocom->flags |= DMSG_IOCOMF_SWORK;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
+ pthread_mutex_unlock(&iocom->mtx);
}
/*
}
dmsg_ioq_done(iocom, &iocom->ioq_rx);
dmsg_ioq_done(iocom, &iocom->ioq_tx);
- if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
+ while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
TAILQ_REMOVE(&iocom->freeq, msg, qentry);
free(msg);
}
- if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
+ while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
free(msg->aux_data);
msg->aux_data = NULL;
}
/*
- * Initialize a circuit structure and add it to the iocom's circuit_tree.
- * circuit0 is left out and will not be added to the tree.
+ * Basic initialization of a circuit structure.
+ *
+ * The circuit structure is initialized with one ref.
*/
void
dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
{
+ circuit->refs = 1;
circuit->iocom = iocom;
RB_INIT(&circuit->staterd_tree);
RB_INIT(&circuit->statewr_tree);
- if (circuit->msgid)
- RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
}
/*
size_t aligned_size;
pthread_mutex_lock(&iocom->mtx);
+#if 0
if (aux_size) {
aligned_size = DMSG_DOALIGN(aux_size);
if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
TAILQ_REMOVE(&iocom->freeq, msg, qentry);
}
+#endif
+ aligned_size = DMSG_DOALIGN(aux_size);
+ msg = NULL;
if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
/*
* Create state when CREATE is set without REPLY.
pthread_mutex_unlock(&iocom->mtx);
state->flags |= DMSG_STATE_INSERTED;
}
+ /* XXX SMP race for state */
pthread_mutex_unlock(&iocom->mtx);
+ hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
if (msg == NULL) {
+ msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
+ bzero(msg, offsetof(struct dmsg_msg, any.head));
+ *(int *)((char *)msg +
+ offsetof(struct dmsg_msg, any.head) + hbytes) =
+ 0x71B2C3D4;
+#if 0
msg = malloc(sizeof(*msg));
bzero(msg, sizeof(*msg));
- msg->aux_data = NULL;
- msg->aux_size = 0;
+#endif
}
/*
}
}
}
- hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
if (hbytes)
bzero(&msg->any.head, hbytes);
msg->hdr_size = hbytes;
msg->any.head.circuit = 0;
msg->circuit = circuit;
msg->iocom = iocom;
+ dmsg_circuit_hold(circuit);
if (state) {
msg->state = state;
state->msg = msg;
void
dmsg_msg_free_locked(dmsg_msg_t *msg)
{
- dmsg_iocom_t *iocom = msg->iocom;
-
+ /*dmsg_iocom_t *iocom = msg->iocom;*/
+#if 1
+ int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
+ if (*(int *)((char *)msg +
+ offsetof(struct dmsg_msg, any.head) + hbytes) !=
+ 0x71B2C3D4) {
+ fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
+ assert(0);
+ }
+#endif
+ if (msg->circuit) {
+ dmsg_circuit_drop_locked(msg->circuit);
+ msg->circuit = NULL;
+ }
msg->state = NULL;
+ if (msg->aux_data) {
+ free(msg->aux_data);
+ msg->aux_data = NULL;
+ }
+ msg->aux_size = 0;
+ free (msg);
+#if 0
if (msg->aux_data)
TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
else
TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
+#endif
}
void
int ai; /* alt bulk path socket */
while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
+ /*
+ * These iocom->flags are only manipulated within the
+ * context of the current thread. However, modifications
+ * still require atomic ops.
+ */
if ((iocom->flags & (DMSG_IOCOMF_RWORK |
DMSG_IOCOMF_WWORK |
DMSG_IOCOMF_PWORK |
poll(fds, count, timeout);
if (wi >= 0 && (fds[wi].revents & POLLIN))
- iocom->flags |= DMSG_IOCOMF_PWORK;
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_PWORK);
if (si >= 0 && (fds[si].revents & POLLIN))
- iocom->flags |= DMSG_IOCOMF_RWORK;
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_RWORK);
if (si >= 0 && (fds[si].revents & POLLOUT))
- iocom->flags |= DMSG_IOCOMF_WWORK;
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_WWORK);
if (wi >= 0 && (fds[wi].revents & POLLOUT))
- iocom->flags |= DMSG_IOCOMF_WWORK;
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_WWORK);
if (ai >= 0 && (fds[ai].revents & POLLIN))
- iocom->flags |= DMSG_IOCOMF_ARWORK;
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_ARWORK);
} else {
/*
* Always check the pipe
*/
- iocom->flags |= DMSG_IOCOMF_PWORK;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
}
if (iocom->flags & DMSG_IOCOMF_SWORK) {
- iocom->flags &= ~DMSG_IOCOMF_SWORK;
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
iocom->signal_callback(iocom);
}
* the pipe with a dummy read.
*/
if (iocom->flags & DMSG_IOCOMF_PWORK) {
- iocom->flags &= ~DMSG_IOCOMF_PWORK;
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
- iocom->flags |= DMSG_IOCOMF_RWORK;
- iocom->flags |= DMSG_IOCOMF_WWORK;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
if (TAILQ_FIRST(&iocom->txmsgq))
dmsg_iocom_flush1(iocom);
}
}
if (iocom->flags & DMSG_IOCOMF_ARWORK) {
- iocom->flags &= ~DMSG_IOCOMF_ARWORK;
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
iocom->altmsg_callback(iocom);
}
}
int error;
again:
- iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
-
/*
* If a message is already pending we can just remove and
* return it. Message state has already been processed.
TAILQ_REMOVE(&ioq->msgq, msg, qentry);
return (msg);
}
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
/*
* If the stream is errored out we stop processing it.
/*
* Allocate the message, the next state will fill it in.
- * Note that the actual buffer will be sized to an aligned
+ * Note that the aux_data buffer will be sized to an aligned
* value and the aligned remainder zero'd for convenience.
*/
- msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0,
+ msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
+ ioq->hbytes / DMSG_ALIGN,
NULL, NULL);
ioq->msg = msg;
/* fall through */
case DMSG_MSGQ_STATE_AUXDATA1:
/*
- * Copy the partial or complete payload from remaining
- * bytes in the FIFO in order to optimize the makeroom call
- * in the AUXDATA2 state. We have to fall-through either
- * way so we can check the crc.
+ * Copy the partial or complete [decrypted] payload from
+ * remaining bytes in the FIFO in order to optimize the
+ * makeroom call in the AUXDATA2 state. We have to
+ * fall-through either way so we can check the crc.
*
* msg->aux_size tracks our aux data.
+ *
+ * (Lets not complicate matters if the data is encrypted,
+ * since the data in-stream is not the same size as the
+ * data decrypted).
*/
if (bytes >= ioq->abytes) {
bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
if (xcrc32 != msg->any.head.aux_crc) {
ioq->error = DMSG_IOQ_ERROR_ACRC;
+ fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
+ xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
break;
}
break;
*/
if (ioq->error) {
skip:
+ fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
/*
* An unrecoverable error causes all active receive
* transactions to be terminated with a LNK_ERROR message.
*/
if (state->rxcmd & DMSGF_DELETE) {
dmsg_msg_free(msg);
+ fprintf(stderr,
+ "iocom: ioq error %d sleeping\n",
+ ioq->error);
+ sleep(1); /* XXX */
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_RWORK);
msg = NULL;
} else {
/*state->txcmd |= DMSGF_DELETE;*/
*/
if (state->rxcmd & DMSGF_DELETE) {
dmsg_msg_free(msg);
+ fprintf(stderr,
+ "iocom: ioq error %d sleeping\n",
+ ioq->error);
+ sleep(1); /* XXX */
+ atomic_set_int(&iocom->flags,
+ DMSG_IOCOMF_RWORK);
msg = NULL;
} else {
msg->state = state;
* Generate a final LNK_ERROR and flag EOF.
*/
msg->state = NULL;
- iocom->flags |= DMSG_IOCOMF_EOF;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
}
pthread_mutex_unlock(&iocom->mtx);
* re-set RWORK.
*/
if (msg)
- iocom->flags |= DMSG_IOCOMF_RWORK;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
} else if (msg == NULL) {
/*
* Insufficient data received to finish building the message,
* Leave ioq->msg intact.
* Leave the FIFO intact.
*/
- iocom->flags |= DMSG_IOCOMF_RREQ;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
} else {
/*
* Continue processing msg.
*/
if (ioq->fifo_beg == ioq->fifo_cdx &&
ioq->fifo_cdn == ioq->fifo_end) {
- iocom->flags |= DMSG_IOCOMF_RREQ;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
ioq->fifo_cdx = 0;
ioq->fifo_cdn = 0;
ioq->fifo_beg = 0;
ioq->fifo_end = 0;
} else {
- iocom->flags |= DMSG_IOCOMF_RWORK;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
}
ioq->state = DMSG_MSGQ_STATE_HEADER1;
ioq->msg = NULL;
*
* State processing only occurs for messages destined for us.
*/
+ if (DMsgDebugOpt >= 5) {
+ fprintf(stderr,
+ "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
+ msg->any.head.cmd,
+ (intmax_t)msg->any.head.msgid,
+ (intmax_t)msg->any.head.circuit);
+ }
if (msg->any.head.circuit)
error = dmsg_circuit_relay(msg);
else
* A non-NULL msg is added to the queue but not necessarily flushed.
* Calling this function with msg == NULL will get a flush going.
*
- * Caller must hold iocom->mtx.
+ * (called from iocom_core only)
*/
void
dmsg_iocom_flush1(dmsg_iocom_t *iocom)
size_t abytes;
dmsg_msg_queue_t tmpq;
- iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
TAILQ_INIT(&tmpq);
pthread_mutex_lock(&iocom->mtx);
while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
/*
* Thread localized, iocom->mtx not held by caller.
+ *
+ * (called from iocom_core via iocom_flush1 only)
*/
void
dmsg_iocom_flush2(dmsg_iocom_t *iocom)
/*
* Make sure the FIFO has a reasonable amount of space
* left (if not completely full).
+ *
+ * In this situation we are staging the encrypted message
+ * data in the FIFO. (nact) represents how much plaintext
+ * has been staged, (n) represents how much encrypted data
+ * has been flushed. The two are independent of each other.
*/
if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
- sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
+ sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
ioq->fifo_end - ioq->fifo_beg);
ioq->fifo_cdx -= ioq->fifo_beg;
ioq->fifo_cdx = 0;
ioq->fifo_end = 0;
}
- /* XXX what if interrupted mid-write? */
- } else {
- nact = 0;
}
+ /*
+ * We don't mess with the nact returned by the crypto_encrypt
+ * call, which represents the filling of the FIFO. (n) tells
+ * us how much we were able to write from the FIFO. The two
+ * are different beasts when encrypting.
+ */
} else {
+ /*
+ * In this situation we are not staging the messages to the
+ * FIFO but instead writing them directly from the msg
+ * structure(s), so (nact) is basically (n).
+ */
n = writev(iocom->sock_fd, iov, iovcnt);
if (n > 0)
nact = n;
break;
}
nact -= abytes - ioq->abytes;
+ /* ioq->abytes = abytes; optimized out */
+
+ if (DMsgDebugOpt >= 5) {
+ fprintf(stderr,
+ "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
+ msg->any.head.cmd,
+ (intmax_t)msg->any.head.msgid,
+ (intmax_t)msg->any.head.circuit);
+ }
TAILQ_REMOVE(&ioq->msgq, msg, qentry);
--ioq->msgcount;
/*
* Wait for socket buffer space
*/
- iocom->flags |= DMSG_IOCOMF_WREQ;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
}
} else {
- iocom->flags |= DMSG_IOCOMF_WREQ;
+ atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
}
if (ioq->error) {
dmsg_iocom_drain(iocom);
* the caller to pull off our contrived terminal error msg to detect
* the connection failure.
*
- * Thread localized, iocom->mtx not held by caller.
+ * Localized to iocom_core thread, iocom->mtx not held by caller.
*/
void
dmsg_iocom_drain(dmsg_iocom_t *iocom)
dmsg_ioq_t *ioq = &iocom->ioq_tx;
dmsg_msg_t *msg;
- iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
+ atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
ioq->hbytes = 0;
ioq->abytes = 0;
{
dmsg_iocom_t *iocom = msg->iocom;
dmsg_circuit_t *circuit;
+ dmsg_circuit_t *ocircuit;
dmsg_state_t *state;
dmsg_state_t sdummy;
dmsg_circuit_t cdummy;
cdummy.msgid = msg->any.head.circuit;
circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
&cdummy);
- if (circuit == NULL)
+ if (circuit == NULL) {
+ pthread_mutex_unlock(&iocom->mtx);
return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
+ }
}
+
+ /*
+ * Replace circuit0 with actual
+ */
+ dmsg_circuit_hold(circuit);
+ ocircuit = msg->circuit;
msg->circuit = circuit;
- ++circuit->refs;
/*
* If received msg is a command state is on staterd_tree.
&sdummy);
}
msg->state = state;
+
pthread_mutex_unlock(&iocom->mtx);
+ if (ocircuit)
+ dmsg_circuit_drop(ocircuit);
/*
* Short-cut one-off or mid-stream messages (state may be NULL).
free(state);
}
+/*
+ * Called with iocom locked
+ */
+void
+dmsg_circuit_hold(dmsg_circuit_t *circuit)
+{
+ assert(circuit->refs > 0); /* caller must hold ref */
+ atomic_add_int(&circuit->refs, 1); /* to safely add more */
+}
+
/*
* Called with iocom locked
*/
/*
* Decrement circuit refs, destroy circuit when refs drops to 0.
*/
- if (--circuit->refs > 0)
+ if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
return;
+ assert(circuit != &iocom->circuit0);
assert(RB_EMPTY(&circuit->staterd_tree));
assert(RB_EMPTY(&circuit->statewr_tree));
+ pthread_mutex_lock(&iocom->mtx);
RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
circuit->iocom = NULL;
+ pthread_mutex_unlock(&iocom->mtx);
dmsg_free(circuit);
/*
}
}
+void
+dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
+{
+ dmsg_iocom_t *iocom;
+
+ iocom = circuit->iocom;
+ assert(circuit->refs > 0);
+ assert(iocom);
+
+ if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
+ assert(circuit != &iocom->circuit0);
+ assert(RB_EMPTY(&circuit->staterd_tree));
+ assert(RB_EMPTY(&circuit->statewr_tree));
+ RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
+ circuit->iocom = NULL;
+ dmsg_free(circuit);
+ if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
+ char dummy = 0;
+ write(iocom->wakeupfds[1], &dummy, 1);
+ }
+ }
+}
+
/*
* This swaps endian for a hammer2_msg_hdr. Note that the extended
* header is not adjusted, just the core header.
/*pthread_mutex_lock(&cluster_mtx);*/
+ if (DMsgDebugOpt >= 4)
+ fprintf(stderr, "CIRC receive cmd=%08x\n", msg->any.head.cmd);
+
switch (msg->any.head.cmd & (DMSGF_CREATE |
DMSGF_DELETE |
DMSGF_REPLY)) {
&dummy);
pthread_mutex_unlock(&iocomA->mtx);
if (tx_state == NULL) {
+ /* XXX SMP race */
fprintf(stderr, "dmsg_lnk_circ: no circuit\n");
dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
break;
}
+ if (tx_state->icmd != DMSG_LNK_SPAN) {
+ /* XXX SMP race */
+ fprintf(stderr, "dmsg_lnk_circ: not LNK_SPAN\n");
+ dmsg_msg_reply(msg, DMSG_ERR_CANTCIRC);
+ break;
+ }
/* locate h2span_link */
rx_state = tx_state->any.relay->source_rt;
* it received from us as <target>.
*/
circA = dmsg_alloc(sizeof(*circA));
- circA->iocom = iocomA;
+ dmsg_circuit_init(iocomA, circA);
circA->state = msg->state; /* LNK_CIRC state */
circA->msgid = msg->state->msgid;
circA->span_state = tx_state; /* H2SPAN_RELAY state */
iocomB = rx_state->iocom;
circB = dmsg_alloc(sizeof(*circB));
+ dmsg_circuit_init(iocomB, circB);
/*
* Create a LNK_CIRC transaction on B
0, DMSG_LNK_CIRC | DMSGF_CREATE,
dmsg_lnk_circ, circB);
fwd_msg->state->any.circ = circB;
- circB->iocom = iocomB;
+ fwd_msg->any.lnk_circ.target = rx_state->msgid;
circB->state = fwd_msg->state; /* LNK_CIRC state */
circB->msgid = fwd_msg->any.head.msgid;
circB->span_state = rx_state; /* H2SPAN_LINK state */
circB->is_relay = 0;
circB->refs = 2; /* state and peer */
+ if (DMsgDebugOpt >= 4)
+ fprintf(stderr, "CIRC forward %p->%p\n", circA, circB);
+
/*
* Link the two circuits together.
*/
circA->peer = circB;
circB->peer = circA;
+ if (iocomA < iocomB) {
+ pthread_mutex_lock(&iocomA->mtx);
+ pthread_mutex_lock(&iocomB->mtx);
+ } else {
+ pthread_mutex_lock(&iocomB->mtx);
+ pthread_mutex_lock(&iocomA->mtx);
+ }
if (RB_INSERT(dmsg_circuit_tree, &iocomA->circuit_tree, circA))
assert(0);
if (RB_INSERT(dmsg_circuit_tree, &iocomB->circuit_tree, circB))
assert(0);
+ if (iocomA < iocomB) {
+ pthread_mutex_unlock(&iocomB->mtx);
+ pthread_mutex_unlock(&iocomA->mtx);
+ } else {
+ pthread_mutex_unlock(&iocomA->mtx);
+ pthread_mutex_unlock(&iocomB->mtx);
+ }
dmsg_msg_write(fwd_msg);
*/
while (relay && relay->source_rt->any.link->node == node) {
next_relay = RB_NEXT(h2span_relay_tree, &conn->tree, relay);
+ fprintf(stderr, "RELAY DELETE FROM EXTRAS\n");
dmsg_relay_delete(relay);
relay = next_relay;
}
if (msg->any.head.cmd & DMSGF_DELETE) {
pthread_mutex_lock(&cluster_mtx);
+ fprintf(stderr, "RELAY DELETE FROM LNK_RELAY MSG\n");
if ((relay = state->any.relay) != NULL) {
dmsg_relay_delete(relay);
} else {
}
}
-
+/*
+ * cluster_mtx held by caller
+ */
static
void
dmsg_relay_delete(h2span_relay_t *relay)
/*
* Lookup the circuit on the incoming iocom.
*/
- pthread_mutex_lock(&cluster_mtx);
+ pthread_mutex_lock(&iocom->mtx);
dummy.msgid = msg->any.head.circuit;
circ = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree, &dummy);
assert(circ);
peer = circ->peer;
+ dmsg_circuit_hold(peer);
+
+ if (DMsgDebugOpt >= 4) {
+ fprintf(stderr,
+ "CIRC relay %08x %p->%p\n",
+ msg->any.head.cmd, circ, peer);
+ }
msg->iocom = peer->iocom;
msg->any.head.circuit = peer->msgid;
+ dmsg_circuit_drop_locked(msg->circuit);
+ msg->circuit = peer;
- pthread_mutex_unlock(&cluster_mtx);
+ pthread_mutex_unlock(&iocom->mtx);
dmsg_msg_write(msg);
error = DMSG_IOQ_ERROR_ROUTED;
/*
* Dumps the spanning tree
+ *
+ * DEBUG ONLY
*/
void
dmsg_shell_tree(dmsg_circuit_t *circuit, char *cmdbuf __unused)
h2span_cluster_t *cls;
h2span_node_t *node;
h2span_link_t *slink;
+ h2span_relay_t *relay;
char *uustr = NULL;
pthread_mutex_lock(&cluster_mtx);
node->fs_label);
RB_FOREACH(slink, h2span_link_tree, &node->tree) {
dmsg_circuit_printf(circuit,
- "\tLink dist=%d via %d\n",
+ "\tSLink msgid %016jx "
+ "dist=%d via %d\n",
+ (intmax_t)slink->state->msgid,
slink->dist,
slink->state->iocom->sock_fd);
+ TAILQ_FOREACH(relay, &slink->relayq, entry) {
+ dmsg_circuit_printf(circuit,
+ "\t Relay-out msgid %016jx "
+ "via %d\n",
+ (intmax_t)relay->target_rt->msgid,
+ relay->target_rt->iocom->sock_fd);
+ }
}
}
}
#endif
}
+/*
+ * DEBUG ONLY
+ *
+ * Locate the state representing an incoming LNK_SPAN given its msgid.
+ */
+int
+dmsg_debug_findspan(uint64_t msgid, dmsg_state_t **statep)
+{
+ h2span_cluster_t *cls;
+ h2span_node_t *node;
+ h2span_link_t *slink;
+ h2span_relay_t *relay;
+
+ pthread_mutex_lock(&cluster_mtx);
+ relay = NULL;
+ RB_FOREACH(cls, h2span_cluster_tree, &cluster_tree) {
+ RB_FOREACH(node, h2span_node_tree, &cls->tree) {
+ RB_FOREACH(slink, h2span_link_tree, &node->tree) {
+ if (slink->state->msgid == msgid) {
+ *statep = slink->state;
+ goto found;
+ }
+ }
+ }
+ }
+ pthread_mutex_unlock(&cluster_mtx);
+ *statep = NULL;
+ return(ENOENT);
+found:
+ pthread_mutex_unlock(&cluster_mtx);
+ return(0);
+}
+
/*
* Random number sub-sort value to add to SPAN rnss fields on relay.
* This allows us to differentiate spans with the same <dist> field