hammer2 - cluster / libdmsg circuit work
[dragonfly.git] / lib / libdmsg / msg.c
CommitLineData
9ab15106
MD
1/*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
0c3a8cd0 36#include "dmsg_local.h"
9ab15106 37
0c3a8cd0
MD
38int DMsgDebugOpt;
39
40static int dmsg_state_msgrx(dmsg_msg_t *msg);
41static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
78476205 42
0d20ec8a
MD
43RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
44RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
45
cf715800 46/*
0d20ec8a
MD
47 * STATE TREE - Represents open transactions which are indexed by their
48 * { msgid } relative to the governing iocom.
90e8cd1d
MD
49 */
50int
0d20ec8a 51dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
90e8cd1d 52{
0d20ec8a 53 if (state1->msgid < state2->msgid)
90e8cd1d 54 return(-1);
0d20ec8a 55 if (state1->msgid > state2->msgid)
90e8cd1d
MD
56 return(1);
57 return(0);
58}
59
90e8cd1d 60/*
0d20ec8a
MD
61 * CIRCUIT TREE - Represents open circuits which are indexed by their
62 * { msgid } relative to the governing iocom.
cf715800
MD
63 */
64int
0d20ec8a 65dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
cf715800 66{
0d20ec8a 67 if (circuit1->msgid < circuit2->msgid)
cf715800 68 return(-1);
0d20ec8a 69 if (circuit1->msgid > circuit2->msgid)
cf715800
MD
70 return(1);
71 return(0);
72}
73
9ab15106
MD
74/*
75 * Initialize a low-level ioq
76 */
77void
0c3a8cd0 78dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
9ab15106
MD
79{
80 bzero(ioq, sizeof(*ioq));
0c3a8cd0 81 ioq->state = DMSG_MSGQ_STATE_HEADER1;
9ab15106
MD
82 TAILQ_INIT(&ioq->msgq);
83}
84
7dc0f844
MD
85/*
86 * Cleanup queue.
87 *
88 * caller holds iocom->mtx.
89 */
9ab15106 90void
0c3a8cd0 91dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
9ab15106 92{
0c3a8cd0 93 dmsg_msg_t *msg;
9ab15106
MD
94
95 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
7dc0f844 96 assert(0); /* shouldn't happen */
78476205 97 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
0c3a8cd0 98 dmsg_msg_free(msg);
9ab15106
MD
99 }
100 if ((msg = ioq->msg) != NULL) {
101 ioq->msg = NULL;
0c3a8cd0 102 dmsg_msg_free(msg);
9ab15106
MD
103 }
104}
105
106/*
5903497c
MD
107 * Initialize a low-level communications channel.
108 *
29ead430 109 * NOTE: The signal_func() is called at least once from the loop and can be
0c3a8cd0 110 * re-armed via dmsg_iocom_restate().
9ab15106
MD
111 */
112void
0c3a8cd0 113dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
0d20ec8a 114 void (*signal_func)(dmsg_iocom_t *),
0c3a8cd0 115 void (*rcvmsg_func)(dmsg_msg_t *),
11f7caf4 116 void (*dbgmsg_func)(dmsg_msg_t *),
0c3a8cd0 117 void (*altmsg_func)(dmsg_iocom_t *))
9ab15106 118{
e1648a68
MD
119 struct stat st;
120
9ab15106
MD
121 bzero(iocom, sizeof(*iocom));
122
0d20ec8a
MD
123 iocom->signal_callback = signal_func;
124 iocom->rcvmsg_callback = rcvmsg_func;
125 iocom->altmsg_callback = altmsg_func;
126 iocom->dbgmsg_callback = dbgmsg_func;
5903497c 127
7dc0f844 128 pthread_mutex_init(&iocom->mtx, NULL);
0d20ec8a 129 RB_INIT(&iocom->circuit_tree);
9ab15106
MD
130 TAILQ_INIT(&iocom->freeq);
131 TAILQ_INIT(&iocom->freeq_aux);
0d20ec8a 132 TAILQ_INIT(&iocom->txmsgq);
9ab15106
MD
133 iocom->sock_fd = sock_fd;
134 iocom->alt_fd = alt_fd;
0c3a8cd0 135 iocom->flags = DMSG_IOCOMF_RREQ;
29ead430 136 if (signal_func)
0c3a8cd0
MD
137 iocom->flags |= DMSG_IOCOMF_SWORK;
138 dmsg_ioq_init(iocom, &iocom->ioq_rx);
139 dmsg_ioq_init(iocom, &iocom->ioq_tx);
7dc0f844
MD
140 if (pipe(iocom->wakeupfds) < 0)
141 assert(0);
142 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
143 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
9ab15106 144
0d20ec8a
MD
145 dmsg_circuit_init(iocom, &iocom->circuit0);
146
62efe6ec
MD
147 /*
148 * Negotiate session crypto synchronously. This will mark the
e1648a68
MD
149 * connection as error'd if it fails. If this is a pipe it's
150 * a linkage that we set up ourselves to the filesystem and there
151 * is no crypto.
62efe6ec 152 */
e1648a68
MD
153 if (fstat(sock_fd, &st) < 0)
154 assert(0);
155 if (S_ISSOCK(st.st_mode))
0c3a8cd0 156 dmsg_crypto_negotiate(iocom);
62efe6ec
MD
157
158 /*
159 * Make sure our fds are set to non-blocking for the iocom core.
160 */
9ab15106
MD
161 if (sock_fd >= 0)
162 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
163#if 0
164 /* if line buffered our single fgets() should be fine */
165 if (alt_fd >= 0)
166 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
167#endif
168}
169
5903497c
MD
170/*
171 * May only be called from a callback from iocom_core.
172 *
173 * Adjust state machine functions, set flags to guarantee that both
174 * the recevmsg_func and the sendmsg_func is called at least once.
175 */
176void
0d20ec8a
MD
177dmsg_iocom_restate(dmsg_iocom_t *iocom,
178 void (*signal_func)(dmsg_iocom_t *),
0c3a8cd0
MD
179 void (*rcvmsg_func)(dmsg_msg_t *msg),
180 void (*altmsg_func)(dmsg_iocom_t *))
5903497c 181{
0d20ec8a
MD
182 iocom->signal_callback = signal_func;
183 iocom->rcvmsg_callback = rcvmsg_func;
184 iocom->altmsg_callback = altmsg_func;
29ead430 185 if (signal_func)
0d20ec8a 186 iocom->flags |= DMSG_IOCOMF_SWORK;
5903497c 187 else
0d20ec8a 188 iocom->flags &= ~DMSG_IOCOMF_SWORK;
29ead430
MD
189}
190
191void
0d20ec8a 192dmsg_iocom_signal(dmsg_iocom_t *iocom)
29ead430 193{
0d20ec8a
MD
194 if (iocom->signal_callback)
195 iocom->flags |= DMSG_IOCOMF_SWORK;
5903497c
MD
196}
197
7dc0f844
MD
198/*
199 * Cleanup a terminating iocom.
200 *
201 * Caller should not hold iocom->mtx. The iocom has already been disconnected
202 * from all possible references to it.
203 */
9ab15106 204void
0c3a8cd0 205dmsg_iocom_done(dmsg_iocom_t *iocom)
9ab15106 206{
0c3a8cd0 207 dmsg_msg_t *msg;
9ab15106 208
7dc0f844
MD
209 if (iocom->sock_fd >= 0) {
210 close(iocom->sock_fd);
211 iocom->sock_fd = -1;
212 }
213 if (iocom->alt_fd >= 0) {
214 close(iocom->alt_fd);
215 iocom->alt_fd = -1;
216 }
0c3a8cd0
MD
217 dmsg_ioq_done(iocom, &iocom->ioq_rx);
218 dmsg_ioq_done(iocom, &iocom->ioq_tx);
9ab15106 219 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
78476205 220 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106
MD
221 free(msg);
222 }
223 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
78476205 224 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
225 free(msg->aux_data);
226 msg->aux_data = NULL;
227 free(msg);
228 }
7dc0f844
MD
229 if (iocom->wakeupfds[0] >= 0) {
230 close(iocom->wakeupfds[0]);
231 iocom->wakeupfds[0] = -1;
232 }
233 if (iocom->wakeupfds[1] >= 0) {
234 close(iocom->wakeupfds[1]);
235 iocom->wakeupfds[1] = -1;
236 }
237 pthread_mutex_destroy(&iocom->mtx);
9ab15106
MD
238}
239
0d20ec8a
MD
240/*
241 * Initialize a circuit structure and add it to the iocom's circuit_tree.
242 * circuit0 is left out and will not be added to the tree.
243 */
244void
245dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
246{
247 circuit->iocom = iocom;
248 RB_INIT(&circuit->staterd_tree);
249 RB_INIT(&circuit->statewr_tree);
250 if (circuit->msgid)
251 RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
252}
253
4a2e0eae
MD
254/*
255 * Allocate a new one-way message.
256 */
0c3a8cd0 257dmsg_msg_t *
0d20ec8a
MD
258dmsg_msg_alloc(dmsg_circuit_t *circuit,
259 size_t aux_size, uint32_t cmd,
260 void (*func)(dmsg_msg_t *), void *data)
9ab15106 261{
0d20ec8a 262 dmsg_iocom_t *iocom = circuit->iocom;
0c3a8cd0 263 dmsg_state_t *state = NULL;
0c3a8cd0 264 dmsg_msg_t *msg;
9ab15106
MD
265 int hbytes;
266
7dc0f844 267 pthread_mutex_lock(&iocom->mtx);
9ab15106 268 if (aux_size) {
5bc5bca2
MD
269 aux_size = (aux_size + DMSG_ALIGNMASK) &
270 ~DMSG_ALIGNMASK;
9ab15106 271 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
78476205 272 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
273 } else {
274 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
78476205 275 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106 276 }
5bc5bca2 277 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
29ead430
MD
278 /*
279 * Create state when CREATE is set without REPLY.
0d20ec8a
MD
280 * Assign a unique msgid, in this case simply using
281 * the pointer value for 'state'.
29ead430 282 *
0c3a8cd0
MD
283 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
284 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
0d20ec8a
MD
285 *
286 * NOTE: state initiated by us and state initiated by
287 * a remote create are placed in different RB trees.
288 * The msgid for SPAN state is used in source/target
289 * for message routing as appropriate.
29ead430
MD
290 */
291 state = malloc(sizeof(*state));
292 bzero(state, sizeof(*state));
293 state->iocom = iocom;
0d20ec8a 294 state->circuit = circuit;
0c3a8cd0 295 state->flags = DMSG_STATE_DYNAMIC;
29ead430 296 state->msgid = (uint64_t)(uintptr_t)state;
5bc5bca2
MD
297 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
298 state->rxcmd = DMSGF_REPLY;
0d20ec8a 299 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
29ead430
MD
300 state->func = func;
301 state->any.any = data;
302 pthread_mutex_lock(&iocom->mtx);
0d20ec8a 303 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
29ead430 304 pthread_mutex_unlock(&iocom->mtx);
0c3a8cd0 305 state->flags |= DMSG_STATE_INSERTED;
29ead430 306 }
7dc0f844 307 pthread_mutex_unlock(&iocom->mtx);
9ab15106
MD
308 if (msg == NULL) {
309 msg = malloc(sizeof(*msg));
78476205 310 bzero(msg, sizeof(*msg));
9ab15106
MD
311 msg->aux_data = NULL;
312 msg->aux_size = 0;
313 }
314 if (msg->aux_size != aux_size) {
315 if (msg->aux_data) {
316 free(msg->aux_data);
317 msg->aux_data = NULL;
318 msg->aux_size = 0;
319 }
320 if (aux_size) {
321 msg->aux_data = malloc(aux_size);
322 msg->aux_size = aux_size;
323 }
324 }
5bc5bca2 325 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
4a2e0eae
MD
326 if (hbytes)
327 bzero(&msg->any.head, hbytes);
78476205 328 msg->hdr_size = hbytes;
0d20ec8a 329 msg->any.head.magic = DMSG_HDR_MAGIC;
9ab15106 330 msg->any.head.cmd = cmd;
8c280d5d
MD
331 msg->any.head.aux_descr = 0;
332 msg->any.head.aux_crc = 0;
0d20ec8a
MD
333 msg->any.head.circuit = 0;
334 msg->circuit = circuit;
335 msg->iocom = iocom;
29ead430
MD
336 if (state) {
337 msg->state = state;
338 state->msg = msg;
339 msg->any.head.msgid = state->msgid;
0d20ec8a
MD
340 } else {
341 msg->any.head.msgid = 0;
29ead430 342 }
9ab15106
MD
343 return (msg);
344}
345
4a2e0eae
MD
346/*
347 * Free a message so it can be reused afresh.
348 *
349 * NOTE: aux_size can be 0 with a non-NULL aux_data.
350 */
7dc0f844 351static
9ab15106 352void
0c3a8cd0 353dmsg_msg_free_locked(dmsg_msg_t *msg)
9ab15106 354{
0d20ec8a 355 dmsg_iocom_t *iocom = msg->iocom;
29ead430 356
7dc0f844 357 msg->state = NULL;
9ab15106 358 if (msg->aux_data)
78476205 359 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
9ab15106 360 else
78476205 361 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
9ab15106
MD
362}
363
7dc0f844 364void
0c3a8cd0 365dmsg_msg_free(dmsg_msg_t *msg)
7dc0f844 366{
0d20ec8a 367 dmsg_iocom_t *iocom = msg->iocom;
29ead430 368
7dc0f844 369 pthread_mutex_lock(&iocom->mtx);
0c3a8cd0 370 dmsg_msg_free_locked(msg);
7dc0f844
MD
371 pthread_mutex_unlock(&iocom->mtx);
372}
373
9ab15106
MD
374/*
375 * I/O core loop for an iocom.
7dc0f844
MD
376 *
377 * Thread localized, iocom->mtx not held.
9ab15106
MD
378 */
379void
0c3a8cd0 380dmsg_iocom_core(dmsg_iocom_t *iocom)
9ab15106 381{
7dc0f844
MD
382 struct pollfd fds[3];
383 char dummybuf[256];
0c3a8cd0 384 dmsg_msg_t *msg;
4a2e0eae 385 int timeout;
7dc0f844
MD
386 int count;
387 int wi; /* wakeup pipe */
388 int si; /* socket */
389 int ai; /* alt bulk path socket */
9ab15106 390
0c3a8cd0
MD
391 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
392 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
393 DMSG_IOCOMF_WWORK |
394 DMSG_IOCOMF_PWORK |
395 DMSG_IOCOMF_SWORK |
396 DMSG_IOCOMF_ARWORK |
397 DMSG_IOCOMF_AWWORK)) == 0) {
7dc0f844
MD
398 /*
399 * Only poll if no immediate work is pending.
400 * Otherwise we are just wasting our time calling
401 * poll.
402 */
403 timeout = 5000;
4a2e0eae 404
7dc0f844
MD
405 count = 0;
406 wi = -1;
407 si = -1;
408 ai = -1;
9ab15106 409
7dc0f844
MD
410 /*
411 * Always check the inter-thread pipe, e.g.
412 * for iocom->txmsgq work.
413 */
414 wi = count++;
415 fds[wi].fd = iocom->wakeupfds[0];
416 fds[wi].events = POLLIN;
417 fds[wi].revents = 0;
418
419 /*
420 * Check the socket input/output direction as
421 * requested
422 */
0c3a8cd0
MD
423 if (iocom->flags & (DMSG_IOCOMF_RREQ |
424 DMSG_IOCOMF_WREQ)) {
7dc0f844
MD
425 si = count++;
426 fds[si].fd = iocom->sock_fd;
427 fds[si].events = 0;
428 fds[si].revents = 0;
429
0c3a8cd0 430 if (iocom->flags & DMSG_IOCOMF_RREQ)
7dc0f844 431 fds[si].events |= POLLIN;
0c3a8cd0 432 if (iocom->flags & DMSG_IOCOMF_WREQ)
7dc0f844
MD
433 fds[si].events |= POLLOUT;
434 }
9ab15106 435
7dc0f844
MD
436 /*
437 * Check the alternative fd for work.
438 */
439 if (iocom->alt_fd >= 0) {
440 ai = count++;
441 fds[ai].fd = iocom->alt_fd;
442 fds[ai].events = POLLIN;
443 fds[ai].revents = 0;
444 }
445 poll(fds, count, timeout);
446
447 if (wi >= 0 && (fds[wi].revents & POLLIN))
0c3a8cd0 448 iocom->flags |= DMSG_IOCOMF_PWORK;
7dc0f844 449 if (si >= 0 && (fds[si].revents & POLLIN))
0c3a8cd0 450 iocom->flags |= DMSG_IOCOMF_RWORK;
7dc0f844 451 if (si >= 0 && (fds[si].revents & POLLOUT))
0c3a8cd0 452 iocom->flags |= DMSG_IOCOMF_WWORK;
7dc0f844 453 if (wi >= 0 && (fds[wi].revents & POLLOUT))
0c3a8cd0 454 iocom->flags |= DMSG_IOCOMF_WWORK;
7dc0f844 455 if (ai >= 0 && (fds[ai].revents & POLLIN))
0c3a8cd0 456 iocom->flags |= DMSG_IOCOMF_ARWORK;
9ab15106 457 } else {
7dc0f844
MD
458 /*
459 * Always check the pipe
460 */
0c3a8cd0 461 iocom->flags |= DMSG_IOCOMF_PWORK;
9ab15106 462 }
7dc0f844 463
0c3a8cd0
MD
464 if (iocom->flags & DMSG_IOCOMF_SWORK) {
465 iocom->flags &= ~DMSG_IOCOMF_SWORK;
0d20ec8a 466 iocom->signal_callback(iocom);
5903497c
MD
467 }
468
7dc0f844
MD
469 /*
470 * Pending message queues from other threads wake us up
471 * with a write to the wakeupfds[] pipe. We have to clear
472 * the pipe with a dummy read.
473 */
0c3a8cd0
MD
474 if (iocom->flags & DMSG_IOCOMF_PWORK) {
475 iocom->flags &= ~DMSG_IOCOMF_PWORK;
7dc0f844 476 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
0c3a8cd0
MD
477 iocom->flags |= DMSG_IOCOMF_RWORK;
478 iocom->flags |= DMSG_IOCOMF_WWORK;
0d20ec8a 479 if (TAILQ_FIRST(&iocom->txmsgq))
0c3a8cd0 480 dmsg_iocom_flush1(iocom);
9ab15106 481 }
7dc0f844
MD
482
483 /*
484 * Message write sequencing
485 */
0c3a8cd0
MD
486 if (iocom->flags & DMSG_IOCOMF_WWORK)
487 dmsg_iocom_flush1(iocom);
7dc0f844
MD
488
489 /*
490 * Message read sequencing. Run this after the write
491 * sequencing in case the write sequencing allowed another
492 * auto-DELETE to occur on the read side.
493 */
0c3a8cd0
MD
494 if (iocom->flags & DMSG_IOCOMF_RWORK) {
495 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
496 (msg = dmsg_ioq_read(iocom)) != NULL) {
497 if (DMsgDebugOpt) {
81666e1b 498 fprintf(stderr, "receive %s\n",
0c3a8cd0 499 dmsg_msg_str(msg));
81666e1b 500 }
0d20ec8a 501 iocom->rcvmsg_callback(msg);
0c3a8cd0 502 dmsg_state_cleanuprx(iocom, msg);
5903497c
MD
503 }
504 }
7dc0f844 505
0c3a8cd0
MD
506 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
507 iocom->flags &= ~DMSG_IOCOMF_ARWORK;
0d20ec8a 508 iocom->altmsg_callback(iocom);
02454b3e 509 }
9ab15106
MD
510 }
511}
512
3033ecc8
MD
513/*
514 * Make sure there's enough room in the FIFO to hold the
515 * needed data.
516 *
517 * Assume worst case encrypted form is 2x the size of the
518 * plaintext equivalent.
519 */
520static
521size_t
0c3a8cd0 522dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
3033ecc8
MD
523{
524 size_t bytes;
525 size_t nmax;
526
527 bytes = ioq->fifo_cdx - ioq->fifo_beg;
528 nmax = sizeof(ioq->buf) - ioq->fifo_end;
529 if (bytes + nmax / 2 < needed) {
530 if (bytes) {
531 bcopy(ioq->buf + ioq->fifo_beg,
532 ioq->buf,
533 bytes);
534 }
535 ioq->fifo_cdx -= ioq->fifo_beg;
536 ioq->fifo_beg = 0;
537 if (ioq->fifo_cdn < ioq->fifo_end) {
538 bcopy(ioq->buf + ioq->fifo_cdn,
539 ioq->buf + ioq->fifo_cdx,
540 ioq->fifo_end - ioq->fifo_cdn);
541 }
542 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
543 ioq->fifo_cdn = ioq->fifo_cdx;
544 nmax = sizeof(ioq->buf) - ioq->fifo_end;
545 }
546 return(nmax);
547}
548
9ab15106
MD
549/*
550 * Read the next ready message from the ioq, issuing I/O if needed.
551 * Caller should retry on a read-event when NULL is returned.
552 *
5bc5bca2 553 * If an error occurs during reception a DMSG_LNK_ERROR msg will
1b195a98 554 * be returned for each open transaction, then the ioq and iocom
5bc5bca2 555 * will be errored out and a non-transactional DMSG_LNK_ERROR
1b195a98
MD
556 * msg will be returned as the final message. The caller should not call
557 * us again after the final message is returned.
7dc0f844
MD
558 *
559 * Thread localized, iocom->mtx not held.
9ab15106 560 */
0c3a8cd0
MD
561dmsg_msg_t *
562dmsg_ioq_read(dmsg_iocom_t *iocom)
9ab15106 563{
0c3a8cd0
MD
564 dmsg_ioq_t *ioq = &iocom->ioq_rx;
565 dmsg_msg_t *msg;
566 dmsg_state_t *state;
0d20ec8a 567 dmsg_circuit_t *circuit0;
5bc5bca2 568 dmsg_hdr_t *head;
9ab15106 569 ssize_t n;
78476205
MD
570 size_t bytes;
571 size_t nmax;
9ab15106 572 uint32_t xcrc32;
78476205 573 int error;
9ab15106 574
78476205 575again:
0c3a8cd0 576 iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
7dc0f844 577
9ab15106
MD
578 /*
579 * If a message is already pending we can just remove and
78476205 580 * return it. Message state has already been processed.
90e8cd1d 581 * (currently not implemented)
9ab15106
MD
582 */
583 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205
MD
584 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
585 return (msg);
9ab15106
MD
586 }
587
90e8cd1d
MD
588 /*
589 * If the stream is errored out we stop processing it.
590 */
cf715800
MD
591 if (ioq->error)
592 goto skip;
593
9ab15106
MD
594 /*
595 * Message read in-progress (msg is NULL at the moment). We don't
596 * allocate a msg until we have its core header.
597 */
5cf97ec5 598 nmax = sizeof(ioq->buf) - ioq->fifo_end;
3033ecc8 599 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */
9ab15106
MD
600 msg = ioq->msg;
601
602 switch(ioq->state) {
0c3a8cd0 603 case DMSG_MSGQ_STATE_HEADER1:
9ab15106
MD
604 /*
605 * Load the primary header, fail on any non-trivial read
606 * error or on EOF. Since the primary header is the same
607 * size is the message alignment it will never straddle
608 * the end of the buffer.
609 */
0c3a8cd0 610 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
3033ecc8 611 if (bytes < sizeof(msg->any.head)) {
9ab15106 612 n = read(iocom->sock_fd,
5cf97ec5 613 ioq->buf + ioq->fifo_end,
9ab15106
MD
614 nmax);
615 if (n <= 0) {
616 if (n == 0) {
0c3a8cd0 617 ioq->error = DMSG_IOQ_ERROR_EOF;
9ab15106
MD
618 break;
619 }
620 if (errno != EINTR &&
621 errno != EINPROGRESS &&
622 errno != EAGAIN) {
0c3a8cd0 623 ioq->error = DMSG_IOQ_ERROR_SOCK;
9ab15106
MD
624 break;
625 }
626 n = 0;
627 /* fall through */
628 }
3033ecc8
MD
629 ioq->fifo_end += (size_t)n;
630 nmax -= (size_t)n;
9ab15106
MD
631 }
632
633 /*
3033ecc8
MD
634 * Decrypt data received so far. Data will be decrypted
635 * in-place but might create gaps in the FIFO. Partial
636 * blocks are not immediately decrypted.
8c280d5d
MD
637 *
638 * WARNING! The header might be in the wrong endian, we
639 * do not fix it up until we get the entire
640 * extended header.
9ab15106 641 */
0c3a8cd0
MD
642 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
643 dmsg_crypto_decrypt(iocom, ioq);
3033ecc8
MD
644 } else {
645 ioq->fifo_cdx = ioq->fifo_end;
646 ioq->fifo_cdn = ioq->fifo_end;
647 }
648 bytes = ioq->fifo_cdx - ioq->fifo_beg;
649
650 /*
651 * Insufficient data accumulated (msg is NULL, caller will
652 * retry on event).
653 */
654 assert(msg == NULL);
655 if (bytes < sizeof(msg->any.head))
656 break;
9ab15106
MD
657
658 /*
659 * Check and fixup the core header. Note that the icrc
660 * has to be calculated before any fixups, but the crc
661 * fields in the msg may have to be swapped like everything
662 * else.
663 */
3033ecc8 664 head = (void *)(ioq->buf + ioq->fifo_beg);
5bc5bca2
MD
665 if (head->magic != DMSG_HDR_MAGIC &&
666 head->magic != DMSG_HDR_MAGIC_REV) {
0c3a8cd0 667 ioq->error = DMSG_IOQ_ERROR_SYNC;
9ab15106
MD
668 break;
669 }
670
9ab15106
MD
671 /*
672 * Calculate the full header size and aux data size
673 */
5bc5bca2
MD
674 if (head->magic == DMSG_HDR_MAGIC_REV) {
675 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
676 DMSG_ALIGN;
8c280d5d 677 ioq->abytes = bswap32(head->aux_bytes) *
5bc5bca2 678 DMSG_ALIGN;
8c280d5d 679 } else {
5bc5bca2
MD
680 ioq->hbytes = (head->cmd & DMSGF_SIZE) *
681 DMSG_ALIGN;
682 ioq->abytes = head->aux_bytes * DMSG_ALIGN;
8c280d5d 683 }
78476205
MD
684 if (ioq->hbytes < sizeof(msg->any.head) ||
685 ioq->hbytes > sizeof(msg->any) ||
5bc5bca2 686 ioq->abytes > DMSG_AUX_MAX) {
0c3a8cd0 687 ioq->error = DMSG_IOQ_ERROR_FIELD;
9ab15106
MD
688 break;
689 }
690
691 /*
02454b3e 692 * Allocate the message, the next state will fill it in.
9ab15106 693 */
0d20ec8a
MD
694 msg = dmsg_msg_alloc(&iocom->circuit0, ioq->abytes, 0,
695 NULL, NULL);
9ab15106
MD
696 ioq->msg = msg;
697
9ab15106
MD
698 /*
699 * Fall through to the next state. Make sure that the
700 * extended header does not straddle the end of the buffer.
701 * We still want to issue larger reads into our buffer,
702 * book-keeping is easier if we don't bcopy() yet.
3033ecc8
MD
703 *
704 * Make sure there is enough room for bloated encrypt data.
9ab15106 705 */
0c3a8cd0
MD
706 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
707 ioq->state = DMSG_MSGQ_STATE_HEADER2;
9ab15106 708 /* fall through */
0c3a8cd0 709 case DMSG_MSGQ_STATE_HEADER2:
9ab15106
MD
710 /*
711 * Fill out the extended header.
712 */
713 assert(msg != NULL);
714 if (bytes < ioq->hbytes) {
715 n = read(iocom->sock_fd,
02454b3e 716 ioq->buf + ioq->fifo_end,
9ab15106
MD
717 nmax);
718 if (n <= 0) {
719 if (n == 0) {
0c3a8cd0 720 ioq->error = DMSG_IOQ_ERROR_EOF;
9ab15106
MD
721 break;
722 }
723 if (errno != EINTR &&
724 errno != EINPROGRESS &&
725 errno != EAGAIN) {
0c3a8cd0 726 ioq->error = DMSG_IOQ_ERROR_SOCK;
9ab15106
MD
727 break;
728 }
729 n = 0;
730 /* fall through */
731 }
3033ecc8
MD
732 ioq->fifo_end += (size_t)n;
733 nmax -= (size_t)n;
9ab15106
MD
734 }
735
0c3a8cd0
MD
736 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
737 dmsg_crypto_decrypt(iocom, ioq);
3033ecc8
MD
738 } else {
739 ioq->fifo_cdx = ioq->fifo_end;
740 ioq->fifo_cdn = ioq->fifo_end;
741 }
742 bytes = ioq->fifo_cdx - ioq->fifo_beg;
743
9ab15106
MD
744 /*
745 * Insufficient data accumulated (set msg NULL so caller will
746 * retry on event).
747 */
748 if (bytes < ioq->hbytes) {
749 msg = NULL;
750 break;
751 }
752
753 /*
5cf97ec5 754 * Calculate the extended header, decrypt data received
8c280d5d
MD
755 * so far. Handle endian-conversion for the entire extended
756 * header.
9ab15106 757 */
5cf97ec5 758 head = (void *)(ioq->buf + ioq->fifo_beg);
9ab15106
MD
759
760 /*
8c280d5d 761 * Check the CRC.
9ab15106 762 */
5bc5bca2 763 if (head->magic == DMSG_HDR_MAGIC_REV)
8c280d5d
MD
764 xcrc32 = bswap32(head->hdr_crc);
765 else
766 xcrc32 = head->hdr_crc;
767 head->hdr_crc = 0;
0c3a8cd0
MD
768 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
769 ioq->error = DMSG_IOQ_ERROR_XCRC;
81666e1b 770 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
0c3a8cd0
MD
771 xcrc32, dmsg_icrc32(head, ioq->hbytes),
772 dmsg_msg_str(msg));
02454b3e 773 assert(0);
8c280d5d
MD
774 break;
775 }
776 head->hdr_crc = xcrc32;
777
5bc5bca2 778 if (head->magic == DMSG_HDR_MAGIC_REV) {
0c3a8cd0 779 dmsg_bswap_head(head);
9ab15106
MD
780 }
781
782 /*
783 * Copy the extended header into the msg and adjust the
784 * FIFO.
785 */
786 bcopy(head, &msg->any, ioq->hbytes);
787
788 /*
789 * We are either done or we fall-through.
790 */
791 if (ioq->abytes == 0) {
792 ioq->fifo_beg += ioq->hbytes;
793 break;
794 }
795
796 /*
3033ecc8
MD
797 * Must adjust bytes (and the state) when falling through.
798 * nmax doesn't change.
9ab15106
MD
799 */
800 ioq->fifo_beg += ioq->hbytes;
9ab15106 801 bytes -= ioq->hbytes;
0c3a8cd0 802 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
9ab15106 803 /* fall through */
0c3a8cd0 804 case DMSG_MSGQ_STATE_AUXDATA1:
9ab15106
MD
805 /*
806 * Copy the partial or complete payload from remaining
3033ecc8
MD
807 * bytes in the FIFO in order to optimize the makeroom call
808 * in the AUXDATA2 state. We have to fall-through either
9ab15106 809 * way so we can check the crc.
78476205 810 *
3033ecc8 811 * msg->aux_size tracks our aux data.
9ab15106 812 */
9ab15106 813 if (bytes >= ioq->abytes) {
5cf97ec5 814 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
815 ioq->abytes);
816 msg->aux_size = ioq->abytes;
817 ioq->fifo_beg += ioq->abytes;
3033ecc8
MD
818 assert(ioq->fifo_beg <= ioq->fifo_cdx);
819 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
9ab15106
MD
820 bytes -= ioq->abytes;
821 } else if (bytes) {
5cf97ec5 822 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
823 bytes);
824 msg->aux_size = bytes;
825 ioq->fifo_beg += bytes;
5cf97ec5
MD
826 if (ioq->fifo_cdx < ioq->fifo_beg)
827 ioq->fifo_cdx = ioq->fifo_beg;
3033ecc8
MD
828 assert(ioq->fifo_beg <= ioq->fifo_cdx);
829 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
9ab15106 830 bytes = 0;
78476205
MD
831 } else {
832 msg->aux_size = 0;
9ab15106 833 }
0c3a8cd0 834 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
9ab15106 835 /* fall through */
0c3a8cd0 836 case DMSG_MSGQ_STATE_AUXDATA2:
9ab15106 837 /*
3033ecc8 838 * Make sure there is enough room for more data.
9ab15106
MD
839 */
840 assert(msg);
0c3a8cd0 841 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
3033ecc8
MD
842
843 /*
844 * Read and decrypt more of the payload.
845 */
9ab15106
MD
846 if (msg->aux_size < ioq->abytes) {
847 assert(bytes == 0);
848 n = read(iocom->sock_fd,
3033ecc8
MD
849 ioq->buf + ioq->fifo_end,
850 nmax);
9ab15106
MD
851 if (n <= 0) {
852 if (n == 0) {
0c3a8cd0 853 ioq->error = DMSG_IOQ_ERROR_EOF;
9ab15106
MD
854 break;
855 }
856 if (errno != EINTR &&
857 errno != EINPROGRESS &&
858 errno != EAGAIN) {
0c3a8cd0 859 ioq->error = DMSG_IOQ_ERROR_SOCK;
9ab15106
MD
860 break;
861 }
862 n = 0;
863 /* fall through */
864 }
3033ecc8
MD
865 ioq->fifo_end += (size_t)n;
866 nmax -= (size_t)n;
867 }
868
0c3a8cd0
MD
869 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
870 dmsg_crypto_decrypt(iocom, ioq);
3033ecc8
MD
871 } else {
872 ioq->fifo_cdx = ioq->fifo_end;
873 ioq->fifo_cdn = ioq->fifo_end;
874 }
875 bytes = ioq->fifo_cdx - ioq->fifo_beg;
876
877 if (bytes > ioq->abytes - msg->aux_size)
878 bytes = ioq->abytes - msg->aux_size;
879
880 if (bytes) {
881 bcopy(ioq->buf + ioq->fifo_beg,
882 msg->aux_data + msg->aux_size,
883 bytes);
884 msg->aux_size += bytes;
885 ioq->fifo_beg += bytes;
9ab15106
MD
886 }
887
888 /*
889 * Insufficient data accumulated (set msg NULL so caller will
890 * retry on event).
891 */
892 if (msg->aux_size < ioq->abytes) {
893 msg = NULL;
894 break;
895 }
896 assert(msg->aux_size == ioq->abytes);
9ab15106
MD
897
898 /*
8c280d5d 899 * Check aux_crc, then we are done.
9ab15106 900 */
0c3a8cd0 901 xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
8c280d5d 902 if (xcrc32 != msg->any.head.aux_crc) {
0c3a8cd0 903 ioq->error = DMSG_IOQ_ERROR_ACRC;
9ab15106
MD
904 break;
905 }
906 break;
0c3a8cd0 907 case DMSG_MSGQ_STATE_ERROR:
1b195a98
MD
908 /*
909 * Continued calls to drain recorded transactions (returning
910 * a LNK_ERROR for each one), before we return the final
911 * LNK_ERROR.
912 */
913 assert(msg == NULL);
914 break;
9ab15106
MD
915 default:
916 /*
917 * We don't double-return errors, the caller should not
918 * have called us again after getting an error msg.
919 */
920 assert(0);
921 break;
922 }
923
5cf97ec5
MD
924 /*
925 * Check the message sequence. The iv[] should prevent any
926 * possibility of a replay but we add this check anyway.
927 */
928 if (msg && ioq->error == 0) {
929 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
0c3a8cd0 930 ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
5cf97ec5
MD
931 } else {
932 ++ioq->seq;
933 }
934 }
935
9ab15106
MD
936 /*
937 * Handle error, RREQ, or completion
938 *
939 * NOTE: nmax and bytes are invalid at this point, we don't bother
940 * to update them when breaking out.
941 */
942 if (ioq->error) {
cf715800 943skip:
9ab15106 944 /*
1b195a98
MD
945 * An unrecoverable error causes all active receive
946 * transactions to be terminated with a LNK_ERROR message.
9ab15106 947 *
1b195a98
MD
948 * Once all active transactions are exhausted we set the
949 * iocom ERROR flag and return a non-transactional LNK_ERROR
950 * message, which should cause master processing loops to
951 * terminate.
9ab15106 952 */
4a2e0eae 953 assert(ioq->msg == msg);
1b195a98 954 if (msg) {
0c3a8cd0 955 dmsg_msg_free(msg);
9ab15106 956 ioq->msg = NULL;
9ab15106 957 }
1b195a98
MD
958
959 /*
960 * No more I/O read processing
961 */
0c3a8cd0 962 ioq->state = DMSG_MSGQ_STATE_ERROR;
1b195a98
MD
963
964 /*
7dc0f844
MD
965 * Simulate a remote LNK_ERROR DELETE msg for any open
966 * transactions, ending with a final non-transactional
967 * LNK_ERROR (that the session can detect) when no
968 * transactions remain.
0d20ec8a
MD
969 *
970 * We only need to scan transactions on circuit0 as these
971 * will contain all circuit forges, and terminating circuit
972 * forges will automatically terminate the transactions on
973 * any other circuits as well as those circuits.
1b195a98 974 */
0d20ec8a
MD
975 circuit0 = &iocom->circuit0;
976 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
9ab15106 977 msg->any.head.error = ioq->error;
1b195a98 978
7dc0f844 979 pthread_mutex_lock(&iocom->mtx);
0c3a8cd0 980 dmsg_iocom_drain(iocom);
0d20ec8a
MD
981
982 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1b195a98 983 /*
7dc0f844
MD
984 * Active remote transactions are still present.
985 * Simulate the other end sending us a DELETE.
1b195a98 986 */
5bc5bca2 987 if (state->rxcmd & DMSGF_DELETE) {
0c3a8cd0 988 dmsg_msg_free(msg);
7dc0f844
MD
989 msg = NULL;
990 } else {
5bc5bca2 991 /*state->txcmd |= DMSGF_DELETE;*/
7dc0f844 992 msg->state = state;
0d20ec8a 993 msg->iocom = iocom;
7dc0f844 994 msg->any.head.msgid = state->msgid;
5bc5bca2
MD
995 msg->any.head.cmd |= DMSGF_ABORT |
996 DMSGF_DELETE;
7dc0f844 997 }
0d20ec8a 998 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
7dc0f844
MD
999 /*
1000 * Active local transactions are still present.
1001 * Simulate the other end sending us a DELETE.
1002 */
5bc5bca2 1003 if (state->rxcmd & DMSGF_DELETE) {
0c3a8cd0 1004 dmsg_msg_free(msg);
7dc0f844
MD
1005 msg = NULL;
1006 } else {
7dc0f844 1007 msg->state = state;
0d20ec8a 1008 msg->iocom = iocom;
7dc0f844 1009 msg->any.head.msgid = state->msgid;
5bc5bca2
MD
1010 msg->any.head.cmd |= DMSGF_ABORT |
1011 DMSGF_DELETE |
1012 DMSGF_REPLY;
1013 if ((state->rxcmd & DMSGF_CREATE) == 0) {
7dc0f844 1014 msg->any.head.cmd |=
5bc5bca2 1015 DMSGF_CREATE;
7dc0f844
MD
1016 }
1017 }
1b195a98
MD
1018 } else {
1019 /*
7dc0f844
MD
1020 * No active local or remote transactions remain.
1021 * Generate a final LNK_ERROR and flag EOF.
1b195a98
MD
1022 */
1023 msg->state = NULL;
0c3a8cd0 1024 iocom->flags |= DMSG_IOCOMF_EOF;
81666e1b 1025 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1b195a98 1026 }
7dc0f844
MD
1027 pthread_mutex_unlock(&iocom->mtx);
1028
1029 /*
1030 * For the iocom error case we want to set RWORK to indicate
1031 * that more messages might be pending.
1032 *
1033 * It is possible to return NULL when there is more work to
1034 * do because each message has to be DELETEd in both
1035 * directions before we continue on with the next (though
1036 * this could be optimized). The transmit direction will
1037 * re-set RWORK.
1038 */
1039 if (msg)
0c3a8cd0 1040 iocom->flags |= DMSG_IOCOMF_RWORK;
9ab15106
MD
1041 } else if (msg == NULL) {
1042 /*
1043 * Insufficient data received to finish building the message,
1044 * set RREQ and return NULL.
1045 *
1046 * Leave ioq->msg intact.
1047 * Leave the FIFO intact.
1048 */
0c3a8cd0 1049 iocom->flags |= DMSG_IOCOMF_RREQ;
9ab15106
MD
1050 } else {
1051 /*
0d20ec8a 1052 * Continue processing msg.
9ab15106
MD
1053 *
1054 * The fifo has already been advanced past the message.
1055 * Trivially reset the FIFO indices if possible.
7dc0f844
MD
1056 *
1057 * clear the FIFO if it is now empty and set RREQ to wait
1058 * for more from the socket. If the FIFO is not empty set
1059 * TWORK to bypass the poll so we loop immediately.
9ab15106 1060 */
3033ecc8
MD
1061 if (ioq->fifo_beg == ioq->fifo_cdx &&
1062 ioq->fifo_cdn == ioq->fifo_end) {
0c3a8cd0 1063 iocom->flags |= DMSG_IOCOMF_RREQ;
5cf97ec5 1064 ioq->fifo_cdx = 0;
3033ecc8 1065 ioq->fifo_cdn = 0;
9ab15106
MD
1066 ioq->fifo_beg = 0;
1067 ioq->fifo_end = 0;
1068 } else {
0c3a8cd0 1069 iocom->flags |= DMSG_IOCOMF_RWORK;
9ab15106 1070 }
0c3a8cd0 1071 ioq->state = DMSG_MSGQ_STATE_HEADER1;
9ab15106 1072 ioq->msg = NULL;
0d20ec8a
MD
1073
1074 /*
1075 * Handle message routing. Validates non-zero sources
1076 * and routes message. Error will be 0 if the message is
1077 * destined for us.
1078 *
1079 * State processing only occurs for messages destined for us.
1080 */
1081 if (msg->any.head.circuit)
1082 error = dmsg_circuit_relay(msg);
1083 else
1084 error = dmsg_state_msgrx(msg);
1085
1086 if (error) {
1087 /*
1088 * Abort-after-closure, throw message away and
1089 * start reading another.
1090 */
1091 if (error == DMSG_IOQ_ERROR_EALREADY) {
1092 dmsg_msg_free(msg);
1093 goto again;
1094 }
1095
1096 /*
1097 * msg routed, msg pointer no longer owned by us.
1098 * Go to the top and start reading another.
1099 */
1100 if (error == DMSG_IOQ_ERROR_ROUTED)
1101 goto again;
1102
1103 /*
1104 * Process real error and throw away message.
1105 */
1106 ioq->error = error;
1107 goto skip;
1108 }
1109 /* no error, not routed. Fall through and return msg */
9ab15106
MD
1110 }
1111 return (msg);
1112}
1113
1114/*
1115 * Calculate the header and data crc's and write a low-level message to
8c280d5d 1116 * the connection. If aux_crc is non-zero the aux_data crc is already
9ab15106
MD
1117 * assumed to have been set.
1118 *
1119 * A non-NULL msg is added to the queue but not necessarily flushed.
1120 * Calling this function with msg == NULL will get a flush going.
7dc0f844
MD
1121 *
1122 * Caller must hold iocom->mtx.
9ab15106 1123 */
7dc0f844 1124void
0c3a8cd0 1125dmsg_iocom_flush1(dmsg_iocom_t *iocom)
9ab15106 1126{
0c3a8cd0
MD
1127 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1128 dmsg_msg_t *msg;
9ab15106 1129 uint32_t xcrc32;
4a2e0eae 1130 int hbytes;
0c3a8cd0 1131 dmsg_msg_queue_t tmpq;
7dc0f844 1132
0c3a8cd0 1133 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
7dc0f844
MD
1134 TAILQ_INIT(&tmpq);
1135 pthread_mutex_lock(&iocom->mtx);
0d20ec8a
MD
1136 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1137 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
7dc0f844 1138 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
9ab15106 1139 }
7dc0f844 1140 pthread_mutex_unlock(&iocom->mtx);
9ab15106 1141
7dc0f844
MD
1142 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1143 /*
1144 * Process terminal connection errors.
1145 */
1146 TAILQ_REMOVE(&tmpq, msg, qentry);
1147 if (ioq->error) {
1148 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1149 ++ioq->msgcount;
1150 continue;
1151 }
9ab15106 1152
7dc0f844
MD
1153 /*
1154 * Finish populating the msg fields. The salt ensures that
1155 * the iv[] array is ridiculously randomized and we also
1156 * re-seed our PRNG every 32768 messages just to be sure.
1157 */
5bc5bca2 1158 msg->any.head.magic = DMSG_HDR_MAGIC;
7dc0f844
MD
1159 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1160 ++ioq->seq;
1161 if ((ioq->seq & 32767) == 0)
1162 srandomdev();
9ab15106 1163
7dc0f844
MD
1164 /*
1165 * Calculate aux_crc if 0, then calculate hdr_crc.
1166 */
1167 if (msg->aux_size && msg->any.head.aux_crc == 0) {
5bc5bca2 1168 assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
0c3a8cd0 1169 xcrc32 = dmsg_icrc32(msg->aux_data, msg->aux_size);
7dc0f844
MD
1170 msg->any.head.aux_crc = xcrc32;
1171 }
5bc5bca2
MD
1172 msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN;
1173 assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
9ab15106 1174
5bc5bca2
MD
1175 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1176 DMSG_ALIGN;
7dc0f844 1177 msg->any.head.hdr_crc = 0;
0c3a8cd0 1178 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
9ab15106 1179
7dc0f844
MD
1180 /*
1181 * Enqueue the message (the flush codes handles stream
1182 * encryption).
1183 */
1184 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1185 ++ioq->msgcount;
1186 }
0c3a8cd0 1187 dmsg_iocom_flush2(iocom);
4a2e0eae
MD
1188}
1189
7dc0f844
MD
1190/*
1191 * Thread localized, iocom->mtx not held by caller.
1192 */
4a2e0eae 1193void
0c3a8cd0 1194dmsg_iocom_flush2(dmsg_iocom_t *iocom)
4a2e0eae 1195{
0c3a8cd0
MD
1196 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1197 dmsg_msg_t *msg;
3033ecc8 1198 ssize_t n;
0c3a8cd0 1199 struct iovec iov[DMSG_IOQ_MAXIOVEC];
3033ecc8 1200 size_t nact;
78476205
MD
1201 size_t hbytes;
1202 size_t abytes;
02454b3e
MD
1203 size_t hoff;
1204 size_t aoff;
3033ecc8 1205 int iovcnt;
9ab15106 1206
7dc0f844 1207 if (ioq->error) {
0c3a8cd0 1208 dmsg_iocom_drain(iocom);
7dc0f844
MD
1209 return;
1210 }
1211
9ab15106
MD
1212 /*
1213 * Pump messages out the connection by building an iovec.
02454b3e
MD
1214 *
1215 * ioq->hbytes/ioq->abytes tracks how much of the first message
1216 * in the queue has been successfully written out, so we can
1217 * resume writing.
9ab15106 1218 */
3033ecc8
MD
1219 iovcnt = 0;
1220 nact = 0;
02454b3e
MD
1221 hoff = ioq->hbytes;
1222 aoff = ioq->abytes;
9ab15106 1223
78476205 1224 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
5bc5bca2
MD
1225 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1226 DMSG_ALIGN;
9ab15106 1227 abytes = msg->aux_size;
02454b3e
MD
1228 assert(hoff <= hbytes && aoff <= abytes);
1229
1230 if (hoff < hbytes) {
3033ecc8
MD
1231 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1232 iov[iovcnt].iov_len = hbytes - hoff;
1233 nact += hbytes - hoff;
1234 ++iovcnt;
0c3a8cd0 1235 if (iovcnt == DMSG_IOQ_MAXIOVEC)
9ab15106
MD
1236 break;
1237 }
02454b3e 1238 if (aoff < abytes) {
9ab15106 1239 assert(msg->aux_data != NULL);
3033ecc8
MD
1240 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1241 iov[iovcnt].iov_len = abytes - aoff;
1242 nact += abytes - aoff;
1243 ++iovcnt;
0c3a8cd0 1244 if (iovcnt == DMSG_IOQ_MAXIOVEC)
9ab15106
MD
1245 break;
1246 }
02454b3e
MD
1247 hoff = 0;
1248 aoff = 0;
9ab15106 1249 }
3033ecc8 1250 if (iovcnt == 0)
9ab15106
MD
1251 return;
1252
5cf97ec5
MD
1253 /*
1254 * Encrypt and write the data. The crypto code will move the
1255 * data into the fifo and adjust the iov as necessary. If
1256 * encryption is disabled the iov is left alone.
1257 *
02454b3e
MD
1258 * May return a smaller iov (thus a smaller n), with aggregated
1259 * chunks. May reduce nmax to what fits in the FIFO.
3033ecc8
MD
1260 *
1261 * This function sets nact to the number of original bytes now
1262 * encrypted, adding to the FIFO some number of bytes that might
1263 * be greater depending on the crypto mechanic. iov[] is adjusted
1264 * to point at the FIFO if necessary.
1265 *
1266 * NOTE: The return value from the writev() is the post-encrypted
1267 * byte count, not the plaintext count.
5cf97ec5 1268 */
0c3a8cd0 1269 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
3033ecc8
MD
1270 /*
1271 * Make sure the FIFO has a reasonable amount of space
1272 * left (if not completely full).
1273 */
1274 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
5bc5bca2 1275 sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
3033ecc8
MD
1276 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1277 ioq->fifo_end - ioq->fifo_beg);
1278 ioq->fifo_cdx -= ioq->fifo_beg;
1279 ioq->fifo_cdn -= ioq->fifo_beg;
1280 ioq->fifo_end -= ioq->fifo_beg;
1281 ioq->fifo_beg = 0;
1282 }
5cf97ec5 1283
0c3a8cd0 1284 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
3033ecc8
MD
1285 n = writev(iocom->sock_fd, iov, iovcnt);
1286 if (n > 0) {
1287 ioq->fifo_beg += n;
1288 ioq->fifo_cdn += n;
1289 ioq->fifo_cdx += n;
1290 if (ioq->fifo_beg == ioq->fifo_end) {
1291 ioq->fifo_beg = 0;
1292 ioq->fifo_cdn = 0;
1293 ioq->fifo_cdx = 0;
1294 ioq->fifo_end = 0;
1295 }
9ab15106 1296 }
3033ecc8
MD
1297 } else {
1298 n = writev(iocom->sock_fd, iov, iovcnt);
1299 if (n > 0)
1300 nact = n;
1301 else
1302 nact = 0;
9ab15106 1303 }
7dc0f844 1304
7dc0f844
MD
1305 /*
1306 * Clean out the transmit queue based on what we successfully
3033ecc8
MD
1307 * sent (nact is the plaintext count). ioq->hbytes/abytes
1308 * represents the portion of the first message previously sent.
7dc0f844 1309 */
9ab15106 1310 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
5bc5bca2
MD
1311 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1312 DMSG_ALIGN;
9ab15106
MD
1313 abytes = msg->aux_size;
1314
78476205 1315 if ((size_t)nact < hbytes - ioq->hbytes) {
9ab15106 1316 ioq->hbytes += nact;
1bd13960 1317 nact = 0;
9ab15106
MD
1318 break;
1319 }
1320 nact -= hbytes - ioq->hbytes;
1321 ioq->hbytes = hbytes;
78476205 1322 if ((size_t)nact < abytes - ioq->abytes) {
9ab15106 1323 ioq->abytes += nact;
1bd13960 1324 nact = 0;
9ab15106
MD
1325 break;
1326 }
1327 nact -= abytes - ioq->abytes;
1328
78476205 1329 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106
MD
1330 --ioq->msgcount;
1331 ioq->hbytes = 0;
1332 ioq->abytes = 0;
78476205 1333
0c3a8cd0 1334 dmsg_state_cleanuptx(msg);
9ab15106 1335 }
3033ecc8 1336 assert(nact == 0);
81666e1b
MD
1337
1338 /*
3033ecc8 1339 * Process the return value from the write w/regards to blocking.
81666e1b 1340 */
3033ecc8
MD
1341 if (n < 0) {
1342 if (errno != EINTR &&
1343 errno != EINPROGRESS &&
1344 errno != EAGAIN) {
1345 /*
1346 * Fatal write error
1347 */
0c3a8cd0
MD
1348 ioq->error = DMSG_IOQ_ERROR_SOCK;
1349 dmsg_iocom_drain(iocom);
3033ecc8
MD
1350 } else {
1351 /*
1352 * Wait for socket buffer space
1353 */
0c3a8cd0 1354 iocom->flags |= DMSG_IOCOMF_WREQ;
3033ecc8
MD
1355 }
1356 } else {
0c3a8cd0 1357 iocom->flags |= DMSG_IOCOMF_WREQ;
3033ecc8 1358 }
9ab15106 1359 if (ioq->error) {
0c3a8cd0 1360 dmsg_iocom_drain(iocom);
9ab15106
MD
1361 }
1362}
1363
1364/*
1365 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1366 * write events will occur. We don't kill read msgs because we want
1367 * the caller to pull off our contrived terminal error msg to detect
1368 * the connection failure.
7dc0f844
MD
1369 *
1370 * Thread localized, iocom->mtx not held by caller.
9ab15106
MD
1371 */
1372void
0c3a8cd0 1373dmsg_iocom_drain(dmsg_iocom_t *iocom)
9ab15106 1374{
0c3a8cd0
MD
1375 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1376 dmsg_msg_t *msg;
9ab15106 1377
0c3a8cd0 1378 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
02454b3e
MD
1379 ioq->hbytes = 0;
1380 ioq->abytes = 0;
7dc0f844 1381
9ab15106 1382 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205 1383 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106 1384 --ioq->msgcount;
0c3a8cd0 1385 dmsg_state_cleanuptx(msg);
9ab15106 1386 }
9ab15106
MD
1387}
1388
1389/*
8c280d5d 1390 * Write a message to an iocom, with additional state processing.
78476205
MD
1391 */
1392void
0c3a8cd0 1393dmsg_msg_write(dmsg_msg_t *msg)
78476205 1394{
0d20ec8a 1395 dmsg_iocom_t *iocom = msg->iocom;
0c3a8cd0 1396 dmsg_state_t *state;
7dc0f844 1397 char dummy;
78476205 1398
8c280d5d
MD
1399 /*
1400 * Handle state processing, create state if necessary.
1401 */
7dc0f844 1402 pthread_mutex_lock(&iocom->mtx);
8c280d5d 1403 if ((state = msg->state) != NULL) {
78476205 1404 /*
8c280d5d
MD
1405 * Existing transaction (could be reply). It is also
1406 * possible for this to be the first reply (CREATE is set),
1407 * in which case we populate state->txcmd.
29ead430
MD
1408 *
1409 * state->txcmd is adjusted to hold the final message cmd,
1410 * and we also be sure to set the CREATE bit here. We did
0c3a8cd0 1411 * not set it in dmsg_msg_alloc() because that would have
29ead430
MD
1412 * not been serialized (state could have gotten ripped out
1413 * from under the message prior to it being transmitted).
78476205 1414 */
5bc5bca2
MD
1415 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1416 DMSGF_CREATE) {
1417 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
0d20ec8a 1418 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
29ead430 1419 }
8c280d5d 1420 msg->any.head.msgid = state->msgid;
5bc5bca2 1421 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
0d20ec8a 1422 if (msg->any.head.cmd & DMSGF_CREATE) {
5bc5bca2 1423 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
0d20ec8a
MD
1424 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1425 }
8c280d5d
MD
1426 }
1427
1428 /*
7dc0f844
MD
1429 * Queue it for output, wake up the I/O pthread. Note that the
1430 * I/O thread is responsible for generating the CRCs and encryption.
8c280d5d 1431 */
0d20ec8a 1432 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
7dc0f844
MD
1433 dummy = 0;
1434 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
1435 pthread_mutex_unlock(&iocom->mtx);
8c280d5d
MD
1436}
1437
8c280d5d
MD
1438/*
1439 * This is a shortcut to formulate a reply to msg with a simple error code,
1440 * It can reply to and terminate a transaction, or it can reply to a one-way
5bc5bca2 1441 * messages. A DMSG_LNK_ERROR command code is utilized to encode
8c280d5d 1442 * the error code (which can be 0). Not all transactions are terminated
5bc5bca2 1443 * with DMSG_LNK_ERROR status (the low level only cares about the
8c280d5d
MD
1444 * MSGF_DELETE flag), but most are.
1445 *
1446 * Replies to one-way messages are a bit of an oxymoron but the feature
1447 * is used by the debug (DBG) protocol.
1448 *
1449 * The reply contains no extended data.
1450 */
1451void
0c3a8cd0 1452dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
8c280d5d 1453{
0c3a8cd0
MD
1454 dmsg_state_t *state = msg->state;
1455 dmsg_msg_t *nmsg;
8c280d5d
MD
1456 uint32_t cmd;
1457
1458
1459 /*
1460 * Reply with a simple error code and terminate the transaction.
1461 */
5bc5bca2 1462 cmd = DMSG_LNK_ERROR;
8c280d5d
MD
1463
1464 /*
1465 * Check if our direction has even been initiated yet, set CREATE.
1466 *
1467 * Check what direction this is (command or reply direction). Note
1468 * that txcmd might not have been initiated yet.
1469 *
1470 * If our direction has already been closed we just return without
1471 * doing anything.
1472 */
1473 if (state) {
5bc5bca2 1474 if (state->txcmd & DMSGF_DELETE)
8c280d5d 1475 return;
5bc5bca2
MD
1476 if (state->txcmd & DMSGF_REPLY)
1477 cmd |= DMSGF_REPLY;
1478 cmd |= DMSGF_DELETE;
8c280d5d 1479 } else {
5bc5bca2
MD
1480 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1481 cmd |= DMSGF_REPLY;
78476205 1482 }
8c280d5d 1483
29ead430
MD
1484 /*
1485 * Allocate the message and associate it with the existing state.
0d20ec8a 1486 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
29ead430
MD
1487 * allocate new state. We have our state already.
1488 */
0d20ec8a 1489 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
29ead430 1490 if (state) {
5bc5bca2
MD
1491 if ((state->txcmd & DMSGF_CREATE) == 0)
1492 nmsg->any.head.cmd |= DMSGF_CREATE;
29ead430 1493 }
78476205 1494 nmsg->any.head.error = error;
0d20ec8a
MD
1495 nmsg->any.head.msgid = msg->any.head.msgid;
1496 nmsg->any.head.circuit = msg->any.head.circuit;
29ead430 1497 nmsg->state = state;
0c3a8cd0 1498 dmsg_msg_write(nmsg);
8c280d5d
MD
1499}
1500
1501/*
0c3a8cd0 1502 * Similar to dmsg_msg_reply() but leave the transaction open. That is,
8c280d5d
MD
1503 * we are generating a streaming reply or an intermediate acknowledgement
1504 * of some sort as part of the higher level protocol, with more to come
1505 * later.
1506 */
1507void
0c3a8cd0 1508dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
8c280d5d 1509{
0c3a8cd0
MD
1510 dmsg_state_t *state = msg->state;
1511 dmsg_msg_t *nmsg;
8c280d5d
MD
1512 uint32_t cmd;
1513
1514
1515 /*
1516 * Reply with a simple error code and terminate the transaction.
1517 */
5bc5bca2 1518 cmd = DMSG_LNK_ERROR;
8c280d5d
MD
1519
1520 /*
1521 * Check if our direction has even been initiated yet, set CREATE.
1522 *
1523 * Check what direction this is (command or reply direction). Note
1524 * that txcmd might not have been initiated yet.
1525 *
1526 * If our direction has already been closed we just return without
1527 * doing anything.
1528 */
1529 if (state) {
5bc5bca2 1530 if (state->txcmd & DMSGF_DELETE)
8c280d5d 1531 return;
5bc5bca2
MD
1532 if (state->txcmd & DMSGF_REPLY)
1533 cmd |= DMSGF_REPLY;
8c280d5d
MD
1534 /* continuing transaction, do not set MSGF_DELETE */
1535 } else {
5bc5bca2
MD
1536 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1537 cmd |= DMSGF_REPLY;
8c280d5d
MD
1538 }
1539
0d20ec8a 1540 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
29ead430 1541 if (state) {
5bc5bca2
MD
1542 if ((state->txcmd & DMSGF_CREATE) == 0)
1543 nmsg->any.head.cmd |= DMSGF_CREATE;
29ead430 1544 }
8c280d5d 1545 nmsg->any.head.error = error;
0d20ec8a
MD
1546 nmsg->any.head.msgid = msg->any.head.msgid;
1547 nmsg->any.head.circuit = msg->any.head.circuit;
8c280d5d 1548 nmsg->state = state;
0c3a8cd0 1549 dmsg_msg_write(nmsg);
8c280d5d
MD
1550}
1551
1552/*
1553 * Terminate a transaction given a state structure by issuing a DELETE.
1554 */
1555void
0c3a8cd0 1556dmsg_state_reply(dmsg_state_t *state, uint32_t error)
8c280d5d 1557{
0c3a8cd0 1558 dmsg_msg_t *nmsg;
5bc5bca2 1559 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
8c280d5d
MD
1560
1561 /*
1562 * Nothing to do if we already transmitted a delete
1563 */
5bc5bca2 1564 if (state->txcmd & DMSGF_DELETE)
8c280d5d
MD
1565 return;
1566
8c280d5d
MD
1567 /*
1568 * Set REPLY if the other end initiated the command. Otherwise
1569 * we are the command direction.
1570 */
5bc5bca2
MD
1571 if (state->txcmd & DMSGF_REPLY)
1572 cmd |= DMSGF_REPLY;
8c280d5d 1573
0d20ec8a 1574 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
29ead430 1575 if (state) {
5bc5bca2
MD
1576 if ((state->txcmd & DMSGF_CREATE) == 0)
1577 nmsg->any.head.cmd |= DMSGF_CREATE;
29ead430 1578 }
8c280d5d 1579 nmsg->any.head.error = error;
0d20ec8a
MD
1580 nmsg->any.head.msgid = state->msgid;
1581 nmsg->any.head.circuit = state->msg->any.head.circuit;
1582 nmsg->state = state;
1583 dmsg_msg_write(nmsg);
1584}
1585
1586/*
1587 * Terminate a transaction given a state structure by issuing a DELETE.
1588 */
1589void
1590dmsg_state_result(dmsg_state_t *state, uint32_t error)
1591{
1592 dmsg_msg_t *nmsg;
1593 uint32_t cmd = DMSG_LNK_ERROR;
1594
1595 /*
1596 * Nothing to do if we already transmitted a delete
1597 */
1598 if (state->txcmd & DMSGF_DELETE)
1599 return;
1600
1601 /*
1602 * Set REPLY if the other end initiated the command. Otherwise
1603 * we are the command direction.
1604 */
1605 if (state->txcmd & DMSGF_REPLY)
1606 cmd |= DMSGF_REPLY;
1607
1608 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1609 if (state) {
1610 if ((state->txcmd & DMSGF_CREATE) == 0)
1611 nmsg->any.head.cmd |= DMSGF_CREATE;
1612 }
1613 nmsg->any.head.error = error;
1614 nmsg->any.head.msgid = state->msgid;
1615 nmsg->any.head.circuit = state->msg->any.head.circuit;
8c280d5d 1616 nmsg->state = state;
0c3a8cd0 1617 dmsg_msg_write(nmsg);
78476205
MD
1618}
1619
1620/************************************************************************
1621 * TRANSACTION STATE HANDLING *
1622 ************************************************************************
1623 *
1624 */
1625
78476205 1626/*
0d20ec8a
MD
1627 * Process circuit and state tracking for a message after reception, prior
1628 * to execution.
78476205
MD
1629 *
1630 * Called with msglk held and the msg dequeued.
1631 *
1632 * All messages are called with dummy state and return actual state.
1633 * (One-off messages often just return the same dummy state).
1634 *
1635 * May request that caller discard the message by setting *discardp to 1.
1636 * The returned state is not used in this case and is allowed to be NULL.
1637 *
1638 * --
1639 *
1640 * These routines handle persistent and command/reply message state via the
1641 * CREATE and DELETE flags. The first message in a command or reply sequence
1642 * sets CREATE, the last message in a command or reply sequence sets DELETE.
4a2e0eae 1643 *
78476205
MD
1644 * There can be any number of intermediate messages belonging to the same
1645 * sequence sent inbetween the CREATE message and the DELETE message,
1646 * which set neither flag. This represents a streaming command or reply.
1647 *
1648 * Any command message received with CREATE set expects a reply sequence to
1649 * be returned. Reply sequences work the same as command sequences except the
1650 * REPLY bit is also sent. Both the command side and reply side can
1651 * degenerate into a single message with both CREATE and DELETE set. Note
1652 * that one side can be streaming and the other side not, or neither, or both.
1653 *
1654 * The msgid is unique for the initiator. That is, two sides sending a new
1655 * message can use the same msgid without colliding.
1656 *
1657 * --
1658 *
1659 * ABORT sequences work by setting the ABORT flag along with normal message
1660 * state. However, ABORTs can also be sent on half-closed messages, that is
1661 * even if the command or reply side has already sent a DELETE, as long as
1662 * the message has not been fully closed it can still send an ABORT+DELETE
1663 * to terminate the half-closed message state.
1664 *
1665 * Since ABORT+DELETEs can race we silently discard ABORT's for message
1666 * state which has already been fully closed. REPLY+ABORT+DELETEs can
1667 * also race, and in this situation the other side might have already
1668 * initiated a new unrelated command with the same message id. Since
1669 * the abort has not set the CREATE flag the situation can be detected
1670 * and the message will also be discarded.
1671 *
1672 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1673 * The ABORT request is essentially integrated into the command instead
1674 * of being sent later on. In this situation the command implementation
1675 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1676 * special-case non-blocking operation for the command.
1677 *
1678 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1679 * to be mid-stream aborts for command/reply sequences. ABORTs on
1680 * one-way messages are not supported.
1681 *
1682 * NOTE! If a command sequence does not support aborts the ABORT flag is
1683 * simply ignored.
1684 *
1685 * --
1686 *
1687 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1688 * set. One-off messages cannot be aborted and typically aren't processed
1689 * by these routines. The REPLY bit can be used to distinguish whether a
1690 * one-off message is a command or reply. For example, one-off replies
1691 * will typically just contain status updates.
9ab15106 1692 */
78476205 1693static int
0c3a8cd0 1694dmsg_state_msgrx(dmsg_msg_t *msg)
78476205 1695{
0d20ec8a
MD
1696 dmsg_iocom_t *iocom = msg->iocom;
1697 dmsg_circuit_t *circuit;
0c3a8cd0 1698 dmsg_state_t *state;
0d20ec8a
MD
1699 dmsg_state_t sdummy;
1700 dmsg_circuit_t cdummy;
78476205
MD
1701 int error;
1702
0d20ec8a
MD
1703 pthread_mutex_lock(&iocom->mtx);
1704
1705 /*
1706 * Locate existing persistent circuit and state, if any.
1707 */
1708 if (msg->any.head.circuit == 0) {
1709 circuit = &iocom->circuit0;
1710 } else {
1711 cdummy.msgid = msg->any.head.circuit;
1712 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1713 &cdummy);
1714 if (circuit == NULL)
1715 return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1716 }
1717 msg->circuit = circuit;
1718 ++circuit->refs;
1719
78476205 1720 /*
78476205
MD
1721 * If received msg is a command state is on staterd_tree.
1722 * If received msg is a reply state is on statewr_tree.
1723 */
0d20ec8a 1724 sdummy.msgid = msg->any.head.msgid;
5bc5bca2 1725 if (msg->any.head.cmd & DMSGF_REPLY) {
0d20ec8a
MD
1726 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1727 &sdummy);
78476205 1728 } else {
0d20ec8a
MD
1729 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1730 &sdummy);
78476205
MD
1731 }
1732 msg->state = state;
7dc0f844 1733 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1734
1735 /*
1736 * Short-cut one-off or mid-stream messages (state may be NULL).
1737 */
5bc5bca2
MD
1738 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1739 DMSGF_ABORT)) == 0) {
78476205
MD
1740 return(0);
1741 }
1742
1743 /*
1744 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1745 * inside the case statements.
1746 */
5bc5bca2
MD
1747 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1748 DMSGF_REPLY)) {
1749 case DMSGF_CREATE:
1750 case DMSGF_CREATE | DMSGF_DELETE:
78476205
MD
1751 /*
1752 * New persistant command received.
1753 */
1754 if (state) {
81666e1b 1755 fprintf(stderr, "duplicate-trans %s\n",
0c3a8cd0
MD
1756 dmsg_msg_str(msg));
1757 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1758 assert(0);
78476205
MD
1759 break;
1760 }
1761 state = malloc(sizeof(*state));
1762 bzero(state, sizeof(*state));
1763 state->iocom = iocom;
0d20ec8a 1764 state->circuit = circuit;
0c3a8cd0 1765 state->flags = DMSG_STATE_DYNAMIC;
78476205 1766 state->msg = msg;
5bc5bca2
MD
1767 state->txcmd = DMSGF_REPLY;
1768 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
0d20ec8a 1769 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
0c3a8cd0 1770 state->flags |= DMSG_STATE_INSERTED;
8c280d5d 1771 state->msgid = msg->any.head.msgid;
78476205 1772 msg->state = state;
cf715800 1773 pthread_mutex_lock(&iocom->mtx);
0d20ec8a 1774 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
cf715800 1775 pthread_mutex_unlock(&iocom->mtx);
78476205 1776 error = 0;
0c3a8cd0 1777 if (DMsgDebugOpt) {
cf715800
MD
1778 fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1779 state, (uint32_t)state->msgid, iocom);
1780 }
78476205 1781 break;
5bc5bca2 1782 case DMSGF_DELETE:
78476205
MD
1783 /*
1784 * Persistent state is expected but might not exist if an
1785 * ABORT+DELETE races the close.
1786 */
1787 if (state == NULL) {
5bc5bca2 1788 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1789 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1790 } else {
81666e1b 1791 fprintf(stderr, "missing-state %s\n",
0c3a8cd0
MD
1792 dmsg_msg_str(msg));
1793 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1794 assert(0);
78476205
MD
1795 }
1796 break;
1797 }
1798
1799 /*
1800 * Handle another ABORT+DELETE case if the msgid has already
1801 * been reused.
1802 */
5bc5bca2
MD
1803 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1804 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1805 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1806 } else {
81666e1b 1807 fprintf(stderr, "reused-state %s\n",
0c3a8cd0
MD
1808 dmsg_msg_str(msg));
1809 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1810 assert(0);
78476205
MD
1811 }
1812 break;
1813 }
1814 error = 0;
1815 break;
1816 default:
1817 /*
1818 * Check for mid-stream ABORT command received, otherwise
1819 * allow.
1820 */
5bc5bca2 1821 if (msg->any.head.cmd & DMSGF_ABORT) {
78476205 1822 if (state == NULL ||
5bc5bca2 1823 (state->rxcmd & DMSGF_CREATE) == 0) {
0c3a8cd0 1824 error = DMSG_IOQ_ERROR_EALREADY;
78476205
MD
1825 break;
1826 }
1827 }
1828 error = 0;
1829 break;
5bc5bca2
MD
1830 case DMSGF_REPLY | DMSGF_CREATE:
1831 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
78476205
MD
1832 /*
1833 * When receiving a reply with CREATE set the original
1834 * persistent state message should already exist.
1835 */
1836 if (state == NULL) {
81666e1b 1837 fprintf(stderr, "no-state(r) %s\n",
0c3a8cd0
MD
1838 dmsg_msg_str(msg));
1839 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1840 assert(0);
78476205
MD
1841 break;
1842 }
7dc0f844 1843 assert(((state->rxcmd ^ msg->any.head.cmd) &
5bc5bca2
MD
1844 DMSGF_REPLY) == 0);
1845 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
78476205
MD
1846 error = 0;
1847 break;
5bc5bca2 1848 case DMSGF_REPLY | DMSGF_DELETE:
78476205
MD
1849 /*
1850 * Received REPLY+ABORT+DELETE in case where msgid has
1851 * already been fully closed, ignore the message.
1852 */
1853 if (state == NULL) {
5bc5bca2 1854 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1855 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1856 } else {
81666e1b 1857 fprintf(stderr, "no-state(r,d) %s\n",
0c3a8cd0
MD
1858 dmsg_msg_str(msg));
1859 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1860 assert(0);
78476205
MD
1861 }
1862 break;
1863 }
1864
1865 /*
1866 * Received REPLY+ABORT+DELETE in case where msgid has
1867 * already been reused for an unrelated message,
1868 * ignore the message.
1869 */
5bc5bca2
MD
1870 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1871 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1872 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1873 } else {
81666e1b 1874 fprintf(stderr, "reused-state(r,d) %s\n",
0c3a8cd0
MD
1875 dmsg_msg_str(msg));
1876 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1877 assert(0);
78476205
MD
1878 }
1879 break;
1880 }
1881 error = 0;
1882 break;
5bc5bca2 1883 case DMSGF_REPLY:
78476205
MD
1884 /*
1885 * Check for mid-stream ABORT reply received to sent command.
1886 */
5bc5bca2 1887 if (msg->any.head.cmd & DMSGF_ABORT) {
78476205 1888 if (state == NULL ||
5bc5bca2 1889 (state->rxcmd & DMSGF_CREATE) == 0) {
0c3a8cd0 1890 error = DMSG_IOQ_ERROR_EALREADY;
78476205
MD
1891 break;
1892 }
1893 }
1894 error = 0;
1895 break;
1896 }
78476205
MD
1897 return (error);
1898}
1899
9ab15106 1900void
0c3a8cd0 1901dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
9ab15106 1902{
0c3a8cd0 1903 dmsg_state_t *state;
78476205
MD
1904
1905 if ((state = msg->state) == NULL) {
1b195a98
MD
1906 /*
1907 * Free a non-transactional message, there is no state
1908 * to worry about.
1909 */
0c3a8cd0 1910 dmsg_msg_free(msg);
5bc5bca2 1911 } else if (msg->any.head.cmd & DMSGF_DELETE) {
1b195a98
MD
1912 /*
1913 * Message terminating transaction, destroy the related
1914 * state, the original message, and this message (if it
1915 * isn't the original message due to a CREATE|DELETE).
1916 */
7dc0f844 1917 pthread_mutex_lock(&iocom->mtx);
5bc5bca2
MD
1918 state->rxcmd |= DMSGF_DELETE;
1919 if (state->txcmd & DMSGF_DELETE) {
78476205
MD
1920 if (state->msg == msg)
1921 state->msg = NULL;
0c3a8cd0 1922 assert(state->flags & DMSG_STATE_INSERTED);
5bc5bca2
MD
1923 if (state->rxcmd & DMSGF_REPLY) {
1924 assert(msg->any.head.cmd & DMSGF_REPLY);
0c3a8cd0 1925 RB_REMOVE(dmsg_state_tree,
0d20ec8a 1926 &msg->circuit->statewr_tree, state);
78476205 1927 } else {
5bc5bca2 1928 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
0c3a8cd0 1929 RB_REMOVE(dmsg_state_tree,
0d20ec8a 1930 &msg->circuit->staterd_tree, state);
78476205 1931 }
0c3a8cd0
MD
1932 state->flags &= ~DMSG_STATE_INSERTED;
1933 dmsg_state_free(state);
78476205 1934 } else {
7dc0f844 1935 ;
78476205 1936 }
7dc0f844 1937 pthread_mutex_unlock(&iocom->mtx);
0c3a8cd0 1938 dmsg_msg_free(msg);
78476205 1939 } else if (state->msg != msg) {
1b195a98
MD
1940 /*
1941 * Message not terminating transaction, leave state intact
1942 * and free message if it isn't the CREATE message.
1943 */
0c3a8cd0 1944 dmsg_msg_free(msg);
9ab15106 1945 }
78476205
MD
1946}
1947
78476205 1948static void
0c3a8cd0 1949dmsg_state_cleanuptx(dmsg_msg_t *msg)
78476205 1950{
0d20ec8a 1951 dmsg_iocom_t *iocom = msg->iocom;
0c3a8cd0 1952 dmsg_state_t *state;
78476205
MD
1953
1954 if ((state = msg->state) == NULL) {
0c3a8cd0 1955 dmsg_msg_free(msg);
5bc5bca2 1956 } else if (msg->any.head.cmd & DMSGF_DELETE) {
7dc0f844 1957 pthread_mutex_lock(&iocom->mtx);
0d20ec8a 1958 assert((state->txcmd & DMSGF_DELETE) == 0);
5bc5bca2
MD
1959 state->txcmd |= DMSGF_DELETE;
1960 if (state->rxcmd & DMSGF_DELETE) {
78476205
MD
1961 if (state->msg == msg)
1962 state->msg = NULL;
0c3a8cd0 1963 assert(state->flags & DMSG_STATE_INSERTED);
5bc5bca2
MD
1964 if (state->txcmd & DMSGF_REPLY) {
1965 assert(msg->any.head.cmd & DMSGF_REPLY);
0c3a8cd0 1966 RB_REMOVE(dmsg_state_tree,
0d20ec8a 1967 &msg->circuit->staterd_tree, state);
78476205 1968 } else {
5bc5bca2 1969 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
0c3a8cd0 1970 RB_REMOVE(dmsg_state_tree,
0d20ec8a 1971 &msg->circuit->statewr_tree, state);
78476205 1972 }
0c3a8cd0
MD
1973 state->flags &= ~DMSG_STATE_INSERTED;
1974 dmsg_state_free(state);
78476205 1975 } else {
7dc0f844 1976 ;
78476205 1977 }
7dc0f844 1978 pthread_mutex_unlock(&iocom->mtx);
0c3a8cd0 1979 dmsg_msg_free(msg);
78476205 1980 } else if (state->msg != msg) {
0c3a8cd0 1981 dmsg_msg_free(msg);
78476205
MD
1982 }
1983}
1984
7dc0f844
MD
1985/*
1986 * Called with iocom locked
1987 */
78476205 1988void
0c3a8cd0 1989dmsg_state_free(dmsg_state_t *state)
78476205 1990{
0c3a8cd0 1991 dmsg_msg_t *msg;
7dc0f844 1992
0c3a8cd0 1993 if (DMsgDebugOpt) {
cf715800
MD
1994 fprintf(stderr, "terminate state %p id=%08x\n",
1995 state, (uint32_t)state->msgid);
81666e1b 1996 }
7dc0f844 1997 assert(state->any.any == NULL);
78476205
MD
1998 msg = state->msg;
1999 state->msg = NULL;
78476205 2000 if (msg)
0c3a8cd0 2001 dmsg_msg_free_locked(msg);
78476205
MD
2002 free(state);
2003}
2004
0d20ec8a
MD
2005/*
2006 * Called with iocom locked
90e8cd1d 2007 */
90e8cd1d 2008void
0d20ec8a 2009dmsg_circuit_drop(dmsg_circuit_t *circuit)
90e8cd1d 2010{
0d20ec8a
MD
2011 dmsg_iocom_t *iocom = circuit->iocom;
2012 char dummy;
90e8cd1d 2013
0d20ec8a
MD
2014 assert(circuit->refs > 0);
2015 assert(iocom);
90e8cd1d 2016
0d20ec8a
MD
2017 /*
2018 * Decrement circuit refs, destroy circuit when refs drops to 0.
2019 */
2020 if (--circuit->refs > 0)
2021 return;
90e8cd1d 2022
0d20ec8a
MD
2023 assert(RB_EMPTY(&circuit->staterd_tree));
2024 assert(RB_EMPTY(&circuit->statewr_tree));
2025 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2026 circuit->iocom = NULL;
2027 dmsg_free(circuit);
90e8cd1d 2028
0d20ec8a
MD
2029 /*
2030 * When an iocom error is present the rx code will terminate the
2031 * receive side for all transactions and (indirectly) all circuits
2032 * by simulating DELETE messages. The state and related circuits
2033 * don't disappear until the related states are closed in both
2034 * directions
2035 *
2036 * Detect the case where the last circuit is now gone (and thus all
2037 * states for all circuits are gone), and wakeup the rx thread to
2038 * complete the termination.
2039 */
2040 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2041 dummy = 0;
2042 write(iocom->wakeupfds[1], &dummy, 1);
2043 }
90e8cd1d 2044}
90e8cd1d 2045
0c3a8cd0
MD
2046/*
2047 * This swaps endian for a hammer2_msg_hdr. Note that the extended
2048 * header is not adjusted, just the core header.
2049 */
2050void
2051dmsg_bswap_head(dmsg_hdr_t *head)
81666e1b 2052{
0c3a8cd0
MD
2053 head->magic = bswap16(head->magic);
2054 head->reserved02 = bswap16(head->reserved02);
2055 head->salt = bswap32(head->salt);
2056
2057 head->msgid = bswap64(head->msgid);
0d20ec8a
MD
2058 head->circuit = bswap64(head->circuit);
2059 head->reserved18= bswap64(head->reserved18);
0c3a8cd0
MD
2060
2061 head->cmd = bswap32(head->cmd);
2062 head->aux_crc = bswap32(head->aux_crc);
2063 head->aux_bytes = bswap32(head->aux_bytes);
2064 head->error = bswap32(head->error);
2065 head->aux_descr = bswap64(head->aux_descr);
2066 head->reserved38= bswap32(head->reserved38);
2067 head->hdr_crc = bswap32(head->hdr_crc);
81666e1b 2068}