cluster - Major kernel component work (diskiocom, xdisk, kdmsg)
[dragonfly.git] / lib / libdmsg / msg.c
CommitLineData
9ab15106
MD
1/*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
0c3a8cd0 36#include "dmsg_local.h"
9ab15106 37
0c3a8cd0
MD
38int DMsgDebugOpt;
39
40static int dmsg_state_msgrx(dmsg_msg_t *msg);
41static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
78476205 42
0d20ec8a
MD
43RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
44RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
45
9ab15106 46/*
0d20ec8a
MD
47 * STATE TREE - Represents open transactions which are indexed by their
48 * { msgid } relative to the governing iocom.
90e8cd1d
MD
49 */
50int
0d20ec8a 51dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
90e8cd1d 52{
0d20ec8a 53 if (state1->msgid < state2->msgid)
90e8cd1d 54 return(-1);
0d20ec8a 55 if (state1->msgid > state2->msgid)
90e8cd1d
MD
56 return(1);
57 return(0);
58}
59
90e8cd1d 60/*
0d20ec8a
MD
61 * CIRCUIT TREE - Represents open circuits which are indexed by their
62 * { msgid } relative to the governing iocom.
cf715800
MD
63 */
64int
0d20ec8a 65dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
cf715800 66{
0d20ec8a 67 if (circuit1->msgid < circuit2->msgid)
cf715800 68 return(-1);
0d20ec8a 69 if (circuit1->msgid > circuit2->msgid)
cf715800
MD
70 return(1);
71 return(0);
72}
73
cf715800 74/*
9ab15106
MD
75 * Initialize a low-level ioq
76 */
77void
0c3a8cd0 78dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
9ab15106
MD
79{
80 bzero(ioq, sizeof(*ioq));
0c3a8cd0 81 ioq->state = DMSG_MSGQ_STATE_HEADER1;
9ab15106
MD
82 TAILQ_INIT(&ioq->msgq);
83}
84
7dc0f844
MD
85/*
86 * Cleanup queue.
87 *
88 * caller holds iocom->mtx.
89 */
9ab15106 90void
0c3a8cd0 91dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
9ab15106 92{
0c3a8cd0 93 dmsg_msg_t *msg;
9ab15106
MD
94
95 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
7dc0f844 96 assert(0); /* shouldn't happen */
78476205 97 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
0c3a8cd0 98 dmsg_msg_free(msg);
9ab15106
MD
99 }
100 if ((msg = ioq->msg) != NULL) {
101 ioq->msg = NULL;
0c3a8cd0 102 dmsg_msg_free(msg);
9ab15106
MD
103 }
104}
105
106/*
5903497c
MD
107 * Initialize a low-level communications channel.
108 *
29ead430 109 * NOTE: The signal_func() is called at least once from the loop and can be
0c3a8cd0 110 * re-armed via dmsg_iocom_restate().
9ab15106
MD
111 */
112void
0c3a8cd0 113dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
0d20ec8a 114 void (*signal_func)(dmsg_iocom_t *),
0c3a8cd0 115 void (*rcvmsg_func)(dmsg_msg_t *),
11f7caf4 116 void (*dbgmsg_func)(dmsg_msg_t *),
0c3a8cd0 117 void (*altmsg_func)(dmsg_iocom_t *))
9ab15106 118{
e1648a68
MD
119 struct stat st;
120
9ab15106
MD
121 bzero(iocom, sizeof(*iocom));
122
f306de83 123 asprintf(&iocom->label, "iocom-%p", iocom);
0d20ec8a
MD
124 iocom->signal_callback = signal_func;
125 iocom->rcvmsg_callback = rcvmsg_func;
126 iocom->altmsg_callback = altmsg_func;
127 iocom->dbgmsg_callback = dbgmsg_func;
5903497c 128
7dc0f844 129 pthread_mutex_init(&iocom->mtx, NULL);
0d20ec8a 130 RB_INIT(&iocom->circuit_tree);
9ab15106
MD
131 TAILQ_INIT(&iocom->freeq);
132 TAILQ_INIT(&iocom->freeq_aux);
0d20ec8a 133 TAILQ_INIT(&iocom->txmsgq);
9ab15106
MD
134 iocom->sock_fd = sock_fd;
135 iocom->alt_fd = alt_fd;
0c3a8cd0 136 iocom->flags = DMSG_IOCOMF_RREQ;
29ead430 137 if (signal_func)
0c3a8cd0
MD
138 iocom->flags |= DMSG_IOCOMF_SWORK;
139 dmsg_ioq_init(iocom, &iocom->ioq_rx);
140 dmsg_ioq_init(iocom, &iocom->ioq_tx);
7dc0f844
MD
141 if (pipe(iocom->wakeupfds) < 0)
142 assert(0);
143 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
144 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
9ab15106 145
0d20ec8a
MD
146 dmsg_circuit_init(iocom, &iocom->circuit0);
147
62efe6ec
MD
148 /*
149 * Negotiate session crypto synchronously. This will mark the
e1648a68
MD
150 * connection as error'd if it fails. If this is a pipe it's
151 * a linkage that we set up ourselves to the filesystem and there
152 * is no crypto.
62efe6ec 153 */
e1648a68
MD
154 if (fstat(sock_fd, &st) < 0)
155 assert(0);
156 if (S_ISSOCK(st.st_mode))
0c3a8cd0 157 dmsg_crypto_negotiate(iocom);
62efe6ec
MD
158
159 /*
160 * Make sure our fds are set to non-blocking for the iocom core.
161 */
9ab15106
MD
162 if (sock_fd >= 0)
163 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
164#if 0
165 /* if line buffered our single fgets() should be fine */
166 if (alt_fd >= 0)
167 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
168#endif
169}
170
f306de83
MD
171void
172dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
173{
174 va_list va;
175 char *optr;
176
177 va_start(va, ctl);
178 optr = iocom->label;
179 vasprintf(&iocom->label, ctl, va);
180 va_end(va);
181 if (optr)
182 free(optr);
183}
184
7dc0f844 185/*
5903497c
MD
186 * May only be called from a callback from iocom_core.
187 *
188 * Adjust state machine functions, set flags to guarantee that both
189 * the recevmsg_func and the sendmsg_func is called at least once.
190 */
191void
0d20ec8a
MD
192dmsg_iocom_restate(dmsg_iocom_t *iocom,
193 void (*signal_func)(dmsg_iocom_t *),
0c3a8cd0
MD
194 void (*rcvmsg_func)(dmsg_msg_t *msg),
195 void (*altmsg_func)(dmsg_iocom_t *))
5903497c 196{
0d20ec8a
MD
197 iocom->signal_callback = signal_func;
198 iocom->rcvmsg_callback = rcvmsg_func;
199 iocom->altmsg_callback = altmsg_func;
29ead430 200 if (signal_func)
0d20ec8a 201 iocom->flags |= DMSG_IOCOMF_SWORK;
5903497c 202 else
0d20ec8a 203 iocom->flags &= ~DMSG_IOCOMF_SWORK;
29ead430
MD
204}
205
206void
0d20ec8a 207dmsg_iocom_signal(dmsg_iocom_t *iocom)
29ead430 208{
0d20ec8a
MD
209 if (iocom->signal_callback)
210 iocom->flags |= DMSG_IOCOMF_SWORK;
5903497c
MD
211}
212
213/*
7dc0f844
MD
214 * Cleanup a terminating iocom.
215 *
216 * Caller should not hold iocom->mtx. The iocom has already been disconnected
217 * from all possible references to it.
218 */
9ab15106 219void
0c3a8cd0 220dmsg_iocom_done(dmsg_iocom_t *iocom)
9ab15106 221{
0c3a8cd0 222 dmsg_msg_t *msg;
9ab15106 223
7dc0f844
MD
224 if (iocom->sock_fd >= 0) {
225 close(iocom->sock_fd);
226 iocom->sock_fd = -1;
227 }
228 if (iocom->alt_fd >= 0) {
229 close(iocom->alt_fd);
230 iocom->alt_fd = -1;
231 }
0c3a8cd0
MD
232 dmsg_ioq_done(iocom, &iocom->ioq_rx);
233 dmsg_ioq_done(iocom, &iocom->ioq_tx);
9ab15106 234 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
78476205 235 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106
MD
236 free(msg);
237 }
238 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
78476205 239 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
240 free(msg->aux_data);
241 msg->aux_data = NULL;
242 free(msg);
243 }
7dc0f844
MD
244 if (iocom->wakeupfds[0] >= 0) {
245 close(iocom->wakeupfds[0]);
246 iocom->wakeupfds[0] = -1;
247 }
248 if (iocom->wakeupfds[1] >= 0) {
249 close(iocom->wakeupfds[1]);
250 iocom->wakeupfds[1] = -1;
251 }
252 pthread_mutex_destroy(&iocom->mtx);
9ab15106
MD
253}
254
4a2e0eae 255/*
0d20ec8a
MD
256 * Initialize a circuit structure and add it to the iocom's circuit_tree.
257 * circuit0 is left out and will not be added to the tree.
258 */
259void
260dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
261{
262 circuit->iocom = iocom;
263 RB_INIT(&circuit->staterd_tree);
264 RB_INIT(&circuit->statewr_tree);
265 if (circuit->msgid)
266 RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
267}
268
269/*
4a2e0eae
MD
270 * Allocate a new one-way message.
271 */
0c3a8cd0 272dmsg_msg_t *
0d20ec8a
MD
273dmsg_msg_alloc(dmsg_circuit_t *circuit,
274 size_t aux_size, uint32_t cmd,
275 void (*func)(dmsg_msg_t *), void *data)
9ab15106 276{
0d20ec8a 277 dmsg_iocom_t *iocom = circuit->iocom;
0c3a8cd0 278 dmsg_state_t *state = NULL;
0c3a8cd0 279 dmsg_msg_t *msg;
9ab15106 280 int hbytes;
f306de83 281 size_t aligned_size;
9ab15106 282
7dc0f844 283 pthread_mutex_lock(&iocom->mtx);
9ab15106 284 if (aux_size) {
f306de83 285 aligned_size = DMSG_DOALIGN(aux_size);
9ab15106 286 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
78476205 287 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106 288 } else {
f306de83 289 aligned_size = 0;
9ab15106 290 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
78476205 291 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106 292 }
5bc5bca2 293 if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
29ead430
MD
294 /*
295 * Create state when CREATE is set without REPLY.
0d20ec8a
MD
296 * Assign a unique msgid, in this case simply using
297 * the pointer value for 'state'.
29ead430 298 *
0c3a8cd0
MD
299 * NOTE: CREATE in txcmd handled by dmsg_msg_write()
300 * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
0d20ec8a
MD
301 *
302 * NOTE: state initiated by us and state initiated by
303 * a remote create are placed in different RB trees.
304 * The msgid for SPAN state is used in source/target
305 * for message routing as appropriate.
29ead430
MD
306 */
307 state = malloc(sizeof(*state));
308 bzero(state, sizeof(*state));
309 state->iocom = iocom;
0d20ec8a 310 state->circuit = circuit;
0c3a8cd0 311 state->flags = DMSG_STATE_DYNAMIC;
29ead430 312 state->msgid = (uint64_t)(uintptr_t)state;
5bc5bca2
MD
313 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
314 state->rxcmd = DMSGF_REPLY;
0d20ec8a 315 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
29ead430
MD
316 state->func = func;
317 state->any.any = data;
318 pthread_mutex_lock(&iocom->mtx);
0d20ec8a 319 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
29ead430 320 pthread_mutex_unlock(&iocom->mtx);
0c3a8cd0 321 state->flags |= DMSG_STATE_INSERTED;
29ead430 322 }
7dc0f844 323 pthread_mutex_unlock(&iocom->mtx);
9ab15106
MD
324 if (msg == NULL) {
325 msg = malloc(sizeof(*msg));
78476205 326 bzero(msg, sizeof(*msg));
9ab15106
MD
327 msg->aux_data = NULL;
328 msg->aux_size = 0;
329 }
f306de83
MD
330
331 /*
332 * [re]allocate the auxillary data buffer. The caller knows that
333 * a size-aligned buffer will be allocated but we do not want to
334 * force the caller to zero any tail piece, so we do that ourself.
335 */
9ab15106
MD
336 if (msg->aux_size != aux_size) {
337 if (msg->aux_data) {
338 free(msg->aux_data);
339 msg->aux_data = NULL;
340 msg->aux_size = 0;
341 }
342 if (aux_size) {
f306de83 343 msg->aux_data = malloc(aligned_size);
9ab15106 344 msg->aux_size = aux_size;
f306de83
MD
345 if (aux_size != aligned_size) {
346 bzero(msg->aux_data + aux_size,
347 aligned_size - aux_size);
348 }
9ab15106
MD
349 }
350 }
5bc5bca2 351 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
4a2e0eae
MD
352 if (hbytes)
353 bzero(&msg->any.head, hbytes);
78476205 354 msg->hdr_size = hbytes;
0d20ec8a 355 msg->any.head.magic = DMSG_HDR_MAGIC;
9ab15106 356 msg->any.head.cmd = cmd;
8c280d5d
MD
357 msg->any.head.aux_descr = 0;
358 msg->any.head.aux_crc = 0;
0d20ec8a
MD
359 msg->any.head.circuit = 0;
360 msg->circuit = circuit;
361 msg->iocom = iocom;
29ead430
MD
362 if (state) {
363 msg->state = state;
364 state->msg = msg;
365 msg->any.head.msgid = state->msgid;
0d20ec8a
MD
366 } else {
367 msg->any.head.msgid = 0;
29ead430 368 }
9ab15106
MD
369 return (msg);
370}
371
4a2e0eae 372/*
4a2e0eae
MD
373 * Free a message so it can be reused afresh.
374 *
375 * NOTE: aux_size can be 0 with a non-NULL aux_data.
376 */
7dc0f844 377static
9ab15106 378void
0c3a8cd0 379dmsg_msg_free_locked(dmsg_msg_t *msg)
9ab15106 380{
0d20ec8a 381 dmsg_iocom_t *iocom = msg->iocom;
29ead430 382
7dc0f844 383 msg->state = NULL;
9ab15106 384 if (msg->aux_data)
78476205 385 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
9ab15106 386 else
78476205 387 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
9ab15106
MD
388}
389
7dc0f844 390void
0c3a8cd0 391dmsg_msg_free(dmsg_msg_t *msg)
7dc0f844 392{
0d20ec8a 393 dmsg_iocom_t *iocom = msg->iocom;
29ead430 394
7dc0f844 395 pthread_mutex_lock(&iocom->mtx);
0c3a8cd0 396 dmsg_msg_free_locked(msg);
7dc0f844
MD
397 pthread_mutex_unlock(&iocom->mtx);
398}
399
9ab15106
MD
400/*
401 * I/O core loop for an iocom.
7dc0f844
MD
402 *
403 * Thread localized, iocom->mtx not held.
9ab15106
MD
404 */
405void
0c3a8cd0 406dmsg_iocom_core(dmsg_iocom_t *iocom)
9ab15106 407{
7dc0f844
MD
408 struct pollfd fds[3];
409 char dummybuf[256];
0c3a8cd0 410 dmsg_msg_t *msg;
4a2e0eae 411 int timeout;
7dc0f844
MD
412 int count;
413 int wi; /* wakeup pipe */
414 int si; /* socket */
415 int ai; /* alt bulk path socket */
9ab15106 416
0c3a8cd0
MD
417 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
418 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
419 DMSG_IOCOMF_WWORK |
420 DMSG_IOCOMF_PWORK |
421 DMSG_IOCOMF_SWORK |
422 DMSG_IOCOMF_ARWORK |
423 DMSG_IOCOMF_AWWORK)) == 0) {
7dc0f844
MD
424 /*
425 * Only poll if no immediate work is pending.
426 * Otherwise we are just wasting our time calling
427 * poll.
428 */
429 timeout = 5000;
4a2e0eae 430
7dc0f844
MD
431 count = 0;
432 wi = -1;
433 si = -1;
434 ai = -1;
9ab15106 435
7dc0f844
MD
436 /*
437 * Always check the inter-thread pipe, e.g.
438 * for iocom->txmsgq work.
439 */
440 wi = count++;
441 fds[wi].fd = iocom->wakeupfds[0];
442 fds[wi].events = POLLIN;
443 fds[wi].revents = 0;
444
445 /*
446 * Check the socket input/output direction as
447 * requested
448 */
0c3a8cd0
MD
449 if (iocom->flags & (DMSG_IOCOMF_RREQ |
450 DMSG_IOCOMF_WREQ)) {
7dc0f844
MD
451 si = count++;
452 fds[si].fd = iocom->sock_fd;
453 fds[si].events = 0;
454 fds[si].revents = 0;
455
0c3a8cd0 456 if (iocom->flags & DMSG_IOCOMF_RREQ)
7dc0f844 457 fds[si].events |= POLLIN;
0c3a8cd0 458 if (iocom->flags & DMSG_IOCOMF_WREQ)
7dc0f844
MD
459 fds[si].events |= POLLOUT;
460 }
9ab15106 461
7dc0f844
MD
462 /*
463 * Check the alternative fd for work.
464 */
465 if (iocom->alt_fd >= 0) {
466 ai = count++;
467 fds[ai].fd = iocom->alt_fd;
468 fds[ai].events = POLLIN;
469 fds[ai].revents = 0;
470 }
471 poll(fds, count, timeout);
472
473 if (wi >= 0 && (fds[wi].revents & POLLIN))
0c3a8cd0 474 iocom->flags |= DMSG_IOCOMF_PWORK;
7dc0f844 475 if (si >= 0 && (fds[si].revents & POLLIN))
0c3a8cd0 476 iocom->flags |= DMSG_IOCOMF_RWORK;
7dc0f844 477 if (si >= 0 && (fds[si].revents & POLLOUT))
0c3a8cd0 478 iocom->flags |= DMSG_IOCOMF_WWORK;
7dc0f844 479 if (wi >= 0 && (fds[wi].revents & POLLOUT))
0c3a8cd0 480 iocom->flags |= DMSG_IOCOMF_WWORK;
7dc0f844 481 if (ai >= 0 && (fds[ai].revents & POLLIN))
0c3a8cd0 482 iocom->flags |= DMSG_IOCOMF_ARWORK;
9ab15106 483 } else {
7dc0f844
MD
484 /*
485 * Always check the pipe
486 */
0c3a8cd0 487 iocom->flags |= DMSG_IOCOMF_PWORK;
9ab15106 488 }
7dc0f844 489
0c3a8cd0
MD
490 if (iocom->flags & DMSG_IOCOMF_SWORK) {
491 iocom->flags &= ~DMSG_IOCOMF_SWORK;
0d20ec8a 492 iocom->signal_callback(iocom);
5903497c
MD
493 }
494
7dc0f844
MD
495 /*
496 * Pending message queues from other threads wake us up
497 * with a write to the wakeupfds[] pipe. We have to clear
498 * the pipe with a dummy read.
499 */
0c3a8cd0
MD
500 if (iocom->flags & DMSG_IOCOMF_PWORK) {
501 iocom->flags &= ~DMSG_IOCOMF_PWORK;
7dc0f844 502 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
0c3a8cd0
MD
503 iocom->flags |= DMSG_IOCOMF_RWORK;
504 iocom->flags |= DMSG_IOCOMF_WWORK;
0d20ec8a 505 if (TAILQ_FIRST(&iocom->txmsgq))
0c3a8cd0 506 dmsg_iocom_flush1(iocom);
9ab15106 507 }
7dc0f844
MD
508
509 /*
510 * Message write sequencing
511 */
0c3a8cd0
MD
512 if (iocom->flags & DMSG_IOCOMF_WWORK)
513 dmsg_iocom_flush1(iocom);
7dc0f844
MD
514
515 /*
516 * Message read sequencing. Run this after the write
517 * sequencing in case the write sequencing allowed another
518 * auto-DELETE to occur on the read side.
519 */
0c3a8cd0
MD
520 if (iocom->flags & DMSG_IOCOMF_RWORK) {
521 while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
522 (msg = dmsg_ioq_read(iocom)) != NULL) {
523 if (DMsgDebugOpt) {
81666e1b 524 fprintf(stderr, "receive %s\n",
0c3a8cd0 525 dmsg_msg_str(msg));
81666e1b 526 }
0d20ec8a 527 iocom->rcvmsg_callback(msg);
0c3a8cd0 528 dmsg_state_cleanuprx(iocom, msg);
5903497c
MD
529 }
530 }
7dc0f844 531
0c3a8cd0
MD
532 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
533 iocom->flags &= ~DMSG_IOCOMF_ARWORK;
0d20ec8a 534 iocom->altmsg_callback(iocom);
02454b3e 535 }
9ab15106
MD
536 }
537}
538
539/*
3033ecc8
MD
540 * Make sure there's enough room in the FIFO to hold the
541 * needed data.
542 *
543 * Assume worst case encrypted form is 2x the size of the
544 * plaintext equivalent.
545 */
546static
547size_t
0c3a8cd0 548dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
3033ecc8
MD
549{
550 size_t bytes;
551 size_t nmax;
552
553 bytes = ioq->fifo_cdx - ioq->fifo_beg;
554 nmax = sizeof(ioq->buf) - ioq->fifo_end;
555 if (bytes + nmax / 2 < needed) {
556 if (bytes) {
557 bcopy(ioq->buf + ioq->fifo_beg,
558 ioq->buf,
559 bytes);
560 }
561 ioq->fifo_cdx -= ioq->fifo_beg;
562 ioq->fifo_beg = 0;
563 if (ioq->fifo_cdn < ioq->fifo_end) {
564 bcopy(ioq->buf + ioq->fifo_cdn,
565 ioq->buf + ioq->fifo_cdx,
566 ioq->fifo_end - ioq->fifo_cdn);
567 }
568 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
569 ioq->fifo_cdn = ioq->fifo_cdx;
570 nmax = sizeof(ioq->buf) - ioq->fifo_end;
571 }
572 return(nmax);
573}
574
575/*
9ab15106
MD
576 * Read the next ready message from the ioq, issuing I/O if needed.
577 * Caller should retry on a read-event when NULL is returned.
578 *
5bc5bca2 579 * If an error occurs during reception a DMSG_LNK_ERROR msg will
1b195a98 580 * be returned for each open transaction, then the ioq and iocom
5bc5bca2 581 * will be errored out and a non-transactional DMSG_LNK_ERROR
1b195a98
MD
582 * msg will be returned as the final message. The caller should not call
583 * us again after the final message is returned.
7dc0f844
MD
584 *
585 * Thread localized, iocom->mtx not held.
9ab15106 586 */
0c3a8cd0
MD
587dmsg_msg_t *
588dmsg_ioq_read(dmsg_iocom_t *iocom)
9ab15106 589{
0c3a8cd0
MD
590 dmsg_ioq_t *ioq = &iocom->ioq_rx;
591 dmsg_msg_t *msg;
592 dmsg_state_t *state;
0d20ec8a 593 dmsg_circuit_t *circuit0;
5bc5bca2 594 dmsg_hdr_t *head;
9ab15106 595 ssize_t n;
78476205
MD
596 size_t bytes;
597 size_t nmax;
f306de83 598 uint32_t aux_size;
9ab15106 599 uint32_t xcrc32;
78476205 600 int error;
9ab15106 601
78476205 602again:
0c3a8cd0 603 iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
7dc0f844 604
9ab15106
MD
605 /*
606 * If a message is already pending we can just remove and
78476205 607 * return it. Message state has already been processed.
90e8cd1d 608 * (currently not implemented)
9ab15106
MD
609 */
610 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205
MD
611 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
612 return (msg);
9ab15106
MD
613 }
614
90e8cd1d
MD
615 /*
616 * If the stream is errored out we stop processing it.
617 */
cf715800
MD
618 if (ioq->error)
619 goto skip;
620
9ab15106
MD
621 /*
622 * Message read in-progress (msg is NULL at the moment). We don't
623 * allocate a msg until we have its core header.
624 */
5cf97ec5 625 nmax = sizeof(ioq->buf) - ioq->fifo_end;
3033ecc8 626 bytes = ioq->fifo_cdx - ioq->fifo_beg; /* already decrypted */
9ab15106
MD
627 msg = ioq->msg;
628
629 switch(ioq->state) {
0c3a8cd0 630 case DMSG_MSGQ_STATE_HEADER1:
9ab15106
MD
631 /*
632 * Load the primary header, fail on any non-trivial read
633 * error or on EOF. Since the primary header is the same
634 * size is the message alignment it will never straddle
635 * the end of the buffer.
636 */
0c3a8cd0 637 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
3033ecc8 638 if (bytes < sizeof(msg->any.head)) {
9ab15106 639 n = read(iocom->sock_fd,
5cf97ec5 640 ioq->buf + ioq->fifo_end,
9ab15106
MD
641 nmax);
642 if (n <= 0) {
643 if (n == 0) {
0c3a8cd0 644 ioq->error = DMSG_IOQ_ERROR_EOF;
9ab15106
MD
645 break;
646 }
647 if (errno != EINTR &&
648 errno != EINPROGRESS &&
649 errno != EAGAIN) {
0c3a8cd0 650 ioq->error = DMSG_IOQ_ERROR_SOCK;
9ab15106
MD
651 break;
652 }
653 n = 0;
654 /* fall through */
655 }
3033ecc8
MD
656 ioq->fifo_end += (size_t)n;
657 nmax -= (size_t)n;
9ab15106
MD
658 }
659
660 /*
3033ecc8
MD
661 * Decrypt data received so far. Data will be decrypted
662 * in-place but might create gaps in the FIFO. Partial
663 * blocks are not immediately decrypted.
8c280d5d
MD
664 *
665 * WARNING! The header might be in the wrong endian, we
666 * do not fix it up until we get the entire
667 * extended header.
9ab15106 668 */
0c3a8cd0
MD
669 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
670 dmsg_crypto_decrypt(iocom, ioq);
3033ecc8
MD
671 } else {
672 ioq->fifo_cdx = ioq->fifo_end;
673 ioq->fifo_cdn = ioq->fifo_end;
674 }
675 bytes = ioq->fifo_cdx - ioq->fifo_beg;
676
677 /*
678 * Insufficient data accumulated (msg is NULL, caller will
679 * retry on event).
680 */
681 assert(msg == NULL);
682 if (bytes < sizeof(msg->any.head))
683 break;
9ab15106
MD
684
685 /*
686 * Check and fixup the core header. Note that the icrc
687 * has to be calculated before any fixups, but the crc
688 * fields in the msg may have to be swapped like everything
689 * else.
690 */
3033ecc8 691 head = (void *)(ioq->buf + ioq->fifo_beg);
5bc5bca2
MD
692 if (head->magic != DMSG_HDR_MAGIC &&
693 head->magic != DMSG_HDR_MAGIC_REV) {
f306de83
MD
694 fprintf(stderr, "%s: head->magic is bad %02x\n",
695 iocom->label, head->magic);
696 if (iocom->flags & DMSG_IOCOMF_CRYPTED)
697 fprintf(stderr, "(on encrypted link)\n");
0c3a8cd0 698 ioq->error = DMSG_IOQ_ERROR_SYNC;
9ab15106
MD
699 break;
700 }
701
9ab15106
MD
702 /*
703 * Calculate the full header size and aux data size
704 */
5bc5bca2
MD
705 if (head->magic == DMSG_HDR_MAGIC_REV) {
706 ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
707 DMSG_ALIGN;
f306de83 708 aux_size = bswap32(head->aux_bytes);
8c280d5d 709 } else {
5bc5bca2
MD
710 ioq->hbytes = (head->cmd & DMSGF_SIZE) *
711 DMSG_ALIGN;
f306de83 712 aux_size = head->aux_bytes;
8c280d5d 713 }
f306de83
MD
714 ioq->abytes = DMSG_DOALIGN(aux_size);
715 ioq->unaligned_aux_size = aux_size;
78476205
MD
716 if (ioq->hbytes < sizeof(msg->any.head) ||
717 ioq->hbytes > sizeof(msg->any) ||
5bc5bca2 718 ioq->abytes > DMSG_AUX_MAX) {
0c3a8cd0 719 ioq->error = DMSG_IOQ_ERROR_FIELD;
9ab15106
MD
720 break;
721 }
722
723 /*
02454b3e 724 * Allocate the message, the next state will fill it in.
f306de83
MD
725 * Note that the actual buffer will be sized to an aligned
726 * value and the aligned remainder zero'd for convenience.
9ab15106 727 */
f306de83 728 msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0,
0d20ec8a 729 NULL, NULL);
9ab15106
MD
730 ioq->msg = msg;
731
732 /*
9ab15106
MD
733 * Fall through to the next state. Make sure that the
734 * extended header does not straddle the end of the buffer.
735 * We still want to issue larger reads into our buffer,
736 * book-keeping is easier if we don't bcopy() yet.
3033ecc8
MD
737 *
738 * Make sure there is enough room for bloated encrypt data.
9ab15106 739 */
0c3a8cd0
MD
740 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
741 ioq->state = DMSG_MSGQ_STATE_HEADER2;
9ab15106 742 /* fall through */
0c3a8cd0 743 case DMSG_MSGQ_STATE_HEADER2:
9ab15106
MD
744 /*
745 * Fill out the extended header.
746 */
747 assert(msg != NULL);
748 if (bytes < ioq->hbytes) {
749 n = read(iocom->sock_fd,
02454b3e 750 ioq->buf + ioq->fifo_end,
9ab15106
MD
751 nmax);
752 if (n <= 0) {
753 if (n == 0) {
0c3a8cd0 754 ioq->error = DMSG_IOQ_ERROR_EOF;
9ab15106
MD
755 break;
756 }
757 if (errno != EINTR &&
758 errno != EINPROGRESS &&
759 errno != EAGAIN) {
0c3a8cd0 760 ioq->error = DMSG_IOQ_ERROR_SOCK;
9ab15106
MD
761 break;
762 }
763 n = 0;
764 /* fall through */
765 }
3033ecc8
MD
766 ioq->fifo_end += (size_t)n;
767 nmax -= (size_t)n;
9ab15106
MD
768 }
769
0c3a8cd0
MD
770 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
771 dmsg_crypto_decrypt(iocom, ioq);
3033ecc8
MD
772 } else {
773 ioq->fifo_cdx = ioq->fifo_end;
774 ioq->fifo_cdn = ioq->fifo_end;
775 }
776 bytes = ioq->fifo_cdx - ioq->fifo_beg;
777
9ab15106
MD
778 /*
779 * Insufficient data accumulated (set msg NULL so caller will
780 * retry on event).
781 */
782 if (bytes < ioq->hbytes) {
783 msg = NULL;
784 break;
785 }
786
787 /*
5cf97ec5 788 * Calculate the extended header, decrypt data received
8c280d5d
MD
789 * so far. Handle endian-conversion for the entire extended
790 * header.
9ab15106 791 */
5cf97ec5 792 head = (void *)(ioq->buf + ioq->fifo_beg);
9ab15106
MD
793
794 /*
8c280d5d 795 * Check the CRC.
9ab15106 796 */
5bc5bca2 797 if (head->magic == DMSG_HDR_MAGIC_REV)
8c280d5d
MD
798 xcrc32 = bswap32(head->hdr_crc);
799 else
800 xcrc32 = head->hdr_crc;
801 head->hdr_crc = 0;
0c3a8cd0
MD
802 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
803 ioq->error = DMSG_IOQ_ERROR_XCRC;
81666e1b 804 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
0c3a8cd0
MD
805 xcrc32, dmsg_icrc32(head, ioq->hbytes),
806 dmsg_msg_str(msg));
02454b3e 807 assert(0);
8c280d5d
MD
808 break;
809 }
810 head->hdr_crc = xcrc32;
811
5bc5bca2 812 if (head->magic == DMSG_HDR_MAGIC_REV) {
0c3a8cd0 813 dmsg_bswap_head(head);
9ab15106
MD
814 }
815
816 /*
817 * Copy the extended header into the msg and adjust the
818 * FIFO.
819 */
820 bcopy(head, &msg->any, ioq->hbytes);
821
822 /*
823 * We are either done or we fall-through.
824 */
825 if (ioq->abytes == 0) {
826 ioq->fifo_beg += ioq->hbytes;
827 break;
828 }
829
830 /*
3033ecc8
MD
831 * Must adjust bytes (and the state) when falling through.
832 * nmax doesn't change.
9ab15106
MD
833 */
834 ioq->fifo_beg += ioq->hbytes;
9ab15106 835 bytes -= ioq->hbytes;
0c3a8cd0 836 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
9ab15106 837 /* fall through */
0c3a8cd0 838 case DMSG_MSGQ_STATE_AUXDATA1:
9ab15106
MD
839 /*
840 * Copy the partial or complete payload from remaining
3033ecc8
MD
841 * bytes in the FIFO in order to optimize the makeroom call
842 * in the AUXDATA2 state. We have to fall-through either
9ab15106 843 * way so we can check the crc.
78476205 844 *
3033ecc8 845 * msg->aux_size tracks our aux data.
9ab15106 846 */
9ab15106 847 if (bytes >= ioq->abytes) {
5cf97ec5 848 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
849 ioq->abytes);
850 msg->aux_size = ioq->abytes;
851 ioq->fifo_beg += ioq->abytes;
3033ecc8
MD
852 assert(ioq->fifo_beg <= ioq->fifo_cdx);
853 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
9ab15106
MD
854 bytes -= ioq->abytes;
855 } else if (bytes) {
5cf97ec5 856 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
857 bytes);
858 msg->aux_size = bytes;
859 ioq->fifo_beg += bytes;
5cf97ec5
MD
860 if (ioq->fifo_cdx < ioq->fifo_beg)
861 ioq->fifo_cdx = ioq->fifo_beg;
3033ecc8
MD
862 assert(ioq->fifo_beg <= ioq->fifo_cdx);
863 assert(ioq->fifo_cdx <= ioq->fifo_cdn);
9ab15106 864 bytes = 0;
78476205
MD
865 } else {
866 msg->aux_size = 0;
9ab15106 867 }
0c3a8cd0 868 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
9ab15106 869 /* fall through */
0c3a8cd0 870 case DMSG_MSGQ_STATE_AUXDATA2:
9ab15106 871 /*
3033ecc8 872 * Make sure there is enough room for more data.
9ab15106
MD
873 */
874 assert(msg);
0c3a8cd0 875 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
3033ecc8
MD
876
877 /*
878 * Read and decrypt more of the payload.
879 */
9ab15106
MD
880 if (msg->aux_size < ioq->abytes) {
881 assert(bytes == 0);
882 n = read(iocom->sock_fd,
3033ecc8
MD
883 ioq->buf + ioq->fifo_end,
884 nmax);
9ab15106
MD
885 if (n <= 0) {
886 if (n == 0) {
0c3a8cd0 887 ioq->error = DMSG_IOQ_ERROR_EOF;
9ab15106
MD
888 break;
889 }
890 if (errno != EINTR &&
891 errno != EINPROGRESS &&
892 errno != EAGAIN) {
0c3a8cd0 893 ioq->error = DMSG_IOQ_ERROR_SOCK;
9ab15106
MD
894 break;
895 }
896 n = 0;
897 /* fall through */
898 }
3033ecc8
MD
899 ioq->fifo_end += (size_t)n;
900 nmax -= (size_t)n;
901 }
902
0c3a8cd0
MD
903 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
904 dmsg_crypto_decrypt(iocom, ioq);
3033ecc8
MD
905 } else {
906 ioq->fifo_cdx = ioq->fifo_end;
907 ioq->fifo_cdn = ioq->fifo_end;
908 }
909 bytes = ioq->fifo_cdx - ioq->fifo_beg;
910
911 if (bytes > ioq->abytes - msg->aux_size)
912 bytes = ioq->abytes - msg->aux_size;
913
914 if (bytes) {
915 bcopy(ioq->buf + ioq->fifo_beg,
916 msg->aux_data + msg->aux_size,
917 bytes);
918 msg->aux_size += bytes;
919 ioq->fifo_beg += bytes;
9ab15106
MD
920 }
921
922 /*
923 * Insufficient data accumulated (set msg NULL so caller will
924 * retry on event).
f306de83
MD
925 *
926 * Assert the auxillary data size is correct, then record the
927 * original unaligned size from the message header.
9ab15106
MD
928 */
929 if (msg->aux_size < ioq->abytes) {
930 msg = NULL;
931 break;
932 }
933 assert(msg->aux_size == ioq->abytes);
f306de83 934 msg->aux_size = ioq->unaligned_aux_size;
9ab15106
MD
935
936 /*
f306de83
MD
937 * Check aux_crc, then we are done. Note that the crc
938 * is calculated over the aligned size, not the actual
939 * size.
9ab15106 940 */
f306de83 941 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
8c280d5d 942 if (xcrc32 != msg->any.head.aux_crc) {
0c3a8cd0 943 ioq->error = DMSG_IOQ_ERROR_ACRC;
9ab15106
MD
944 break;
945 }
946 break;
0c3a8cd0 947 case DMSG_MSGQ_STATE_ERROR:
1b195a98
MD
948 /*
949 * Continued calls to drain recorded transactions (returning
950 * a LNK_ERROR for each one), before we return the final
951 * LNK_ERROR.
952 */
953 assert(msg == NULL);
954 break;
9ab15106
MD
955 default:
956 /*
957 * We don't double-return errors, the caller should not
958 * have called us again after getting an error msg.
959 */
960 assert(0);
961 break;
962 }
963
964 /*
5cf97ec5
MD
965 * Check the message sequence. The iv[] should prevent any
966 * possibility of a replay but we add this check anyway.
967 */
968 if (msg && ioq->error == 0) {
969 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
0c3a8cd0 970 ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
5cf97ec5
MD
971 } else {
972 ++ioq->seq;
973 }
974 }
975
976 /*
9ab15106
MD
977 * Handle error, RREQ, or completion
978 *
979 * NOTE: nmax and bytes are invalid at this point, we don't bother
980 * to update them when breaking out.
981 */
982 if (ioq->error) {
cf715800 983skip:
9ab15106 984 /*
1b195a98
MD
985 * An unrecoverable error causes all active receive
986 * transactions to be terminated with a LNK_ERROR message.
9ab15106 987 *
1b195a98
MD
988 * Once all active transactions are exhausted we set the
989 * iocom ERROR flag and return a non-transactional LNK_ERROR
990 * message, which should cause master processing loops to
991 * terminate.
9ab15106 992 */
4a2e0eae 993 assert(ioq->msg == msg);
1b195a98 994 if (msg) {
0c3a8cd0 995 dmsg_msg_free(msg);
9ab15106 996 ioq->msg = NULL;
9ab15106 997 }
1b195a98
MD
998
999 /*
1000 * No more I/O read processing
1001 */
0c3a8cd0 1002 ioq->state = DMSG_MSGQ_STATE_ERROR;
1b195a98
MD
1003
1004 /*
7dc0f844
MD
1005 * Simulate a remote LNK_ERROR DELETE msg for any open
1006 * transactions, ending with a final non-transactional
1007 * LNK_ERROR (that the session can detect) when no
1008 * transactions remain.
0d20ec8a
MD
1009 *
1010 * We only need to scan transactions on circuit0 as these
1011 * will contain all circuit forges, and terminating circuit
1012 * forges will automatically terminate the transactions on
1013 * any other circuits as well as those circuits.
1b195a98 1014 */
0d20ec8a
MD
1015 circuit0 = &iocom->circuit0;
1016 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
9ab15106 1017 msg->any.head.error = ioq->error;
1b195a98 1018
7dc0f844 1019 pthread_mutex_lock(&iocom->mtx);
0c3a8cd0 1020 dmsg_iocom_drain(iocom);
0d20ec8a
MD
1021
1022 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1b195a98 1023 /*
7dc0f844
MD
1024 * Active remote transactions are still present.
1025 * Simulate the other end sending us a DELETE.
1b195a98 1026 */
5bc5bca2 1027 if (state->rxcmd & DMSGF_DELETE) {
0c3a8cd0 1028 dmsg_msg_free(msg);
7dc0f844
MD
1029 msg = NULL;
1030 } else {
5bc5bca2 1031 /*state->txcmd |= DMSGF_DELETE;*/
7dc0f844 1032 msg->state = state;
0d20ec8a 1033 msg->iocom = iocom;
7dc0f844 1034 msg->any.head.msgid = state->msgid;
5bc5bca2
MD
1035 msg->any.head.cmd |= DMSGF_ABORT |
1036 DMSGF_DELETE;
7dc0f844 1037 }
0d20ec8a 1038 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
7dc0f844
MD
1039 /*
1040 * Active local transactions are still present.
1041 * Simulate the other end sending us a DELETE.
1042 */
5bc5bca2 1043 if (state->rxcmd & DMSGF_DELETE) {
0c3a8cd0 1044 dmsg_msg_free(msg);
7dc0f844
MD
1045 msg = NULL;
1046 } else {
7dc0f844 1047 msg->state = state;
0d20ec8a 1048 msg->iocom = iocom;
7dc0f844 1049 msg->any.head.msgid = state->msgid;
5bc5bca2
MD
1050 msg->any.head.cmd |= DMSGF_ABORT |
1051 DMSGF_DELETE |
1052 DMSGF_REPLY;
1053 if ((state->rxcmd & DMSGF_CREATE) == 0) {
7dc0f844 1054 msg->any.head.cmd |=
5bc5bca2 1055 DMSGF_CREATE;
7dc0f844
MD
1056 }
1057 }
1b195a98
MD
1058 } else {
1059 /*
7dc0f844
MD
1060 * No active local or remote transactions remain.
1061 * Generate a final LNK_ERROR and flag EOF.
1b195a98
MD
1062 */
1063 msg->state = NULL;
0c3a8cd0 1064 iocom->flags |= DMSG_IOCOMF_EOF;
81666e1b 1065 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1b195a98 1066 }
7dc0f844
MD
1067 pthread_mutex_unlock(&iocom->mtx);
1068
1069 /*
1070 * For the iocom error case we want to set RWORK to indicate
1071 * that more messages might be pending.
1072 *
1073 * It is possible to return NULL when there is more work to
1074 * do because each message has to be DELETEd in both
1075 * directions before we continue on with the next (though
1076 * this could be optimized). The transmit direction will
1077 * re-set RWORK.
1078 */
1079 if (msg)
0c3a8cd0 1080 iocom->flags |= DMSG_IOCOMF_RWORK;
9ab15106
MD
1081 } else if (msg == NULL) {
1082 /*
1083 * Insufficient data received to finish building the message,
1084 * set RREQ and return NULL.
1085 *
1086 * Leave ioq->msg intact.
1087 * Leave the FIFO intact.
1088 */
0c3a8cd0 1089 iocom->flags |= DMSG_IOCOMF_RREQ;
9ab15106
MD
1090 } else {
1091 /*
0d20ec8a 1092 * Continue processing msg.
9ab15106
MD
1093 *
1094 * The fifo has already been advanced past the message.
1095 * Trivially reset the FIFO indices if possible.
7dc0f844
MD
1096 *
1097 * clear the FIFO if it is now empty and set RREQ to wait
1098 * for more from the socket. If the FIFO is not empty set
1099 * TWORK to bypass the poll so we loop immediately.
9ab15106 1100 */
3033ecc8
MD
1101 if (ioq->fifo_beg == ioq->fifo_cdx &&
1102 ioq->fifo_cdn == ioq->fifo_end) {
0c3a8cd0 1103 iocom->flags |= DMSG_IOCOMF_RREQ;
5cf97ec5 1104 ioq->fifo_cdx = 0;
3033ecc8 1105 ioq->fifo_cdn = 0;
9ab15106
MD
1106 ioq->fifo_beg = 0;
1107 ioq->fifo_end = 0;
1108 } else {
0c3a8cd0 1109 iocom->flags |= DMSG_IOCOMF_RWORK;
9ab15106 1110 }
0c3a8cd0 1111 ioq->state = DMSG_MSGQ_STATE_HEADER1;
9ab15106 1112 ioq->msg = NULL;
0d20ec8a
MD
1113
1114 /*
1115 * Handle message routing. Validates non-zero sources
1116 * and routes message. Error will be 0 if the message is
1117 * destined for us.
1118 *
1119 * State processing only occurs for messages destined for us.
1120 */
1121 if (msg->any.head.circuit)
1122 error = dmsg_circuit_relay(msg);
1123 else
1124 error = dmsg_state_msgrx(msg);
1125
1126 if (error) {
1127 /*
1128 * Abort-after-closure, throw message away and
1129 * start reading another.
1130 */
1131 if (error == DMSG_IOQ_ERROR_EALREADY) {
1132 dmsg_msg_free(msg);
1133 goto again;
1134 }
1135
1136 /*
1137 * msg routed, msg pointer no longer owned by us.
1138 * Go to the top and start reading another.
1139 */
1140 if (error == DMSG_IOQ_ERROR_ROUTED)
1141 goto again;
1142
1143 /*
1144 * Process real error and throw away message.
1145 */
1146 ioq->error = error;
1147 goto skip;
1148 }
1149 /* no error, not routed. Fall through and return msg */
9ab15106
MD
1150 }
1151 return (msg);
1152}
1153
1154/*
1155 * Calculate the header and data crc's and write a low-level message to
8c280d5d 1156 * the connection. If aux_crc is non-zero the aux_data crc is already
9ab15106
MD
1157 * assumed to have been set.
1158 *
1159 * A non-NULL msg is added to the queue but not necessarily flushed.
1160 * Calling this function with msg == NULL will get a flush going.
7dc0f844
MD
1161 *
1162 * Caller must hold iocom->mtx.
9ab15106 1163 */
7dc0f844 1164void
0c3a8cd0 1165dmsg_iocom_flush1(dmsg_iocom_t *iocom)
9ab15106 1166{
0c3a8cd0
MD
1167 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1168 dmsg_msg_t *msg;
9ab15106 1169 uint32_t xcrc32;
f306de83
MD
1170 size_t hbytes;
1171 size_t abytes;
0c3a8cd0 1172 dmsg_msg_queue_t tmpq;
7dc0f844 1173
0c3a8cd0 1174 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
7dc0f844
MD
1175 TAILQ_INIT(&tmpq);
1176 pthread_mutex_lock(&iocom->mtx);
0d20ec8a
MD
1177 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1178 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
7dc0f844 1179 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
9ab15106 1180 }
7dc0f844 1181 pthread_mutex_unlock(&iocom->mtx);
9ab15106 1182
7dc0f844
MD
1183 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1184 /*
1185 * Process terminal connection errors.
1186 */
1187 TAILQ_REMOVE(&tmpq, msg, qentry);
1188 if (ioq->error) {
1189 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1190 ++ioq->msgcount;
1191 continue;
1192 }
9ab15106 1193
7dc0f844
MD
1194 /*
1195 * Finish populating the msg fields. The salt ensures that
1196 * the iv[] array is ridiculously randomized and we also
1197 * re-seed our PRNG every 32768 messages just to be sure.
1198 */
5bc5bca2 1199 msg->any.head.magic = DMSG_HDR_MAGIC;
7dc0f844
MD
1200 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1201 ++ioq->seq;
1202 if ((ioq->seq & 32767) == 0)
1203 srandomdev();
9ab15106 1204
7dc0f844
MD
1205 /*
1206 * Calculate aux_crc if 0, then calculate hdr_crc.
1207 */
1208 if (msg->aux_size && msg->any.head.aux_crc == 0) {
f306de83
MD
1209 abytes = DMSG_DOALIGN(msg->aux_size);
1210 xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
7dc0f844
MD
1211 msg->any.head.aux_crc = xcrc32;
1212 }
f306de83 1213 msg->any.head.aux_bytes = msg->aux_size;
9ab15106 1214
5bc5bca2
MD
1215 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1216 DMSG_ALIGN;
7dc0f844 1217 msg->any.head.hdr_crc = 0;
0c3a8cd0 1218 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
9ab15106 1219
7dc0f844
MD
1220 /*
1221 * Enqueue the message (the flush codes handles stream
1222 * encryption).
1223 */
1224 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1225 ++ioq->msgcount;
1226 }
0c3a8cd0 1227 dmsg_iocom_flush2(iocom);
4a2e0eae
MD
1228}
1229
7dc0f844
MD
1230/*
1231 * Thread localized, iocom->mtx not held by caller.
1232 */
4a2e0eae 1233void
0c3a8cd0 1234dmsg_iocom_flush2(dmsg_iocom_t *iocom)
4a2e0eae 1235{
0c3a8cd0
MD
1236 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1237 dmsg_msg_t *msg;
3033ecc8 1238 ssize_t n;
0c3a8cd0 1239 struct iovec iov[DMSG_IOQ_MAXIOVEC];
3033ecc8 1240 size_t nact;
78476205
MD
1241 size_t hbytes;
1242 size_t abytes;
02454b3e
MD
1243 size_t hoff;
1244 size_t aoff;
3033ecc8 1245 int iovcnt;
9ab15106 1246
7dc0f844 1247 if (ioq->error) {
0c3a8cd0 1248 dmsg_iocom_drain(iocom);
7dc0f844
MD
1249 return;
1250 }
1251
9ab15106
MD
1252 /*
1253 * Pump messages out the connection by building an iovec.
02454b3e
MD
1254 *
1255 * ioq->hbytes/ioq->abytes tracks how much of the first message
1256 * in the queue has been successfully written out, so we can
1257 * resume writing.
9ab15106 1258 */
3033ecc8
MD
1259 iovcnt = 0;
1260 nact = 0;
02454b3e
MD
1261 hoff = ioq->hbytes;
1262 aoff = ioq->abytes;
9ab15106 1263
78476205 1264 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
5bc5bca2
MD
1265 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1266 DMSG_ALIGN;
8d6d37b8 1267 abytes = DMSG_DOALIGN(msg->aux_size);
02454b3e
MD
1268 assert(hoff <= hbytes && aoff <= abytes);
1269
1270 if (hoff < hbytes) {
3033ecc8
MD
1271 iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1272 iov[iovcnt].iov_len = hbytes - hoff;
1273 nact += hbytes - hoff;
1274 ++iovcnt;
0c3a8cd0 1275 if (iovcnt == DMSG_IOQ_MAXIOVEC)
9ab15106
MD
1276 break;
1277 }
02454b3e 1278 if (aoff < abytes) {
9ab15106 1279 assert(msg->aux_data != NULL);
3033ecc8
MD
1280 iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1281 iov[iovcnt].iov_len = abytes - aoff;
1282 nact += abytes - aoff;
1283 ++iovcnt;
0c3a8cd0 1284 if (iovcnt == DMSG_IOQ_MAXIOVEC)
9ab15106
MD
1285 break;
1286 }
02454b3e
MD
1287 hoff = 0;
1288 aoff = 0;
9ab15106 1289 }
3033ecc8 1290 if (iovcnt == 0)
9ab15106
MD
1291 return;
1292
1293 /*
5cf97ec5
MD
1294 * Encrypt and write the data. The crypto code will move the
1295 * data into the fifo and adjust the iov as necessary. If
1296 * encryption is disabled the iov is left alone.
1297 *
02454b3e
MD
1298 * May return a smaller iov (thus a smaller n), with aggregated
1299 * chunks. May reduce nmax to what fits in the FIFO.
3033ecc8
MD
1300 *
1301 * This function sets nact to the number of original bytes now
1302 * encrypted, adding to the FIFO some number of bytes that might
1303 * be greater depending on the crypto mechanic. iov[] is adjusted
1304 * to point at the FIFO if necessary.
1305 *
1306 * NOTE: The return value from the writev() is the post-encrypted
1307 * byte count, not the plaintext count.
5cf97ec5 1308 */
0c3a8cd0 1309 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
3033ecc8
MD
1310 /*
1311 * Make sure the FIFO has a reasonable amount of space
1312 * left (if not completely full).
1313 */
1314 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
5bc5bca2 1315 sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
3033ecc8
MD
1316 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1317 ioq->fifo_end - ioq->fifo_beg);
1318 ioq->fifo_cdx -= ioq->fifo_beg;
1319 ioq->fifo_cdn -= ioq->fifo_beg;
1320 ioq->fifo_end -= ioq->fifo_beg;
1321 ioq->fifo_beg = 0;
1322 }
5cf97ec5 1323
0c3a8cd0 1324 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
3033ecc8
MD
1325 n = writev(iocom->sock_fd, iov, iovcnt);
1326 if (n > 0) {
1327 ioq->fifo_beg += n;
1328 ioq->fifo_cdn += n;
1329 ioq->fifo_cdx += n;
1330 if (ioq->fifo_beg == ioq->fifo_end) {
1331 ioq->fifo_beg = 0;
1332 ioq->fifo_cdn = 0;
1333 ioq->fifo_cdx = 0;
1334 ioq->fifo_end = 0;
1335 }
8d6d37b8 1336 /* XXX what if interrupted mid-write? */
f306de83
MD
1337 } else {
1338 nact = 0;
9ab15106 1339 }
3033ecc8
MD
1340 } else {
1341 n = writev(iocom->sock_fd, iov, iovcnt);
1342 if (n > 0)
1343 nact = n;
1344 else
1345 nact = 0;
9ab15106 1346 }
7dc0f844
MD
1347
1348 /*
7dc0f844 1349 * Clean out the transmit queue based on what we successfully
3033ecc8
MD
1350 * sent (nact is the plaintext count). ioq->hbytes/abytes
1351 * represents the portion of the first message previously sent.
7dc0f844 1352 */
9ab15106 1353 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
5bc5bca2
MD
1354 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1355 DMSG_ALIGN;
8d6d37b8 1356 abytes = DMSG_DOALIGN(msg->aux_size);
9ab15106 1357
78476205 1358 if ((size_t)nact < hbytes - ioq->hbytes) {
9ab15106 1359 ioq->hbytes += nact;
1bd13960 1360 nact = 0;
9ab15106
MD
1361 break;
1362 }
1363 nact -= hbytes - ioq->hbytes;
1364 ioq->hbytes = hbytes;
78476205 1365 if ((size_t)nact < abytes - ioq->abytes) {
9ab15106 1366 ioq->abytes += nact;
1bd13960 1367 nact = 0;
9ab15106
MD
1368 break;
1369 }
1370 nact -= abytes - ioq->abytes;
1371
78476205 1372 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106
MD
1373 --ioq->msgcount;
1374 ioq->hbytes = 0;
1375 ioq->abytes = 0;
78476205 1376
0c3a8cd0 1377 dmsg_state_cleanuptx(msg);
9ab15106 1378 }
3033ecc8 1379 assert(nact == 0);
81666e1b
MD
1380
1381 /*
3033ecc8 1382 * Process the return value from the write w/regards to blocking.
81666e1b 1383 */
3033ecc8
MD
1384 if (n < 0) {
1385 if (errno != EINTR &&
1386 errno != EINPROGRESS &&
1387 errno != EAGAIN) {
1388 /*
1389 * Fatal write error
1390 */
0c3a8cd0
MD
1391 ioq->error = DMSG_IOQ_ERROR_SOCK;
1392 dmsg_iocom_drain(iocom);
3033ecc8
MD
1393 } else {
1394 /*
1395 * Wait for socket buffer space
1396 */
0c3a8cd0 1397 iocom->flags |= DMSG_IOCOMF_WREQ;
3033ecc8
MD
1398 }
1399 } else {
0c3a8cd0 1400 iocom->flags |= DMSG_IOCOMF_WREQ;
3033ecc8 1401 }
9ab15106 1402 if (ioq->error) {
0c3a8cd0 1403 dmsg_iocom_drain(iocom);
9ab15106
MD
1404 }
1405}
1406
1407/*
1408 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1409 * write events will occur. We don't kill read msgs because we want
1410 * the caller to pull off our contrived terminal error msg to detect
1411 * the connection failure.
7dc0f844
MD
1412 *
1413 * Thread localized, iocom->mtx not held by caller.
9ab15106
MD
1414 */
1415void
0c3a8cd0 1416dmsg_iocom_drain(dmsg_iocom_t *iocom)
9ab15106 1417{
0c3a8cd0
MD
1418 dmsg_ioq_t *ioq = &iocom->ioq_tx;
1419 dmsg_msg_t *msg;
9ab15106 1420
0c3a8cd0 1421 iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
02454b3e
MD
1422 ioq->hbytes = 0;
1423 ioq->abytes = 0;
7dc0f844 1424
9ab15106 1425 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205 1426 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106 1427 --ioq->msgcount;
0c3a8cd0 1428 dmsg_state_cleanuptx(msg);
9ab15106 1429 }
9ab15106
MD
1430}
1431
1432/*
8c280d5d 1433 * Write a message to an iocom, with additional state processing.
78476205
MD
1434 */
1435void
0c3a8cd0 1436dmsg_msg_write(dmsg_msg_t *msg)
78476205 1437{
0d20ec8a 1438 dmsg_iocom_t *iocom = msg->iocom;
0c3a8cd0 1439 dmsg_state_t *state;
7dc0f844 1440 char dummy;
78476205 1441
8c280d5d
MD
1442 /*
1443 * Handle state processing, create state if necessary.
1444 */
7dc0f844 1445 pthread_mutex_lock(&iocom->mtx);
8c280d5d 1446 if ((state = msg->state) != NULL) {
78476205 1447 /*
8c280d5d
MD
1448 * Existing transaction (could be reply). It is also
1449 * possible for this to be the first reply (CREATE is set),
1450 * in which case we populate state->txcmd.
29ead430
MD
1451 *
1452 * state->txcmd is adjusted to hold the final message cmd,
1453 * and we also be sure to set the CREATE bit here. We did
0c3a8cd0 1454 * not set it in dmsg_msg_alloc() because that would have
29ead430
MD
1455 * not been serialized (state could have gotten ripped out
1456 * from under the message prior to it being transmitted).
78476205 1457 */
5bc5bca2
MD
1458 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1459 DMSGF_CREATE) {
1460 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
0d20ec8a 1461 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
29ead430 1462 }
8c280d5d 1463 msg->any.head.msgid = state->msgid;
5bc5bca2 1464 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
0d20ec8a 1465 if (msg->any.head.cmd & DMSGF_CREATE) {
5bc5bca2 1466 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
0d20ec8a 1467 }
8c280d5d
MD
1468 }
1469
1470 /*
7dc0f844
MD
1471 * Queue it for output, wake up the I/O pthread. Note that the
1472 * I/O thread is responsible for generating the CRCs and encryption.
8c280d5d 1473 */
0d20ec8a 1474 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
7dc0f844
MD
1475 dummy = 0;
1476 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
1477 pthread_mutex_unlock(&iocom->mtx);
8c280d5d
MD
1478}
1479
8c280d5d
MD
1480/*
1481 * This is a shortcut to formulate a reply to msg with a simple error code,
1482 * It can reply to and terminate a transaction, or it can reply to a one-way
5bc5bca2 1483 * messages. A DMSG_LNK_ERROR command code is utilized to encode
8c280d5d 1484 * the error code (which can be 0). Not all transactions are terminated
5bc5bca2 1485 * with DMSG_LNK_ERROR status (the low level only cares about the
8c280d5d
MD
1486 * MSGF_DELETE flag), but most are.
1487 *
1488 * Replies to one-way messages are a bit of an oxymoron but the feature
1489 * is used by the debug (DBG) protocol.
1490 *
1491 * The reply contains no extended data.
1492 */
1493void
0c3a8cd0 1494dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
8c280d5d 1495{
0c3a8cd0
MD
1496 dmsg_state_t *state = msg->state;
1497 dmsg_msg_t *nmsg;
8c280d5d
MD
1498 uint32_t cmd;
1499
1500
1501 /*
1502 * Reply with a simple error code and terminate the transaction.
1503 */
5bc5bca2 1504 cmd = DMSG_LNK_ERROR;
8c280d5d
MD
1505
1506 /*
1507 * Check if our direction has even been initiated yet, set CREATE.
1508 *
1509 * Check what direction this is (command or reply direction). Note
1510 * that txcmd might not have been initiated yet.
1511 *
1512 * If our direction has already been closed we just return without
1513 * doing anything.
1514 */
1515 if (state) {
5bc5bca2 1516 if (state->txcmd & DMSGF_DELETE)
8c280d5d 1517 return;
5bc5bca2
MD
1518 if (state->txcmd & DMSGF_REPLY)
1519 cmd |= DMSGF_REPLY;
1520 cmd |= DMSGF_DELETE;
8c280d5d 1521 } else {
5bc5bca2
MD
1522 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1523 cmd |= DMSGF_REPLY;
78476205 1524 }
8c280d5d 1525
29ead430
MD
1526 /*
1527 * Allocate the message and associate it with the existing state.
0d20ec8a 1528 * We cannot pass DMSGF_CREATE to msg_alloc() because that may
29ead430
MD
1529 * allocate new state. We have our state already.
1530 */
0d20ec8a 1531 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
29ead430 1532 if (state) {
5bc5bca2
MD
1533 if ((state->txcmd & DMSGF_CREATE) == 0)
1534 nmsg->any.head.cmd |= DMSGF_CREATE;
29ead430 1535 }
78476205 1536 nmsg->any.head.error = error;
0d20ec8a
MD
1537 nmsg->any.head.msgid = msg->any.head.msgid;
1538 nmsg->any.head.circuit = msg->any.head.circuit;
29ead430 1539 nmsg->state = state;
0c3a8cd0 1540 dmsg_msg_write(nmsg);
8c280d5d
MD
1541}
1542
1543/*
0c3a8cd0 1544 * Similar to dmsg_msg_reply() but leave the transaction open. That is,
8c280d5d
MD
1545 * we are generating a streaming reply or an intermediate acknowledgement
1546 * of some sort as part of the higher level protocol, with more to come
1547 * later.
1548 */
1549void
0c3a8cd0 1550dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
8c280d5d 1551{
0c3a8cd0
MD
1552 dmsg_state_t *state = msg->state;
1553 dmsg_msg_t *nmsg;
8c280d5d
MD
1554 uint32_t cmd;
1555
1556
1557 /*
1558 * Reply with a simple error code and terminate the transaction.
1559 */
5bc5bca2 1560 cmd = DMSG_LNK_ERROR;
8c280d5d
MD
1561
1562 /*
1563 * Check if our direction has even been initiated yet, set CREATE.
1564 *
1565 * Check what direction this is (command or reply direction). Note
1566 * that txcmd might not have been initiated yet.
1567 *
1568 * If our direction has already been closed we just return without
1569 * doing anything.
1570 */
1571 if (state) {
5bc5bca2 1572 if (state->txcmd & DMSGF_DELETE)
8c280d5d 1573 return;
5bc5bca2
MD
1574 if (state->txcmd & DMSGF_REPLY)
1575 cmd |= DMSGF_REPLY;
8c280d5d
MD
1576 /* continuing transaction, do not set MSGF_DELETE */
1577 } else {
5bc5bca2
MD
1578 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1579 cmd |= DMSGF_REPLY;
8c280d5d
MD
1580 }
1581
0d20ec8a 1582 nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
29ead430 1583 if (state) {
5bc5bca2
MD
1584 if ((state->txcmd & DMSGF_CREATE) == 0)
1585 nmsg->any.head.cmd |= DMSGF_CREATE;
29ead430 1586 }
8c280d5d 1587 nmsg->any.head.error = error;
0d20ec8a
MD
1588 nmsg->any.head.msgid = msg->any.head.msgid;
1589 nmsg->any.head.circuit = msg->any.head.circuit;
8c280d5d 1590 nmsg->state = state;
0c3a8cd0 1591 dmsg_msg_write(nmsg);
8c280d5d
MD
1592}
1593
1594/*
1595 * Terminate a transaction given a state structure by issuing a DELETE.
1596 */
1597void
0c3a8cd0 1598dmsg_state_reply(dmsg_state_t *state, uint32_t error)
8c280d5d 1599{
0c3a8cd0 1600 dmsg_msg_t *nmsg;
5bc5bca2 1601 uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
8c280d5d
MD
1602
1603 /*
1604 * Nothing to do if we already transmitted a delete
1605 */
5bc5bca2 1606 if (state->txcmd & DMSGF_DELETE)
8c280d5d
MD
1607 return;
1608
1609 /*
8c280d5d
MD
1610 * Set REPLY if the other end initiated the command. Otherwise
1611 * we are the command direction.
1612 */
5bc5bca2
MD
1613 if (state->txcmd & DMSGF_REPLY)
1614 cmd |= DMSGF_REPLY;
8c280d5d 1615
0d20ec8a 1616 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
29ead430 1617 if (state) {
5bc5bca2
MD
1618 if ((state->txcmd & DMSGF_CREATE) == 0)
1619 nmsg->any.head.cmd |= DMSGF_CREATE;
29ead430 1620 }
8c280d5d 1621 nmsg->any.head.error = error;
0d20ec8a
MD
1622 nmsg->any.head.msgid = state->msgid;
1623 nmsg->any.head.circuit = state->msg->any.head.circuit;
1624 nmsg->state = state;
1625 dmsg_msg_write(nmsg);
1626}
1627
1628/*
1629 * Terminate a transaction given a state structure by issuing a DELETE.
1630 */
1631void
1632dmsg_state_result(dmsg_state_t *state, uint32_t error)
1633{
1634 dmsg_msg_t *nmsg;
1635 uint32_t cmd = DMSG_LNK_ERROR;
1636
1637 /*
1638 * Nothing to do if we already transmitted a delete
1639 */
1640 if (state->txcmd & DMSGF_DELETE)
1641 return;
1642
1643 /*
1644 * Set REPLY if the other end initiated the command. Otherwise
1645 * we are the command direction.
1646 */
1647 if (state->txcmd & DMSGF_REPLY)
1648 cmd |= DMSGF_REPLY;
1649
1650 nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1651 if (state) {
1652 if ((state->txcmd & DMSGF_CREATE) == 0)
1653 nmsg->any.head.cmd |= DMSGF_CREATE;
1654 }
1655 nmsg->any.head.error = error;
1656 nmsg->any.head.msgid = state->msgid;
1657 nmsg->any.head.circuit = state->msg->any.head.circuit;
8c280d5d 1658 nmsg->state = state;
0c3a8cd0 1659 dmsg_msg_write(nmsg);
78476205
MD
1660}
1661
1662/************************************************************************
1663 * TRANSACTION STATE HANDLING *
1664 ************************************************************************
1665 *
1666 */
1667
78476205 1668/*
0d20ec8a
MD
1669 * Process circuit and state tracking for a message after reception, prior
1670 * to execution.
78476205
MD
1671 *
1672 * Called with msglk held and the msg dequeued.
1673 *
1674 * All messages are called with dummy state and return actual state.
1675 * (One-off messages often just return the same dummy state).
1676 *
1677 * May request that caller discard the message by setting *discardp to 1.
1678 * The returned state is not used in this case and is allowed to be NULL.
1679 *
1680 * --
1681 *
1682 * These routines handle persistent and command/reply message state via the
1683 * CREATE and DELETE flags. The first message in a command or reply sequence
1684 * sets CREATE, the last message in a command or reply sequence sets DELETE.
4a2e0eae 1685 *
78476205
MD
1686 * There can be any number of intermediate messages belonging to the same
1687 * sequence sent inbetween the CREATE message and the DELETE message,
1688 * which set neither flag. This represents a streaming command or reply.
1689 *
1690 * Any command message received with CREATE set expects a reply sequence to
1691 * be returned. Reply sequences work the same as command sequences except the
1692 * REPLY bit is also sent. Both the command side and reply side can
1693 * degenerate into a single message with both CREATE and DELETE set. Note
1694 * that one side can be streaming and the other side not, or neither, or both.
1695 *
1696 * The msgid is unique for the initiator. That is, two sides sending a new
1697 * message can use the same msgid without colliding.
1698 *
1699 * --
1700 *
1701 * ABORT sequences work by setting the ABORT flag along with normal message
1702 * state. However, ABORTs can also be sent on half-closed messages, that is
1703 * even if the command or reply side has already sent a DELETE, as long as
1704 * the message has not been fully closed it can still send an ABORT+DELETE
1705 * to terminate the half-closed message state.
1706 *
1707 * Since ABORT+DELETEs can race we silently discard ABORT's for message
1708 * state which has already been fully closed. REPLY+ABORT+DELETEs can
1709 * also race, and in this situation the other side might have already
1710 * initiated a new unrelated command with the same message id. Since
1711 * the abort has not set the CREATE flag the situation can be detected
1712 * and the message will also be discarded.
1713 *
1714 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1715 * The ABORT request is essentially integrated into the command instead
1716 * of being sent later on. In this situation the command implementation
1717 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1718 * special-case non-blocking operation for the command.
1719 *
1720 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1721 * to be mid-stream aborts for command/reply sequences. ABORTs on
1722 * one-way messages are not supported.
1723 *
1724 * NOTE! If a command sequence does not support aborts the ABORT flag is
1725 * simply ignored.
1726 *
1727 * --
1728 *
1729 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1730 * set. One-off messages cannot be aborted and typically aren't processed
1731 * by these routines. The REPLY bit can be used to distinguish whether a
1732 * one-off message is a command or reply. For example, one-off replies
1733 * will typically just contain status updates.
9ab15106 1734 */
78476205 1735static int
0c3a8cd0 1736dmsg_state_msgrx(dmsg_msg_t *msg)
78476205 1737{
0d20ec8a
MD
1738 dmsg_iocom_t *iocom = msg->iocom;
1739 dmsg_circuit_t *circuit;
0c3a8cd0 1740 dmsg_state_t *state;
0d20ec8a
MD
1741 dmsg_state_t sdummy;
1742 dmsg_circuit_t cdummy;
78476205
MD
1743 int error;
1744
0d20ec8a
MD
1745 pthread_mutex_lock(&iocom->mtx);
1746
1747 /*
1748 * Locate existing persistent circuit and state, if any.
1749 */
1750 if (msg->any.head.circuit == 0) {
1751 circuit = &iocom->circuit0;
1752 } else {
1753 cdummy.msgid = msg->any.head.circuit;
1754 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1755 &cdummy);
1756 if (circuit == NULL)
1757 return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1758 }
1759 msg->circuit = circuit;
1760 ++circuit->refs;
1761
78476205 1762 /*
78476205
MD
1763 * If received msg is a command state is on staterd_tree.
1764 * If received msg is a reply state is on statewr_tree.
1765 */
0d20ec8a 1766 sdummy.msgid = msg->any.head.msgid;
5bc5bca2 1767 if (msg->any.head.cmd & DMSGF_REPLY) {
0d20ec8a
MD
1768 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1769 &sdummy);
78476205 1770 } else {
0d20ec8a
MD
1771 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1772 &sdummy);
78476205
MD
1773 }
1774 msg->state = state;
7dc0f844 1775 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1776
1777 /*
1778 * Short-cut one-off or mid-stream messages (state may be NULL).
1779 */
5bc5bca2
MD
1780 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1781 DMSGF_ABORT)) == 0) {
78476205
MD
1782 return(0);
1783 }
1784
1785 /*
1786 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1787 * inside the case statements.
1788 */
5bc5bca2
MD
1789 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1790 DMSGF_REPLY)) {
1791 case DMSGF_CREATE:
1792 case DMSGF_CREATE | DMSGF_DELETE:
78476205
MD
1793 /*
1794 * New persistant command received.
1795 */
1796 if (state) {
81666e1b 1797 fprintf(stderr, "duplicate-trans %s\n",
0c3a8cd0
MD
1798 dmsg_msg_str(msg));
1799 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1800 assert(0);
78476205
MD
1801 break;
1802 }
1803 state = malloc(sizeof(*state));
1804 bzero(state, sizeof(*state));
1805 state->iocom = iocom;
0d20ec8a 1806 state->circuit = circuit;
0c3a8cd0 1807 state->flags = DMSG_STATE_DYNAMIC;
78476205 1808 state->msg = msg;
5bc5bca2
MD
1809 state->txcmd = DMSGF_REPLY;
1810 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
0d20ec8a 1811 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
0c3a8cd0 1812 state->flags |= DMSG_STATE_INSERTED;
8c280d5d 1813 state->msgid = msg->any.head.msgid;
78476205 1814 msg->state = state;
cf715800 1815 pthread_mutex_lock(&iocom->mtx);
0d20ec8a 1816 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
cf715800 1817 pthread_mutex_unlock(&iocom->mtx);
78476205 1818 error = 0;
0c3a8cd0 1819 if (DMsgDebugOpt) {
cf715800
MD
1820 fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1821 state, (uint32_t)state->msgid, iocom);
1822 }
78476205 1823 break;
5bc5bca2 1824 case DMSGF_DELETE:
78476205
MD
1825 /*
1826 * Persistent state is expected but might not exist if an
1827 * ABORT+DELETE races the close.
1828 */
1829 if (state == NULL) {
5bc5bca2 1830 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1831 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1832 } else {
81666e1b 1833 fprintf(stderr, "missing-state %s\n",
0c3a8cd0
MD
1834 dmsg_msg_str(msg));
1835 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1836 assert(0);
78476205
MD
1837 }
1838 break;
1839 }
1840
1841 /*
1842 * Handle another ABORT+DELETE case if the msgid has already
1843 * been reused.
1844 */
5bc5bca2
MD
1845 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1846 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1847 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1848 } else {
81666e1b 1849 fprintf(stderr, "reused-state %s\n",
0c3a8cd0
MD
1850 dmsg_msg_str(msg));
1851 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1852 assert(0);
78476205
MD
1853 }
1854 break;
1855 }
1856 error = 0;
1857 break;
1858 default:
1859 /*
1860 * Check for mid-stream ABORT command received, otherwise
1861 * allow.
1862 */
5bc5bca2 1863 if (msg->any.head.cmd & DMSGF_ABORT) {
78476205 1864 if (state == NULL ||
5bc5bca2 1865 (state->rxcmd & DMSGF_CREATE) == 0) {
0c3a8cd0 1866 error = DMSG_IOQ_ERROR_EALREADY;
78476205
MD
1867 break;
1868 }
1869 }
1870 error = 0;
1871 break;
5bc5bca2
MD
1872 case DMSGF_REPLY | DMSGF_CREATE:
1873 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
78476205
MD
1874 /*
1875 * When receiving a reply with CREATE set the original
1876 * persistent state message should already exist.
1877 */
1878 if (state == NULL) {
81666e1b 1879 fprintf(stderr, "no-state(r) %s\n",
0c3a8cd0
MD
1880 dmsg_msg_str(msg));
1881 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1882 assert(0);
78476205
MD
1883 break;
1884 }
7dc0f844 1885 assert(((state->rxcmd ^ msg->any.head.cmd) &
5bc5bca2
MD
1886 DMSGF_REPLY) == 0);
1887 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
78476205
MD
1888 error = 0;
1889 break;
5bc5bca2 1890 case DMSGF_REPLY | DMSGF_DELETE:
78476205
MD
1891 /*
1892 * Received REPLY+ABORT+DELETE in case where msgid has
1893 * already been fully closed, ignore the message.
1894 */
1895 if (state == NULL) {
5bc5bca2 1896 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1897 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1898 } else {
81666e1b 1899 fprintf(stderr, "no-state(r,d) %s\n",
0c3a8cd0
MD
1900 dmsg_msg_str(msg));
1901 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1902 assert(0);
78476205
MD
1903 }
1904 break;
1905 }
1906
1907 /*
1908 * Received REPLY+ABORT+DELETE in case where msgid has
1909 * already been reused for an unrelated message,
1910 * ignore the message.
1911 */
5bc5bca2
MD
1912 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1913 if (msg->any.head.cmd & DMSGF_ABORT) {
0c3a8cd0 1914 error = DMSG_IOQ_ERROR_EALREADY;
78476205 1915 } else {
81666e1b 1916 fprintf(stderr, "reused-state(r,d) %s\n",
0c3a8cd0
MD
1917 dmsg_msg_str(msg));
1918 error = DMSG_IOQ_ERROR_TRANS;
cf715800 1919 assert(0);
78476205
MD
1920 }
1921 break;
1922 }
1923 error = 0;
1924 break;
5bc5bca2 1925 case DMSGF_REPLY:
78476205
MD
1926 /*
1927 * Check for mid-stream ABORT reply received to sent command.
1928 */
5bc5bca2 1929 if (msg->any.head.cmd & DMSGF_ABORT) {
78476205 1930 if (state == NULL ||
5bc5bca2 1931 (state->rxcmd & DMSGF_CREATE) == 0) {
0c3a8cd0 1932 error = DMSG_IOQ_ERROR_EALREADY;
78476205
MD
1933 break;
1934 }
1935 }
1936 error = 0;
1937 break;
1938 }
78476205
MD
1939 return (error);
1940}
1941
9ab15106 1942void
0c3a8cd0 1943dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
9ab15106 1944{
0c3a8cd0 1945 dmsg_state_t *state;
78476205
MD
1946
1947 if ((state = msg->state) == NULL) {
1b195a98
MD
1948 /*
1949 * Free a non-transactional message, there is no state
1950 * to worry about.
1951 */
0c3a8cd0 1952 dmsg_msg_free(msg);
5bc5bca2 1953 } else if (msg->any.head.cmd & DMSGF_DELETE) {
1b195a98
MD
1954 /*
1955 * Message terminating transaction, destroy the related
1956 * state, the original message, and this message (if it
1957 * isn't the original message due to a CREATE|DELETE).
1958 */
7dc0f844 1959 pthread_mutex_lock(&iocom->mtx);
5bc5bca2
MD
1960 state->rxcmd |= DMSGF_DELETE;
1961 if (state->txcmd & DMSGF_DELETE) {
78476205
MD
1962 if (state->msg == msg)
1963 state->msg = NULL;
0c3a8cd0 1964 assert(state->flags & DMSG_STATE_INSERTED);
5bc5bca2
MD
1965 if (state->rxcmd & DMSGF_REPLY) {
1966 assert(msg->any.head.cmd & DMSGF_REPLY);
0c3a8cd0 1967 RB_REMOVE(dmsg_state_tree,
0d20ec8a 1968 &msg->circuit->statewr_tree, state);
78476205 1969 } else {
5bc5bca2 1970 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
0c3a8cd0 1971 RB_REMOVE(dmsg_state_tree,
0d20ec8a 1972 &msg->circuit->staterd_tree, state);
78476205 1973 }
0c3a8cd0
MD
1974 state->flags &= ~DMSG_STATE_INSERTED;
1975 dmsg_state_free(state);
78476205 1976 } else {
7dc0f844 1977 ;
78476205 1978 }
7dc0f844 1979 pthread_mutex_unlock(&iocom->mtx);
0c3a8cd0 1980 dmsg_msg_free(msg);
78476205 1981 } else if (state->msg != msg) {
1b195a98
MD
1982 /*
1983 * Message not terminating transaction, leave state intact
1984 * and free message if it isn't the CREATE message.
1985 */
0c3a8cd0 1986 dmsg_msg_free(msg);
9ab15106 1987 }
78476205
MD
1988}
1989
78476205 1990static void
0c3a8cd0 1991dmsg_state_cleanuptx(dmsg_msg_t *msg)
78476205 1992{
0d20ec8a 1993 dmsg_iocom_t *iocom = msg->iocom;
0c3a8cd0 1994 dmsg_state_t *state;
78476205
MD
1995
1996 if ((state = msg->state) == NULL) {
0c3a8cd0 1997 dmsg_msg_free(msg);
5bc5bca2 1998 } else if (msg->any.head.cmd & DMSGF_DELETE) {
7dc0f844 1999 pthread_mutex_lock(&iocom->mtx);
0d20ec8a 2000 assert((state->txcmd & DMSGF_DELETE) == 0);
5bc5bca2
MD
2001 state->txcmd |= DMSGF_DELETE;
2002 if (state->rxcmd & DMSGF_DELETE) {
78476205
MD
2003 if (state->msg == msg)
2004 state->msg = NULL;
0c3a8cd0 2005 assert(state->flags & DMSG_STATE_INSERTED);
5bc5bca2
MD
2006 if (state->txcmd & DMSGF_REPLY) {
2007 assert(msg->any.head.cmd & DMSGF_REPLY);
0c3a8cd0 2008 RB_REMOVE(dmsg_state_tree,
0d20ec8a 2009 &msg->circuit->staterd_tree, state);
78476205 2010 } else {
5bc5bca2 2011 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
0c3a8cd0 2012 RB_REMOVE(dmsg_state_tree,
0d20ec8a 2013 &msg->circuit->statewr_tree, state);
78476205 2014 }
0c3a8cd0
MD
2015 state->flags &= ~DMSG_STATE_INSERTED;
2016 dmsg_state_free(state);
78476205 2017 } else {
7dc0f844 2018 ;
78476205 2019 }
7dc0f844 2020 pthread_mutex_unlock(&iocom->mtx);
0c3a8cd0 2021 dmsg_msg_free(msg);
78476205 2022 } else if (state->msg != msg) {
0c3a8cd0 2023 dmsg_msg_free(msg);
78476205
MD
2024 }
2025}
2026
7dc0f844
MD
2027/*
2028 * Called with iocom locked
2029 */
78476205 2030void
0c3a8cd0 2031dmsg_state_free(dmsg_state_t *state)
78476205 2032{
0c3a8cd0 2033 dmsg_msg_t *msg;
7dc0f844 2034
0c3a8cd0 2035 if (DMsgDebugOpt) {
cf715800
MD
2036 fprintf(stderr, "terminate state %p id=%08x\n",
2037 state, (uint32_t)state->msgid);
81666e1b 2038 }
f306de83
MD
2039 if (state->any.any != NULL) /* XXX avoid deadlock w/exit & kernel */
2040 closefrom(3);
7dc0f844 2041 assert(state->any.any == NULL);
78476205
MD
2042 msg = state->msg;
2043 state->msg = NULL;
78476205 2044 if (msg)
0c3a8cd0 2045 dmsg_msg_free_locked(msg);
78476205
MD
2046 free(state);
2047}
2048
0d20ec8a
MD
2049/*
2050 * Called with iocom locked
90e8cd1d 2051 */
90e8cd1d 2052void
0d20ec8a 2053dmsg_circuit_drop(dmsg_circuit_t *circuit)
90e8cd1d 2054{
0d20ec8a
MD
2055 dmsg_iocom_t *iocom = circuit->iocom;
2056 char dummy;
90e8cd1d 2057
0d20ec8a
MD
2058 assert(circuit->refs > 0);
2059 assert(iocom);
90e8cd1d 2060
0d20ec8a
MD
2061 /*
2062 * Decrement circuit refs, destroy circuit when refs drops to 0.
2063 */
2064 if (--circuit->refs > 0)
2065 return;
90e8cd1d 2066
0d20ec8a
MD
2067 assert(RB_EMPTY(&circuit->staterd_tree));
2068 assert(RB_EMPTY(&circuit->statewr_tree));
2069 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2070 circuit->iocom = NULL;
2071 dmsg_free(circuit);
90e8cd1d 2072
0d20ec8a
MD
2073 /*
2074 * When an iocom error is present the rx code will terminate the
2075 * receive side for all transactions and (indirectly) all circuits
2076 * by simulating DELETE messages. The state and related circuits
2077 * don't disappear until the related states are closed in both
2078 * directions
2079 *
2080 * Detect the case where the last circuit is now gone (and thus all
2081 * states for all circuits are gone), and wakeup the rx thread to
2082 * complete the termination.
2083 */
2084 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2085 dummy = 0;
2086 write(iocom->wakeupfds[1], &dummy, 1);
2087 }
90e8cd1d 2088}
90e8cd1d 2089
0c3a8cd0
MD
2090/*
2091 * This swaps endian for a hammer2_msg_hdr. Note that the extended
2092 * header is not adjusted, just the core header.
2093 */
2094void
2095dmsg_bswap_head(dmsg_hdr_t *head)
81666e1b 2096{
0c3a8cd0
MD
2097 head->magic = bswap16(head->magic);
2098 head->reserved02 = bswap16(head->reserved02);
2099 head->salt = bswap32(head->salt);
2100
2101 head->msgid = bswap64(head->msgid);
0d20ec8a
MD
2102 head->circuit = bswap64(head->circuit);
2103 head->reserved18= bswap64(head->reserved18);
0c3a8cd0
MD
2104
2105 head->cmd = bswap32(head->cmd);
2106 head->aux_crc = bswap32(head->aux_crc);
2107 head->aux_bytes = bswap32(head->aux_bytes);
2108 head->error = bswap32(head->error);
2109 head->aux_descr = bswap64(head->aux_descr);
2110 head->reserved38= bswap32(head->reserved38);
2111 head->hdr_crc = bswap32(head->hdr_crc);
81666e1b 2112}