hammer2 - SPAN protocol work
[dragonfly.git] / sbin / hammer2 / msg.c
CommitLineData
9ab15106
MD
1/*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
36#include "hammer2.h"
37
78476205 38static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
78476205
MD
39static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
9ab15106 41/*
cf715800
MD
42 * Indexed messages are stored in a red-black tree indexed by their
43 * msgid. Only persistent messages are indexed.
44 */
45int
46hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
47{
48 if (state1->spanid < state2->spanid)
49 return(-1);
50 if (state1->spanid > state2->spanid)
51 return(1);
52 if (state1->msgid < state2->msgid)
53 return(-1);
54 if (state1->msgid > state2->msgid)
55 return(1);
56 return(0);
57}
58
59RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
60
61/*
9ab15106
MD
62 * Initialize a low-level ioq
63 */
64void
65hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
66{
67 bzero(ioq, sizeof(*ioq));
68 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
69 TAILQ_INIT(&ioq->msgq);
70}
71
7dc0f844
MD
72/*
73 * Cleanup queue.
74 *
75 * caller holds iocom->mtx.
76 */
9ab15106 77void
4a2e0eae 78hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
9ab15106
MD
79{
80 hammer2_msg_t *msg;
81
82 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
7dc0f844 83 assert(0); /* shouldn't happen */
78476205
MD
84 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
85 hammer2_msg_free(iocom, msg);
9ab15106
MD
86 }
87 if ((msg = ioq->msg) != NULL) {
88 ioq->msg = NULL;
78476205 89 hammer2_msg_free(iocom, msg);
9ab15106
MD
90 }
91}
92
93/*
5903497c
MD
94 * Initialize a low-level communications channel.
95 *
96 * NOTE: The state_func() is called at least once from the loop and can be
97 * re-armed via hammer2_iocom_restate().
9ab15106
MD
98 */
99void
5903497c
MD
100hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
101 void (*state_func)(hammer2_iocom_t *),
102 void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
103 void (*altmsg_func)(hammer2_iocom_t *))
9ab15106
MD
104{
105 bzero(iocom, sizeof(*iocom));
106
5903497c
MD
107 iocom->state_callback = state_func;
108 iocom->rcvmsg_callback = rcvmsg_func;
109 iocom->altmsg_callback = altmsg_func;
110
7dc0f844 111 pthread_mutex_init(&iocom->mtx, NULL);
78476205
MD
112 RB_INIT(&iocom->staterd_tree);
113 RB_INIT(&iocom->statewr_tree);
9ab15106
MD
114 TAILQ_INIT(&iocom->freeq);
115 TAILQ_INIT(&iocom->freeq_aux);
8c280d5d 116 TAILQ_INIT(&iocom->addrq);
7dc0f844 117 TAILQ_INIT(&iocom->txmsgq);
9ab15106
MD
118 iocom->sock_fd = sock_fd;
119 iocom->alt_fd = alt_fd;
7dc0f844 120 iocom->flags = HAMMER2_IOCOMF_RREQ;
5903497c
MD
121 if (state_func)
122 iocom->flags |= HAMMER2_IOCOMF_SWORK;
9ab15106
MD
123 hammer2_ioq_init(iocom, &iocom->ioq_rx);
124 hammer2_ioq_init(iocom, &iocom->ioq_tx);
7dc0f844
MD
125 if (pipe(iocom->wakeupfds) < 0)
126 assert(0);
127 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
128 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
9ab15106 129
62efe6ec
MD
130 /*
131 * Negotiate session crypto synchronously. This will mark the
132 * connection as error'd if it fails.
133 */
134 hammer2_crypto_negotiate(iocom);
135
136 /*
137 * Make sure our fds are set to non-blocking for the iocom core.
138 */
9ab15106
MD
139 if (sock_fd >= 0)
140 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
141#if 0
142 /* if line buffered our single fgets() should be fine */
143 if (alt_fd >= 0)
144 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
145#endif
146}
147
7dc0f844 148/*
5903497c
MD
149 * May only be called from a callback from iocom_core.
150 *
151 * Adjust state machine functions, set flags to guarantee that both
152 * the recevmsg_func and the sendmsg_func is called at least once.
153 */
154void
155hammer2_iocom_restate(hammer2_iocom_t *iocom,
156 void (*state_func)(hammer2_iocom_t *),
157 void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
158 void (*altmsg_func)(hammer2_iocom_t *))
159{
160 iocom->state_callback = state_func;
161 iocom->rcvmsg_callback = rcvmsg_func;
162 iocom->altmsg_callback = altmsg_func;
163 if (state_func)
164 iocom->flags |= HAMMER2_IOCOMF_SWORK;
165 else
166 iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
167}
168
169/*
7dc0f844
MD
170 * Cleanup a terminating iocom.
171 *
172 * Caller should not hold iocom->mtx. The iocom has already been disconnected
173 * from all possible references to it.
174 */
9ab15106
MD
175void
176hammer2_iocom_done(hammer2_iocom_t *iocom)
177{
178 hammer2_msg_t *msg;
179
7dc0f844
MD
180 if (iocom->sock_fd >= 0) {
181 close(iocom->sock_fd);
182 iocom->sock_fd = -1;
183 }
184 if (iocom->alt_fd >= 0) {
185 close(iocom->alt_fd);
186 iocom->alt_fd = -1;
187 }
9ab15106
MD
188 hammer2_ioq_done(iocom, &iocom->ioq_rx);
189 hammer2_ioq_done(iocom, &iocom->ioq_tx);
190 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
78476205 191 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106
MD
192 free(msg);
193 }
194 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
78476205 195 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
196 free(msg->aux_data);
197 msg->aux_data = NULL;
198 free(msg);
199 }
7dc0f844
MD
200 if (iocom->wakeupfds[0] >= 0) {
201 close(iocom->wakeupfds[0]);
202 iocom->wakeupfds[0] = -1;
203 }
204 if (iocom->wakeupfds[1] >= 0) {
205 close(iocom->wakeupfds[1]);
206 iocom->wakeupfds[1] = -1;
207 }
208 pthread_mutex_destroy(&iocom->mtx);
9ab15106
MD
209}
210
4a2e0eae
MD
211/*
212 * Allocate a new one-way message.
213 */
9ab15106 214hammer2_msg_t *
78476205 215hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
9ab15106
MD
216{
217 hammer2_msg_t *msg;
218 int hbytes;
219
7dc0f844 220 pthread_mutex_lock(&iocom->mtx);
9ab15106
MD
221 if (aux_size) {
222 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
223 ~HAMMER2_MSG_ALIGNMASK;
224 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
78476205 225 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
226 } else {
227 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
78476205 228 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106 229 }
7dc0f844 230 pthread_mutex_unlock(&iocom->mtx);
9ab15106
MD
231 if (msg == NULL) {
232 msg = malloc(sizeof(*msg));
78476205 233 bzero(msg, sizeof(*msg));
9ab15106
MD
234 msg->aux_data = NULL;
235 msg->aux_size = 0;
236 }
237 if (msg->aux_size != aux_size) {
238 if (msg->aux_data) {
239 free(msg->aux_data);
240 msg->aux_data = NULL;
241 msg->aux_size = 0;
242 }
243 if (aux_size) {
244 msg->aux_data = malloc(aux_size);
245 msg->aux_size = aux_size;
246 }
247 }
9ab15106 248 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
4a2e0eae
MD
249 if (hbytes)
250 bzero(&msg->any.head, hbytes);
78476205 251 msg->hdr_size = hbytes;
9ab15106 252 msg->any.head.cmd = cmd;
8c280d5d
MD
253 msg->any.head.aux_descr = 0;
254 msg->any.head.aux_crc = 0;
9ab15106
MD
255
256 return (msg);
257}
258
4a2e0eae 259/*
4a2e0eae
MD
260 * Free a message so it can be reused afresh.
261 *
262 * NOTE: aux_size can be 0 with a non-NULL aux_data.
263 */
7dc0f844 264static
9ab15106 265void
7dc0f844 266hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
9ab15106 267{
7dc0f844 268 msg->state = NULL;
9ab15106 269 if (msg->aux_data)
78476205 270 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
9ab15106 271 else
78476205 272 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
9ab15106
MD
273}
274
7dc0f844
MD
275void
276hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
277{
278 pthread_mutex_lock(&iocom->mtx);
279 hammer2_msg_free_locked(iocom, msg);
280 pthread_mutex_unlock(&iocom->mtx);
281}
282
9ab15106
MD
283/*
284 * I/O core loop for an iocom.
7dc0f844
MD
285 *
286 * Thread localized, iocom->mtx not held.
9ab15106
MD
287 */
288void
5903497c 289hammer2_iocom_core(hammer2_iocom_t *iocom)
9ab15106 290{
7dc0f844
MD
291 struct pollfd fds[3];
292 char dummybuf[256];
5903497c 293 hammer2_msg_t *msg;
4a2e0eae 294 int timeout;
7dc0f844
MD
295 int count;
296 int wi; /* wakeup pipe */
297 int si; /* socket */
298 int ai; /* alt bulk path socket */
9ab15106 299
9ab15106 300 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
7dc0f844
MD
301 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
302 HAMMER2_IOCOMF_WWORK |
303 HAMMER2_IOCOMF_PWORK |
5903497c 304 HAMMER2_IOCOMF_SWORK |
7dc0f844
MD
305 HAMMER2_IOCOMF_ARWORK |
306 HAMMER2_IOCOMF_AWWORK)) == 0) {
307 /*
308 * Only poll if no immediate work is pending.
309 * Otherwise we are just wasting our time calling
310 * poll.
311 */
312 timeout = 5000;
4a2e0eae 313
7dc0f844
MD
314 count = 0;
315 wi = -1;
316 si = -1;
317 ai = -1;
9ab15106 318
7dc0f844
MD
319 /*
320 * Always check the inter-thread pipe, e.g.
321 * for iocom->txmsgq work.
322 */
323 wi = count++;
324 fds[wi].fd = iocom->wakeupfds[0];
325 fds[wi].events = POLLIN;
326 fds[wi].revents = 0;
327
328 /*
329 * Check the socket input/output direction as
330 * requested
331 */
332 if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
333 HAMMER2_IOCOMF_WREQ)) {
334 si = count++;
335 fds[si].fd = iocom->sock_fd;
336 fds[si].events = 0;
337 fds[si].revents = 0;
338
339 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
340 fds[si].events |= POLLIN;
341 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
342 fds[si].events |= POLLOUT;
343 }
9ab15106 344
7dc0f844
MD
345 /*
346 * Check the alternative fd for work.
347 */
348 if (iocom->alt_fd >= 0) {
349 ai = count++;
350 fds[ai].fd = iocom->alt_fd;
351 fds[ai].events = POLLIN;
352 fds[ai].revents = 0;
353 }
354 poll(fds, count, timeout);
355
356 if (wi >= 0 && (fds[wi].revents & POLLIN))
357 iocom->flags |= HAMMER2_IOCOMF_PWORK;
358 if (si >= 0 && (fds[si].revents & POLLIN))
359 iocom->flags |= HAMMER2_IOCOMF_RWORK;
360 if (si >= 0 && (fds[si].revents & POLLOUT))
361 iocom->flags |= HAMMER2_IOCOMF_WWORK;
362 if (wi >= 0 && (fds[wi].revents & POLLOUT))
363 iocom->flags |= HAMMER2_IOCOMF_WWORK;
364 if (ai >= 0 && (fds[ai].revents & POLLIN))
365 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
9ab15106 366 } else {
7dc0f844
MD
367 /*
368 * Always check the pipe
369 */
370 iocom->flags |= HAMMER2_IOCOMF_PWORK;
9ab15106 371 }
7dc0f844 372
5903497c
MD
373 if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
374 iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
375 iocom->state_callback(iocom);
376 }
377
7dc0f844
MD
378 /*
379 * Pending message queues from other threads wake us up
380 * with a write to the wakeupfds[] pipe. We have to clear
381 * the pipe with a dummy read.
382 */
383 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
384 iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
385 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
386 iocom->flags |= HAMMER2_IOCOMF_RWORK;
387 iocom->flags |= HAMMER2_IOCOMF_WWORK;
388 if (TAILQ_FIRST(&iocom->txmsgq))
5903497c 389 hammer2_iocom_flush1(iocom);
9ab15106 390 }
7dc0f844
MD
391
392 /*
393 * Message write sequencing
394 */
395 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
5903497c 396 hammer2_iocom_flush1(iocom);
7dc0f844
MD
397
398 /*
399 * Message read sequencing. Run this after the write
400 * sequencing in case the write sequencing allowed another
401 * auto-DELETE to occur on the read side.
402 */
5903497c
MD
403 if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
404 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
405 (msg = hammer2_ioq_read(iocom)) != NULL) {
81666e1b
MD
406 if (DebugOpt) {
407 fprintf(stderr, "receive %s\n",
408 hammer2_msg_str(msg));
409 }
5903497c
MD
410 iocom->rcvmsg_callback(iocom, msg);
411 hammer2_state_cleanuprx(iocom, msg);
412 }
413 }
7dc0f844 414
02454b3e
MD
415 if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
416 iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
9ab15106 417 iocom->altmsg_callback(iocom);
02454b3e 418 }
9ab15106
MD
419 }
420}
421
422/*
423 * Read the next ready message from the ioq, issuing I/O if needed.
424 * Caller should retry on a read-event when NULL is returned.
425 *
426 * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
1b195a98
MD
427 * be returned for each open transaction, then the ioq and iocom
428 * will be errored out and a non-transactional HAMMER2_LNK_ERROR
429 * msg will be returned as the final message. The caller should not call
430 * us again after the final message is returned.
7dc0f844
MD
431 *
432 * Thread localized, iocom->mtx not held.
9ab15106
MD
433 */
434hammer2_msg_t *
435hammer2_ioq_read(hammer2_iocom_t *iocom)
436{
437 hammer2_ioq_t *ioq = &iocom->ioq_rx;
438 hammer2_msg_t *msg;
439 hammer2_msg_hdr_t *head;
1b195a98 440 hammer2_state_t *state;
9ab15106 441 ssize_t n;
78476205
MD
442 size_t bytes;
443 size_t nmax;
9ab15106 444 uint32_t xcrc32;
78476205 445 int error;
9ab15106 446
78476205 447again:
7dc0f844
MD
448 iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
449
9ab15106
MD
450 /*
451 * If a message is already pending we can just remove and
78476205 452 * return it. Message state has already been processed.
9ab15106
MD
453 */
454 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205
MD
455 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
456 return (msg);
9ab15106
MD
457 }
458
cf715800
MD
459 if (ioq->error)
460 goto skip;
461
9ab15106
MD
462 /*
463 * Message read in-progress (msg is NULL at the moment). We don't
464 * allocate a msg until we have its core header.
465 */
466 bytes = ioq->fifo_end - ioq->fifo_beg;
5cf97ec5 467 nmax = sizeof(ioq->buf) - ioq->fifo_end;
9ab15106
MD
468 msg = ioq->msg;
469
470 switch(ioq->state) {
471 case HAMMER2_MSGQ_STATE_HEADER1:
472 /*
473 * Load the primary header, fail on any non-trivial read
474 * error or on EOF. Since the primary header is the same
475 * size is the message alignment it will never straddle
476 * the end of the buffer.
477 */
478 if (bytes < (int)sizeof(msg->any.head)) {
479 n = read(iocom->sock_fd,
5cf97ec5 480 ioq->buf + ioq->fifo_end,
9ab15106
MD
481 nmax);
482 if (n <= 0) {
483 if (n == 0) {
484 ioq->error = HAMMER2_IOQ_ERROR_EOF;
485 break;
486 }
487 if (errno != EINTR &&
488 errno != EINPROGRESS &&
489 errno != EAGAIN) {
490 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
491 break;
492 }
493 n = 0;
494 /* fall through */
495 }
496 ioq->fifo_end += n;
497 bytes += n;
498 nmax -= n;
499 }
500
501 /*
502 * Insufficient data accumulated (msg is NULL, caller will
503 * retry on event).
504 */
505 assert(msg == NULL);
506 if (bytes < (int)sizeof(msg->any.head))
507 break;
508
9ab15106 509 /*
5cf97ec5
MD
510 * Calculate the header, decrypt data received so far.
511 * Data will be decrypted in-place. Partial blocks are
512 * not immediately decrypted.
8c280d5d
MD
513 *
514 * WARNING! The header might be in the wrong endian, we
515 * do not fix it up until we get the entire
516 * extended header.
9ab15106 517 */
5cf97ec5 518 hammer2_crypto_decrypt(iocom, ioq);
5cf97ec5 519 head = (void *)(ioq->buf + ioq->fifo_beg);
9ab15106
MD
520
521 /*
522 * Check and fixup the core header. Note that the icrc
523 * has to be calculated before any fixups, but the crc
524 * fields in the msg may have to be swapped like everything
525 * else.
526 */
527 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
528 head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
529 ioq->error = HAMMER2_IOQ_ERROR_SYNC;
530 break;
531 }
532
9ab15106
MD
533 /*
534 * Calculate the full header size and aux data size
535 */
8c280d5d
MD
536 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
537 ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
538 HAMMER2_MSG_ALIGN;
539 ioq->abytes = bswap32(head->aux_bytes) *
540 HAMMER2_MSG_ALIGN;
541 } else {
542 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
543 HAMMER2_MSG_ALIGN;
544 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
545 }
78476205
MD
546 if (ioq->hbytes < sizeof(msg->any.head) ||
547 ioq->hbytes > sizeof(msg->any) ||
9ab15106
MD
548 ioq->abytes > HAMMER2_MSGAUX_MAX) {
549 ioq->error = HAMMER2_IOQ_ERROR_FIELD;
550 break;
551 }
552
553 /*
02454b3e 554 * Allocate the message, the next state will fill it in.
9ab15106 555 */
78476205 556 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
9ab15106
MD
557 ioq->msg = msg;
558
559 /*
9ab15106
MD
560 * Fall through to the next state. Make sure that the
561 * extended header does not straddle the end of the buffer.
562 * We still want to issue larger reads into our buffer,
563 * book-keeping is easier if we don't bcopy() yet.
564 */
565 if (bytes + nmax < ioq->hbytes) {
5cf97ec5
MD
566 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
567 ioq->fifo_cdx -= ioq->fifo_beg;
9ab15106
MD
568 ioq->fifo_beg = 0;
569 ioq->fifo_end = bytes;
5cf97ec5 570 nmax = sizeof(ioq->buf) - ioq->fifo_end;
9ab15106
MD
571 }
572 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
573 /* fall through */
574 case HAMMER2_MSGQ_STATE_HEADER2:
575 /*
576 * Fill out the extended header.
577 */
578 assert(msg != NULL);
579 if (bytes < ioq->hbytes) {
580 n = read(iocom->sock_fd,
02454b3e 581 ioq->buf + ioq->fifo_end,
9ab15106
MD
582 nmax);
583 if (n <= 0) {
584 if (n == 0) {
585 ioq->error = HAMMER2_IOQ_ERROR_EOF;
586 break;
587 }
588 if (errno != EINTR &&
589 errno != EINPROGRESS &&
590 errno != EAGAIN) {
591 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
592 break;
593 }
594 n = 0;
595 /* fall through */
596 }
597 ioq->fifo_end += n;
598 bytes += n;
599 nmax -= n;
600 }
601
602 /*
603 * Insufficient data accumulated (set msg NULL so caller will
604 * retry on event).
605 */
606 if (bytes < ioq->hbytes) {
607 msg = NULL;
608 break;
609 }
610
611 /*
5cf97ec5 612 * Calculate the extended header, decrypt data received
8c280d5d
MD
613 * so far. Handle endian-conversion for the entire extended
614 * header.
9ab15106 615 */
5cf97ec5
MD
616 hammer2_crypto_decrypt(iocom, ioq);
617 head = (void *)(ioq->buf + ioq->fifo_beg);
9ab15106
MD
618
619 /*
8c280d5d 620 * Check the CRC.
9ab15106 621 */
8c280d5d
MD
622 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
623 xcrc32 = bswap32(head->hdr_crc);
624 else
625 xcrc32 = head->hdr_crc;
626 head->hdr_crc = 0;
627 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
628 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
81666e1b
MD
629 fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
630 xcrc32, hammer2_icrc32(head, ioq->hbytes),
631 hammer2_msg_str(msg));
02454b3e 632 assert(0);
8c280d5d
MD
633 break;
634 }
635 head->hdr_crc = xcrc32;
636
637 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
638 hammer2_bswap_head(head);
9ab15106
MD
639 }
640
641 /*
642 * Copy the extended header into the msg and adjust the
643 * FIFO.
644 */
645 bcopy(head, &msg->any, ioq->hbytes);
646
647 /*
648 * We are either done or we fall-through.
649 */
650 if (ioq->abytes == 0) {
651 ioq->fifo_beg += ioq->hbytes;
652 break;
653 }
654
655 /*
656 * Must adjust nmax and bytes (and the state) when falling
657 * through.
658 */
659 ioq->fifo_beg += ioq->hbytes;
660 nmax -= ioq->hbytes;
661 bytes -= ioq->hbytes;
662 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
663 /* fall through */
664 case HAMMER2_MSGQ_STATE_AUXDATA1:
665 /*
666 * Copy the partial or complete payload from remaining
667 * bytes in the FIFO. We have to fall-through either
668 * way so we can check the crc.
78476205
MD
669 *
670 * Adjust msg->aux_size to the final actual value.
9ab15106 671 */
5cf97ec5
MD
672 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
673 if (ioq->already > ioq->abytes)
674 ioq->already = ioq->abytes;
9ab15106 675 if (bytes >= ioq->abytes) {
5cf97ec5 676 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
677 ioq->abytes);
678 msg->aux_size = ioq->abytes;
679 ioq->fifo_beg += ioq->abytes;
5cf97ec5
MD
680 if (ioq->fifo_cdx < ioq->fifo_beg)
681 ioq->fifo_cdx = ioq->fifo_beg;
9ab15106
MD
682 bytes -= ioq->abytes;
683 } else if (bytes) {
5cf97ec5 684 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
685 bytes);
686 msg->aux_size = bytes;
687 ioq->fifo_beg += bytes;
5cf97ec5
MD
688 if (ioq->fifo_cdx < ioq->fifo_beg)
689 ioq->fifo_cdx = ioq->fifo_beg;
9ab15106 690 bytes = 0;
78476205
MD
691 } else {
692 msg->aux_size = 0;
9ab15106
MD
693 }
694 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
695 /* fall through */
696 case HAMMER2_MSGQ_STATE_AUXDATA2:
697 /*
698 * Read the remainder of the payload directly into the
699 * msg->aux_data buffer.
700 */
701 assert(msg);
702 if (msg->aux_size < ioq->abytes) {
703 assert(bytes == 0);
704 n = read(iocom->sock_fd,
705 msg->aux_data + msg->aux_size,
706 ioq->abytes - msg->aux_size);
707 if (n <= 0) {
708 if (n == 0) {
709 ioq->error = HAMMER2_IOQ_ERROR_EOF;
710 break;
711 }
712 if (errno != EINTR &&
713 errno != EINPROGRESS &&
714 errno != EAGAIN) {
715 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
716 break;
717 }
718 n = 0;
719 /* fall through */
720 }
721 msg->aux_size += n;
722 }
723
724 /*
725 * Insufficient data accumulated (set msg NULL so caller will
726 * retry on event).
727 */
728 if (msg->aux_size < ioq->abytes) {
729 msg = NULL;
730 break;
731 }
732 assert(msg->aux_size == ioq->abytes);
5cf97ec5 733 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
9ab15106
MD
734
735 /*
8c280d5d 736 * Check aux_crc, then we are done.
9ab15106
MD
737 */
738 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
8c280d5d 739 if (xcrc32 != msg->any.head.aux_crc) {
9ab15106
MD
740 ioq->error = HAMMER2_IOQ_ERROR_ACRC;
741 break;
742 }
743 break;
744 case HAMMER2_MSGQ_STATE_ERROR:
1b195a98
MD
745 /*
746 * Continued calls to drain recorded transactions (returning
747 * a LNK_ERROR for each one), before we return the final
748 * LNK_ERROR.
749 */
750 assert(msg == NULL);
751 break;
9ab15106
MD
752 default:
753 /*
754 * We don't double-return errors, the caller should not
755 * have called us again after getting an error msg.
756 */
757 assert(0);
758 break;
759 }
760
761 /*
5cf97ec5
MD
762 * Check the message sequence. The iv[] should prevent any
763 * possibility of a replay but we add this check anyway.
764 */
765 if (msg && ioq->error == 0) {
766 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
767 ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
768 } else {
769 ++ioq->seq;
770 }
771 }
772
773 /*
78476205
MD
774 * Process transactional state for the message.
775 */
776 if (msg && ioq->error == 0) {
777 error = hammer2_state_msgrx(iocom, msg);
778 if (error) {
779 if (error == HAMMER2_IOQ_ERROR_EALREADY) {
780 hammer2_msg_free(iocom, msg);
781 goto again;
782 }
783 ioq->error = error;
784 }
785 }
786
787 /*
9ab15106
MD
788 * Handle error, RREQ, or completion
789 *
790 * NOTE: nmax and bytes are invalid at this point, we don't bother
791 * to update them when breaking out.
792 */
793 if (ioq->error) {
cf715800 794skip:
9ab15106 795 /*
1b195a98
MD
796 * An unrecoverable error causes all active receive
797 * transactions to be terminated with a LNK_ERROR message.
9ab15106 798 *
1b195a98
MD
799 * Once all active transactions are exhausted we set the
800 * iocom ERROR flag and return a non-transactional LNK_ERROR
801 * message, which should cause master processing loops to
802 * terminate.
9ab15106 803 */
4a2e0eae 804 assert(ioq->msg == msg);
1b195a98
MD
805 if (msg) {
806 hammer2_msg_free(iocom, msg);
9ab15106 807 ioq->msg = NULL;
9ab15106 808 }
1b195a98
MD
809
810 /*
811 * No more I/O read processing
812 */
813 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
814
815 /*
7dc0f844
MD
816 * Simulate a remote LNK_ERROR DELETE msg for any open
817 * transactions, ending with a final non-transactional
818 * LNK_ERROR (that the session can detect) when no
819 * transactions remain.
1b195a98
MD
820 */
821 msg = hammer2_msg_alloc(iocom, 0, 0);
9ab15106
MD
822 bzero(&msg->any.head, sizeof(msg->any.head));
823 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
824 msg->any.head.cmd = HAMMER2_LNK_ERROR;
825 msg->any.head.error = ioq->error;
1b195a98 826
7dc0f844 827 pthread_mutex_lock(&iocom->mtx);
cf715800 828 hammer2_iocom_drain(iocom);
1b195a98
MD
829 if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
830 /*
7dc0f844
MD
831 * Active remote transactions are still present.
832 * Simulate the other end sending us a DELETE.
1b195a98 833 */
7dc0f844 834 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
7dc0f844
MD
835 hammer2_msg_free(iocom, msg);
836 msg = NULL;
837 } else {
7dc0f844
MD
838 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
839 msg->state = state;
840 msg->any.head.spanid = state->spanid;
841 msg->any.head.msgid = state->msgid;
842 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
843 HAMMER2_MSGF_DELETE;
844 }
845 } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
846 /*
847 * Active local transactions are still present.
848 * Simulate the other end sending us a DELETE.
849 */
850 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
7dc0f844
MD
851 hammer2_msg_free(iocom, msg);
852 msg = NULL;
853 } else {
7dc0f844
MD
854 msg->state = state;
855 msg->any.head.spanid = state->spanid;
856 msg->any.head.msgid = state->msgid;
857 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
858 HAMMER2_MSGF_DELETE |
859 HAMMER2_MSGF_REPLY;
860 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
861 msg->any.head.cmd |=
862 HAMMER2_MSGF_CREATE;
863 }
864 }
1b195a98
MD
865 } else {
866 /*
7dc0f844
MD
867 * No active local or remote transactions remain.
868 * Generate a final LNK_ERROR and flag EOF.
1b195a98
MD
869 */
870 msg->state = NULL;
871 iocom->flags |= HAMMER2_IOCOMF_EOF;
81666e1b 872 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1b195a98 873 }
7dc0f844
MD
874 pthread_mutex_unlock(&iocom->mtx);
875
876 /*
877 * For the iocom error case we want to set RWORK to indicate
878 * that more messages might be pending.
879 *
880 * It is possible to return NULL when there is more work to
881 * do because each message has to be DELETEd in both
882 * directions before we continue on with the next (though
883 * this could be optimized). The transmit direction will
884 * re-set RWORK.
885 */
886 if (msg)
887 iocom->flags |= HAMMER2_IOCOMF_RWORK;
9ab15106
MD
888 } else if (msg == NULL) {
889 /*
890 * Insufficient data received to finish building the message,
891 * set RREQ and return NULL.
892 *
893 * Leave ioq->msg intact.
894 * Leave the FIFO intact.
895 */
896 iocom->flags |= HAMMER2_IOCOMF_RREQ;
9ab15106
MD
897 } else {
898 /*
7dc0f844 899 * Return msg.
9ab15106
MD
900 *
901 * The fifo has already been advanced past the message.
902 * Trivially reset the FIFO indices if possible.
7dc0f844
MD
903 *
904 * clear the FIFO if it is now empty and set RREQ to wait
905 * for more from the socket. If the FIFO is not empty set
906 * TWORK to bypass the poll so we loop immediately.
9ab15106
MD
907 */
908 if (ioq->fifo_beg == ioq->fifo_end) {
909 iocom->flags |= HAMMER2_IOCOMF_RREQ;
5cf97ec5 910 ioq->fifo_cdx = 0;
9ab15106
MD
911 ioq->fifo_beg = 0;
912 ioq->fifo_end = 0;
913 } else {
7dc0f844 914 iocom->flags |= HAMMER2_IOCOMF_RWORK;
9ab15106
MD
915 }
916 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
917 ioq->msg = NULL;
918 }
919 return (msg);
920}
921
922/*
923 * Calculate the header and data crc's and write a low-level message to
8c280d5d 924 * the connection. If aux_crc is non-zero the aux_data crc is already
9ab15106
MD
925 * assumed to have been set.
926 *
927 * A non-NULL msg is added to the queue but not necessarily flushed.
928 * Calling this function with msg == NULL will get a flush going.
7dc0f844
MD
929 *
930 * Caller must hold iocom->mtx.
9ab15106 931 */
7dc0f844
MD
932void
933hammer2_iocom_flush1(hammer2_iocom_t *iocom)
9ab15106
MD
934{
935 hammer2_ioq_t *ioq = &iocom->ioq_tx;
7dc0f844 936 hammer2_msg_t *msg;
9ab15106 937 uint32_t xcrc32;
4a2e0eae 938 int hbytes;
7dc0f844
MD
939 hammer2_msg_queue_t tmpq;
940
941 iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
942 TAILQ_INIT(&tmpq);
943 pthread_mutex_lock(&iocom->mtx);
944 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
945 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
946 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
9ab15106 947 }
7dc0f844 948 pthread_mutex_unlock(&iocom->mtx);
9ab15106 949
7dc0f844
MD
950 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
951 /*
952 * Process terminal connection errors.
953 */
954 TAILQ_REMOVE(&tmpq, msg, qentry);
955 if (ioq->error) {
956 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
957 ++ioq->msgcount;
958 continue;
959 }
9ab15106 960
7dc0f844
MD
961 /*
962 * Finish populating the msg fields. The salt ensures that
963 * the iv[] array is ridiculously randomized and we also
964 * re-seed our PRNG every 32768 messages just to be sure.
965 */
966 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
967 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
968 ++ioq->seq;
969 if ((ioq->seq & 32767) == 0)
970 srandomdev();
9ab15106 971
7dc0f844
MD
972 /*
973 * Calculate aux_crc if 0, then calculate hdr_crc.
974 */
975 if (msg->aux_size && msg->any.head.aux_crc == 0) {
976 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
977 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
978 msg->any.head.aux_crc = xcrc32;
979 }
980 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
981 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
9ab15106 982
7dc0f844
MD
983 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
984 HAMMER2_MSG_ALIGN;
985 msg->any.head.hdr_crc = 0;
986 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
9ab15106 987
7dc0f844
MD
988 /*
989 * Enqueue the message (the flush codes handles stream
990 * encryption).
991 */
992 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
993 ++ioq->msgcount;
994 }
995 hammer2_iocom_flush2(iocom);
4a2e0eae
MD
996}
997
7dc0f844
MD
998/*
999 * Thread localized, iocom->mtx not held by caller.
1000 */
4a2e0eae 1001void
7dc0f844 1002hammer2_iocom_flush2(hammer2_iocom_t *iocom)
4a2e0eae
MD
1003{
1004 hammer2_ioq_t *ioq = &iocom->ioq_tx;
1005 hammer2_msg_t *msg;
1006 ssize_t nmax;
02454b3e 1007 ssize_t omax;
4a2e0eae
MD
1008 ssize_t nact;
1009 struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
78476205
MD
1010 size_t hbytes;
1011 size_t abytes;
02454b3e
MD
1012 size_t hoff;
1013 size_t aoff;
4a2e0eae 1014 int n;
9ab15106 1015
7dc0f844
MD
1016 if (ioq->error) {
1017 hammer2_iocom_drain(iocom);
1018 return;
1019 }
1020
9ab15106
MD
1021 /*
1022 * Pump messages out the connection by building an iovec.
02454b3e
MD
1023 *
1024 * ioq->hbytes/ioq->abytes tracks how much of the first message
1025 * in the queue has been successfully written out, so we can
1026 * resume writing.
9ab15106
MD
1027 */
1028 n = 0;
1029 nmax = 0;
02454b3e
MD
1030 hoff = ioq->hbytes;
1031 aoff = ioq->abytes;
9ab15106 1032
78476205 1033 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
9ab15106
MD
1034 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1035 HAMMER2_MSG_ALIGN;
9ab15106 1036 abytes = msg->aux_size;
02454b3e
MD
1037 assert(hoff <= hbytes && aoff <= abytes);
1038
1039 if (hoff < hbytes) {
9ab15106
MD
1040 iov[n].iov_base = (char *)&msg->any.head + hoff;
1041 iov[n].iov_len = hbytes - hoff;
1042 nmax += hbytes - hoff;
1043 ++n;
1044 if (n == HAMMER2_IOQ_MAXIOVEC)
1045 break;
1046 }
02454b3e 1047 if (aoff < abytes) {
9ab15106 1048 assert(msg->aux_data != NULL);
02454b3e 1049 iov[n].iov_base = (char *)msg->aux_data + aoff;
9ab15106
MD
1050 iov[n].iov_len = abytes - aoff;
1051 nmax += abytes - aoff;
1052 ++n;
1053 if (n == HAMMER2_IOQ_MAXIOVEC)
1054 break;
1055 }
02454b3e
MD
1056 hoff = 0;
1057 aoff = 0;
9ab15106
MD
1058 }
1059 if (n == 0)
1060 return;
1061
1062 /*
5cf97ec5
MD
1063 * Encrypt and write the data. The crypto code will move the
1064 * data into the fifo and adjust the iov as necessary. If
1065 * encryption is disabled the iov is left alone.
1066 *
02454b3e
MD
1067 * May return a smaller iov (thus a smaller n), with aggregated
1068 * chunks. May reduce nmax to what fits in the FIFO.
5cf97ec5 1069 */
02454b3e
MD
1070 omax = nmax;
1071 n = hammer2_crypto_encrypt(iocom, ioq, iov, n, &nmax);
5cf97ec5
MD
1072
1073 /*
9ab15106
MD
1074 * Execute the writev() then figure out what happened.
1075 */
1076 nact = writev(iocom->sock_fd, iov, n);
1077 if (nact < 0) {
1078 if (errno != EINTR &&
1079 errno != EINPROGRESS &&
1080 errno != EAGAIN) {
7dc0f844
MD
1081 /*
1082 * Fatal write error
1083 */
9ab15106 1084 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
4a2e0eae 1085 hammer2_iocom_drain(iocom);
9ab15106 1086 } else {
7dc0f844
MD
1087 /*
1088 * Wait for socket buffer space
1089 */
9ab15106
MD
1090 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1091 }
1092 return;
1093 }
7dc0f844
MD
1094
1095 /*
02454b3e
MD
1096 * Indicate bytes written successfully.
1097 *
1098 * If we were unable to write the entire iov array then set WREQ
1099 * to wait for more socket buffer space.
1100 *
1101 * If the FIFO space was insufficient to fully drain all messages
1102 * set WWORK to cause the core to call us again for the next batch.
7dc0f844 1103 */
5cf97ec5 1104 hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
7dc0f844 1105 if (nact != nmax)
9ab15106
MD
1106 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1107
7dc0f844
MD
1108 /*
1109 * Clean out the transmit queue based on what we successfully
02454b3e
MD
1110 * sent. ioq->hbytes/abytes represents the portion of the first
1111 * message previously sent.
7dc0f844 1112 */
9ab15106
MD
1113 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1114 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1115 HAMMER2_MSG_ALIGN;
1116 abytes = msg->aux_size;
1117
78476205 1118 if ((size_t)nact < hbytes - ioq->hbytes) {
9ab15106 1119 ioq->hbytes += nact;
02454b3e 1120 /* nact = 0; */
9ab15106
MD
1121 break;
1122 }
1123 nact -= hbytes - ioq->hbytes;
1124 ioq->hbytes = hbytes;
78476205 1125 if ((size_t)nact < abytes - ioq->abytes) {
9ab15106 1126 ioq->abytes += nact;
02454b3e 1127 /* nact = 0; */
9ab15106
MD
1128 break;
1129 }
1130 nact -= abytes - ioq->abytes;
1131
78476205 1132 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106
MD
1133 --ioq->msgcount;
1134 ioq->hbytes = 0;
1135 ioq->abytes = 0;
78476205
MD
1136
1137 hammer2_state_cleanuptx(iocom, msg);
9ab15106 1138 }
81666e1b
MD
1139
1140 /*
1141 * If more messages are pending on WREQ wasn't set we must
1142 * ensure that WWORK gets set.
1143 */
1144 if (msg && (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0)
1145 iocom->flags |= HAMMER2_IOCOMF_WWORK;
02454b3e 1146 assert(nact == 0);
9ab15106 1147 if (ioq->error) {
7dc0f844 1148 hammer2_iocom_drain(iocom);
9ab15106
MD
1149 }
1150}
1151
1152/*
1153 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1154 * write events will occur. We don't kill read msgs because we want
1155 * the caller to pull off our contrived terminal error msg to detect
1156 * the connection failure.
7dc0f844
MD
1157 *
1158 * Thread localized, iocom->mtx not held by caller.
9ab15106
MD
1159 */
1160void
4a2e0eae 1161hammer2_iocom_drain(hammer2_iocom_t *iocom)
9ab15106
MD
1162{
1163 hammer2_ioq_t *ioq = &iocom->ioq_tx;
1164 hammer2_msg_t *msg;
1165
7dc0f844 1166 iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
02454b3e
MD
1167 ioq->hbytes = 0;
1168 ioq->abytes = 0;
7dc0f844 1169
9ab15106 1170 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205 1171 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106 1172 --ioq->msgcount;
7dc0f844 1173 hammer2_state_cleanuptx(iocom, msg);
9ab15106 1174 }
9ab15106
MD
1175}
1176
1177/*
8c280d5d 1178 * Write a message to an iocom, with additional state processing.
78476205
MD
1179 */
1180void
8c280d5d
MD
1181hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
1182 void (*func)(hammer2_state_t *, hammer2_msg_t *),
7dc0f844
MD
1183 void *data,
1184 hammer2_state_t **statep)
78476205 1185{
8c280d5d 1186 hammer2_state_t *state;
7dc0f844 1187 char dummy;
78476205 1188
8c280d5d
MD
1189 /*
1190 * Handle state processing, create state if necessary.
1191 */
7dc0f844 1192 pthread_mutex_lock(&iocom->mtx);
8c280d5d 1193 if ((state = msg->state) != NULL) {
78476205 1194 /*
8c280d5d
MD
1195 * Existing transaction (could be reply). It is also
1196 * possible for this to be the first reply (CREATE is set),
1197 * in which case we populate state->txcmd.
78476205 1198 */
8c280d5d
MD
1199 msg->any.head.msgid = state->msgid;
1200 msg->any.head.spanid = state->spanid;
1201 if (func) {
1202 state->func = func;
1203 state->any.any = data;
78476205 1204 }
7dc0f844
MD
1205 assert(((state->txcmd ^ msg->any.head.cmd) &
1206 HAMMER2_MSGF_REPLY) == 0);
8c280d5d
MD
1207 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1208 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
8c280d5d 1209 } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
8c280d5d
MD
1210 /*
1211 * No existing state and CREATE is set, create new
1212 * state for outgoing command. This can't happen if
1213 * REPLY is set as the state would already exist for
1214 * a transaction reply.
1215 */
1216 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1217
1218 state = malloc(sizeof(*state));
1219 bzero(state, sizeof(*state));
1220 state->iocom = iocom;
1221 state->flags = HAMMER2_STATE_DYNAMIC;
1222 state->msg = msg;
1223 state->msgid = (uint64_t)(uintptr_t)state;
1224 state->spanid = msg->any.head.spanid;
1225 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
7dc0f844 1226 state->rxcmd = HAMMER2_MSGF_REPLY;
8c280d5d
MD
1227 state->func = func;
1228 state->any.any = data;
cf715800 1229 pthread_mutex_lock(&iocom->mtx);
8c280d5d 1230 RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
cf715800 1231 pthread_mutex_unlock(&iocom->mtx);
8c280d5d
MD
1232 state->flags |= HAMMER2_STATE_INSERTED;
1233 msg->state = state;
1234 msg->any.head.msgid = state->msgid;
1235 /* spanid set by caller */
78476205 1236 } else {
8c280d5d
MD
1237 msg->any.head.msgid = 0;
1238 /* spanid set by caller */
1239 }
1240
7dc0f844
MD
1241 if (statep)
1242 *statep = state;
1243
8c280d5d 1244 /*
7dc0f844
MD
1245 * Queue it for output, wake up the I/O pthread. Note that the
1246 * I/O thread is responsible for generating the CRCs and encryption.
8c280d5d 1247 */
7dc0f844
MD
1248 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1249 dummy = 0;
1250 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
1251 pthread_mutex_unlock(&iocom->mtx);
8c280d5d
MD
1252}
1253
8c280d5d
MD
1254/*
1255 * This is a shortcut to formulate a reply to msg with a simple error code,
1256 * It can reply to and terminate a transaction, or it can reply to a one-way
1257 * messages. A HAMMER2_LNK_ERROR command code is utilized to encode
1258 * the error code (which can be 0). Not all transactions are terminated
1259 * with HAMMER2_LNK_ERROR status (the low level only cares about the
1260 * MSGF_DELETE flag), but most are.
1261 *
1262 * Replies to one-way messages are a bit of an oxymoron but the feature
1263 * is used by the debug (DBG) protocol.
1264 *
1265 * The reply contains no extended data.
1266 */
1267void
1268hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1269{
1270 hammer2_state_t *state = msg->state;
1271 hammer2_msg_t *nmsg;
1272 uint32_t cmd;
1273
1274
1275 /*
1276 * Reply with a simple error code and terminate the transaction.
1277 */
1278 cmd = HAMMER2_LNK_ERROR;
1279
1280 /*
1281 * Check if our direction has even been initiated yet, set CREATE.
1282 *
1283 * Check what direction this is (command or reply direction). Note
1284 * that txcmd might not have been initiated yet.
1285 *
1286 * If our direction has already been closed we just return without
1287 * doing anything.
1288 */
1289 if (state) {
1290 if (state->txcmd & HAMMER2_MSGF_DELETE)
1291 return;
1292 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1293 cmd |= HAMMER2_MSGF_CREATE;
7dc0f844 1294 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
1295 cmd |= HAMMER2_MSGF_REPLY;
1296 cmd |= HAMMER2_MSGF_DELETE;
1297 } else {
1298 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1299 cmd |= HAMMER2_MSGF_REPLY;
78476205 1300 }
8c280d5d 1301
78476205
MD
1302 nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1303 nmsg->any.head.error = error;
8c280d5d 1304 nmsg->state = msg->state;
7dc0f844 1305 hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
8c280d5d
MD
1306}
1307
1308/*
1309 * Similar to hammer2_msg_reply() but leave the transaction open. That is,
1310 * we are generating a streaming reply or an intermediate acknowledgement
1311 * of some sort as part of the higher level protocol, with more to come
1312 * later.
1313 */
1314void
1315hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1316{
1317 hammer2_state_t *state = msg->state;
1318 hammer2_msg_t *nmsg;
1319 uint32_t cmd;
1320
1321
1322 /*
1323 * Reply with a simple error code and terminate the transaction.
1324 */
1325 cmd = HAMMER2_LNK_ERROR;
1326
1327 /*
1328 * Check if our direction has even been initiated yet, set CREATE.
1329 *
1330 * Check what direction this is (command or reply direction). Note
1331 * that txcmd might not have been initiated yet.
1332 *
1333 * If our direction has already been closed we just return without
1334 * doing anything.
1335 */
1336 if (state) {
1337 if (state->txcmd & HAMMER2_MSGF_DELETE)
1338 return;
1339 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1340 cmd |= HAMMER2_MSGF_CREATE;
7dc0f844 1341 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
1342 cmd |= HAMMER2_MSGF_REPLY;
1343 /* continuing transaction, do not set MSGF_DELETE */
1344 } else {
1345 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1346 cmd |= HAMMER2_MSGF_REPLY;
1347 }
1348
1349 nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1350 nmsg->any.head.error = error;
1351 nmsg->state = state;
7dc0f844 1352 hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
8c280d5d
MD
1353}
1354
1355/*
1356 * Terminate a transaction given a state structure by issuing a DELETE.
1357 */
1358void
1359hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1360{
1361 hammer2_msg_t *nmsg;
1362 uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1363
1364 /*
1365 * Nothing to do if we already transmitted a delete
1366 */
1367 if (state->txcmd & HAMMER2_MSGF_DELETE)
1368 return;
1369
1370 /*
1371 * We must also set CREATE if this is our first response to a
1372 * remote command.
1373 */
1374 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1375 cmd |= HAMMER2_MSGF_CREATE;
1376
1377 /*
1378 * Set REPLY if the other end initiated the command. Otherwise
1379 * we are the command direction.
1380 */
7dc0f844 1381 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
1382 cmd |= HAMMER2_MSGF_REPLY;
1383
1384 nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
1385 nmsg->any.head.error = error;
1386 nmsg->state = state;
7dc0f844 1387 hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
78476205
MD
1388}
1389
1390/************************************************************************
1391 * TRANSACTION STATE HANDLING *
1392 ************************************************************************
1393 *
1394 */
1395
78476205
MD
1396/*
1397 * Process state tracking for a message after reception, prior to
1398 * execution.
1399 *
1400 * Called with msglk held and the msg dequeued.
1401 *
1402 * All messages are called with dummy state and return actual state.
1403 * (One-off messages often just return the same dummy state).
1404 *
1405 * May request that caller discard the message by setting *discardp to 1.
1406 * The returned state is not used in this case and is allowed to be NULL.
1407 *
1408 * --
1409 *
1410 * These routines handle persistent and command/reply message state via the
1411 * CREATE and DELETE flags. The first message in a command or reply sequence
1412 * sets CREATE, the last message in a command or reply sequence sets DELETE.
4a2e0eae 1413 *
78476205
MD
1414 * There can be any number of intermediate messages belonging to the same
1415 * sequence sent inbetween the CREATE message and the DELETE message,
1416 * which set neither flag. This represents a streaming command or reply.
1417 *
1418 * Any command message received with CREATE set expects a reply sequence to
1419 * be returned. Reply sequences work the same as command sequences except the
1420 * REPLY bit is also sent. Both the command side and reply side can
1421 * degenerate into a single message with both CREATE and DELETE set. Note
1422 * that one side can be streaming and the other side not, or neither, or both.
1423 *
1424 * The msgid is unique for the initiator. That is, two sides sending a new
1425 * message can use the same msgid without colliding.
1426 *
1427 * --
1428 *
1429 * ABORT sequences work by setting the ABORT flag along with normal message
1430 * state. However, ABORTs can also be sent on half-closed messages, that is
1431 * even if the command or reply side has already sent a DELETE, as long as
1432 * the message has not been fully closed it can still send an ABORT+DELETE
1433 * to terminate the half-closed message state.
1434 *
1435 * Since ABORT+DELETEs can race we silently discard ABORT's for message
1436 * state which has already been fully closed. REPLY+ABORT+DELETEs can
1437 * also race, and in this situation the other side might have already
1438 * initiated a new unrelated command with the same message id. Since
1439 * the abort has not set the CREATE flag the situation can be detected
1440 * and the message will also be discarded.
1441 *
1442 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1443 * The ABORT request is essentially integrated into the command instead
1444 * of being sent later on. In this situation the command implementation
1445 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1446 * special-case non-blocking operation for the command.
1447 *
1448 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1449 * to be mid-stream aborts for command/reply sequences. ABORTs on
1450 * one-way messages are not supported.
1451 *
1452 * NOTE! If a command sequence does not support aborts the ABORT flag is
1453 * simply ignored.
1454 *
1455 * --
1456 *
1457 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1458 * set. One-off messages cannot be aborted and typically aren't processed
1459 * by these routines. The REPLY bit can be used to distinguish whether a
1460 * one-off message is a command or reply. For example, one-off replies
1461 * will typically just contain status updates.
9ab15106 1462 */
78476205
MD
1463static int
1464hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1465{
1466 hammer2_state_t *state;
1467 hammer2_state_t dummy;
1468 int error;
1469
1470 /*
1471 * Lock RB tree and locate existing persistent state, if any.
1472 *
1473 * If received msg is a command state is on staterd_tree.
1474 * If received msg is a reply state is on statewr_tree.
1475 */
78476205
MD
1476
1477 dummy.msgid = msg->any.head.msgid;
8c280d5d 1478 dummy.spanid = msg->any.head.spanid;
7dc0f844 1479 pthread_mutex_lock(&iocom->mtx);
78476205
MD
1480 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1481 state = RB_FIND(hammer2_state_tree,
1482 &iocom->statewr_tree, &dummy);
1483 } else {
1484 state = RB_FIND(hammer2_state_tree,
1485 &iocom->staterd_tree, &dummy);
1486 }
1487 msg->state = state;
7dc0f844 1488 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1489
1490 /*
1491 * Short-cut one-off or mid-stream messages (state may be NULL).
1492 */
1493 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1494 HAMMER2_MSGF_ABORT)) == 0) {
78476205
MD
1495 return(0);
1496 }
1497
1498 /*
1499 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1500 * inside the case statements.
1501 */
1502 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1503 HAMMER2_MSGF_REPLY)) {
1504 case HAMMER2_MSGF_CREATE:
1505 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1506 /*
1507 * New persistant command received.
1508 */
1509 if (state) {
81666e1b
MD
1510 fprintf(stderr, "duplicate-trans %s\n",
1511 hammer2_msg_str(msg));
78476205 1512 error = HAMMER2_IOQ_ERROR_TRANS;
cf715800 1513 assert(0);
78476205
MD
1514 break;
1515 }
1516 state = malloc(sizeof(*state));
1517 bzero(state, sizeof(*state));
1518 state->iocom = iocom;
1519 state->flags = HAMMER2_STATE_DYNAMIC;
1520 state->msg = msg;
7dc0f844 1521 state->txcmd = HAMMER2_MSGF_REPLY;
78476205 1522 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
78476205 1523 state->flags |= HAMMER2_STATE_INSERTED;
8c280d5d
MD
1524 state->msgid = msg->any.head.msgid;
1525 state->spanid = msg->any.head.spanid;
78476205 1526 msg->state = state;
cf715800
MD
1527 pthread_mutex_lock(&iocom->mtx);
1528 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1529 pthread_mutex_unlock(&iocom->mtx);
78476205 1530 error = 0;
cf715800
MD
1531 if (DebugOpt) {
1532 fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1533 state, (uint32_t)state->msgid, iocom);
1534 }
78476205
MD
1535 break;
1536 case HAMMER2_MSGF_DELETE:
1537 /*
1538 * Persistent state is expected but might not exist if an
1539 * ABORT+DELETE races the close.
1540 */
1541 if (state == NULL) {
1542 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1543 error = HAMMER2_IOQ_ERROR_EALREADY;
1544 } else {
81666e1b
MD
1545 fprintf(stderr, "missing-state %s\n",
1546 hammer2_msg_str(msg));
78476205 1547 error = HAMMER2_IOQ_ERROR_TRANS;
cf715800 1548 assert(0);
78476205
MD
1549 }
1550 break;
1551 }
1552
1553 /*
1554 * Handle another ABORT+DELETE case if the msgid has already
1555 * been reused.
1556 */
1557 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1558 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1559 error = HAMMER2_IOQ_ERROR_EALREADY;
1560 } else {
81666e1b
MD
1561 fprintf(stderr, "reused-state %s\n",
1562 hammer2_msg_str(msg));
78476205 1563 error = HAMMER2_IOQ_ERROR_TRANS;
cf715800 1564 assert(0);
78476205
MD
1565 }
1566 break;
1567 }
1568 error = 0;
1569 break;
1570 default:
1571 /*
1572 * Check for mid-stream ABORT command received, otherwise
1573 * allow.
1574 */
1575 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1576 if (state == NULL ||
1577 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1578 error = HAMMER2_IOQ_ERROR_EALREADY;
1579 break;
1580 }
1581 }
1582 error = 0;
1583 break;
1584 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1585 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1586 /*
1587 * When receiving a reply with CREATE set the original
1588 * persistent state message should already exist.
1589 */
1590 if (state == NULL) {
81666e1b
MD
1591 fprintf(stderr, "no-state(r) %s\n",
1592 hammer2_msg_str(msg));
78476205 1593 error = HAMMER2_IOQ_ERROR_TRANS;
cf715800 1594 assert(0);
78476205
MD
1595 break;
1596 }
7dc0f844
MD
1597 assert(((state->rxcmd ^ msg->any.head.cmd) &
1598 HAMMER2_MSGF_REPLY) == 0);
78476205
MD
1599 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1600 error = 0;
1601 break;
1602 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1603 /*
1604 * Received REPLY+ABORT+DELETE in case where msgid has
1605 * already been fully closed, ignore the message.
1606 */
1607 if (state == NULL) {
1608 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1609 error = HAMMER2_IOQ_ERROR_EALREADY;
1610 } else {
81666e1b
MD
1611 fprintf(stderr, "no-state(r,d) %s\n",
1612 hammer2_msg_str(msg));
78476205 1613 error = HAMMER2_IOQ_ERROR_TRANS;
cf715800 1614 assert(0);
78476205
MD
1615 }
1616 break;
1617 }
1618
1619 /*
1620 * Received REPLY+ABORT+DELETE in case where msgid has
1621 * already been reused for an unrelated message,
1622 * ignore the message.
1623 */
1624 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1625 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1626 error = HAMMER2_IOQ_ERROR_EALREADY;
1627 } else {
81666e1b
MD
1628 fprintf(stderr, "reused-state(r,d) %s\n",
1629 hammer2_msg_str(msg));
78476205 1630 error = HAMMER2_IOQ_ERROR_TRANS;
cf715800 1631 assert(0);
78476205
MD
1632 }
1633 break;
1634 }
1635 error = 0;
1636 break;
1637 case HAMMER2_MSGF_REPLY:
1638 /*
1639 * Check for mid-stream ABORT reply received to sent command.
1640 */
1641 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1642 if (state == NULL ||
1643 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1644 error = HAMMER2_IOQ_ERROR_EALREADY;
1645 break;
1646 }
1647 }
1648 error = 0;
1649 break;
1650 }
78476205
MD
1651 return (error);
1652}
1653
9ab15106 1654void
78476205 1655hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
9ab15106 1656{
78476205
MD
1657 hammer2_state_t *state;
1658
1659 if ((state = msg->state) == NULL) {
1b195a98
MD
1660 /*
1661 * Free a non-transactional message, there is no state
1662 * to worry about.
1663 */
78476205
MD
1664 hammer2_msg_free(iocom, msg);
1665 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1b195a98
MD
1666 /*
1667 * Message terminating transaction, destroy the related
1668 * state, the original message, and this message (if it
1669 * isn't the original message due to a CREATE|DELETE).
1670 */
7dc0f844 1671 pthread_mutex_lock(&iocom->mtx);
78476205
MD
1672 state->rxcmd |= HAMMER2_MSGF_DELETE;
1673 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1674 if (state->msg == msg)
1675 state->msg = NULL;
1676 assert(state->flags & HAMMER2_STATE_INSERTED);
7dc0f844
MD
1677 if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1678 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
78476205
MD
1679 RB_REMOVE(hammer2_state_tree,
1680 &iocom->statewr_tree, state);
1681 } else {
7dc0f844 1682 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
78476205
MD
1683 RB_REMOVE(hammer2_state_tree,
1684 &iocom->staterd_tree, state);
1685 }
1686 state->flags &= ~HAMMER2_STATE_INSERTED;
78476205
MD
1687 hammer2_state_free(state);
1688 } else {
7dc0f844 1689 ;
78476205 1690 }
7dc0f844 1691 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1692 hammer2_msg_free(iocom, msg);
1693 } else if (state->msg != msg) {
1b195a98
MD
1694 /*
1695 * Message not terminating transaction, leave state intact
1696 * and free message if it isn't the CREATE message.
1697 */
78476205 1698 hammer2_msg_free(iocom, msg);
9ab15106 1699 }
78476205
MD
1700}
1701
78476205
MD
1702static void
1703hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1704{
1705 hammer2_state_t *state;
1706
1707 if ((state = msg->state) == NULL) {
1708 hammer2_msg_free(iocom, msg);
1709 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
7dc0f844 1710 pthread_mutex_lock(&iocom->mtx);
78476205
MD
1711 state->txcmd |= HAMMER2_MSGF_DELETE;
1712 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1713 if (state->msg == msg)
1714 state->msg = NULL;
1715 assert(state->flags & HAMMER2_STATE_INSERTED);
7dc0f844
MD
1716 if (state->txcmd & HAMMER2_MSGF_REPLY) {
1717 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
78476205
MD
1718 RB_REMOVE(hammer2_state_tree,
1719 &iocom->staterd_tree, state);
1720 } else {
7dc0f844 1721 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
78476205
MD
1722 RB_REMOVE(hammer2_state_tree,
1723 &iocom->statewr_tree, state);
1724 }
1725 state->flags &= ~HAMMER2_STATE_INSERTED;
78476205
MD
1726 hammer2_state_free(state);
1727 } else {
7dc0f844 1728 ;
78476205 1729 }
7dc0f844 1730 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1731 hammer2_msg_free(iocom, msg);
1732 } else if (state->msg != msg) {
1733 hammer2_msg_free(iocom, msg);
1734 }
1735}
1736
7dc0f844
MD
1737/*
1738 * Called with iocom locked
1739 */
78476205
MD
1740void
1741hammer2_state_free(hammer2_state_t *state)
1742{
1743 hammer2_iocom_t *iocom = state->iocom;
1744 hammer2_msg_t *msg;
7dc0f844
MD
1745 char dummy;
1746
81666e1b 1747 if (DebugOpt) {
cf715800
MD
1748 fprintf(stderr, "terminate state %p id=%08x\n",
1749 state, (uint32_t)state->msgid);
81666e1b 1750 }
7dc0f844 1751 assert(state->any.any == NULL);
78476205
MD
1752 msg = state->msg;
1753 state->msg = NULL;
78476205 1754 if (msg)
7dc0f844 1755 hammer2_msg_free_locked(iocom, msg);
78476205 1756 free(state);
7dc0f844
MD
1757
1758 /*
1759 * When an iocom error is present we are trying to close down the
1760 * iocom, but we have to wait for all states to terminate before
1761 * we can do so. The iocom rx code will terminate the receive side
1762 * for all transactions by simulating incoming DELETE messages,
1763 * but the state doesn't go away until both sides are terminated.
1764 *
1765 * We may have to wake up the rx code.
1766 */
1767 if (iocom->ioq_rx.error &&
1768 RB_EMPTY(&iocom->staterd_tree) &&
1769 RB_EMPTY(&iocom->statewr_tree)) {
1770 dummy = 0;
1771 write(iocom->wakeupfds[1], &dummy, 1);
1772 }
78476205
MD
1773}
1774
81666e1b
MD
1775const char *
1776hammer2_basecmd_str(uint32_t cmd)
1777{
1778 static char buf[64];
1779 char protobuf[32];
1780 char cmdbuf[32];
1781 const char *protostr;
1782 const char *cmdstr;
1783
1784 switch(cmd & HAMMER2_MSGF_PROTOS) {
1785 case HAMMER2_MSG_PROTO_LNK:
1786 protostr = "LNK_";
1787 break;
1788 case HAMMER2_MSG_PROTO_DBG:
1789 protostr = "DBG_";
1790 break;
1791 case HAMMER2_MSG_PROTO_DOM:
1792 protostr = "DOM_";
1793 break;
1794 case HAMMER2_MSG_PROTO_CAC:
1795 protostr = "CAC_";
1796 break;
1797 case HAMMER2_MSG_PROTO_QRM:
1798 protostr = "QRM_";
1799 break;
1800 case HAMMER2_MSG_PROTO_BLK:
1801 protostr = "BLK_";
1802 break;
1803 case HAMMER2_MSG_PROTO_VOP:
1804 protostr = "VOP_";
1805 break;
1806 default:
1807 snprintf(protobuf, sizeof(protobuf), "%x_",
1808 (cmd & HAMMER2_MSGF_PROTOS) >> 20);
1809 protostr = protobuf;
1810 break;
1811 }
1812
1813 switch(cmd & (HAMMER2_MSGF_PROTOS |
1814 HAMMER2_MSGF_CMDS |
1815 HAMMER2_MSGF_SIZE)) {
1816 case HAMMER2_LNK_PAD:
1817 cmdstr = "PAD";
1818 break;
1819 case HAMMER2_LNK_PING:
1820 cmdstr = "PING";
1821 break;
1822 case HAMMER2_LNK_AUTH:
1823 cmdstr = "AUTH";
1824 break;
1825 case HAMMER2_LNK_CONN:
1826 cmdstr = "CONN";
1827 break;
1828 case HAMMER2_LNK_SPAN:
1829 cmdstr = "SPAN";
1830 break;
1831 case HAMMER2_LNK_ERROR:
1832 if (cmd & HAMMER2_MSGF_DELETE)
1833 cmdstr = "RETURN";
1834 else
1835 cmdstr = "RESULT";
1836 break;
1837 case HAMMER2_DBG_SHELL:
1838 cmdstr = "SHELL";
1839 break;
1840 default:
1841 snprintf(cmdbuf, sizeof(cmdbuf),
1842 "%06x", (cmd & (HAMMER2_MSGF_PROTOS |
1843 HAMMER2_MSGF_CMDS |
1844 HAMMER2_MSGF_SIZE)));
1845 cmdstr = cmdbuf;
1846 break;
1847 }
1848 snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr);
1849 return (buf);
1850}
1851
1852const char *
1853hammer2_msg_str(hammer2_msg_t *msg)
1854{
1855 hammer2_state_t *state;
1856 static char buf[256];
1857 char errbuf[16];
1858 char statebuf[64];
1859 char flagbuf[64];
1860 const char *statestr;
1861 const char *errstr;
1862 uint32_t basecmd;
1863 int i;
1864
1865 /*
1866 * Parse the state
1867 */
1868 if ((state = msg->state) != NULL) {
1869 basecmd = (state->rxcmd & HAMMER2_MSGF_REPLY) ?
1870 state->txcmd : state->rxcmd;
1871 snprintf(statebuf, sizeof(statebuf),
1872 " %s=%s,L=%s%s,R=%s%s",
1873 ((state->txcmd & HAMMER2_MSGF_REPLY) ?
1874 "rcvcmd" : "sndcmd"),
1875 hammer2_basecmd_str(basecmd),
1876 ((state->txcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1877 ((state->txcmd & HAMMER2_MSGF_DELETE) ? "D" : ""),
1878 ((state->rxcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1879 ((state->rxcmd & HAMMER2_MSGF_DELETE) ? "D" : "")
1880 );
1881 statestr = statebuf;
1882 } else {
1883 statestr = "";
1884 }
1885
1886 /*
1887 * Parse the error
1888 */
1889 switch(msg->any.head.error) {
1890 case 0:
1891 errstr = "";
1892 break;
1893 case HAMMER2_IOQ_ERROR_SYNC:
1894 errstr = "err=IOQ:NOSYNC";
1895 break;
1896 case HAMMER2_IOQ_ERROR_EOF:
1897 errstr = "err=IOQ:STREAMEOF";
1898 break;
1899 case HAMMER2_IOQ_ERROR_SOCK:
1900 errstr = "err=IOQ:SOCKERR";
1901 break;
1902 case HAMMER2_IOQ_ERROR_FIELD:
1903 errstr = "err=IOQ:BADFIELD";
1904 break;
1905 case HAMMER2_IOQ_ERROR_HCRC:
1906 errstr = "err=IOQ:BADHCRC";
1907 break;
1908 case HAMMER2_IOQ_ERROR_XCRC:
1909 errstr = "err=IOQ:BADXCRC";
1910 break;
1911 case HAMMER2_IOQ_ERROR_ACRC:
1912 errstr = "err=IOQ:BADACRC";
1913 break;
1914 case HAMMER2_IOQ_ERROR_STATE:
1915 errstr = "err=IOQ:BADSTATE";
1916 break;
1917 case HAMMER2_IOQ_ERROR_NOPEER:
1918 errstr = "err=IOQ:PEERCONFIG";
1919 break;
1920 case HAMMER2_IOQ_ERROR_NORKEY:
1921 errstr = "err=IOQ:BADRKEY";
1922 break;
1923 case HAMMER2_IOQ_ERROR_NOLKEY:
1924 errstr = "err=IOQ:BADLKEY";
1925 break;
1926 case HAMMER2_IOQ_ERROR_KEYXCHGFAIL:
1927 errstr = "err=IOQ:BADKEYXCHG";
1928 break;
1929 case HAMMER2_IOQ_ERROR_KEYFMT:
1930 errstr = "err=IOQ:BADFMT";
1931 break;
1932 case HAMMER2_IOQ_ERROR_BADURANDOM:
1933 errstr = "err=IOQ:BADRANDOM";
1934 break;
1935 case HAMMER2_IOQ_ERROR_MSGSEQ:
1936 errstr = "err=IOQ:BADSEQ";
1937 break;
1938 case HAMMER2_IOQ_ERROR_EALREADY:
1939 errstr = "err=IOQ:DUPMSG";
1940 break;
1941 case HAMMER2_IOQ_ERROR_TRANS:
1942 errstr = "err=IOQ:BADTRANS";
1943 break;
1944 case HAMMER2_MSG_ERR_NOSUPP:
1945 errstr = "err=NOSUPPORT";
1946 break;
1947 default:
1948 snprintf(errbuf, sizeof(errbuf),
1949 " err=%d", msg->any.head.error);
1950 errstr = errbuf;
1951 break;
1952 }
1953
1954 /*
1955 * Message flags
1956 */
1957 i = 0;
1958 if (msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1959 HAMMER2_MSGF_ABORT | HAMMER2_MSGF_REPLY)) {
1960 flagbuf[i++] = '|';
1961 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1962 flagbuf[i++] = 'C';
1963 if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
1964 flagbuf[i++] = 'D';
1965 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
1966 flagbuf[i++] = 'R';
1967 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT)
1968 flagbuf[i++] = 'A';
1969 }
1970 flagbuf[i] = 0;
1971
1972 /*
1973 * Generate the buf
1974 */
1975 snprintf(buf, sizeof(buf),
1976 "msg=%s%s %s id=%08x span=%08x %s",
1977 hammer2_basecmd_str(msg->any.head.cmd),
1978 flagbuf,
1979 errstr,
1980 (uint32_t)(intmax_t)msg->any.head.msgid, /* for brevity */
1981 (uint32_t)(intmax_t)msg->any.head.spanid, /* for brevity */
1982 statestr);
1983
1984 return(buf);
1985}