hammer2 - cluster / libdmsg circuit work
[dragonfly.git] / lib / libdmsg / msg.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
36#include "dmsg_local.h"
37
38int DMsgDebugOpt;
39
40static int dmsg_state_msgrx(dmsg_msg_t *msg);
41static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42
43RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
44RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
45
46/*
47 * STATE TREE - Represents open transactions which are indexed by their
48 * { msgid } relative to the governing iocom.
49 */
50int
51dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
52{
53 if (state1->msgid < state2->msgid)
54 return(-1);
55 if (state1->msgid > state2->msgid)
56 return(1);
57 return(0);
58}
59
60/*
61 * CIRCUIT TREE - Represents open circuits which are indexed by their
62 * { msgid } relative to the governing iocom.
63 */
64int
65dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
66{
67 if (circuit1->msgid < circuit2->msgid)
68 return(-1);
69 if (circuit1->msgid > circuit2->msgid)
70 return(1);
71 return(0);
72}
73
74/*
75 * Initialize a low-level ioq
76 */
77void
78dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
79{
80 bzero(ioq, sizeof(*ioq));
81 ioq->state = DMSG_MSGQ_STATE_HEADER1;
82 TAILQ_INIT(&ioq->msgq);
83}
84
85/*
86 * Cleanup queue.
87 *
88 * caller holds iocom->mtx.
89 */
90void
91dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
92{
93 dmsg_msg_t *msg;
94
95 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
96 assert(0); /* shouldn't happen */
97 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
98 dmsg_msg_free(msg);
99 }
100 if ((msg = ioq->msg) != NULL) {
101 ioq->msg = NULL;
102 dmsg_msg_free(msg);
103 }
104}
105
106/*
107 * Initialize a low-level communications channel.
108 *
109 * NOTE: The signal_func() is called at least once from the loop and can be
110 * re-armed via dmsg_iocom_restate().
111 */
112void
113dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
114 void (*signal_func)(dmsg_iocom_t *),
115 void (*rcvmsg_func)(dmsg_msg_t *),
116 void (*dbgmsg_func)(dmsg_msg_t *),
117 void (*altmsg_func)(dmsg_iocom_t *))
118{
119 struct stat st;
120
121 bzero(iocom, sizeof(*iocom));
122
123 iocom->signal_callback = signal_func;
124 iocom->rcvmsg_callback = rcvmsg_func;
125 iocom->altmsg_callback = altmsg_func;
126 iocom->dbgmsg_callback = dbgmsg_func;
127
128 pthread_mutex_init(&iocom->mtx, NULL);
129 RB_INIT(&iocom->circuit_tree);
130 TAILQ_INIT(&iocom->freeq);
131 TAILQ_INIT(&iocom->freeq_aux);
132 TAILQ_INIT(&iocom->txmsgq);
133 iocom->sock_fd = sock_fd;
134 iocom->alt_fd = alt_fd;
135 iocom->flags = DMSG_IOCOMF_RREQ;
136 if (signal_func)
137 iocom->flags |= DMSG_IOCOMF_SWORK;
138 dmsg_ioq_init(iocom, &iocom->ioq_rx);
139 dmsg_ioq_init(iocom, &iocom->ioq_tx);
140 if (pipe(iocom->wakeupfds) < 0)
141 assert(0);
142 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
143 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
144
145 dmsg_circuit_init(iocom, &iocom->circuit0);
146
147 /*
148 * Negotiate session crypto synchronously. This will mark the
149 * connection as error'd if it fails. If this is a pipe it's
150 * a linkage that we set up ourselves to the filesystem and there
151 * is no crypto.
152 */
153 if (fstat(sock_fd, &st) < 0)
154 assert(0);
155 if (S_ISSOCK(st.st_mode))
156 dmsg_crypto_negotiate(iocom);
157
158 /*
159 * Make sure our fds are set to non-blocking for the iocom core.
160 */
161 if (sock_fd >= 0)
162 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
163#if 0
164 /* if line buffered our single fgets() should be fine */
165 if (alt_fd >= 0)
166 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
167#endif
168}
169
170/*
171 * May only be called from a callback from iocom_core.
172 *
173 * Adjust state machine functions, set flags to guarantee that both
174 * the recevmsg_func and the sendmsg_func is called at least once.
175 */
176void
177dmsg_iocom_restate(dmsg_iocom_t *iocom,
178 void (*signal_func)(dmsg_iocom_t *),
179 void (*rcvmsg_func)(dmsg_msg_t *msg),
180 void (*altmsg_func)(dmsg_iocom_t *))
181{
182 iocom->signal_callback = signal_func;
183 iocom->rcvmsg_callback = rcvmsg_func;
184 iocom->altmsg_callback = altmsg_func;
185 if (signal_func)
186 iocom->flags |= DMSG_IOCOMF_SWORK;
187 else
188 iocom->flags &= ~DMSG_IOCOMF_SWORK;
189}
190
191void
192dmsg_iocom_signal(dmsg_iocom_t *iocom)
193{
194 if (iocom->signal_callback)
195 iocom->flags |= DMSG_IOCOMF_SWORK;
196}
197
198/*
199 * Cleanup a terminating iocom.
200 *
201 * Caller should not hold iocom->mtx. The iocom has already been disconnected
202 * from all possible references to it.
203 */
204void
205dmsg_iocom_done(dmsg_iocom_t *iocom)
206{
207 dmsg_msg_t *msg;
208
209 if (iocom->sock_fd >= 0) {
210 close(iocom->sock_fd);
211 iocom->sock_fd = -1;
212 }
213 if (iocom->alt_fd >= 0) {
214 close(iocom->alt_fd);
215 iocom->alt_fd = -1;
216 }
217 dmsg_ioq_done(iocom, &iocom->ioq_rx);
218 dmsg_ioq_done(iocom, &iocom->ioq_tx);
219 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
220 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
221 free(msg);
222 }
223 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
224 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
225 free(msg->aux_data);
226 msg->aux_data = NULL;
227 free(msg);
228 }
229 if (iocom->wakeupfds[0] >= 0) {
230 close(iocom->wakeupfds[0]);
231 iocom->wakeupfds[0] = -1;
232 }
233 if (iocom->wakeupfds[1] >= 0) {
234 close(iocom->wakeupfds[1]);
235 iocom->wakeupfds[1] = -1;
236 }
237 pthread_mutex_destroy(&iocom->mtx);
238}
239
240/*
241 * Initialize a circuit structure and add it to the iocom's circuit_tree.
242 * circuit0 is left out and will not be added to the tree.
243 */
244void
245dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
246{
247 circuit->iocom = iocom;
248 RB_INIT(&circuit->staterd_tree);
249 RB_INIT(&circuit->statewr_tree);
250 if (circuit->msgid)
251 RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
252}
253
254/*
255 * Allocate a new one-way message.
256 */
257dmsg_msg_t *
258dmsg_msg_alloc(dmsg_circuit_t *circuit,
259 size_t aux_size, uint32_t cmd,
260 void (*func)(dmsg_msg_t *), void *data)
261{
262 dmsg_iocom_t *iocom = circuit->iocom;
263 dmsg_state_t *state = NULL;
264 dmsg_msg_t *msg;
265 int hbytes;
266
267 pthread_mutex_lock(&iocom->mtx);
268 if (aux_size) {
269 aux_size = (aux_size + DMSG_ALIGNMASK) &
270 ~DMSG_ALIGNMASK;
271 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
272 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
273 } else {
274 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
275 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
276 }
277 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
278 /*
279 * Create state when CREATE is set without REPLY.
280 * Assign a unique msgid, in this case simply using
281 * the pointer value for 'state'.
282 *
283 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
284 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
285 *
286 * NOTE: state initiated by us and state initiated by
287 * a remote create are placed in different RB trees.
288 * The msgid for SPAN state is used in source/target
289 * for message routing as appropriate.
290 */
291 state = malloc(sizeof(*state));
292 bzero(state, sizeof(*state));
293 state->iocom = iocom;
294 state->circuit = circuit;
295 state->flags = DMSG_STATE_DYNAMIC;
296 state->msgid = (uint64_t)(uintptr_t)state;
297 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
298 state->rxcmd = DMSGF_REPLY;
299 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
300 state->func = func;
301 state->any.any = data;
302 pthread_mutex_lock(&iocom->mtx);
303 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
304 pthread_mutex_unlock(&iocom->mtx);
305 state->flags |= DMSG_STATE_INSERTED;
306 }
307 pthread_mutex_unlock(&iocom->mtx);
308 if (msg == NULL) {
309 msg = malloc(sizeof(*msg));
310 bzero(msg, sizeof(*msg));
311 msg->aux_data = NULL;
312 msg->aux_size = 0;
313 }
314 if (msg->aux_size != aux_size) {
315 if (msg->aux_data) {
316 free(msg->aux_data);
317 msg->aux_data = NULL;
318 msg->aux_size = 0;
319 }
320 if (aux_size) {
321 msg->aux_data = malloc(aux_size);
322 msg->aux_size = aux_size;
323 }
324 }
325 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
326 if (hbytes)
327 bzero(&msg->any.head, hbytes);
328 msg->hdr_size = hbytes;
329 msg->any.head.magic = DMSG_HDR_MAGIC;
330 msg->any.head.cmd = cmd;
331 msg->any.head.aux_descr = 0;
332 msg->any.head.aux_crc = 0;
333 msg->any.head.circuit = 0;
334 msg->circuit = circuit;
335 msg->iocom = iocom;
336 if (state) {
337 msg->state = state;
338 state->msg = msg;
339 msg->any.head.msgid = state->msgid;
340 } else {
341 msg->any.head.msgid = 0;
342 }
343 return (msg);
344}
345
346/*
347 * Free a message so it can be reused afresh.
348 *
349 * NOTE: aux_size can be 0 with a non-NULL aux_data.
350 */
351static
352void
353dmsg_msg_free_locked(dmsg_msg_t *msg)
354{
355 dmsg_iocom_t *iocom = msg->iocom;
356
357 msg->state = NULL;
358 if (msg->aux_data)
359 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
360 else
361 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
362}
363
364void
365dmsg_msg_free(dmsg_msg_t *msg)
366{
367 dmsg_iocom_t *iocom = msg->iocom;
368
369 pthread_mutex_lock(&iocom->mtx);
370 dmsg_msg_free_locked(msg);
371 pthread_mutex_unlock(&iocom->mtx);
372}
373
374/*
375 * I/O core loop for an iocom.
376 *
377 * Thread localized, iocom->mtx not held.
378 */
379void
380dmsg_iocom_core(dmsg_iocom_t *iocom)
381{
382 struct pollfd fds[3];
383 char dummybuf[256];
384 dmsg_msg_t *msg;
385 int timeout;
386 int count;
387 int wi; /* wakeup pipe */
388 int si; /* socket */
389 int ai; /* alt bulk path socket */
390
391 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
392 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
393 DMSG_IOCOMF_WWORK |
394 DMSG_IOCOMF_PWORK |
395 DMSG_IOCOMF_SWORK |
396 DMSG_IOCOMF_ARWORK |
397 DMSG_IOCOMF_AWWORK)) == 0) {
398 /*
399 * Only poll if no immediate work is pending.
400 * Otherwise we are just wasting our time calling
401 * poll.
402 */
403 timeout = 5000;
404
405 count = 0;
406 wi = -1;
407 si = -1;
408 ai = -1;
409
410 /*
411 * Always check the inter-thread pipe, e.g.
412 * for iocom->txmsgq work.
413 */
414 wi = count++;
415 fds[wi].fd = iocom->wakeupfds[0];
416 fds[wi].events = POLLIN;
417 fds[wi].revents = 0;
418
419 /*
420 * Check the socket input/output direction as
421 * requested
422 */
423 if (iocom->flags & (DMSG_IOCOMF_RREQ |
424 DMSG_IOCOMF_WREQ)) {
425 si = count++;
426 fds[si].fd = iocom->sock_fd;
427 fds[si].events = 0;
428 fds[si].revents = 0;
429
430 if (iocom->flags & DMSG_IOCOMF_RREQ)
431 fds[si].events |= POLLIN;
432 if (iocom->flags & DMSG_IOCOMF_WREQ)
433 fds[si].events |= POLLOUT;
434 }
435
436 /*
437 * Check the alternative fd for work.
438 */
439 if (iocom->alt_fd >= 0) {
440 ai = count++;
441 fds[ai].fd = iocom->alt_fd;
442 fds[ai].events = POLLIN;
443 fds[ai].revents = 0;
444 }
445 poll(fds, count, timeout);
446
447 if (wi >= 0 && (fds[wi].revents & POLLIN))
448 iocom->flags |= DMSG_IOCOMF_PWORK;
449 if (si >= 0 && (fds[si].revents & POLLIN))
450 iocom->flags |= DMSG_IOCOMF_RWORK;
451 if (si >= 0 && (fds[si].revents & POLLOUT))
452 iocom->flags |= DMSG_IOCOMF_WWORK;
453 if (wi >= 0 && (fds[wi].revents & POLLOUT))
454 iocom->flags |= DMSG_IOCOMF_WWORK;
455 if (ai >= 0 && (fds[ai].revents & POLLIN))
456 iocom->flags |= DMSG_IOCOMF_ARWORK;
457 } else {
458 /*
459 * Always check the pipe
460 */
461 iocom->flags |= DMSG_IOCOMF_PWORK;
462 }
463
464 if (iocom->flags & DMSG_IOCOMF_SWORK) {
465 iocom->flags &= ~DMSG_IOCOMF_SWORK;
466 iocom->signal_callback(iocom);
467 }
468
469 /*
470 * Pending message queues from other threads wake us up
471 * with a write to the wakeupfds[] pipe. We have to clear
472 * the pipe with a dummy read.
473 */
474 if (iocom->flags & DMSG_IOCOMF_PWORK) {
475 iocom->flags &= ~DMSG_IOCOMF_PWORK;
476 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
477 iocom->flags |= DMSG_IOCOMF_RWORK;
478 iocom->flags |= DMSG_IOCOMF_WWORK;
479 if (TAILQ_FIRST(&iocom->txmsgq))
480 dmsg_iocom_flush1(iocom);
481 }
482
483 /*
484 * Message write sequencing
485 */
486 if (iocom->flags & DMSG_IOCOMF_WWORK)
487 dmsg_iocom_flush1(iocom);
488
489 /*
490 * Message read sequencing. Run this after the write
491 * sequencing in case the write sequencing allowed another
492 * auto-DELETE to occur on the read side.
493 */
494 if (iocom->flags & DMSG_IOCOMF_RWORK) {
495 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
496 (msg = dmsg_ioq_read(iocom)) != NULL) {
497 if (DMsgDebugOpt) {
498 fprintf(stderr, "receive %s\n",
499 dmsg_msg_str(msg));
500 }
501 iocom->rcvmsg_callback(msg);
502 dmsg_state_cleanuprx(iocom, msg);
503 }
504 }
505
506 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
507 iocom->flags &= ~DMSG_IOCOMF_ARWORK;
508 iocom->altmsg_callback(iocom);
509 }
510 }
511}
512
513/*
514 * Make sure there's enough room in the FIFO to hold the
515 * needed data.
516 *
517 * Assume worst case encrypted form is 2x the size of the
518 * plaintext equivalent.
519 */
520static
521size_t
522dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
523{
524 size_t bytes;
525 size_t nmax;
526
527 bytes = ioq->fifo_cdx - ioq->fifo_beg;
528 nmax = sizeof(ioq->buf) - ioq->fifo_end;
529 if (bytes + nmax / 2 < needed) {
530 if (bytes) {
531 bcopy(ioq->buf + ioq->fifo_beg,
532 ioq->buf,
533 bytes);
534 }
535 ioq->fifo_cdx -= ioq->fifo_beg;
536 ioq->fifo_beg = 0;
537 if (ioq->fifo_cdn < ioq->fifo_end) {
538 bcopy(ioq->buf + ioq->fifo_cdn,
539 ioq->buf + ioq->fifo_cdx,
540 ioq->fifo_end - ioq->fifo_cdn);
541 }
542 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
543 ioq->fifo_cdn = ioq->fifo_cdx;
544 nmax = sizeof(ioq->buf) - ioq->fifo_end;
545 }
546 return(nmax);
547}
548
549/*
550 * Read the next ready message from the ioq, issuing I/O if needed.
551 * Caller should retry on a read-event when NULL is returned.
552 *
553 * If an error occurs during reception a DMSG_LNK_ERROR msg will
554 * be returned for each open transaction, then the ioq and iocom
555 * will be errored out and a non-transactional DMSG_LNK_ERROR
556 * msg will be returned as the final message. The caller should not call
557 * us again after the final message is returned.
558 *
559 * Thread localized, iocom->mtx not held.
560 */
561dmsg_msg_t *
562dmsg_ioq_read(dmsg_iocom_t *iocom)
563{
564 dmsg_ioq_t *ioq = &iocom->ioq_rx;
565 dmsg_msg_t *msg;
566 dmsg_state_t *state;
567 dmsg_circuit_t *circuit0;
568 dmsg_hdr_t *head;
569 ssize_t n;
570 size_t bytes;
571 size_t nmax;
572 uint32_t xcrc32;
573 int error;
574
575again:
576 iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
577
578 /*
579 * If a message is already pending we can just remove and
580 * return it. Message state has already been processed.
581 * (currently not implemented)
582 */
583 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
584 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
585 return (msg);
586 }
587
588 /*
589 * If the stream is errored out we stop processing it.
590 */
591 if (ioq->error)
592 goto skip;
593
594 /*
595 * Message read in-progress (msg is NULL at the moment). We don't
596 * allocate a msg until we have its core header.
597 */
598 nmax = sizeof(ioq->buf) - ioq->fifo_end;
599 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */
600 msg = ioq->msg;
601
602 switch(ioq->state) {
603 case DMSG_MSGQ_STATE_HEADER1:
604 /*
605 * Load the primary header, fail on any non-trivial read
606 * error or on EOF. Since the primary header is the same
607 * size is the message alignment it will never straddle
608 * the end of the buffer.
609 */
610 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
611 if (bytes < sizeof(msg->any.head)) {
612 n = read(iocom->sock_fd,
613 ioq->buf + ioq->fifo_end,
614 nmax);
615 if (n <= 0) {
616 if (n == 0) {
617 ioq->error = DMSG_IOQ_ERROR_EOF;
618 break;
619 }
620 if (errno != EINTR &&
621 errno != EINPROGRESS &&
622 errno != EAGAIN) {
623 ioq->error = DMSG_IOQ_ERROR_SOCK;
624 break;
625 }
626 n = 0;
627 /* fall through */
628 }
629 ioq->fifo_end += (size_t)n;
630 nmax -= (size_t)n;
631 }
632
633 /*
634 * Decrypt data received so far. Data will be decrypted
635 * in-place but might create gaps in the FIFO. Partial
636 * blocks are not immediately decrypted.
637 *
638 * WARNING! The header might be in the wrong endian, we
639 * do not fix it up until we get the entire
640 * extended header.
641 */
642 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
643 dmsg_crypto_decrypt(iocom, ioq);
644 } else {
645 ioq->fifo_cdx = ioq->fifo_end;
646 ioq->fifo_cdn = ioq->fifo_end;
647 }
648 bytes = ioq->fifo_cdx - ioq->fifo_beg;
649
650 /*
651 * Insufficient data accumulated (msg is NULL, caller will
652 * retry on event).
653 */
654 assert(msg == NULL);
655 if (bytes < sizeof(msg->any.head))
656 break;
657
658 /*
659 * Check and fixup the core header. Note that the icrc
660 * has to be calculated before any fixups, but the crc
661 * fields in the msg may have to be swapped like everything
662 * else.
663 */
664 head = (void *)(ioq->buf + ioq->fifo_beg);
665 if (head->magic != DMSG_HDR_MAGIC &&
666 head->magic != DMSG_HDR_MAGIC_REV) {
667 ioq->error = DMSG_IOQ_ERROR_SYNC;
668 break;
669 }
670
671 /*
672 * Calculate the full header size and aux data size
673 */
674 if (head->magic == DMSG_HDR_MAGIC_REV) {
675 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
676 DMSG_ALIGN;
677 ioq->abytes = bswap32(head->aux_bytes) *
678 DMSG_ALIGN;
679 } else {
680 ioq->hbytes = (head->cmd & DMSGF_SIZE) *
681 DMSG_ALIGN;
682 ioq->abytes = head->aux_bytes * DMSG_ALIGN;
683 }
684 if (ioq->hbytes < sizeof(msg->any.head) ||
685 ioq->hbytes > sizeof(msg->any) ||
686 ioq->abytes > DMSG_AUX_MAX) {
687 ioq->error = DMSG_IOQ_ERROR_FIELD;
688 break;
689 }
690
691 /*
692 * Allocate the message, the next state will fill it in.
693 */
694 msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0,
695 NULL, NULL);
696 ioq->msg = msg;
697
698 /*
699 * Fall through to the next state. Make sure that the
700 * extended header does not straddle the end of the buffer.
701 * We still want to issue larger reads into our buffer,
702 * book-keeping is easier if we don't bcopy() yet.
703 *
704 * Make sure there is enough room for bloated encrypt data.
705 */
706 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
707 ioq->state = DMSG_MSGQ_STATE_HEADER2;
708 /* fall through */
709 case DMSG_MSGQ_STATE_HEADER2:
710 /*
711 * Fill out the extended header.
712 */
713 assert(msg != NULL);
714 if (bytes < ioq->hbytes) {
715 n = read(iocom->sock_fd,
716 ioq->buf + ioq->fifo_end,
717 nmax);
718 if (n <= 0) {
719 if (n == 0) {
720 ioq->error = DMSG_IOQ_ERROR_EOF;
721 break;
722 }
723 if (errno != EINTR &&
724 errno != EINPROGRESS &&
725 errno != EAGAIN) {
726 ioq->error = DMSG_IOQ_ERROR_SOCK;
727 break;
728 }
729 n = 0;
730 /* fall through */
731 }
732 ioq->fifo_end += (size_t)n;
733 nmax -= (size_t)n;
734 }
735
736 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
737 dmsg_crypto_decrypt(iocom, ioq);
738 } else {
739 ioq->fifo_cdx = ioq->fifo_end;
740 ioq->fifo_cdn = ioq->fifo_end;
741 }
742 bytes = ioq->fifo_cdx - ioq->fifo_beg;
743
744 /*
745 * Insufficient data accumulated (set msg NULL so caller will
746 * retry on event).
747 */
748 if (bytes < ioq->hbytes) {
749 msg = NULL;
750 break;
751 }
752
753 /*
754 * Calculate the extended header, decrypt data received
755 * so far. Handle endian-conversion for the entire extended
756 * header.
757 */
758 head = (void *)(ioq->buf + ioq->fifo_beg);
759
760 /*
761 * Check the CRC.
762 */
763 if (head->magic == DMSG_HDR_MAGIC_REV)
764 xcrc32 = bswap32(head->hdr_crc);
765 else
766 xcrc32 = head->hdr_crc;
767 head->hdr_crc = 0;
768 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
769 ioq->error = DMSG_IOQ_ERROR_XCRC;
770 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
771 xcrc32, dmsg_icrc32(head, ioq->hbytes),
772 dmsg_msg_str(msg));
773 assert(0);
774 break;
775 }
776 head->hdr_crc = xcrc32;
777
778 if (head->magic == DMSG_HDR_MAGIC_REV) {
779 dmsg_bswap_head(head);
780 }
781
782 /*
783 * Copy the extended header into the msg and adjust the
784 * FIFO.
785 */
786 bcopy(head, &msg->any, ioq->hbytes);
787
788 /*
789 * We are either done or we fall-through.
790 */
791 if (ioq->abytes == 0) {
792 ioq->fifo_beg += ioq->hbytes;
793 break;
794 }
795
796 /*
797 * Must adjust bytes (and the state) when falling through.
798 * nmax doesn't change.
799 */
800 ioq->fifo_beg += ioq->hbytes;
801 bytes -= ioq->hbytes;
802 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
803 /* fall through */
804 case DMSG_MSGQ_STATE_AUXDATA1:
805 /*
806 * Copy the partial or complete payload from remaining
807 * bytes in the FIFO in order to optimize the makeroom call
808 * in the AUXDATA2 state. We have to fall-through either
809 * way so we can check the crc.
810 *
811 * msg->aux_size tracks our aux data.
812 */
813 if (bytes >= ioq->abytes) {
814 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
815 ioq->abytes);
816 msg->aux_size = ioq->abytes;
817 ioq->fifo_beg += ioq->abytes;
818 assert(ioq->fifo_beg <= ioq->fifo_cdx);
819 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
820 bytes -= ioq->abytes;
821 } else if (bytes) {
822 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
823 bytes);
824 msg->aux_size = bytes;
825 ioq->fifo_beg += bytes;
826 if (ioq->fifo_cdx < ioq->fifo_beg)
827 ioq->fifo_cdx = ioq->fifo_beg;
828 assert(ioq->fifo_beg <= ioq->fifo_cdx);
829 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
830 bytes = 0;
831 } else {
832 msg->aux_size = 0;
833 }
834 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
835 /* fall through */
836 case DMSG_MSGQ_STATE_AUXDATA2:
837 /*
838 * Make sure there is enough room for more data.
839 */
840 assert(msg);
841 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
842
843 /*
844 * Read and decrypt more of the payload.
845 */
846 if (msg->aux_size < ioq->abytes) {
847 assert(bytes == 0);
848 n = read(iocom->sock_fd,
849 ioq->buf + ioq->fifo_end,
850 nmax);
851 if (n <= 0) {
852 if (n == 0) {
853 ioq->error = DMSG_IOQ_ERROR_EOF;
854 break;
855 }
856 if (errno != EINTR &&
857 errno != EINPROGRESS &&
858 errno != EAGAIN) {
859 ioq->error = DMSG_IOQ_ERROR_SOCK;
860 break;
861 }
862 n = 0;
863 /* fall through */
864 }
865 ioq->fifo_end += (size_t)n;
866 nmax -= (size_t)n;
867 }
868
869 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
870 dmsg_crypto_decrypt(iocom, ioq);
871 } else {
872 ioq->fifo_cdx = ioq->fifo_end;
873 ioq->fifo_cdn = ioq->fifo_end;
874 }
875 bytes = ioq->fifo_cdx - ioq->fifo_beg;
876
877 if (bytes > ioq->abytes - msg->aux_size)
878 bytes = ioq->abytes - msg->aux_size;
879
880 if (bytes) {
881 bcopy(ioq->buf + ioq->fifo_beg,
882 msg->aux_data + msg->aux_size,
883 bytes);
884 msg->aux_size += bytes;
885 ioq->fifo_beg += bytes;
886 }
887
888 /*
889 * Insufficient data accumulated (set msg NULL so caller will
890 * retry on event).
891 */
892 if (msg->aux_size < ioq->abytes) {
893 msg = NULL;
894 break;
895 }
896 assert(msg->aux_size == ioq->abytes);
897
898 /*
899 * Check aux_crc, then we are done.
900 */
901 xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
902 if (xcrc32 != msg->any.head.aux_crc) {
903 ioq->error = DMSG_IOQ_ERROR_ACRC;
904 break;
905 }
906 break;
907 case DMSG_MSGQ_STATE_ERROR:
908 /*
909 * Continued calls to drain recorded transactions (returning
910 * a LNK_ERROR for each one), before we return the final
911 * LNK_ERROR.
912 */
913 assert(msg == NULL);
914 break;
915 default:
916 /*
917 * We don't double-return errors, the caller should not
918 * have called us again after getting an error msg.
919 */
920 assert(0);
921 break;
922 }
923
924 /*
925 * Check the message sequence. The iv[] should prevent any
926 * possibility of a replay but we add this check anyway.
927 */
928 if (msg && ioq->error == 0) {
929 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
930 ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
931 } else {
932 ++ioq->seq;
933 }
934 }
935
936 /*
937 * Handle error, RREQ, or completion
938 *
939 * NOTE: nmax and bytes are invalid at this point, we don't bother
940 * to update them when breaking out.
941 */
942 if (ioq->error) {
943skip:
944 /*
945 * An unrecoverable error causes all active receive
946 * transactions to be terminated with a LNK_ERROR message.
947 *
948 * Once all active transactions are exhausted we set the
949 * iocom ERROR flag and return a non-transactional LNK_ERROR
950 * message, which should cause master processing loops to
951 * terminate.
952 */
953 assert(ioq->msg == msg);
954 if (msg) {
955 dmsg_msg_free(msg);
956 ioq->msg = NULL;
957 }
958
959 /*
960 * No more I/O read processing
961 */
962 ioq->state = DMSG_MSGQ_STATE_ERROR;
963
964 /*
965 * Simulate a remote LNK_ERROR DELETE msg for any open
966 * transactions, ending with a final non-transactional
967 * LNK_ERROR (that the session can detect) when no
968 * transactions remain.
969 *
970 * We only need to scan transactions on circuit0 as these
971 * will contain all circuit forges, and terminating circuit
972 * forges will automatically terminate the transactions on
973 * any other circuits as well as those circuits.
974 */
975 circuit0 = &iocom->circuit0;
976 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
977 msg->any.head.error = ioq->error;
978
979 pthread_mutex_lock(&iocom->mtx);
980 dmsg_iocom_drain(iocom);
981
982 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
983 /*
984 * Active remote transactions are still present.
985 * Simulate the other end sending us a DELETE.
986 */
987 if (state->rxcmd & DMSGF_DELETE) {
988 dmsg_msg_free(msg);
989 msg = NULL;
990 } else {
991 /*state->txcmd |= DMSGF_DELETE;*/
992 msg->state = state;
993 msg->iocom = iocom;
994 msg->any.head.msgid = state->msgid;
995 msg->any.head.cmd |= DMSGF_ABORT |
996 DMSGF_DELETE;
997 }
998 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
999 /*
1000 * Active local transactions are still present.
1001 * Simulate the other end sending us a DELETE.
1002 */
1003 if (state->rxcmd & DMSGF_DELETE) {
1004 dmsg_msg_free(msg);
1005 msg = NULL;
1006 } else {
1007 msg->state = state;
1008 msg->iocom = iocom;
1009 msg->any.head.msgid = state->msgid;
1010 msg->any.head.cmd |= DMSGF_ABORT |
1011 DMSGF_DELETE |
1012 DMSGF_REPLY;
1013 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1014 msg->any.head.cmd |=
1015 DMSGF_CREATE;
1016 }
1017 }
1018 } else {
1019 /*
1020 * No active local or remote transactions remain.
1021 * Generate a final LNK_ERROR and flag EOF.
1022 */
1023 msg->state = NULL;
1024 iocom->flags |= DMSG_IOCOMF_EOF;
1025 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1026 }
1027 pthread_mutex_unlock(&iocom->mtx);
1028
1029 /*
1030 * For the iocom error case we want to set RWORK to indicate
1031 * that more messages might be pending.
1032 *
1033 * It is possible to return NULL when there is more work to
1034 * do because each message has to be DELETEd in both
1035 * directions before we continue on with the next (though
1036 * this could be optimized). The transmit direction will
1037 * re-set RWORK.
1038 */
1039 if (msg)
1040 iocom->flags |= DMSG_IOCOMF_RWORK;
1041 } else if (msg == NULL) {
1042 /*
1043 * Insufficient data received to finish building the message,
1044 * set RREQ and return NULL.
1045 *
1046 * Leave ioq->msg intact.
1047 * Leave the FIFO intact.
1048 */
1049 iocom->flags |= DMSG_IOCOMF_RREQ;
1050 } else {
1051 /*
1052 * Continue processing msg.
1053 *
1054 * The fifo has already been advanced past the message.
1055 * Trivially reset the FIFO indices if possible.
1056 *
1057 * clear the FIFO if it is now empty and set RREQ to wait
1058 * for more from the socket. If the FIFO is not empty set
1059 * TWORK to bypass the poll so we loop immediately.
1060 */
1061 if (ioq->fifo_beg == ioq->fifo_cdx &&
1062 ioq->fifo_cdn == ioq->fifo_end) {
1063 iocom->flags |= DMSG_IOCOMF_RREQ;
1064 ioq->fifo_cdx = 0;
1065 ioq->fifo_cdn = 0;
1066 ioq->fifo_beg = 0;
1067 ioq->fifo_end = 0;
1068 } else {
1069 iocom->flags |= DMSG_IOCOMF_RWORK;
1070 }
1071 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1072 ioq->msg = NULL;
1073
1074 /*
1075 * Handle message routing. Validates non-zero sources
1076 * and routes message. Error will be 0 if the message is
1077 * destined for us.
1078 *
1079 * State processing only occurs for messages destined for us.
1080 */
1081 if (msg->any.head.circuit)
1082 error = dmsg_circuit_relay(msg);
1083 else
1084 error = dmsg_state_msgrx(msg);
1085
1086 if (error) {
1087 /*
1088 * Abort-after-closure, throw message away and
1089 * start reading another.
1090 */
1091 if (error == DMSG_IOQ_ERROR_EALREADY) {
1092 dmsg_msg_free(msg);
1093 goto again;
1094 }
1095
1096 /*
1097 * msg routed, msg pointer no longer owned by us.
1098 * Go to the top and start reading another.
1099 */
1100 if (error == DMSG_IOQ_ERROR_ROUTED)
1101 goto again;
1102
1103 /*
1104 * Process real error and throw away message.
1105 */
1106 ioq->error = error;
1107 goto skip;
1108 }
1109 /* no error, not routed. Fall through and return msg */
1110 }
1111 return (msg);
1112}
1113
1114/*
1115 * Calculate the header and data crc's and write a low-level message to
1116 * the connection. If aux_crc is non-zero the aux_data crc is already
1117 * assumed to have been set.
1118 *
1119 * A non-NULL msg is added to the queue but not necessarily flushed.
1120 * Calling this function with msg == NULL will get a flush going.
1121 *
1122 * Caller must hold iocom->mtx.
1123 */
1124void
1125dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1126{
1127 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1128 dmsg_msg_t *msg;
1129 uint32_t xcrc32;
1130 int hbytes;
1131 dmsg_msg_queue_t tmpq;
1132
1133 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1134 TAILQ_INIT(&tmpq);
1135 pthread_mutex_lock(&iocom->mtx);
1136 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1137 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1138 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1139 }
1140 pthread_mutex_unlock(&iocom->mtx);
1141
1142 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1143 /*
1144 * Process terminal connection errors.
1145 */
1146 TAILQ_REMOVE(&tmpq, msg, qentry);
1147 if (ioq->error) {
1148 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1149 ++ioq->msgcount;
1150 continue;
1151 }
1152
1153 /*
1154 * Finish populating the msg fields. The salt ensures that
1155 * the iv[] array is ridiculously randomized and we also
1156 * re-seed our PRNG every 32768 messages just to be sure.
1157 */
1158 msg->any.head.magic = DMSG_HDR_MAGIC;
1159 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1160 ++ioq->seq;
1161 if ((ioq->seq & 32767) == 0)
1162 srandomdev();
1163
1164 /*
1165 * Calculate aux_crc if 0, then calculate hdr_crc.
1166 */
1167 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1168 assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
1169 xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
1170 msg->any.head.aux_crc = xcrc32;
1171 }
1172 msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN;
1173 assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
1174
1175 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1176 DMSG_ALIGN;
1177 msg->any.head.hdr_crc = 0;
1178 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1179
1180 /*
1181 * Enqueue the message (the flush codes handles stream
1182 * encryption).
1183 */
1184 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1185 ++ioq->msgcount;
1186 }
1187 dmsg_iocom_flush2(iocom);
1188}
1189
1190/*
1191 * Thread localized, iocom->mtx not held by caller.
1192 */
1193void
1194dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1195{
1196 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1197 dmsg_msg_t *msg;
1198 ssize_t n;
1199 struct iovec iov[DMSG_IOQ_MAXIOVEC];
1200 size_t nact;
1201 size_t hbytes;
1202 size_t abytes;
1203 size_t hoff;
1204 size_t aoff;
1205 int iovcnt;
1206
1207 if (ioq->error) {
1208 dmsg_iocom_drain(iocom);
1209 return;
1210 }
1211
1212 /*
1213 * Pump messages out the connection by building an iovec.
1214 *
1215 * ioq->hbytes/ioq->abytes tracks how much of the first message
1216 * in the queue has been successfully written out, so we can
1217 * resume writing.
1218 */
1219 iovcnt = 0;
1220 nact = 0;
1221 hoff = ioq->hbytes;
1222 aoff = ioq->abytes;
1223
1224 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1225 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1226 DMSG_ALIGN;
1227 abytes = msg->aux_size;
1228 assert(hoff <= hbytes && aoff <= abytes);
1229
1230 if (hoff < hbytes) {
1231 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1232 iov[iovcnt].iov_len = hbytes - hoff;
1233 nact += hbytes - hoff;
1234 ++iovcnt;
1235 if (iovcnt == DMSG_IOQ_MAXIOVEC)
1236 break;
1237 }
1238 if (aoff < abytes) {
1239 assert(msg->aux_data != NULL);
1240 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1241 iov[iovcnt].iov_len = abytes - aoff;
1242 nact += abytes - aoff;
1243 ++iovcnt;
1244 if (iovcnt == DMSG_IOQ_MAXIOVEC)
1245 break;
1246 }
1247 hoff = 0;
1248 aoff = 0;
1249 }
1250 if (iovcnt == 0)
1251 return;
1252
1253 /*
1254 * Encrypt and write the data. The crypto code will move the
1255 * data into the fifo and adjust the iov as necessary. If
1256 * encryption is disabled the iov is left alone.
1257 *
1258 * May return a smaller iov (thus a smaller n), with aggregated
1259 * chunks. May reduce nmax to what fits in the FIFO.
1260 *
1261 * This function sets nact to the number of original bytes now
1262 * encrypted, adding to the FIFO some number of bytes that might
1263 * be greater depending on the crypto mechanic. iov[] is adjusted
1264 * to point at the FIFO if necessary.
1265 *
1266 * NOTE: The return value from the writev() is the post-encrypted
1267 * byte count, not the plaintext count.
1268 */
1269 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1270 /*
1271 * Make sure the FIFO has a reasonable amount of space
1272 * left (if not completely full).
1273 */
1274 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1275 sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
1276 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1277 ioq->fifo_end - ioq->fifo_beg);
1278 ioq->fifo_cdx -= ioq->fifo_beg;
1279 ioq->fifo_cdn -= ioq->fifo_beg;
1280 ioq->fifo_end -= ioq->fifo_beg;
1281 ioq->fifo_beg = 0;
1282 }
1283
1284 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1285 n = writev(iocom->sock_fd, iov, iovcnt);
1286 if (n > 0) {
1287 ioq->fifo_beg += n;
1288 ioq->fifo_cdn += n;
1289 ioq->fifo_cdx += n;
1290 if (ioq->fifo_beg == ioq->fifo_end) {
1291 ioq->fifo_beg = 0;
1292 ioq->fifo_cdn = 0;
1293 ioq->fifo_cdx = 0;
1294 ioq->fifo_end = 0;
1295 }
1296 }
1297 } else {
1298 n = writev(iocom->sock_fd, iov, iovcnt);
1299 if (n > 0)
1300 nact = n;
1301 else
1302 nact = 0;
1303 }
1304
1305 /*
1306 * Clean out the transmit queue based on what we successfully
1307 * sent (nact is the plaintext count). ioq->hbytes/abytes
1308 * represents the portion of the first message previously sent.
1309 */
1310 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1311 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1312 DMSG_ALIGN;
1313 abytes = msg->aux_size;
1314
1315 if ((size_t)nact < hbytes - ioq->hbytes) {
1316 ioq->hbytes += nact;
1317 nact = 0;
1318 break;
1319 }
1320 nact -= hbytes - ioq->hbytes;
1321 ioq->hbytes = hbytes;
1322 if ((size_t)nact < abytes - ioq->abytes) {
1323 ioq->abytes += nact;
1324 nact = 0;
1325 break;
1326 }
1327 nact -= abytes - ioq->abytes;
1328
1329 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1330 --ioq->msgcount;
1331 ioq->hbytes = 0;
1332 ioq->abytes = 0;
1333
1334 dmsg_state_cleanuptx(msg);
1335 }
1336 assert(nact == 0);
1337
1338 /*
1339 * Process the return value from the write w/regards to blocking.
1340 */
1341 if (n < 0) {
1342 if (errno != EINTR &&
1343 errno != EINPROGRESS &&
1344 errno != EAGAIN) {
1345 /*
1346 * Fatal write error
1347 */
1348 ioq->error = DMSG_IOQ_ERROR_SOCK;
1349 dmsg_iocom_drain(iocom);
1350 } else {
1351 /*
1352 * Wait for socket buffer space
1353 */
1354 iocom->flags |= DMSG_IOCOMF_WREQ;
1355 }
1356 } else {
1357 iocom->flags |= DMSG_IOCOMF_WREQ;
1358 }
1359 if (ioq->error) {
1360 dmsg_iocom_drain(iocom);
1361 }
1362}
1363
1364/*
1365 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1366 * write events will occur. We don't kill read msgs because we want
1367 * the caller to pull off our contrived terminal error msg to detect
1368 * the connection failure.
1369 *
1370 * Thread localized, iocom->mtx not held by caller.
1371 */
1372void
1373dmsg_iocom_drain(dmsg_iocom_t *iocom)
1374{
1375 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1376 dmsg_msg_t *msg;
1377
1378 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1379 ioq->hbytes = 0;
1380 ioq->abytes = 0;
1381
1382 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1383 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1384 --ioq->msgcount;
1385 dmsg_state_cleanuptx(msg);
1386 }
1387}
1388
1389/*
1390 * Write a message to an iocom, with additional state processing.
1391 */
1392void
1393dmsg_msg_write(dmsg_msg_t *msg)
1394{
1395 dmsg_iocom_t *iocom = msg->iocom;
1396 dmsg_state_t *state;
1397 char dummy;
1398
1399 /*
1400 * Handle state processing, create state if necessary.
1401 */
1402 pthread_mutex_lock(&iocom->mtx);
1403 if ((state = msg->state) != NULL) {
1404 /*
1405 * Existing transaction (could be reply). It is also
1406 * possible for this to be the first reply (CREATE is set),
1407 * in which case we populate state->txcmd.
1408 *
1409 * state->txcmd is adjusted to hold the final message cmd,
1410 * and we also be sure to set the CREATE bit here. We did
1411 * not set it in dmsg_msg_alloc() because that would have
1412 * not been serialized (state could have gotten ripped out
1413 * from under the message prior to it being transmitted).
1414 */
1415 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1416 DMSGF_CREATE) {
1417 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1418 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1419 }
1420 msg->any.head.msgid = state->msgid;
1421 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1422 if (msg->any.head.cmd & DMSGF_CREATE) {
1423 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1424 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1425 }
1426 }
1427
1428 /*
1429 * Queue it for output, wake up the I/O pthread. Note that the
1430 * I/O thread is responsible for generating the CRCs and encryption.
1431 */
1432 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1433 dummy = 0;
1434 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
1435 pthread_mutex_unlock(&iocom->mtx);
1436}
1437
1438/*
1439 * This is a shortcut to formulate a reply to msg with a simple error code,
1440 * It can reply to and terminate a transaction, or it can reply to a one-way
1441 * messages. A DMSG_LNK_ERROR command code is utilized to encode
1442 * the error code (which can be 0). Not all transactions are terminated
1443 * with DMSG_LNK_ERROR status (the low level only cares about the
1444 * MSGF_DELETE flag), but most are.
1445 *
1446 * Replies to one-way messages are a bit of an oxymoron but the feature
1447 * is used by the debug (DBG) protocol.
1448 *
1449 * The reply contains no extended data.
1450 */
1451void
1452dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1453{
1454 dmsg_state_t *state = msg->state;
1455 dmsg_msg_t *nmsg;
1456 uint32_t cmd;
1457
1458
1459 /*
1460 * Reply with a simple error code and terminate the transaction.
1461 */
1462 cmd = DMSG_LNK_ERROR;
1463
1464 /*
1465 * Check if our direction has even been initiated yet, set CREATE.
1466 *
1467 * Check what direction this is (command or reply direction). Note
1468 * that txcmd might not have been initiated yet.
1469 *
1470 * If our direction has already been closed we just return without
1471 * doing anything.
1472 */
1473 if (state) {
1474 if (state->txcmd & DMSGF_DELETE)
1475 return;
1476 if (state->txcmd & DMSGF_REPLY)
1477 cmd |= DMSGF_REPLY;
1478 cmd |= DMSGF_DELETE;
1479 } else {
1480 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1481 cmd |= DMSGF_REPLY;
1482 }
1483
1484 /*
1485 * Allocate the message and associate it with the existing state.
1486 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1487 * allocate new state. We have our state already.
1488 */
1489 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1490 if (state) {
1491 if ((state->txcmd & DMSGF_CREATE) == 0)
1492 nmsg->any.head.cmd |= DMSGF_CREATE;
1493 }
1494 nmsg->any.head.error = error;
1495 nmsg->any.head.msgid = msg->any.head.msgid;
1496 nmsg->any.head.circuit = msg->any.head.circuit;
1497 nmsg->state = state;
1498 dmsg_msg_write(nmsg);
1499}
1500
1501/*
1502 * Similar to dmsg_msg_reply() but leave the transaction open. That is,
1503 * we are generating a streaming reply or an intermediate acknowledgement
1504 * of some sort as part of the higher level protocol, with more to come
1505 * later.
1506 */
1507void
1508dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1509{
1510 dmsg_state_t *state = msg->state;
1511 dmsg_msg_t *nmsg;
1512 uint32_t cmd;
1513
1514
1515 /*
1516 * Reply with a simple error code and terminate the transaction.
1517 */
1518 cmd = DMSG_LNK_ERROR;
1519
1520 /*
1521 * Check if our direction has even been initiated yet, set CREATE.
1522 *
1523 * Check what direction this is (command or reply direction). Note
1524 * that txcmd might not have been initiated yet.
1525 *
1526 * If our direction has already been closed we just return without
1527 * doing anything.
1528 */
1529 if (state) {
1530 if (state->txcmd & DMSGF_DELETE)
1531 return;
1532 if (state->txcmd & DMSGF_REPLY)
1533 cmd |= DMSGF_REPLY;
1534 /* continuing transaction, do not set MSGF_DELETE */
1535 } else {
1536 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1537 cmd |= DMSGF_REPLY;
1538 }
1539
1540 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1541 if (state) {
1542 if ((state->txcmd & DMSGF_CREATE) == 0)
1543 nmsg->any.head.cmd |= DMSGF_CREATE;
1544 }
1545 nmsg->any.head.error = error;
1546 nmsg->any.head.msgid = msg->any.head.msgid;
1547 nmsg->any.head.circuit = msg->any.head.circuit;
1548 nmsg->state = state;
1549 dmsg_msg_write(nmsg);
1550}
1551
1552/*
1553 * Terminate a transaction given a state structure by issuing a DELETE.
1554 */
1555void
1556dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1557{
1558 dmsg_msg_t *nmsg;
1559 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1560
1561 /*
1562 * Nothing to do if we already transmitted a delete
1563 */
1564 if (state->txcmd & DMSGF_DELETE)
1565 return;
1566
1567 /*
1568 * Set REPLY if the other end initiated the command. Otherwise
1569 * we are the command direction.
1570 */
1571 if (state->txcmd & DMSGF_REPLY)
1572 cmd |= DMSGF_REPLY;
1573
1574 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1575 if (state) {
1576 if ((state->txcmd & DMSGF_CREATE) == 0)
1577 nmsg->any.head.cmd |= DMSGF_CREATE;
1578 }
1579 nmsg->any.head.error = error;
1580 nmsg->any.head.msgid = state->msgid;
1581 nmsg->any.head.circuit = state->msg->any.head.circuit;
1582 nmsg->state = state;
1583 dmsg_msg_write(nmsg);
1584}
1585
1586/*
1587 * Terminate a transaction given a state structure by issuing a DELETE.
1588 */
1589void
1590dmsg_state_result(dmsg_state_t *state, uint32_t error)
1591{
1592 dmsg_msg_t *nmsg;
1593 uint32_t cmd = DMSG_LNK_ERROR;
1594
1595 /*
1596 * Nothing to do if we already transmitted a delete
1597 */
1598 if (state->txcmd & DMSGF_DELETE)
1599 return;
1600
1601 /*
1602 * Set REPLY if the other end initiated the command. Otherwise
1603 * we are the command direction.
1604 */
1605 if (state->txcmd & DMSGF_REPLY)
1606 cmd |= DMSGF_REPLY;
1607
1608 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1609 if (state) {
1610 if ((state->txcmd & DMSGF_CREATE) == 0)
1611 nmsg->any.head.cmd |= DMSGF_CREATE;
1612 }
1613 nmsg->any.head.error = error;
1614 nmsg->any.head.msgid = state->msgid;
1615 nmsg->any.head.circuit = state->msg->any.head.circuit;
1616 nmsg->state = state;
1617 dmsg_msg_write(nmsg);
1618}
1619
1620/************************************************************************
1621 * TRANSACTION STATE HANDLING *
1622 ************************************************************************
1623 *
1624 */
1625
1626/*
1627 * Process circuit and state tracking for a message after reception, prior
1628 * to execution.
1629 *
1630 * Called with msglk held and the msg dequeued.
1631 *
1632 * All messages are called with dummy state and return actual state.
1633 * (One-off messages often just return the same dummy state).
1634 *
1635 * May request that caller discard the message by setting *discardp to 1.
1636 * The returned state is not used in this case and is allowed to be NULL.
1637 *
1638 * --
1639 *
1640 * These routines handle persistent and command/reply message state via the
1641 * CREATE and DELETE flags. The first message in a command or reply sequence
1642 * sets CREATE, the last message in a command or reply sequence sets DELETE.
1643 *
1644 * There can be any number of intermediate messages belonging to the same
1645 * sequence sent inbetween the CREATE message and the DELETE message,
1646 * which set neither flag. This represents a streaming command or reply.
1647 *
1648 * Any command message received with CREATE set expects a reply sequence to
1649 * be returned. Reply sequences work the same as command sequences except the
1650 * REPLY bit is also sent. Both the command side and reply side can
1651 * degenerate into a single message with both CREATE and DELETE set. Note
1652 * that one side can be streaming and the other side not, or neither, or both.
1653 *
1654 * The msgid is unique for the initiator. That is, two sides sending a new
1655 * message can use the same msgid without colliding.
1656 *
1657 * --
1658 *
1659 * ABORT sequences work by setting the ABORT flag along with normal message
1660 * state. However, ABORTs can also be sent on half-closed messages, that is
1661 * even if the command or reply side has already sent a DELETE, as long as
1662 * the message has not been fully closed it can still send an ABORT+DELETE
1663 * to terminate the half-closed message state.
1664 *
1665 * Since ABORT+DELETEs can race we silently discard ABORT's for message
1666 * state which has already been fully closed. REPLY+ABORT+DELETEs can
1667 * also race, and in this situation the other side might have already
1668 * initiated a new unrelated command with the same message id. Since
1669 * the abort has not set the CREATE flag the situation can be detected
1670 * and the message will also be discarded.
1671 *
1672 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1673 * The ABORT request is essentially integrated into the command instead
1674 * of being sent later on. In this situation the command implementation
1675 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1676 * special-case non-blocking operation for the command.
1677 *
1678 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1679 * to be mid-stream aborts for command/reply sequences. ABORTs on
1680 * one-way messages are not supported.
1681 *
1682 * NOTE! If a command sequence does not support aborts the ABORT flag is
1683 * simply ignored.
1684 *
1685 * --
1686 *
1687 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1688 * set. One-off messages cannot be aborted and typically aren't processed
1689 * by these routines. The REPLY bit can be used to distinguish whether a
1690 * one-off message is a command or reply. For example, one-off replies
1691 * will typically just contain status updates.
1692 */
1693static int
1694dmsg_state_msgrx(dmsg_msg_t *msg)
1695{
1696 dmsg_iocom_t *iocom = msg->iocom;
1697 dmsg_circuit_t *circuit;
1698 dmsg_state_t *state;
1699 dmsg_state_t sdummy;
1700 dmsg_circuit_t cdummy;
1701 int error;
1702
1703 pthread_mutex_lock(&iocom->mtx);
1704
1705 /*
1706 * Locate existing persistent circuit and state, if any.
1707 */
1708 if (msg->any.head.circuit == 0) {
1709 circuit = &iocom->circuit0;
1710 } else {
1711 cdummy.msgid = msg->any.head.circuit;
1712 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1713 &cdummy);
1714 if (circuit == NULL)
1715 return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1716 }
1717 msg->circuit = circuit;
1718 ++circuit->refs;
1719
1720 /*
1721 * If received msg is a command state is on staterd_tree.
1722 * If received msg is a reply state is on statewr_tree.
1723 */
1724 sdummy.msgid = msg->any.head.msgid;
1725 if (msg->any.head.cmd & DMSGF_REPLY) {
1726 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1727 &sdummy);
1728 } else {
1729 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1730 &sdummy);
1731 }
1732 msg->state = state;
1733 pthread_mutex_unlock(&iocom->mtx);
1734
1735 /*
1736 * Short-cut one-off or mid-stream messages (state may be NULL).
1737 */
1738 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1739 DMSGF_ABORT)) == 0) {
1740 return(0);
1741 }
1742
1743 /*
1744 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1745 * inside the case statements.
1746 */
1747 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1748 DMSGF_REPLY)) {
1749 case DMSGF_CREATE:
1750 case DMSGF_CREATE | DMSGF_DELETE:
1751 /*
1752 * New persistant command received.
1753 */
1754 if (state) {
1755 fprintf(stderr, "duplicate-trans %s\n",
1756 dmsg_msg_str(msg));
1757 error = DMSG_IOQ_ERROR_TRANS;
1758 assert(0);
1759 break;
1760 }
1761 state = malloc(sizeof(*state));
1762 bzero(state, sizeof(*state));
1763 state->iocom = iocom;
1764 state->circuit = circuit;
1765 state->flags = DMSG_STATE_DYNAMIC;
1766 state->msg = msg;
1767 state->txcmd = DMSGF_REPLY;
1768 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1769 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1770 state->flags |= DMSG_STATE_INSERTED;
1771 state->msgid = msg->any.head.msgid;
1772 msg->state = state;
1773 pthread_mutex_lock(&iocom->mtx);
1774 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1775 pthread_mutex_unlock(&iocom->mtx);
1776 error = 0;
1777 if (DMsgDebugOpt) {
1778 fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1779 state, (uint32_t)state->msgid, iocom);
1780 }
1781 break;
1782 case DMSGF_DELETE:
1783 /*
1784 * Persistent state is expected but might not exist if an
1785 * ABORT+DELETE races the close.
1786 */
1787 if (state == NULL) {
1788 if (msg->any.head.cmd & DMSGF_ABORT) {
1789 error = DMSG_IOQ_ERROR_EALREADY;
1790 } else {
1791 fprintf(stderr, "missing-state %s\n",
1792 dmsg_msg_str(msg));
1793 error = DMSG_IOQ_ERROR_TRANS;
1794 assert(0);
1795 }
1796 break;
1797 }
1798
1799 /*
1800 * Handle another ABORT+DELETE case if the msgid has already
1801 * been reused.
1802 */
1803 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1804 if (msg->any.head.cmd & DMSGF_ABORT) {
1805 error = DMSG_IOQ_ERROR_EALREADY;
1806 } else {
1807 fprintf(stderr, "reused-state %s\n",
1808 dmsg_msg_str(msg));
1809 error = DMSG_IOQ_ERROR_TRANS;
1810 assert(0);
1811 }
1812 break;
1813 }
1814 error = 0;
1815 break;
1816 default:
1817 /*
1818 * Check for mid-stream ABORT command received, otherwise
1819 * allow.
1820 */
1821 if (msg->any.head.cmd & DMSGF_ABORT) {
1822 if (state == NULL ||
1823 (state->rxcmd & DMSGF_CREATE) == 0) {
1824 error = DMSG_IOQ_ERROR_EALREADY;
1825 break;
1826 }
1827 }
1828 error = 0;
1829 break;
1830 case DMSGF_REPLY | DMSGF_CREATE:
1831 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1832 /*
1833 * When receiving a reply with CREATE set the original
1834 * persistent state message should already exist.
1835 */
1836 if (state == NULL) {
1837 fprintf(stderr, "no-state(r) %s\n",
1838 dmsg_msg_str(msg));
1839 error = DMSG_IOQ_ERROR_TRANS;
1840 assert(0);
1841 break;
1842 }
1843 assert(((state->rxcmd ^ msg->any.head.cmd) &
1844 DMSGF_REPLY) == 0);
1845 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1846 error = 0;
1847 break;
1848 case DMSGF_REPLY | DMSGF_DELETE:
1849 /*
1850 * Received REPLY+ABORT+DELETE in case where msgid has
1851 * already been fully closed, ignore the message.
1852 */
1853 if (state == NULL) {
1854 if (msg->any.head.cmd & DMSGF_ABORT) {
1855 error = DMSG_IOQ_ERROR_EALREADY;
1856 } else {
1857 fprintf(stderr, "no-state(r,d) %s\n",
1858 dmsg_msg_str(msg));
1859 error = DMSG_IOQ_ERROR_TRANS;
1860 assert(0);
1861 }
1862 break;
1863 }
1864
1865 /*
1866 * Received REPLY+ABORT+DELETE in case where msgid has
1867 * already been reused for an unrelated message,
1868 * ignore the message.
1869 */
1870 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1871 if (msg->any.head.cmd & DMSGF_ABORT) {
1872 error = DMSG_IOQ_ERROR_EALREADY;
1873 } else {
1874 fprintf(stderr, "reused-state(r,d) %s\n",
1875 dmsg_msg_str(msg));
1876 error = DMSG_IOQ_ERROR_TRANS;
1877 assert(0);
1878 }
1879 break;
1880 }
1881 error = 0;
1882 break;
1883 case DMSGF_REPLY:
1884 /*
1885 * Check for mid-stream ABORT reply received to sent command.
1886 */
1887 if (msg->any.head.cmd & DMSGF_ABORT) {
1888 if (state == NULL ||
1889 (state->rxcmd & DMSGF_CREATE) == 0) {
1890 error = DMSG_IOQ_ERROR_EALREADY;
1891 break;
1892 }
1893 }
1894 error = 0;
1895 break;
1896 }
1897 return (error);
1898}
1899
1900void
1901dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
1902{
1903 dmsg_state_t *state;
1904
1905 if ((state = msg->state) == NULL) {
1906 /*
1907 * Free a non-transactional message, there is no state
1908 * to worry about.
1909 */
1910 dmsg_msg_free(msg);
1911 } else if (msg->any.head.cmd & DMSGF_DELETE) {
1912 /*
1913 * Message terminating transaction, destroy the related
1914 * state, the original message, and this message (if it
1915 * isn't the original message due to a CREATE|DELETE).
1916 */
1917 pthread_mutex_lock(&iocom->mtx);
1918 state->rxcmd |= DMSGF_DELETE;
1919 if (state->txcmd & DMSGF_DELETE) {
1920 if (state->msg == msg)
1921 state->msg = NULL;
1922 assert(state->flags & DMSG_STATE_INSERTED);
1923 if (state->rxcmd & DMSGF_REPLY) {
1924 assert(msg->any.head.cmd & DMSGF_REPLY);
1925 RB_REMOVE(dmsg_state_tree,
1926 &msg->circuit->statewr_tree, state);
1927 } else {
1928 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
1929 RB_REMOVE(dmsg_state_tree,
1930 &msg->circuit->staterd_tree, state);
1931 }
1932 state->flags &= ~DMSG_STATE_INSERTED;
1933 dmsg_state_free(state);
1934 } else {
1935 ;
1936 }
1937 pthread_mutex_unlock(&iocom->mtx);
1938 dmsg_msg_free(msg);
1939 } else if (state->msg != msg) {
1940 /*
1941 * Message not terminating transaction, leave state intact
1942 * and free message if it isn't the CREATE message.
1943 */
1944 dmsg_msg_free(msg);
1945 }
1946}
1947
1948static void
1949dmsg_state_cleanuptx(dmsg_msg_t *msg)
1950{
1951 dmsg_iocom_t *iocom = msg->iocom;
1952 dmsg_state_t *state;
1953
1954 if ((state = msg->state) == NULL) {
1955 dmsg_msg_free(msg);
1956 } else if (msg->any.head.cmd & DMSGF_DELETE) {
1957 pthread_mutex_lock(&iocom->mtx);
1958 assert((state->txcmd & DMSGF_DELETE) == 0);
1959 state->txcmd |= DMSGF_DELETE;
1960 if (state->rxcmd & DMSGF_DELETE) {
1961 if (state->msg == msg)
1962 state->msg = NULL;
1963 assert(state->flags & DMSG_STATE_INSERTED);
1964 if (state->txcmd & DMSGF_REPLY) {
1965 assert(msg->any.head.cmd & DMSGF_REPLY);
1966 RB_REMOVE(dmsg_state_tree,
1967 &msg->circuit->staterd_tree, state);
1968 } else {
1969 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
1970 RB_REMOVE(dmsg_state_tree,
1971 &msg->circuit->statewr_tree, state);
1972 }
1973 state->flags &= ~DMSG_STATE_INSERTED;
1974 dmsg_state_free(state);
1975 } else {
1976 ;
1977 }
1978 pthread_mutex_unlock(&iocom->mtx);
1979 dmsg_msg_free(msg);
1980 } else if (state->msg != msg) {
1981 dmsg_msg_free(msg);
1982 }
1983}
1984
1985/*
1986 * Called with iocom locked
1987 */
1988void
1989dmsg_state_free(dmsg_state_t *state)
1990{
1991 dmsg_msg_t *msg;
1992
1993 if (DMsgDebugOpt) {
1994 fprintf(stderr, "terminate state %p id=%08x\n",
1995 state, (uint32_t)state->msgid);
1996 }
1997 assert(state->any.any == NULL);
1998 msg = state->msg;
1999 state->msg = NULL;
2000 if (msg)
2001 dmsg_msg_free_locked(msg);
2002 free(state);
2003}
2004
2005/*
2006 * Called with iocom locked
2007 */
2008void
2009dmsg_circuit_drop(dmsg_circuit_t *circuit)
2010{
2011 dmsg_iocom_t *iocom = circuit->iocom;
2012 char dummy;
2013
2014 assert(circuit->refs > 0);
2015 assert(iocom);
2016
2017 /*
2018 * Decrement circuit refs, destroy circuit when refs drops to 0.
2019 */
2020 if (--circuit->refs > 0)
2021 return;
2022
2023 assert(RB_EMPTY(&circuit->staterd_tree));
2024 assert(RB_EMPTY(&circuit->statewr_tree));
2025 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2026 circuit->iocom = NULL;
2027 dmsg_free(circuit);
2028
2029 /*
2030 * When an iocom error is present the rx code will terminate the
2031 * receive side for all transactions and (indirectly) all circuits
2032 * by simulating DELETE messages. The state and related circuits
2033 * don't disappear until the related states are closed in both
2034 * directions
2035 *
2036 * Detect the case where the last circuit is now gone (and thus all
2037 * states for all circuits are gone), and wakeup the rx thread to
2038 * complete the termination.
2039 */
2040 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2041 dummy = 0;
2042 write(iocom->wakeupfds[1], &dummy, 1);
2043 }
2044}
2045
2046/*
2047 * This swaps endian for a hammer2_msg_hdr. Note that the extended
2048 * header is not adjusted, just the core header.
2049 */
2050void
2051dmsg_bswap_head(dmsg_hdr_t *head)
2052{
2053 head->magic = bswap16(head->magic);
2054 head->reserved02 = bswap16(head->reserved02);
2055 head->salt = bswap32(head->salt);
2056
2057 head->msgid = bswap64(head->msgid);
2058 head->circuit = bswap64(head->circuit);
2059 head->reserved18= bswap64(head->reserved18);
2060
2061 head->cmd = bswap32(head->cmd);
2062 head->aux_crc = bswap32(head->aux_crc);
2063 head->aux_bytes = bswap32(head->aux_bytes);
2064 head->error = bswap32(head->error);
2065 head->aux_descr = bswap64(head->aux_descr);
2066 head->reserved38= bswap32(head->reserved38);
2067 head->hdr_crc = bswap32(head->hdr_crc);
2068}