2 * Copyright (c) 2011-2015 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
36 #include "dmsg_local.h"
40 #ifdef DMSG_BLOCK_DEBUG
44 static int dmsg_state_msgrx(dmsg_msg_t *msg);
45 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
46 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
47 static void dmsg_state_free(dmsg_state_t *state);
48 static void dmsg_simulate_failure(dmsg_state_t *state, int error);
50 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
53 * STATE TREE - Represents open transactions which are indexed by their
54 * { msgid } relative to the governing iocom.
57 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
59 if (state1->msgid < state2->msgid)
61 if (state1->msgid > state2->msgid)
67 * Initialize a low-level ioq
70 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
72 bzero(ioq, sizeof(*ioq));
73 ioq->state = DMSG_MSGQ_STATE_HEADER1;
74 TAILQ_INIT(&ioq->msgq);
80 * caller holds iocom->mtx.
83 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
87 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
88 assert(0); /* shouldn't happen */
89 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
92 if ((msg = ioq->msg) != NULL) {
99 * Initialize a low-level communications channel.
101 * NOTE: The signal_func() is called at least once from the loop and can be
102 * re-armed via dmsg_iocom_restate().
105 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
106 void (*signal_func)(dmsg_iocom_t *iocom),
107 void (*rcvmsg_func)(dmsg_msg_t *msg),
108 void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
109 void (*altmsg_func)(dmsg_iocom_t *iocom))
113 bzero(iocom, sizeof(*iocom));
115 asprintf(&iocom->label, "iocom-%p", iocom);
116 iocom->signal_callback = signal_func;
117 iocom->rcvmsg_callback = rcvmsg_func;
118 iocom->altmsg_callback = altmsg_func;
119 iocom->usrmsg_callback = usrmsg_func;
121 pthread_mutex_init(&iocom->mtx, NULL);
122 RB_INIT(&iocom->staterd_tree);
123 RB_INIT(&iocom->statewr_tree);
124 TAILQ_INIT(&iocom->freeq);
125 TAILQ_INIT(&iocom->freeq_aux);
126 TAILQ_INIT(&iocom->txmsgq);
127 iocom->sock_fd = sock_fd;
128 iocom->alt_fd = alt_fd;
129 iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
131 iocom->flags |= DMSG_IOCOMF_SWORK;
132 dmsg_ioq_init(iocom, &iocom->ioq_rx);
133 dmsg_ioq_init(iocom, &iocom->ioq_tx);
134 iocom->state0.refs = 1; /* should never trigger a free */
135 iocom->state0.iocom = iocom;
136 iocom->state0.parent = &iocom->state0;
137 iocom->state0.flags = DMSG_STATE_ROOT;
138 TAILQ_INIT(&iocom->state0.subq);
140 if (pipe(iocom->wakeupfds) < 0)
142 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
143 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
146 * Negotiate session crypto synchronously. This will mark the
147 * connection as error'd if it fails. If this is a pipe it's
148 * a linkage that we set up ourselves to the filesystem and there
151 if (fstat(sock_fd, &st) < 0)
153 if (S_ISSOCK(st.st_mode))
154 dmsg_crypto_negotiate(iocom);
157 * Make sure our fds are set to non-blocking for the iocom core.
160 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
162 /* if line buffered our single fgets() should be fine */
164 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
169 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
176 vasprintf(&iocom->label, ctl, va);
183 * May only be called from a callback from iocom_core.
185 * Adjust state machine functions, set flags to guarantee that both
186 * the recevmsg_func and the sendmsg_func is called at least once.
189 dmsg_iocom_restate(dmsg_iocom_t *iocom,
190 void (*signal_func)(dmsg_iocom_t *),
191 void (*rcvmsg_func)(dmsg_msg_t *msg))
193 pthread_mutex_lock(&iocom->mtx);
194 iocom->signal_callback = signal_func;
195 iocom->rcvmsg_callback = rcvmsg_func;
197 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
199 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
200 pthread_mutex_unlock(&iocom->mtx);
204 dmsg_iocom_signal(dmsg_iocom_t *iocom)
206 pthread_mutex_lock(&iocom->mtx);
207 if (iocom->signal_callback)
208 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
209 pthread_mutex_unlock(&iocom->mtx);
213 * Cleanup a terminating iocom.
215 * Caller should not hold iocom->mtx. The iocom has already been disconnected
216 * from all possible references to it.
219 dmsg_iocom_done(dmsg_iocom_t *iocom)
223 if (iocom->sock_fd >= 0) {
224 close(iocom->sock_fd);
227 if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
228 close(iocom->alt_fd);
231 dmsg_ioq_done(iocom, &iocom->ioq_rx);
232 dmsg_ioq_done(iocom, &iocom->ioq_tx);
233 while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
234 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
237 while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
238 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
240 msg->aux_data = NULL;
243 if (iocom->wakeupfds[0] >= 0) {
244 close(iocom->wakeupfds[0]);
245 iocom->wakeupfds[0] = -1;
247 if (iocom->wakeupfds[1] >= 0) {
248 close(iocom->wakeupfds[1]);
249 iocom->wakeupfds[1] = -1;
251 pthread_mutex_destroy(&iocom->mtx);
255 * Allocate a new message using the specified transaction state.
257 * If CREATE is set a new transaction is allocated relative to the passed-in
258 * transaction (the 'state' argument becomes pstate).
260 * If CREATE is not set the message is associated with the passed-in
264 dmsg_msg_alloc(dmsg_state_t *state,
265 size_t aux_size, uint32_t cmd,
266 void (*func)(dmsg_msg_t *), void *data)
268 dmsg_iocom_t *iocom = state->iocom;
271 pthread_mutex_lock(&iocom->mtx);
272 msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
273 pthread_mutex_unlock(&iocom->mtx);
279 dmsg_msg_alloc_locked(dmsg_state_t *state,
280 size_t aux_size, uint32_t cmd,
281 void (*func)(dmsg_msg_t *), void *data)
283 dmsg_iocom_t *iocom = state->iocom;
284 dmsg_state_t *pstate;
291 aligned_size = DMSG_DOALIGN(aux_size);
292 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
293 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
296 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
297 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
300 aligned_size = DMSG_DOALIGN(aux_size);
302 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
304 * When CREATE is set without REPLY the caller is
305 * initiating a new transaction stacked under the specified
308 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
309 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
312 state = malloc(sizeof(*state));
313 atomic_add_int(&dmsg_state_count, 1);
314 bzero(state, sizeof(*state));
315 TAILQ_INIT(&state->subq);
316 dmsg_state_hold(pstate);
318 state->parent = pstate;
319 state->iocom = iocom;
320 state->flags = DMSG_STATE_DYNAMIC;
321 state->msgid = (uint64_t)(uintptr_t)state;
322 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
323 state->rxcmd = DMSGF_REPLY;
324 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
326 state->any.any = data;
328 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
329 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
330 state->flags |= DMSG_STATE_SUBINSERTED |
331 DMSG_STATE_RBINSERTED;
335 "create state %p id=%08x on iocom statewr %p\n",
336 state, (uint32_t)state->msgid, iocom);
340 * Otherwise the message is transmitted over the existing
343 pstate = state->parent;
346 /* XXX SMP race for state */
347 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
349 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
350 bzero(msg, offsetof(struct dmsg_msg, any.head));
351 *(int *)((char *)msg +
352 offsetof(struct dmsg_msg, any.head) + hbytes) =
356 "allo msg %p id=%08x on iocom %p\n",
357 msg, (int)msg->any.head.msgid, iocom);
360 msg = malloc(sizeof(*msg));
361 bzero(msg, sizeof(*msg));
366 * [re]allocate the auxillary data buffer. The caller knows that
367 * a size-aligned buffer will be allocated but we do not want to
368 * force the caller to zero any tail piece, so we do that ourself.
370 if (msg->aux_size != aux_size) {
373 msg->aux_data = NULL;
377 msg->aux_data = malloc(aligned_size);
378 msg->aux_size = aux_size;
379 if (aux_size != aligned_size) {
380 bzero(msg->aux_data + aux_size,
381 aligned_size - aux_size);
387 * Set REVTRANS if the transaction was remotely initiated
388 * Set REVCIRC if the circuit was remotely initiated
390 if (state->flags & DMSG_STATE_OPPOSITE)
391 cmd |= DMSGF_REVTRANS;
392 if (pstate->flags & DMSG_STATE_OPPOSITE)
393 cmd |= DMSGF_REVCIRC;
396 * Finish filling out the header.
399 bzero(&msg->any.head, hbytes);
400 msg->hdr_size = hbytes;
401 msg->any.head.magic = DMSG_HDR_MAGIC;
402 msg->any.head.cmd = cmd;
403 msg->any.head.aux_descr = 0;
404 msg->any.head.aux_crc = 0;
405 msg->any.head.msgid = state->msgid;
406 msg->any.head.circuit = pstate->msgid;
413 * Free a message so it can be reused afresh.
415 * NOTE: aux_size can be 0 with a non-NULL aux_data.
419 dmsg_msg_free_locked(dmsg_msg_t *msg)
421 /*dmsg_iocom_t *iocom = msg->iocom;*/
425 "free msg %p id=%08x on (aux %p)\n",
426 msg, (int)msg->any.head.msgid, msg->aux_data);
429 int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
430 if (*(int *)((char *)msg +
431 offsetof(struct dmsg_msg, any.head) + hbytes) !=
433 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
437 msg->state = NULL; /* safety */
440 msg->aux_data = NULL;
446 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
448 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
453 dmsg_msg_free(dmsg_msg_t *msg)
455 dmsg_iocom_t *iocom = msg->state->iocom;
457 pthread_mutex_lock(&iocom->mtx);
458 dmsg_msg_free_locked(msg);
459 pthread_mutex_unlock(&iocom->mtx);
463 * I/O core loop for an iocom.
465 * Thread localized, iocom->mtx not held.
468 dmsg_iocom_core(dmsg_iocom_t *iocom)
470 struct pollfd fds[3];
475 int wi; /* wakeup pipe */
477 int ai; /* alt bulk path socket */
479 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
481 * These iocom->flags are only manipulated within the
482 * context of the current thread. However, modifications
483 * still require atomic ops.
486 fprintf(stderr, "iocom %p %08x\n", iocom, iocom->flags);
488 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
493 DMSG_IOCOMF_AWWORK)) == 0) {
495 * Only poll if no immediate work is pending.
496 * Otherwise we are just wasting our time calling
507 * Always check the inter-thread pipe, e.g.
508 * for iocom->txmsgq work.
511 fds[wi].fd = iocom->wakeupfds[0];
512 fds[wi].events = POLLIN;
516 * Check the socket input/output direction as
519 if (iocom->flags & (DMSG_IOCOMF_RREQ |
522 fds[si].fd = iocom->sock_fd;
526 if (iocom->flags & DMSG_IOCOMF_RREQ)
527 fds[si].events |= POLLIN;
528 if (iocom->flags & DMSG_IOCOMF_WREQ)
529 fds[si].events |= POLLOUT;
533 * Check the alternative fd for work.
535 if (iocom->alt_fd >= 0) {
537 fds[ai].fd = iocom->alt_fd;
538 fds[ai].events = POLLIN;
541 poll(fds, count, timeout);
543 if (wi >= 0 && (fds[wi].revents & POLLIN))
544 atomic_set_int(&iocom->flags,
546 if (si >= 0 && (fds[si].revents & POLLIN))
547 atomic_set_int(&iocom->flags,
549 if (si >= 0 && (fds[si].revents & POLLOUT))
550 atomic_set_int(&iocom->flags,
552 if (wi >= 0 && (fds[wi].revents & POLLOUT))
553 atomic_set_int(&iocom->flags,
555 if (ai >= 0 && (fds[ai].revents & POLLIN))
556 atomic_set_int(&iocom->flags,
560 * Always check the pipe
562 atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
565 if (iocom->flags & DMSG_IOCOMF_SWORK) {
566 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
567 iocom->signal_callback(iocom);
571 * Pending message queues from other threads wake us up
572 * with a write to the wakeupfds[] pipe. We have to clear
573 * the pipe with a dummy read.
575 if (iocom->flags & DMSG_IOCOMF_PWORK) {
576 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
577 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
578 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
579 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
583 * Message write sequencing
585 if (iocom->flags & DMSG_IOCOMF_WWORK)
586 dmsg_iocom_flush1(iocom);
589 * Message read sequencing. Run this after the write
590 * sequencing in case the write sequencing allowed another
591 * auto-DELETE to occur on the read side.
593 if (iocom->flags & DMSG_IOCOMF_RWORK) {
594 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
595 (msg = dmsg_ioq_read(iocom)) != NULL) {
597 fprintf(stderr, "receive %s\n",
600 iocom->rcvmsg_callback(msg);
601 dmsg_state_cleanuprx(iocom, msg);
605 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
606 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
607 iocom->altmsg_callback(iocom);
613 * Make sure there's enough room in the FIFO to hold the
616 * Assume worst case encrypted form is 2x the size of the
617 * plaintext equivalent.
621 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
626 bytes = ioq->fifo_cdx - ioq->fifo_beg;
627 nmax = sizeof(ioq->buf) - ioq->fifo_end;
628 if (bytes + nmax / 2 < needed) {
630 bcopy(ioq->buf + ioq->fifo_beg,
634 ioq->fifo_cdx -= ioq->fifo_beg;
636 if (ioq->fifo_cdn < ioq->fifo_end) {
637 bcopy(ioq->buf + ioq->fifo_cdn,
638 ioq->buf + ioq->fifo_cdx,
639 ioq->fifo_end - ioq->fifo_cdn);
641 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
642 ioq->fifo_cdn = ioq->fifo_cdx;
643 nmax = sizeof(ioq->buf) - ioq->fifo_end;
649 * Read the next ready message from the ioq, issuing I/O if needed.
650 * Caller should retry on a read-event when NULL is returned.
652 * If an error occurs during reception a DMSG_LNK_ERROR msg will
653 * be returned for each open transaction, then the ioq and iocom
654 * will be errored out and a non-transactional DMSG_LNK_ERROR
655 * msg will be returned as the final message. The caller should not call
656 * us again after the final message is returned.
658 * Thread localized, iocom->mtx not held.
661 dmsg_ioq_read(dmsg_iocom_t *iocom)
663 dmsg_ioq_t *ioq = &iocom->ioq_rx;
675 * If a message is already pending we can just remove and
676 * return it. Message state has already been processed.
677 * (currently not implemented)
679 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
680 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
682 if (msg->state == &iocom->state0) {
683 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
684 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
688 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
691 * If the stream is errored out we stop processing it.
697 * Message read in-progress (msg is NULL at the moment). We don't
698 * allocate a msg until we have its core header.
700 nmax = sizeof(ioq->buf) - ioq->fifo_end;
701 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */
705 case DMSG_MSGQ_STATE_HEADER1:
707 * Load the primary header, fail on any non-trivial read
708 * error or on EOF. Since the primary header is the same
709 * size is the message alignment it will never straddle
710 * the end of the buffer.
712 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
713 if (bytes < sizeof(msg->any.head)) {
714 n = read(iocom->sock_fd,
715 ioq->buf + ioq->fifo_end,
719 ioq->error = DMSG_IOQ_ERROR_EOF;
722 if (errno != EINTR &&
723 errno != EINPROGRESS &&
725 ioq->error = DMSG_IOQ_ERROR_SOCK;
731 ioq->fifo_end += (size_t)n;
736 * Decrypt data received so far. Data will be decrypted
737 * in-place but might create gaps in the FIFO. Partial
738 * blocks are not immediately decrypted.
740 * WARNING! The header might be in the wrong endian, we
741 * do not fix it up until we get the entire
744 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
745 dmsg_crypto_decrypt(iocom, ioq);
747 ioq->fifo_cdx = ioq->fifo_end;
748 ioq->fifo_cdn = ioq->fifo_end;
750 bytes = ioq->fifo_cdx - ioq->fifo_beg;
753 * Insufficient data accumulated (msg is NULL, caller will
757 if (bytes < sizeof(msg->any.head))
761 * Check and fixup the core header. Note that the icrc
762 * has to be calculated before any fixups, but the crc
763 * fields in the msg may have to be swapped like everything
766 head = (void *)(ioq->buf + ioq->fifo_beg);
767 if (head->magic != DMSG_HDR_MAGIC &&
768 head->magic != DMSG_HDR_MAGIC_REV) {
769 fprintf(stderr, "%s: head->magic is bad %02x\n",
770 iocom->label, head->magic);
771 if (iocom->flags & DMSG_IOCOMF_CRYPTED)
772 fprintf(stderr, "(on encrypted link)\n");
773 ioq->error = DMSG_IOQ_ERROR_SYNC;
778 * Calculate the full header size and aux data size
780 if (head->magic == DMSG_HDR_MAGIC_REV) {
781 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
783 aux_size = bswap32(head->aux_bytes);
785 ioq->hbytes = (head->cmd & DMSGF_SIZE) *
787 aux_size = head->aux_bytes;
789 ioq->abytes = DMSG_DOALIGN(aux_size);
790 ioq->unaligned_aux_size = aux_size;
791 if (ioq->hbytes < sizeof(msg->any.head) ||
792 ioq->hbytes > sizeof(msg->any) ||
793 ioq->abytes > DMSG_AUX_MAX) {
794 ioq->error = DMSG_IOQ_ERROR_FIELD;
799 * Allocate the message, the next state will fill it in.
801 * NOTE: The aux_data buffer will be sized to an aligned
802 * value and the aligned remainder zero'd for
805 * NOTE: Supply dummy state and a degenerate cmd without
806 * CREATE set. The message will temporarily be
807 * associated with state0 until later post-processing.
809 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
810 ioq->hbytes / DMSG_ALIGN,
815 * Fall through to the next state. Make sure that the
816 * extended header does not straddle the end of the buffer.
817 * We still want to issue larger reads into our buffer,
818 * book-keeping is easier if we don't bcopy() yet.
820 * Make sure there is enough room for bloated encrypt data.
822 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
823 ioq->state = DMSG_MSGQ_STATE_HEADER2;
825 case DMSG_MSGQ_STATE_HEADER2:
827 * Fill out the extended header.
830 if (bytes < ioq->hbytes) {
831 n = read(iocom->sock_fd,
832 ioq->buf + ioq->fifo_end,
836 ioq->error = DMSG_IOQ_ERROR_EOF;
839 if (errno != EINTR &&
840 errno != EINPROGRESS &&
842 ioq->error = DMSG_IOQ_ERROR_SOCK;
848 ioq->fifo_end += (size_t)n;
852 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
853 dmsg_crypto_decrypt(iocom, ioq);
855 ioq->fifo_cdx = ioq->fifo_end;
856 ioq->fifo_cdn = ioq->fifo_end;
858 bytes = ioq->fifo_cdx - ioq->fifo_beg;
861 * Insufficient data accumulated (set msg NULL so caller will
864 if (bytes < ioq->hbytes) {
870 * Calculate the extended header, decrypt data received
871 * so far. Handle endian-conversion for the entire extended
874 head = (void *)(ioq->buf + ioq->fifo_beg);
879 if (head->magic == DMSG_HDR_MAGIC_REV)
880 xcrc32 = bswap32(head->hdr_crc);
882 xcrc32 = head->hdr_crc;
884 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
885 ioq->error = DMSG_IOQ_ERROR_XCRC;
886 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
887 xcrc32, dmsg_icrc32(head, ioq->hbytes),
892 head->hdr_crc = xcrc32;
894 if (head->magic == DMSG_HDR_MAGIC_REV) {
895 dmsg_bswap_head(head);
899 * Copy the extended header into the msg and adjust the
902 bcopy(head, &msg->any, ioq->hbytes);
905 * We are either done or we fall-through.
907 if (ioq->abytes == 0) {
908 ioq->fifo_beg += ioq->hbytes;
913 * Must adjust bytes (and the state) when falling through.
914 * nmax doesn't change.
916 ioq->fifo_beg += ioq->hbytes;
917 bytes -= ioq->hbytes;
918 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
920 case DMSG_MSGQ_STATE_AUXDATA1:
922 * Copy the partial or complete [decrypted] payload from
923 * remaining bytes in the FIFO in order to optimize the
924 * makeroom call in the AUXDATA2 state. We have to
925 * fall-through either way so we can check the crc.
927 * msg->aux_size tracks our aux data.
929 * (Lets not complicate matters if the data is encrypted,
930 * since the data in-stream is not the same size as the
933 if (bytes >= ioq->abytes) {
934 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
936 msg->aux_size = ioq->abytes;
937 ioq->fifo_beg += ioq->abytes;
938 assert(ioq->fifo_beg <= ioq->fifo_cdx);
939 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
940 bytes -= ioq->abytes;
942 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
944 msg->aux_size = bytes;
945 ioq->fifo_beg += bytes;
946 if (ioq->fifo_cdx < ioq->fifo_beg)
947 ioq->fifo_cdx = ioq->fifo_beg;
948 assert(ioq->fifo_beg <= ioq->fifo_cdx);
949 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
954 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
956 case DMSG_MSGQ_STATE_AUXDATA2:
958 * Make sure there is enough room for more data.
961 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
964 * Read and decrypt more of the payload.
966 if (msg->aux_size < ioq->abytes) {
968 n = read(iocom->sock_fd,
969 ioq->buf + ioq->fifo_end,
973 ioq->error = DMSG_IOQ_ERROR_EOF;
976 if (errno != EINTR &&
977 errno != EINPROGRESS &&
979 ioq->error = DMSG_IOQ_ERROR_SOCK;
985 ioq->fifo_end += (size_t)n;
989 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
990 dmsg_crypto_decrypt(iocom, ioq);
992 ioq->fifo_cdx = ioq->fifo_end;
993 ioq->fifo_cdn = ioq->fifo_end;
995 bytes = ioq->fifo_cdx - ioq->fifo_beg;
997 if (bytes > ioq->abytes - msg->aux_size)
998 bytes = ioq->abytes - msg->aux_size;
1001 bcopy(ioq->buf + ioq->fifo_beg,
1002 msg->aux_data + msg->aux_size,
1004 msg->aux_size += bytes;
1005 ioq->fifo_beg += bytes;
1009 * Insufficient data accumulated (set msg NULL so caller will
1012 * Assert the auxillary data size is correct, then record the
1013 * original unaligned size from the message header.
1015 if (msg->aux_size < ioq->abytes) {
1019 assert(msg->aux_size == ioq->abytes);
1020 msg->aux_size = ioq->unaligned_aux_size;
1023 * Check aux_crc, then we are done. Note that the crc
1024 * is calculated over the aligned size, not the actual
1027 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
1028 if (xcrc32 != msg->any.head.aux_crc) {
1029 ioq->error = DMSG_IOQ_ERROR_ACRC;
1031 "iocom: ACRC error %08x vs %08x "
1032 "msgid %016jx msgcmd %08x auxsize %d\n",
1034 msg->any.head.aux_crc,
1035 (intmax_t)msg->any.head.msgid,
1037 msg->any.head.aux_bytes);
1041 case DMSG_MSGQ_STATE_ERROR:
1043 * Continued calls to drain recorded transactions (returning
1044 * a LNK_ERROR for each one), before we return the final
1047 assert(msg == NULL);
1051 * We don't double-return errors, the caller should not
1052 * have called us again after getting an error msg.
1059 * Check the message sequence. The iv[] should prevent any
1060 * possibility of a replay but we add this check anyway.
1062 if (msg && ioq->error == 0) {
1063 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1064 ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1071 * Handle error, RREQ, or completion
1073 * NOTE: nmax and bytes are invalid at this point, we don't bother
1074 * to update them when breaking out.
1079 * An unrecoverable error causes all active receive
1080 * transactions to be terminated with a LNK_ERROR message.
1082 * Once all active transactions are exhausted we set the
1083 * iocom ERROR flag and return a non-transactional LNK_ERROR
1084 * message, which should cause master processing loops to
1087 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1088 assert(ioq->msg == msg);
1096 * No more I/O read processing
1098 ioq->state = DMSG_MSGQ_STATE_ERROR;
1101 * Simulate a remote LNK_ERROR DELETE msg for any open
1102 * transactions, ending with a final non-transactional
1103 * LNK_ERROR (that the session can detect) when no
1104 * transactions remain.
1106 * NOTE: Temporarily supply state0 and a degenerate cmd
1107 * without CREATE set. The real state will be
1108 * assigned in the loop.
1110 * NOTE: We are simulating a received message using our
1111 * side of the state, so the DMSGF_REV* bits have
1114 pthread_mutex_lock(&iocom->mtx);
1115 dmsg_iocom_drain(iocom);
1116 dmsg_simulate_failure(&iocom->state0, ioq->error);
1117 pthread_mutex_unlock(&iocom->mtx);
1118 if (TAILQ_FIRST(&ioq->msgq))
1123 * For the iocom error case we want to set RWORK to indicate
1124 * that more messages might be pending.
1126 * It is possible to return NULL when there is more work to
1127 * do because each message has to be DELETEd in both
1128 * directions before we continue on with the next (though
1129 * this could be optimized). The transmit direction will
1133 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1135 } else if (msg == NULL) {
1137 * Insufficient data received to finish building the message,
1138 * set RREQ and return NULL.
1140 * Leave ioq->msg intact.
1141 * Leave the FIFO intact.
1143 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1146 * Continue processing msg.
1148 * The fifo has already been advanced past the message.
1149 * Trivially reset the FIFO indices if possible.
1151 * clear the FIFO if it is now empty and set RREQ to wait
1152 * for more from the socket. If the FIFO is not empty set
1153 * TWORK to bypass the poll so we loop immediately.
1155 if (ioq->fifo_beg == ioq->fifo_cdx &&
1156 ioq->fifo_cdn == ioq->fifo_end) {
1157 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1163 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1165 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1169 * Handle message routing. Validates non-zero sources
1170 * and routes message. Error will be 0 if the message is
1173 * State processing only occurs for messages destined for us.
1175 if (DMsgDebugOpt >= 5) {
1177 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1179 (intmax_t)msg->any.head.msgid,
1180 (intmax_t)msg->any.head.circuit);
1183 error = dmsg_state_msgrx(msg);
1187 * Abort-after-closure, throw message away and
1188 * start reading another.
1190 if (error == DMSG_IOQ_ERROR_EALREADY) {
1196 * Process real error and throw away message.
1203 * No error and not routed
1205 /* no error, not routed. Fall through and return msg */
1211 * Calculate the header and data crc's and write a low-level message to
1212 * the connection. If aux_crc is non-zero the aux_data crc is already
1213 * assumed to have been set.
1215 * A non-NULL msg is added to the queue but not necessarily flushed.
1216 * Calling this function with msg == NULL will get a flush going.
1218 * (called from iocom_core only)
1221 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1223 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1228 dmsg_msg_queue_t tmpq;
1230 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1232 pthread_mutex_lock(&iocom->mtx);
1233 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1234 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1235 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1237 pthread_mutex_unlock(&iocom->mtx);
1239 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1241 * Process terminal connection errors.
1243 TAILQ_REMOVE(&tmpq, msg, qentry);
1245 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1251 * Finish populating the msg fields. The salt ensures that
1252 * the iv[] array is ridiculously randomized and we also
1253 * re-seed our PRNG every 32768 messages just to be sure.
1255 msg->any.head.magic = DMSG_HDR_MAGIC;
1256 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1258 if ((ioq->seq & 32767) == 0)
1262 * Calculate aux_crc if 0, then calculate hdr_crc.
1264 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1265 abytes = DMSG_DOALIGN(msg->aux_size);
1266 xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1267 msg->any.head.aux_crc = xcrc32;
1269 msg->any.head.aux_bytes = msg->aux_size;
1271 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1273 msg->any.head.hdr_crc = 0;
1274 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1277 * Enqueue the message (the flush codes handles stream
1280 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1283 dmsg_iocom_flush2(iocom);
1287 * Thread localized, iocom->mtx not held by caller.
1289 * (called from iocom_core via iocom_flush1 only)
1292 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1294 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1297 struct iovec iov[DMSG_IOQ_MAXIOVEC];
1307 dmsg_iocom_drain(iocom);
1312 * Pump messages out the connection by building an iovec.
1314 * ioq->hbytes/ioq->abytes tracks how much of the first message
1315 * in the queue has been successfully written out, so we can
1323 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1324 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1326 abytes = DMSG_DOALIGN(msg->aux_size);
1327 assert(hoff <= hbytes && aoff <= abytes);
1329 if (hoff < hbytes) {
1330 size_t maxlen = hbytes - hoff;
1331 if (maxlen > sizeof(ioq->buf) / 2)
1332 maxlen = sizeof(ioq->buf) / 2;
1333 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1334 iov[iovcnt].iov_len = maxlen;
1337 if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1338 maxlen != hbytes - hoff) {
1342 if (aoff < abytes) {
1343 size_t maxlen = abytes - aoff;
1344 if (maxlen > sizeof(ioq->buf) / 2)
1345 maxlen = sizeof(ioq->buf) / 2;
1347 assert(msg->aux_data != NULL);
1348 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1349 iov[iovcnt].iov_len = maxlen;
1352 if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1353 maxlen != abytes - aoff) {
1362 * Shortcut if no work to do. Be sure to check for old work still
1363 * pending in the FIFO.
1365 if (iovcnt == 0 && ioq->fifo_beg == ioq->fifo_cdx)
1369 * Encrypt and write the data. The crypto code will move the
1370 * data into the fifo and adjust the iov as necessary. If
1371 * encryption is disabled the iov is left alone.
1373 * May return a smaller iov (thus a smaller n), with aggregated
1374 * chunks. May reduce nmax to what fits in the FIFO.
1376 * This function sets nact to the number of original bytes now
1377 * encrypted, adding to the FIFO some number of bytes that might
1378 * be greater depending on the crypto mechanic. iov[] is adjusted
1379 * to point at the FIFO if necessary.
1381 * NOTE: nact is the number of bytes eaten from the message. For
1382 * encrypted data this is the number of bytes processed for
1383 * encryption and not necessarily the number of bytes writable.
1384 * The return value from the writev() is the post-encrypted
1385 * byte count which might be larger.
1387 * NOTE: For direct writes, nact is the return value from the writev().
1389 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1391 * Make sure the FIFO has a reasonable amount of space
1392 * left (if not completely full).
1394 * In this situation we are staging the encrypted message
1395 * data in the FIFO. (nact) represents how much plaintext
1396 * has been staged, (n) represents how much encrypted data
1397 * has been flushed. The two are independent of each other.
1399 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1400 sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1401 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1402 ioq->fifo_end - ioq->fifo_beg);
1403 ioq->fifo_cdx -= ioq->fifo_beg;
1404 ioq->fifo_cdn -= ioq->fifo_beg;
1405 ioq->fifo_end -= ioq->fifo_beg;
1410 * beg .... cdx ............ cdn ............. end
1411 * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1413 * Advance fifo_beg on a successful write.
1415 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1416 n = writev(iocom->sock_fd, iov, iovcnt);
1420 if (ioq->fifo_beg == ioq->fifo_end) {
1429 * We don't mess with the nact returned by the crypto_encrypt
1430 * call, which represents the filling of the FIFO. (n) tells
1431 * us how much we were able to write from the FIFO. The two
1432 * are different beasts when encrypting.
1436 * In this situation we are not staging the messages to the
1437 * FIFO but instead writing them directly from the msg
1438 * structure(s) unencrypted, so (nact) is basically (n).
1440 n = writev(iocom->sock_fd, iov, iovcnt);
1449 * Clean out the transmit queue based on what we successfully
1450 * encrypted (nact is the plaintext count) and is now in the FIFO.
1451 * ioq->hbytes/abytes represents the portion of the first message
1454 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1455 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1457 abytes = DMSG_DOALIGN(msg->aux_size);
1459 if ((size_t)nact < hbytes - ioq->hbytes) {
1460 ioq->hbytes += nact;
1464 nact -= hbytes - ioq->hbytes;
1465 ioq->hbytes = hbytes;
1466 if ((size_t)nact < abytes - ioq->abytes) {
1467 ioq->abytes += nact;
1471 nact -= abytes - ioq->abytes;
1472 /* ioq->abytes = abytes; optimized out */
1476 "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1478 (intmax_t)msg->any.head.msgid,
1479 (intmax_t)msg->any.head.circuit);
1482 #ifdef DMSG_BLOCK_DEBUG
1485 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
1486 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
1487 tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
1488 (msg->any.head.cmd & (DMSGF_CREATE |
1495 tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
1499 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
1500 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
1501 fprintf(stderr, "write BIO %-3d %016jx %d@%016jx\n",
1502 biocount, msg->any.head.msgid,
1503 msg->any.blk_read.bytes,
1504 msg->any.blk_read.offset);
1506 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1507 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1508 fprintf(stderr, "wretr BIO %-3d %016jx %d@%016jx\n",
1509 biocount, msg->any.head.msgid,
1510 msg->any.blk_read.bytes,
1511 msg->any.blk_read.offset);
1518 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1527 * Process the return value from the write w/regards to blocking.
1530 if (save_errno != EINTR &&
1531 save_errno != EINPROGRESS &&
1532 save_errno != EAGAIN) {
1536 ioq->error = DMSG_IOQ_ERROR_SOCK;
1537 dmsg_iocom_drain(iocom);
1540 * Wait for socket buffer space, do not try to
1541 * process more packets for transmit until space
1544 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1546 } else if (TAILQ_FIRST(&ioq->msgq) ||
1547 TAILQ_FIRST(&iocom->txmsgq) ||
1548 ioq->fifo_beg != ioq->fifo_cdx) {
1550 * If the write succeeded and more messages are pending
1551 * in either msgq, or the FIFO WWORK must remain set.
1553 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
1555 /* else no transmit-side work remains */
1558 dmsg_iocom_drain(iocom);
1563 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1564 * write events will occur. We don't kill read msgs because we want
1565 * the caller to pull off our contrived terminal error msg to detect
1566 * the connection failure.
1568 * Localized to iocom_core thread, iocom->mtx not held by caller.
1571 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1573 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1576 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1580 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1581 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1588 * Write a message to an iocom, with additional state processing.
1591 dmsg_msg_write(dmsg_msg_t *msg)
1593 dmsg_iocom_t *iocom = msg->state->iocom;
1594 dmsg_state_t *state;
1597 pthread_mutex_lock(&iocom->mtx);
1602 * Make sure the parent transaction is still open in the transmit
1603 * direction. If it isn't the message is dead and we have to
1604 * potentially simulate a rxmsg terminating the transaction.
1606 if ((state->parent->txcmd & DMSGF_DELETE) ||
1607 (state->parent->rxcmd & DMSGF_DELETE)) {
1608 fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1609 dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1610 dmsg_state_cleanuptx(iocom, msg);
1612 pthread_mutex_unlock(&iocom->mtx);
1618 * Process state data into the message as needed, then update the
1619 * state based on the message.
1621 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1623 * Existing transaction (could be reply). It is also
1624 * possible for this to be the first reply (CREATE is set),
1625 * in which case we populate state->txcmd.
1627 * state->txcmd is adjusted to hold the final message cmd,
1628 * and we also be sure to set the CREATE bit here. We did
1629 * not set it in dmsg_msg_alloc() because that would have
1630 * not been serialized (state could have gotten ripped out
1631 * from under the message prior to it being transmitted).
1633 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1635 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1636 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1638 msg->any.head.msgid = state->msgid;
1640 if (msg->any.head.cmd & DMSGF_CREATE) {
1641 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1644 dmsg_state_cleanuptx(iocom, msg);
1648 "MSGWRITE %016jx %08x\n",
1649 msg->any.head.msgid, msg->any.head.cmd);
1653 * Queue it for output, wake up the I/O pthread. Note that the
1654 * I/O thread is responsible for generating the CRCs and encryption.
1656 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1658 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
1659 pthread_mutex_unlock(&iocom->mtx);
1663 * Simulate reception of a transaction DELETE message when the link goes
1664 * bad. This routine must recurse through state->subq and generate messages
1665 * and callbacks bottom-up.
1667 * iocom->mtx must be held by caller.
1671 dmsg_simulate_failure(dmsg_state_t *state, int error)
1673 dmsg_state_t *substate;
1674 dmsg_iocom_t *iocom;
1677 while ((substate = TAILQ_FIRST(&state->subq)) != NULL) {
1678 dmsg_simulate_failure(substate, error);
1681 iocom = state->iocom;
1682 if (state == &iocom->state0) {
1684 * No active local or remote transactions remain.
1685 * Generate a final LNK_ERROR. EOF will be flagged
1686 * when the message is returned by dmsg_ioq_read().
1688 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1691 msg->any.head.error = error;
1692 } else if (state->flags & DMSG_STATE_OPPOSITE) {
1694 * Active remote transactions are still present.
1695 * Simulate the other end sending us a DELETE.
1697 if (state->flags & DMSG_STATE_SUBINSERTED) {
1698 TAILQ_REMOVE(&state->parent->subq, state, entry);
1699 state->flags &= ~DMSG_STATE_SUBINSERTED;
1701 if ((state->rxcmd & DMSGF_DELETE) == 0) {
1702 fprintf(stderr, "SIMULATE ERROR1\n");
1703 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1706 /*state->txcmd |= DMSGF_DELETE;*/
1708 msg->any.head.error = error;
1709 msg->any.head.msgid = state->msgid;
1710 msg->any.head.circuit = state->parent->msgid;
1711 msg->any.head.cmd |= DMSGF_ABORT |
1713 if ((state->parent->flags &
1714 DMSG_STATE_OPPOSITE) == 0) {
1715 msg->any.head.cmd |= DMSGF_REVCIRC;
1722 * Active local transactions are still present.
1723 * Simulate the other end sending us a DELETE.
1725 if (state->flags & DMSG_STATE_SUBINSERTED) {
1726 TAILQ_REMOVE(&state->parent->subq, state, entry);
1727 state->flags &= ~DMSG_STATE_SUBINSERTED;
1729 if ((state->rxcmd & DMSGF_DELETE) == 0) {
1730 fprintf(stderr, "SIMULATE ERROR1\n");
1731 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1735 msg->any.head.error = error;
1736 msg->any.head.msgid = state->msgid;
1737 msg->any.head.circuit = state->parent->msgid;
1738 msg->any.head.cmd |= DMSGF_ABORT |
1742 if ((state->parent->flags &
1743 DMSG_STATE_OPPOSITE) == 0) {
1744 msg->any.head.cmd |= DMSGF_REVCIRC;
1746 if ((state->rxcmd & DMSGF_CREATE) == 0)
1747 msg->any.head.cmd |= DMSGF_CREATE;
1753 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1754 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1759 * This is a shortcut to formulate a reply to msg with a simple error code,
1760 * It can reply to and terminate a transaction, or it can reply to a one-way
1761 * messages. A DMSG_LNK_ERROR command code is utilized to encode
1762 * the error code (which can be 0). Not all transactions are terminated
1763 * with DMSG_LNK_ERROR status (the low level only cares about the
1764 * MSGF_DELETE flag), but most are.
1766 * Replies to one-way messages are a bit of an oxymoron but the feature
1767 * is used by the debug (DBG) protocol.
1769 * The reply contains no extended data.
1772 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1774 dmsg_state_t *state = msg->state;
1779 * Reply with a simple error code and terminate the transaction.
1781 cmd = DMSG_LNK_ERROR;
1784 * Check if our direction has even been initiated yet, set CREATE.
1786 * Check what direction this is (command or reply direction). Note
1787 * that txcmd might not have been initiated yet.
1789 * If our direction has already been closed we just return without
1792 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1793 if (state->txcmd & DMSGF_DELETE)
1795 if (state->txcmd & DMSGF_REPLY)
1797 cmd |= DMSGF_DELETE;
1799 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1804 * Allocate the message and associate it with the existing state.
1805 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1806 * allocate new state. We have our state already.
1808 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1809 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1810 if ((state->txcmd & DMSGF_CREATE) == 0)
1811 nmsg->any.head.cmd |= DMSGF_CREATE;
1813 nmsg->any.head.error = error;
1815 dmsg_msg_write(nmsg);
1819 * Similar to dmsg_msg_reply() but leave the transaction open. That is,
1820 * we are generating a streaming reply or an intermediate acknowledgement
1821 * of some sort as part of the higher level protocol, with more to come
1825 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1827 dmsg_state_t *state = msg->state;
1833 * Reply with a simple error code and terminate the transaction.
1835 cmd = DMSG_LNK_ERROR;
1838 * Check if our direction has even been initiated yet, set CREATE.
1840 * Check what direction this is (command or reply direction). Note
1841 * that txcmd might not have been initiated yet.
1843 * If our direction has already been closed we just return without
1846 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1847 if (state->txcmd & DMSGF_DELETE)
1849 if (state->txcmd & DMSGF_REPLY)
1851 /* continuing transaction, do not set MSGF_DELETE */
1853 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1856 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1857 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1858 if ((state->txcmd & DMSGF_CREATE) == 0)
1859 nmsg->any.head.cmd |= DMSGF_CREATE;
1861 nmsg->any.head.error = error;
1863 dmsg_msg_write(nmsg);
1867 * Terminate a transaction given a state structure by issuing a DELETE.
1868 * (the state structure must not be &iocom->state0)
1871 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1874 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1877 * Nothing to do if we already transmitted a delete
1879 if (state->txcmd & DMSGF_DELETE)
1883 * Set REPLY if the other end initiated the command. Otherwise
1884 * we are the command direction.
1886 if (state->txcmd & DMSGF_REPLY)
1889 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1890 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1891 if ((state->txcmd & DMSGF_CREATE) == 0)
1892 nmsg->any.head.cmd |= DMSGF_CREATE;
1894 nmsg->any.head.error = error;
1895 dmsg_msg_write(nmsg);
1899 * Terminate a transaction given a state structure by issuing a DELETE.
1900 * (the state structure must not be &iocom->state0)
1903 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1906 uint32_t cmd = DMSG_LNK_ERROR;
1909 * Nothing to do if we already transmitted a delete
1911 if (state->txcmd & DMSGF_DELETE)
1915 * Set REPLY if the other end initiated the command. Otherwise
1916 * we are the command direction.
1918 if (state->txcmd & DMSGF_REPLY)
1921 nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1922 if ((state->flags & DMSG_STATE_ROOT) == 0) {
1923 if ((state->txcmd & DMSGF_CREATE) == 0)
1924 nmsg->any.head.cmd |= DMSGF_CREATE;
1926 nmsg->any.head.error = error;
1927 dmsg_msg_write(nmsg);
1930 /************************************************************************
1931 * TRANSACTION STATE HANDLING *
1932 ************************************************************************
1937 * Process state tracking for a message after reception, prior to execution.
1938 * Possibly route the message (consuming it).
1940 * Called with msglk held and the msg dequeued.
1942 * All messages are called with dummy state and return actual state.
1943 * (One-off messages often just return the same dummy state).
1945 * May request that caller discard the message by setting *discardp to 1.
1946 * The returned state is not used in this case and is allowed to be NULL.
1950 * These routines handle persistent and command/reply message state via the
1951 * CREATE and DELETE flags. The first message in a command or reply sequence
1952 * sets CREATE, the last message in a command or reply sequence sets DELETE.
1954 * There can be any number of intermediate messages belonging to the same
1955 * sequence sent inbetween the CREATE message and the DELETE message,
1956 * which set neither flag. This represents a streaming command or reply.
1958 * Any command message received with CREATE set expects a reply sequence to
1959 * be returned. Reply sequences work the same as command sequences except the
1960 * REPLY bit is also sent. Both the command side and reply side can
1961 * degenerate into a single message with both CREATE and DELETE set. Note
1962 * that one side can be streaming and the other side not, or neither, or both.
1964 * The msgid is unique for the initiator. That is, two sides sending a new
1965 * message can use the same msgid without colliding.
1969 * The message may be running over a circuit. If the circuit is half-deleted
1970 * The message is typically racing against a link failure and must be thrown
1971 * out. As the circuit deletion propagates the library will automatically
1972 * generate terminations for sub states.
1976 * ABORT sequences work by setting the ABORT flag along with normal message
1977 * state. However, ABORTs can also be sent on half-closed messages, that is
1978 * even if the command or reply side has already sent a DELETE, as long as
1979 * the message has not been fully closed it can still send an ABORT+DELETE
1980 * to terminate the half-closed message state.
1982 * Since ABORT+DELETEs can race we silently discard ABORT's for message
1983 * state which has already been fully closed. REPLY+ABORT+DELETEs can
1984 * also race, and in this situation the other side might have already
1985 * initiated a new unrelated command with the same message id. Since
1986 * the abort has not set the CREATE flag the situation can be detected
1987 * and the message will also be discarded.
1989 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1990 * The ABORT request is essentially integrated into the command instead
1991 * of being sent later on. In this situation the command implementation
1992 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1993 * special-case non-blocking operation for the command.
1995 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1996 * to be mid-stream aborts for command/reply sequences. ABORTs on
1997 * one-way messages are not supported.
1999 * NOTE! If a command sequence does not support aborts the ABORT flag is
2004 * One-off messages (no reply expected) are sent without an established
2005 * transaction. CREATE and DELETE are left clear and the msgid is usually 0.
2006 * For one-off messages sent over circuits msgid generally MUST be 0.
2008 * One-off messages cannot be aborted and typically aren't processed
2009 * by these routines. Order is still guaranteed for messages sent over
2010 * the same circuit. The REPLY bit can be used to distinguish whether
2011 * a one-off message is a command or reply. For example, one-off replies
2012 * will typically just contain status updates.
2015 dmsg_state_msgrx(dmsg_msg_t *msg)
2017 dmsg_iocom_t *iocom = msg->state->iocom;
2018 dmsg_state_t *state;
2019 dmsg_state_t *pstate;
2020 dmsg_state_t sdummy;
2023 pthread_mutex_lock(&iocom->mtx);
2026 * Lookup the circuit (pstate). The circuit will be an open
2027 * transaction. The REVCIRC bit in the message tells us which side
2030 if (msg->any.head.circuit) {
2031 sdummy.msgid = msg->any.head.circuit;
2033 if (msg->any.head.cmd & DMSGF_REVCIRC) {
2034 pstate = RB_FIND(dmsg_state_tree,
2035 &iocom->statewr_tree,
2038 pstate = RB_FIND(dmsg_state_tree,
2039 &iocom->staterd_tree,
2042 if (pstate == NULL) {
2044 "missing parent in stacked trans %s\n",
2046 error = DMSG_IOQ_ERROR_TRANS;
2047 pthread_mutex_unlock(&iocom->mtx);
2051 pstate = &iocom->state0;
2057 * If received msg is a command state is on staterd_tree.
2058 * If received msg is a reply state is on statewr_tree.
2059 * Otherwise there is no state (retain &iocom->state0)
2061 sdummy.msgid = msg->any.head.msgid;
2062 if (msg->any.head.cmd & DMSGF_REVTRANS)
2063 state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
2065 state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
2069 * Message over an existing transaction (CREATE should not
2073 assert(pstate == state->parent);
2076 * Either a new transaction (if CREATE set) or a one-off.
2081 pthread_mutex_unlock(&iocom->mtx);
2084 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2085 * inside the case statements.
2087 * Construct new state as necessary.
2089 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2092 case DMSGF_CREATE | DMSGF_DELETE:
2094 * Create new sub-transaction under pstate.
2095 * (any DELETE is handled in post-processing of msg).
2097 * (During routing the msgid was made unique for this
2098 * direction over the comlink, so our RB trees can be
2099 * iocom-based instead of state-based).
2101 if (state != pstate) {
2103 "duplicate transaction %s\n",
2105 error = DMSG_IOQ_ERROR_TRANS;
2111 * Allocate the new state.
2113 state = malloc(sizeof(*state));
2114 atomic_add_int(&dmsg_state_count, 1);
2115 bzero(state, sizeof(*state));
2116 TAILQ_INIT(&state->subq);
2117 dmsg_state_hold(pstate);
2119 state->parent = pstate;
2120 state->iocom = iocom;
2121 state->flags = DMSG_STATE_DYNAMIC |
2122 DMSG_STATE_OPPOSITE;
2123 state->msgid = msg->any.head.msgid;
2124 state->txcmd = DMSGF_REPLY;
2125 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2126 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2128 pthread_mutex_lock(&iocom->mtx);
2129 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2130 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2131 state->flags |= DMSG_STATE_SUBINSERTED |
2132 DMSG_STATE_RBINSERTED;
2135 * If the parent is a relay set up the state handler to
2136 * automatically route the message. Local processing will
2139 * (state relays are seeded by SPAN processing)
2142 state->func = dmsg_state_relay;
2143 pthread_mutex_unlock(&iocom->mtx);
2148 "create state %p id=%08x on iocom staterd %p\n",
2149 state, (uint32_t)state->msgid, iocom);
2154 * Persistent state is expected but might not exist if an
2155 * ABORT+DELETE races the close.
2157 * (any DELETE is handled in post-processing of msg).
2159 if (state == pstate) {
2160 if (msg->any.head.cmd & DMSGF_ABORT) {
2161 error = DMSG_IOQ_ERROR_EALREADY;
2163 fprintf(stderr, "missing-state %s\n",
2165 error = DMSG_IOQ_ERROR_TRANS;
2172 * Handle another ABORT+DELETE case if the msgid has already
2175 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2176 if (msg->any.head.cmd & DMSGF_ABORT) {
2177 error = DMSG_IOQ_ERROR_EALREADY;
2179 fprintf(stderr, "reused-state %s\n",
2181 error = DMSG_IOQ_ERROR_TRANS;
2190 * Check for mid-stream ABORT command received, otherwise
2193 if (msg->any.head.cmd & DMSGF_ABORT) {
2194 if ((state == pstate) ||
2195 (state->rxcmd & DMSGF_CREATE) == 0) {
2196 error = DMSG_IOQ_ERROR_EALREADY;
2202 case DMSGF_REPLY | DMSGF_CREATE:
2203 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2205 * When receiving a reply with CREATE set the original
2206 * persistent state message should already exist.
2208 if (state == pstate) {
2209 fprintf(stderr, "no-state(r) %s\n",
2211 error = DMSG_IOQ_ERROR_TRANS;
2215 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2216 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2219 case DMSGF_REPLY | DMSGF_DELETE:
2221 * Received REPLY+ABORT+DELETE in case where msgid has
2222 * already been fully closed, ignore the message.
2224 if (state == pstate) {
2225 if (msg->any.head.cmd & DMSGF_ABORT) {
2226 error = DMSG_IOQ_ERROR_EALREADY;
2228 fprintf(stderr, "no-state(r,d) %s\n",
2230 error = DMSG_IOQ_ERROR_TRANS;
2237 * Received REPLY+ABORT+DELETE in case where msgid has
2238 * already been reused for an unrelated message,
2239 * ignore the message.
2241 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2242 if (msg->any.head.cmd & DMSGF_ABORT) {
2243 error = DMSG_IOQ_ERROR_EALREADY;
2245 fprintf(stderr, "reused-state(r,d) %s\n",
2247 error = DMSG_IOQ_ERROR_TRANS;
2256 * Check for mid-stream ABORT reply received to sent command.
2258 if (msg->any.head.cmd & DMSGF_ABORT) {
2259 if (state == pstate ||
2260 (state->rxcmd & DMSGF_CREATE) == 0) {
2261 error = DMSG_IOQ_ERROR_EALREADY;
2270 * Calculate the easy-switch() transactional command. Represents
2271 * the outer-transaction command for any transaction-create or
2272 * transaction-delete, and the inner message command for any
2273 * non-transaction or inside-transaction command. tcmd will be
2274 * set to 0 for any messaging error condition.
2276 * The two can be told apart because outer-transaction commands
2277 * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2279 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2280 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
2281 msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2282 (msg->any.head.cmd & (DMSGF_CREATE |
2289 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2292 #ifdef DMSG_BLOCK_DEBUG
2293 switch (msg->tcmd) {
2294 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2295 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2296 fprintf(stderr, "read BIO %-3d %016jx %d@%016jx\n",
2297 biocount, msg->any.head.msgid,
2298 msg->any.blk_read.bytes,
2299 msg->any.blk_read.offset);
2301 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2302 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2303 fprintf(stderr, "rread BIO %-3d %016jx %d@%016jx\n",
2304 biocount, msg->any.head.msgid,
2305 msg->any.blk_read.bytes,
2306 msg->any.blk_read.offset);
2317 * Route the message and handle pair-state processing.
2320 dmsg_state_relay(dmsg_msg_t *lmsg)
2322 dmsg_state_t *lpstate;
2323 dmsg_state_t *rpstate;
2324 dmsg_state_t *lstate;
2325 dmsg_state_t *rstate;
2328 #ifdef DMSG_BLOCK_DEBUG
2329 switch (lmsg->tcmd) {
2330 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2331 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2332 atomic_add_int(&biocount, 1);
2333 fprintf(stderr, "relay BIO %-3d %016jx %d@%016jx\n",
2334 biocount, lmsg->any.head.msgid,
2335 lmsg->any.blk_read.bytes,
2336 lmsg->any.blk_read.offset);
2338 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2339 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2340 fprintf(stderr, "retrn BIO %-3d %016jx %d@%016jx\n",
2341 biocount, lmsg->any.head.msgid,
2342 lmsg->any.blk_read.bytes,
2343 lmsg->any.blk_read.offset);
2344 atomic_add_int(&biocount, -1);
2351 if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2354 * New sub-transaction, establish new state and relay.
2356 lstate = lmsg->state;
2357 lpstate = lstate->parent;
2358 rpstate = lpstate->relay;
2359 assert(lstate->relay == NULL);
2360 assert(rpstate != NULL);
2362 rmsg = dmsg_msg_alloc(rpstate, 0,
2364 dmsg_state_relay, NULL);
2365 rstate = rmsg->state;
2366 rstate->relay = lstate;
2367 lstate->relay = rstate;
2368 dmsg_state_hold(lstate);
2369 dmsg_state_hold(rstate);
2372 * State & relay already established
2374 lstate = lmsg->state;
2375 rstate = lstate->relay;
2376 assert(rstate != NULL);
2378 rmsg = dmsg_msg_alloc(rstate, 0,
2380 dmsg_state_relay, NULL);
2382 if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2383 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2384 lmsg->hdr_size - sizeof(lmsg->any.head));
2386 rmsg->any.head.error = lmsg->any.head.error;
2387 rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2388 rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2389 rmsg->aux_size = lmsg->aux_size;
2390 rmsg->aux_data = lmsg->aux_data;
2391 lmsg->aux_data = NULL;
2393 fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2395 dmsg_msg_write(rmsg);
2399 * Cleanup and retire msg after processing
2402 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2404 dmsg_state_t *state;
2405 dmsg_state_t *pstate;
2407 assert(msg->state->iocom == iocom);
2409 if (state->flags & DMSG_STATE_ROOT) {
2411 * Free a non-transactional message, there is no state
2415 } else if (msg->any.head.cmd & DMSGF_DELETE) {
2417 * Message terminating transaction, destroy the related
2418 * state, the original message, and this message (if it
2419 * isn't the original message due to a CREATE|DELETE).
2421 * It's possible for governing state to terminate while
2422 * sub-transactions still exist. This is allowed but
2423 * will cause sub-transactions to recursively fail.
2424 * Further reception of sub-transaction messages will be
2425 * impossible because the circuit will no longer exist.
2426 * (XXX need code to make sure that happens properly).
2428 pthread_mutex_lock(&iocom->mtx);
2429 state->rxcmd |= DMSGF_DELETE;
2431 if (state->txcmd & DMSGF_DELETE) {
2432 assert(state->flags & DMSG_STATE_RBINSERTED);
2433 if (state->rxcmd & DMSGF_REPLY) {
2434 assert(msg->any.head.cmd & DMSGF_REPLY);
2435 RB_REMOVE(dmsg_state_tree,
2436 &iocom->statewr_tree, state);
2438 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2439 RB_REMOVE(dmsg_state_tree,
2440 &iocom->staterd_tree, state);
2442 state->flags &= ~DMSG_STATE_RBINSERTED;
2443 if (state->flags & DMSG_STATE_SUBINSERTED) {
2444 pstate = state->parent;
2445 TAILQ_REMOVE(&pstate->subq, state, entry);
2446 state->flags &= ~DMSG_STATE_SUBINSERTED;
2447 dmsg_state_drop(pstate);
2449 state->parent = NULL;
2452 dmsg_state_drop(state->relay);
2453 state->relay = NULL;
2456 dmsg_state_drop(state);
2460 pthread_mutex_unlock(&iocom->mtx);
2463 * Message not terminating transaction, leave state intact
2464 * and free message if it isn't the CREATE message.
2471 * Clean up the state after pulling out needed fields and queueing the
2472 * message for transmission. This occurs in dmsg_msg_write().
2475 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2477 dmsg_state_t *state;
2478 dmsg_state_t *pstate;
2480 assert(iocom == msg->state->iocom);
2482 if (state->flags & DMSG_STATE_ROOT) {
2484 } else if (msg->any.head.cmd & DMSGF_DELETE) {
2486 * Message terminating transaction, destroy the related
2487 * state, the original message, and this message (if it
2488 * isn't the original message due to a CREATE|DELETE).
2490 * It's possible for governing state to terminate while
2491 * sub-transactions still exist. This is allowed but
2492 * will cause sub-transactions to recursively fail.
2493 * Further reception of sub-transaction messages will be
2494 * impossible because the circuit will no longer exist.
2495 * (XXX need code to make sure that happens properly).
2497 pthread_mutex_lock(&iocom->mtx);
2498 assert((state->txcmd & DMSGF_DELETE) == 0);
2499 state->txcmd |= DMSGF_DELETE;
2500 if (state->rxcmd & DMSGF_DELETE) {
2501 assert(state->flags & DMSG_STATE_RBINSERTED);
2502 if (state->txcmd & DMSGF_REPLY) {
2503 assert(msg->any.head.cmd & DMSGF_REPLY);
2504 RB_REMOVE(dmsg_state_tree,
2505 &iocom->staterd_tree, state);
2507 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2508 RB_REMOVE(dmsg_state_tree,
2509 &iocom->statewr_tree, state);
2511 state->flags &= ~DMSG_STATE_RBINSERTED;
2512 pstate = state->parent;
2513 if (state->flags & DMSG_STATE_SUBINSERTED) {
2514 TAILQ_REMOVE(&pstate->subq, state, entry);
2515 state->flags &= ~DMSG_STATE_SUBINSERTED;
2517 state->parent = NULL;
2518 dmsg_state_drop(pstate);
2521 dmsg_state_drop(state->relay);
2522 state->relay = NULL;
2524 dmsg_state_drop(state); /* usually the last drop */
2526 pthread_mutex_unlock(&iocom->mtx);
2531 * Called with or without locks
2534 dmsg_state_hold(dmsg_state_t *state)
2536 atomic_add_int(&state->refs, 1);
2540 dmsg_state_drop(dmsg_state_t *state)
2542 if (atomic_fetchadd_int(&state->refs, -1) == 1)
2543 dmsg_state_free(state);
2547 * Called with iocom locked
2550 dmsg_state_free(dmsg_state_t *state)
2552 atomic_add_int(&dmsg_state_count, -1);
2554 fprintf(stderr, "terminate state %p id=%08x\n",
2555 state, (uint32_t)state->msgid);
2557 assert((state->flags & (DMSG_STATE_ROOT |
2558 DMSG_STATE_SUBINSERTED |
2559 DMSG_STATE_RBINSERTED)) == 0);
2560 assert(TAILQ_EMPTY(&state->subq));
2561 assert(state->refs == 0);
2562 if (state->any.any != NULL) /* XXX avoid deadlock w/exit & kernel */
2564 assert(state->any.any == NULL);
2569 * This swaps endian for a hammer2_msg_hdr. Note that the extended
2570 * header is not adjusted, just the core header.
2573 dmsg_bswap_head(dmsg_hdr_t *head)
2575 head->magic = bswap16(head->magic);
2576 head->reserved02 = bswap16(head->reserved02);
2577 head->salt = bswap32(head->salt);
2579 head->msgid = bswap64(head->msgid);
2580 head->circuit = bswap64(head->circuit);
2581 head->reserved18= bswap64(head->reserved18);
2583 head->cmd = bswap32(head->cmd);
2584 head->aux_crc = bswap32(head->aux_crc);
2585 head->aux_bytes = bswap32(head->aux_bytes);
2586 head->error = bswap32(head->error);
2587 head->aux_descr = bswap64(head->aux_descr);
2588 head->reserved38= bswap32(head->reserved38);
2589 head->hdr_crc = bswap32(head->hdr_crc);