hammer2 - spanning tree and messaging work
[dragonfly.git] / sbin / hammer2 / msg.c
CommitLineData
9ab15106
MD
1/*
2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7 *
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
10 * are met:
11 *
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
17 * distribution.
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
21 *
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33 * SUCH DAMAGE.
34 */
35
36#include "hammer2.h"
37
78476205 38static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
78476205
MD
39static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
9ab15106
MD
41/*
42 * Initialize a low-level ioq
43 */
44void
45hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
46{
47 bzero(ioq, sizeof(*ioq));
48 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
49 TAILQ_INIT(&ioq->msgq);
50}
51
7dc0f844
MD
52/*
53 * Cleanup queue.
54 *
55 * caller holds iocom->mtx.
56 */
9ab15106 57void
4a2e0eae 58hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
9ab15106
MD
59{
60 hammer2_msg_t *msg;
61
62 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
7dc0f844 63 assert(0); /* shouldn't happen */
78476205
MD
64 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
65 hammer2_msg_free(iocom, msg);
9ab15106
MD
66 }
67 if ((msg = ioq->msg) != NULL) {
68 ioq->msg = NULL;
78476205 69 hammer2_msg_free(iocom, msg);
9ab15106
MD
70 }
71}
72
73/*
74 * Initialize a low-level communications channel
75 */
76void
77hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
78{
79 bzero(iocom, sizeof(*iocom));
80
7dc0f844 81 pthread_mutex_init(&iocom->mtx, NULL);
78476205
MD
82 RB_INIT(&iocom->staterd_tree);
83 RB_INIT(&iocom->statewr_tree);
9ab15106
MD
84 TAILQ_INIT(&iocom->freeq);
85 TAILQ_INIT(&iocom->freeq_aux);
8c280d5d 86 TAILQ_INIT(&iocom->addrq);
7dc0f844 87 TAILQ_INIT(&iocom->txmsgq);
9ab15106
MD
88 iocom->sock_fd = sock_fd;
89 iocom->alt_fd = alt_fd;
7dc0f844 90 iocom->flags = HAMMER2_IOCOMF_RREQ;
9ab15106
MD
91 hammer2_ioq_init(iocom, &iocom->ioq_rx);
92 hammer2_ioq_init(iocom, &iocom->ioq_tx);
7dc0f844
MD
93 if (pipe(iocom->wakeupfds) < 0)
94 assert(0);
95 fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
96 fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
9ab15106 97
62efe6ec
MD
98 /*
99 * Negotiate session crypto synchronously. This will mark the
100 * connection as error'd if it fails.
101 */
102 hammer2_crypto_negotiate(iocom);
103
104 /*
105 * Make sure our fds are set to non-blocking for the iocom core.
106 */
9ab15106
MD
107 if (sock_fd >= 0)
108 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
109#if 0
110 /* if line buffered our single fgets() should be fine */
111 if (alt_fd >= 0)
112 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
113#endif
114}
115
7dc0f844
MD
116/*
117 * Cleanup a terminating iocom.
118 *
119 * Caller should not hold iocom->mtx. The iocom has already been disconnected
120 * from all possible references to it.
121 */
9ab15106
MD
122void
123hammer2_iocom_done(hammer2_iocom_t *iocom)
124{
125 hammer2_msg_t *msg;
126
7dc0f844
MD
127 if (iocom->sock_fd >= 0) {
128 close(iocom->sock_fd);
129 iocom->sock_fd = -1;
130 }
131 if (iocom->alt_fd >= 0) {
132 close(iocom->alt_fd);
133 iocom->alt_fd = -1;
134 }
9ab15106
MD
135 hammer2_ioq_done(iocom, &iocom->ioq_rx);
136 hammer2_ioq_done(iocom, &iocom->ioq_tx);
137 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
78476205 138 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106
MD
139 free(msg);
140 }
141 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
78476205 142 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
143 free(msg->aux_data);
144 msg->aux_data = NULL;
145 free(msg);
146 }
7dc0f844
MD
147 if (iocom->wakeupfds[0] >= 0) {
148 close(iocom->wakeupfds[0]);
149 iocom->wakeupfds[0] = -1;
150 }
151 if (iocom->wakeupfds[1] >= 0) {
152 close(iocom->wakeupfds[1]);
153 iocom->wakeupfds[1] = -1;
154 }
155 pthread_mutex_destroy(&iocom->mtx);
9ab15106
MD
156}
157
4a2e0eae
MD
158/*
159 * Allocate a new one-way message.
160 */
9ab15106 161hammer2_msg_t *
78476205 162hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
9ab15106
MD
163{
164 hammer2_msg_t *msg;
165 int hbytes;
166
7dc0f844 167 pthread_mutex_lock(&iocom->mtx);
9ab15106
MD
168 if (aux_size) {
169 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
170 ~HAMMER2_MSG_ALIGNMASK;
171 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
78476205 172 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
9ab15106
MD
173 } else {
174 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
78476205 175 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
9ab15106 176 }
7dc0f844 177 pthread_mutex_unlock(&iocom->mtx);
9ab15106
MD
178 if (msg == NULL) {
179 msg = malloc(sizeof(*msg));
78476205 180 bzero(msg, sizeof(*msg));
9ab15106
MD
181 msg->aux_data = NULL;
182 msg->aux_size = 0;
183 }
184 if (msg->aux_size != aux_size) {
185 if (msg->aux_data) {
186 free(msg->aux_data);
187 msg->aux_data = NULL;
188 msg->aux_size = 0;
189 }
190 if (aux_size) {
191 msg->aux_data = malloc(aux_size);
192 msg->aux_size = aux_size;
193 }
194 }
9ab15106 195 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
4a2e0eae
MD
196 if (hbytes)
197 bzero(&msg->any.head, hbytes);
78476205 198 msg->hdr_size = hbytes;
9ab15106 199 msg->any.head.cmd = cmd;
8c280d5d
MD
200 msg->any.head.aux_descr = 0;
201 msg->any.head.aux_crc = 0;
9ab15106
MD
202
203 return (msg);
204}
205
4a2e0eae 206/*
4a2e0eae
MD
207 * Free a message so it can be reused afresh.
208 *
209 * NOTE: aux_size can be 0 with a non-NULL aux_data.
210 */
7dc0f844 211static
9ab15106 212void
7dc0f844 213hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
9ab15106 214{
7dc0f844 215 msg->state = NULL;
9ab15106 216 if (msg->aux_data)
78476205 217 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
9ab15106 218 else
78476205 219 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
9ab15106
MD
220}
221
7dc0f844
MD
222void
223hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
224{
225 pthread_mutex_lock(&iocom->mtx);
226 hammer2_msg_free_locked(iocom, msg);
227 pthread_mutex_unlock(&iocom->mtx);
228}
229
9ab15106
MD
230/*
231 * I/O core loop for an iocom.
7dc0f844
MD
232 *
233 * Thread localized, iocom->mtx not held.
9ab15106
MD
234 */
235void
236hammer2_iocom_core(hammer2_iocom_t *iocom,
237 void (*recvmsg_func)(hammer2_iocom_t *),
238 void (*sendmsg_func)(hammer2_iocom_t *),
239 void (*altmsg_func)(hammer2_iocom_t *))
240{
7dc0f844
MD
241 struct pollfd fds[3];
242 char dummybuf[256];
4a2e0eae 243 int timeout;
7dc0f844
MD
244 int count;
245 int wi; /* wakeup pipe */
246 int si; /* socket */
247 int ai; /* alt bulk path socket */
9ab15106
MD
248
249 iocom->recvmsg_callback = recvmsg_func;
250 iocom->sendmsg_callback = sendmsg_func;
251 iocom->altmsg_callback = altmsg_func;
252
253 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
7dc0f844
MD
254 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
255 HAMMER2_IOCOMF_WWORK |
256 HAMMER2_IOCOMF_PWORK |
257 HAMMER2_IOCOMF_ARWORK |
258 HAMMER2_IOCOMF_AWWORK)) == 0) {
259 /*
260 * Only poll if no immediate work is pending.
261 * Otherwise we are just wasting our time calling
262 * poll.
263 */
264 timeout = 5000;
4a2e0eae 265
7dc0f844
MD
266 count = 0;
267 wi = -1;
268 si = -1;
269 ai = -1;
9ab15106 270
7dc0f844
MD
271 /*
272 * Always check the inter-thread pipe, e.g.
273 * for iocom->txmsgq work.
274 */
275 wi = count++;
276 fds[wi].fd = iocom->wakeupfds[0];
277 fds[wi].events = POLLIN;
278 fds[wi].revents = 0;
279
280 /*
281 * Check the socket input/output direction as
282 * requested
283 */
284 if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
285 HAMMER2_IOCOMF_WREQ)) {
286 si = count++;
287 fds[si].fd = iocom->sock_fd;
288 fds[si].events = 0;
289 fds[si].revents = 0;
290
291 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
292 fds[si].events |= POLLIN;
293 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
294 fds[si].events |= POLLOUT;
295 }
9ab15106 296
7dc0f844
MD
297 /*
298 * Check the alternative fd for work.
299 */
300 if (iocom->alt_fd >= 0) {
301 ai = count++;
302 fds[ai].fd = iocom->alt_fd;
303 fds[ai].events = POLLIN;
304 fds[ai].revents = 0;
305 }
306 poll(fds, count, timeout);
307
308 if (wi >= 0 && (fds[wi].revents & POLLIN))
309 iocom->flags |= HAMMER2_IOCOMF_PWORK;
310 if (si >= 0 && (fds[si].revents & POLLIN))
311 iocom->flags |= HAMMER2_IOCOMF_RWORK;
312 if (si >= 0 && (fds[si].revents & POLLOUT))
313 iocom->flags |= HAMMER2_IOCOMF_WWORK;
314 if (wi >= 0 && (fds[wi].revents & POLLOUT))
315 iocom->flags |= HAMMER2_IOCOMF_WWORK;
316 if (ai >= 0 && (fds[ai].revents & POLLIN))
317 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
9ab15106 318 } else {
7dc0f844
MD
319 /*
320 * Always check the pipe
321 */
322 iocom->flags |= HAMMER2_IOCOMF_PWORK;
9ab15106 323 }
7dc0f844
MD
324
325 /*
326 * Pending message queues from other threads wake us up
327 * with a write to the wakeupfds[] pipe. We have to clear
328 * the pipe with a dummy read.
329 */
330 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
331 iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
332 read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
333 iocom->flags |= HAMMER2_IOCOMF_RWORK;
334 iocom->flags |= HAMMER2_IOCOMF_WWORK;
335 if (TAILQ_FIRST(&iocom->txmsgq))
9ab15106 336 iocom->sendmsg_callback(iocom);
9ab15106 337 }
7dc0f844
MD
338
339 /*
340 * Message write sequencing
341 */
342 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
343 iocom->sendmsg_callback(iocom);
344
345 /*
346 * Message read sequencing. Run this after the write
347 * sequencing in case the write sequencing allowed another
348 * auto-DELETE to occur on the read side.
349 */
350 if (iocom->flags & HAMMER2_IOCOMF_RWORK)
351 iocom->recvmsg_callback(iocom);
352
353 if (iocom->flags & HAMMER2_IOCOMF_ARWORK)
9ab15106
MD
354 iocom->altmsg_callback(iocom);
355 }
356}
357
358/*
359 * Read the next ready message from the ioq, issuing I/O if needed.
360 * Caller should retry on a read-event when NULL is returned.
361 *
362 * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
1b195a98
MD
363 * be returned for each open transaction, then the ioq and iocom
364 * will be errored out and a non-transactional HAMMER2_LNK_ERROR
365 * msg will be returned as the final message. The caller should not call
366 * us again after the final message is returned.
7dc0f844
MD
367 *
368 * Thread localized, iocom->mtx not held.
9ab15106
MD
369 */
370hammer2_msg_t *
371hammer2_ioq_read(hammer2_iocom_t *iocom)
372{
373 hammer2_ioq_t *ioq = &iocom->ioq_rx;
374 hammer2_msg_t *msg;
375 hammer2_msg_hdr_t *head;
1b195a98 376 hammer2_state_t *state;
9ab15106 377 ssize_t n;
78476205
MD
378 size_t bytes;
379 size_t nmax;
9ab15106 380 uint32_t xcrc32;
78476205 381 int error;
9ab15106 382
78476205 383again:
7dc0f844
MD
384 iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
385
9ab15106
MD
386 /*
387 * If a message is already pending we can just remove and
78476205 388 * return it. Message state has already been processed.
9ab15106
MD
389 */
390 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205
MD
391 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
392 return (msg);
9ab15106
MD
393 }
394
395 /*
396 * Message read in-progress (msg is NULL at the moment). We don't
397 * allocate a msg until we have its core header.
398 */
399 bytes = ioq->fifo_end - ioq->fifo_beg;
5cf97ec5 400 nmax = sizeof(ioq->buf) - ioq->fifo_end;
9ab15106
MD
401 msg = ioq->msg;
402
403 switch(ioq->state) {
404 case HAMMER2_MSGQ_STATE_HEADER1:
405 /*
406 * Load the primary header, fail on any non-trivial read
407 * error or on EOF. Since the primary header is the same
408 * size is the message alignment it will never straddle
409 * the end of the buffer.
410 */
411 if (bytes < (int)sizeof(msg->any.head)) {
412 n = read(iocom->sock_fd,
5cf97ec5 413 ioq->buf + ioq->fifo_end,
9ab15106
MD
414 nmax);
415 if (n <= 0) {
416 if (n == 0) {
417 ioq->error = HAMMER2_IOQ_ERROR_EOF;
418 break;
419 }
420 if (errno != EINTR &&
421 errno != EINPROGRESS &&
422 errno != EAGAIN) {
423 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
424 break;
425 }
426 n = 0;
427 /* fall through */
428 }
429 ioq->fifo_end += n;
430 bytes += n;
431 nmax -= n;
432 }
433
434 /*
435 * Insufficient data accumulated (msg is NULL, caller will
436 * retry on event).
437 */
438 assert(msg == NULL);
439 if (bytes < (int)sizeof(msg->any.head))
440 break;
441
9ab15106 442 /*
5cf97ec5
MD
443 * Calculate the header, decrypt data received so far.
444 * Data will be decrypted in-place. Partial blocks are
445 * not immediately decrypted.
8c280d5d
MD
446 *
447 * WARNING! The header might be in the wrong endian, we
448 * do not fix it up until we get the entire
449 * extended header.
9ab15106 450 */
5cf97ec5 451 hammer2_crypto_decrypt(iocom, ioq);
5cf97ec5 452 head = (void *)(ioq->buf + ioq->fifo_beg);
9ab15106
MD
453
454 /*
455 * Check and fixup the core header. Note that the icrc
456 * has to be calculated before any fixups, but the crc
457 * fields in the msg may have to be swapped like everything
458 * else.
459 */
460 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
461 head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
462 ioq->error = HAMMER2_IOQ_ERROR_SYNC;
463 break;
464 }
465
9ab15106
MD
466 /*
467 * Calculate the full header size and aux data size
468 */
8c280d5d
MD
469 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
470 ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
471 HAMMER2_MSG_ALIGN;
472 ioq->abytes = bswap32(head->aux_bytes) *
473 HAMMER2_MSG_ALIGN;
474 } else {
475 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
476 HAMMER2_MSG_ALIGN;
477 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
478 }
78476205
MD
479 if (ioq->hbytes < sizeof(msg->any.head) ||
480 ioq->hbytes > sizeof(msg->any) ||
9ab15106
MD
481 ioq->abytes > HAMMER2_MSGAUX_MAX) {
482 ioq->error = HAMMER2_IOQ_ERROR_FIELD;
483 break;
484 }
485
486 /*
487 * Finally allocate the message and copy the core header
488 * to the embedded extended header.
4a2e0eae
MD
489 *
490 * Initialize msg->aux_size to 0 and use it to track
491 * the amount of data copied from the stream.
9ab15106 492 */
78476205 493 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
9ab15106
MD
494 ioq->msg = msg;
495
496 /*
497 * We are either done or we fall-through
498 */
499 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
500 bcopy(head, &msg->any.head, sizeof(msg->any.head));
501 ioq->fifo_beg += ioq->hbytes;
502 break;
503 }
504
505 /*
506 * Fall through to the next state. Make sure that the
507 * extended header does not straddle the end of the buffer.
508 * We still want to issue larger reads into our buffer,
509 * book-keeping is easier if we don't bcopy() yet.
510 */
511 if (bytes + nmax < ioq->hbytes) {
5cf97ec5
MD
512 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
513 ioq->fifo_cdx -= ioq->fifo_beg;
9ab15106
MD
514 ioq->fifo_beg = 0;
515 ioq->fifo_end = bytes;
5cf97ec5 516 nmax = sizeof(ioq->buf) - ioq->fifo_end;
9ab15106
MD
517 }
518 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
519 /* fall through */
520 case HAMMER2_MSGQ_STATE_HEADER2:
521 /*
522 * Fill out the extended header.
523 */
524 assert(msg != NULL);
525 if (bytes < ioq->hbytes) {
526 n = read(iocom->sock_fd,
527 msg->any.buf + ioq->fifo_end,
528 nmax);
529 if (n <= 0) {
530 if (n == 0) {
531 ioq->error = HAMMER2_IOQ_ERROR_EOF;
532 break;
533 }
534 if (errno != EINTR &&
535 errno != EINPROGRESS &&
536 errno != EAGAIN) {
537 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
538 break;
539 }
540 n = 0;
541 /* fall through */
542 }
543 ioq->fifo_end += n;
544 bytes += n;
545 nmax -= n;
546 }
547
548 /*
549 * Insufficient data accumulated (set msg NULL so caller will
550 * retry on event).
551 */
552 if (bytes < ioq->hbytes) {
553 msg = NULL;
554 break;
555 }
556
557 /*
5cf97ec5 558 * Calculate the extended header, decrypt data received
8c280d5d
MD
559 * so far. Handle endian-conversion for the entire extended
560 * header.
9ab15106 561 */
5cf97ec5
MD
562 hammer2_crypto_decrypt(iocom, ioq);
563 head = (void *)(ioq->buf + ioq->fifo_beg);
9ab15106
MD
564
565 /*
8c280d5d 566 * Check the CRC.
9ab15106 567 */
8c280d5d
MD
568 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
569 xcrc32 = bswap32(head->hdr_crc);
570 else
571 xcrc32 = head->hdr_crc;
572 head->hdr_crc = 0;
573 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
574 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
575 break;
576 }
577 head->hdr_crc = xcrc32;
578
579 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
580 hammer2_bswap_head(head);
9ab15106
MD
581 }
582
583 /*
584 * Copy the extended header into the msg and adjust the
585 * FIFO.
586 */
587 bcopy(head, &msg->any, ioq->hbytes);
588
589 /*
590 * We are either done or we fall-through.
591 */
592 if (ioq->abytes == 0) {
593 ioq->fifo_beg += ioq->hbytes;
594 break;
595 }
596
597 /*
598 * Must adjust nmax and bytes (and the state) when falling
599 * through.
600 */
601 ioq->fifo_beg += ioq->hbytes;
602 nmax -= ioq->hbytes;
603 bytes -= ioq->hbytes;
604 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
605 /* fall through */
606 case HAMMER2_MSGQ_STATE_AUXDATA1:
607 /*
608 * Copy the partial or complete payload from remaining
609 * bytes in the FIFO. We have to fall-through either
610 * way so we can check the crc.
78476205
MD
611 *
612 * Adjust msg->aux_size to the final actual value.
9ab15106 613 */
5cf97ec5
MD
614 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
615 if (ioq->already > ioq->abytes)
616 ioq->already = ioq->abytes;
9ab15106 617 if (bytes >= ioq->abytes) {
5cf97ec5 618 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
619 ioq->abytes);
620 msg->aux_size = ioq->abytes;
621 ioq->fifo_beg += ioq->abytes;
5cf97ec5
MD
622 if (ioq->fifo_cdx < ioq->fifo_beg)
623 ioq->fifo_cdx = ioq->fifo_beg;
9ab15106
MD
624 bytes -= ioq->abytes;
625 } else if (bytes) {
5cf97ec5 626 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
9ab15106
MD
627 bytes);
628 msg->aux_size = bytes;
629 ioq->fifo_beg += bytes;
5cf97ec5
MD
630 if (ioq->fifo_cdx < ioq->fifo_beg)
631 ioq->fifo_cdx = ioq->fifo_beg;
9ab15106 632 bytes = 0;
78476205
MD
633 } else {
634 msg->aux_size = 0;
9ab15106
MD
635 }
636 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
637 /* fall through */
638 case HAMMER2_MSGQ_STATE_AUXDATA2:
639 /*
640 * Read the remainder of the payload directly into the
641 * msg->aux_data buffer.
642 */
643 assert(msg);
644 if (msg->aux_size < ioq->abytes) {
645 assert(bytes == 0);
646 n = read(iocom->sock_fd,
647 msg->aux_data + msg->aux_size,
648 ioq->abytes - msg->aux_size);
649 if (n <= 0) {
650 if (n == 0) {
651 ioq->error = HAMMER2_IOQ_ERROR_EOF;
652 break;
653 }
654 if (errno != EINTR &&
655 errno != EINPROGRESS &&
656 errno != EAGAIN) {
657 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
658 break;
659 }
660 n = 0;
661 /* fall through */
662 }
663 msg->aux_size += n;
664 }
665
666 /*
667 * Insufficient data accumulated (set msg NULL so caller will
668 * retry on event).
669 */
670 if (msg->aux_size < ioq->abytes) {
671 msg = NULL;
672 break;
673 }
674 assert(msg->aux_size == ioq->abytes);
5cf97ec5 675 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
9ab15106
MD
676
677 /*
8c280d5d 678 * Check aux_crc, then we are done.
9ab15106
MD
679 */
680 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
8c280d5d 681 if (xcrc32 != msg->any.head.aux_crc) {
9ab15106
MD
682 ioq->error = HAMMER2_IOQ_ERROR_ACRC;
683 break;
684 }
685 break;
686 case HAMMER2_MSGQ_STATE_ERROR:
1b195a98
MD
687 /*
688 * Continued calls to drain recorded transactions (returning
689 * a LNK_ERROR for each one), before we return the final
690 * LNK_ERROR.
691 */
692 assert(msg == NULL);
693 break;
9ab15106
MD
694 default:
695 /*
696 * We don't double-return errors, the caller should not
697 * have called us again after getting an error msg.
698 */
699 assert(0);
700 break;
701 }
702
703 /*
5cf97ec5
MD
704 * Check the message sequence. The iv[] should prevent any
705 * possibility of a replay but we add this check anyway.
706 */
707 if (msg && ioq->error == 0) {
708 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
709 ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
710 } else {
711 ++ioq->seq;
712 }
713 }
714
715 /*
78476205
MD
716 * Process transactional state for the message.
717 */
718 if (msg && ioq->error == 0) {
719 error = hammer2_state_msgrx(iocom, msg);
720 if (error) {
721 if (error == HAMMER2_IOQ_ERROR_EALREADY) {
722 hammer2_msg_free(iocom, msg);
723 goto again;
724 }
725 ioq->error = error;
726 }
727 }
728
729 /*
9ab15106
MD
730 * Handle error, RREQ, or completion
731 *
732 * NOTE: nmax and bytes are invalid at this point, we don't bother
733 * to update them when breaking out.
734 */
735 if (ioq->error) {
736 /*
1b195a98
MD
737 * An unrecoverable error causes all active receive
738 * transactions to be terminated with a LNK_ERROR message.
9ab15106 739 *
1b195a98
MD
740 * Once all active transactions are exhausted we set the
741 * iocom ERROR flag and return a non-transactional LNK_ERROR
742 * message, which should cause master processing loops to
743 * terminate.
9ab15106 744 */
4a2e0eae 745 assert(ioq->msg == msg);
1b195a98
MD
746 if (msg) {
747 hammer2_msg_free(iocom, msg);
9ab15106 748 ioq->msg = NULL;
9ab15106 749 }
1b195a98
MD
750
751 /*
752 * No more I/O read processing
753 */
754 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
755
756 /*
7dc0f844
MD
757 * Simulate a remote LNK_ERROR DELETE msg for any open
758 * transactions, ending with a final non-transactional
759 * LNK_ERROR (that the session can detect) when no
760 * transactions remain.
1b195a98
MD
761 */
762 msg = hammer2_msg_alloc(iocom, 0, 0);
9ab15106
MD
763 bzero(&msg->any.head, sizeof(msg->any.head));
764 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
765 msg->any.head.cmd = HAMMER2_LNK_ERROR;
766 msg->any.head.error = ioq->error;
1b195a98 767
7dc0f844 768 pthread_mutex_lock(&iocom->mtx);
1b195a98
MD
769 if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
770 /*
7dc0f844
MD
771 * Active remote transactions are still present.
772 * Simulate the other end sending us a DELETE.
1b195a98 773 */
7dc0f844
MD
774 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
775 fprintf(stderr, "SIMULATE DELETION RCONT %p\n", state);
776 hammer2_msg_free(iocom, msg);
777 msg = NULL;
778 } else {
779 fprintf(stderr, "SIMULATE DELETION %p RD RXCMD %08x\n", state, state->rxcmd);
780 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
781 msg->state = state;
782 msg->any.head.spanid = state->spanid;
783 msg->any.head.msgid = state->msgid;
784 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
785 HAMMER2_MSGF_DELETE;
786 }
787 } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
788 /*
789 * Active local transactions are still present.
790 * Simulate the other end sending us a DELETE.
791 */
792 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
793 fprintf(stderr, "SIMULATE DELETION WCONT\n");
794 hammer2_msg_free(iocom, msg);
795 msg = NULL;
796 } else {
797 fprintf(stderr, "SIMULATE DELETION WD RXCMD %08x\n", state->txcmd);
798 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
799 msg->state = state;
800 msg->any.head.spanid = state->spanid;
801 msg->any.head.msgid = state->msgid;
802 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
803 HAMMER2_MSGF_DELETE |
804 HAMMER2_MSGF_REPLY;
805 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
806 msg->any.head.cmd |=
807 HAMMER2_MSGF_CREATE;
808 }
809 }
1b195a98
MD
810 } else {
811 /*
7dc0f844
MD
812 * No active local or remote transactions remain.
813 * Generate a final LNK_ERROR and flag EOF.
1b195a98
MD
814 */
815 msg->state = NULL;
816 iocom->flags |= HAMMER2_IOCOMF_EOF;
7dc0f844 817 fprintf(stderr, "EOF ON SOCKET\n");
1b195a98 818 }
7dc0f844
MD
819 pthread_mutex_unlock(&iocom->mtx);
820
821 /*
822 * For the iocom error case we want to set RWORK to indicate
823 * that more messages might be pending.
824 *
825 * It is possible to return NULL when there is more work to
826 * do because each message has to be DELETEd in both
827 * directions before we continue on with the next (though
828 * this could be optimized). The transmit direction will
829 * re-set RWORK.
830 */
831 if (msg)
832 iocom->flags |= HAMMER2_IOCOMF_RWORK;
9ab15106
MD
833 } else if (msg == NULL) {
834 /*
835 * Insufficient data received to finish building the message,
836 * set RREQ and return NULL.
837 *
838 * Leave ioq->msg intact.
839 * Leave the FIFO intact.
840 */
841 iocom->flags |= HAMMER2_IOCOMF_RREQ;
9ab15106
MD
842 } else {
843 /*
7dc0f844 844 * Return msg.
9ab15106
MD
845 *
846 * The fifo has already been advanced past the message.
847 * Trivially reset the FIFO indices if possible.
7dc0f844
MD
848 *
849 * clear the FIFO if it is now empty and set RREQ to wait
850 * for more from the socket. If the FIFO is not empty set
851 * TWORK to bypass the poll so we loop immediately.
9ab15106
MD
852 */
853 if (ioq->fifo_beg == ioq->fifo_end) {
854 iocom->flags |= HAMMER2_IOCOMF_RREQ;
5cf97ec5 855 ioq->fifo_cdx = 0;
9ab15106
MD
856 ioq->fifo_beg = 0;
857 ioq->fifo_end = 0;
858 } else {
7dc0f844 859 iocom->flags |= HAMMER2_IOCOMF_RWORK;
9ab15106
MD
860 }
861 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
862 ioq->msg = NULL;
863 }
864 return (msg);
865}
866
867/*
868 * Calculate the header and data crc's and write a low-level message to
8c280d5d 869 * the connection. If aux_crc is non-zero the aux_data crc is already
9ab15106
MD
870 * assumed to have been set.
871 *
872 * A non-NULL msg is added to the queue but not necessarily flushed.
873 * Calling this function with msg == NULL will get a flush going.
7dc0f844
MD
874 *
875 * Caller must hold iocom->mtx.
9ab15106 876 */
7dc0f844
MD
877void
878hammer2_iocom_flush1(hammer2_iocom_t *iocom)
9ab15106
MD
879{
880 hammer2_ioq_t *ioq = &iocom->ioq_tx;
7dc0f844 881 hammer2_msg_t *msg;
9ab15106 882 uint32_t xcrc32;
4a2e0eae 883 int hbytes;
7dc0f844
MD
884 hammer2_msg_queue_t tmpq;
885
886 iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
887 TAILQ_INIT(&tmpq);
888 pthread_mutex_lock(&iocom->mtx);
889 while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
890 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
891 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
9ab15106 892 }
7dc0f844 893 pthread_mutex_unlock(&iocom->mtx);
9ab15106 894
7dc0f844
MD
895 while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
896 /*
897 * Process terminal connection errors.
898 */
899 TAILQ_REMOVE(&tmpq, msg, qentry);
900 if (ioq->error) {
901 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
902 ++ioq->msgcount;
903 continue;
904 }
9ab15106 905
7dc0f844
MD
906 /*
907 * Finish populating the msg fields. The salt ensures that
908 * the iv[] array is ridiculously randomized and we also
909 * re-seed our PRNG every 32768 messages just to be sure.
910 */
911 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
912 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
913 ++ioq->seq;
914 if ((ioq->seq & 32767) == 0)
915 srandomdev();
9ab15106 916
7dc0f844
MD
917 /*
918 * Calculate aux_crc if 0, then calculate hdr_crc.
919 */
920 if (msg->aux_size && msg->any.head.aux_crc == 0) {
921 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
922 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
923 msg->any.head.aux_crc = xcrc32;
924 }
925 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
926 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
9ab15106 927
7dc0f844
MD
928 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
929 HAMMER2_MSG_ALIGN;
930 msg->any.head.hdr_crc = 0;
931 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
9ab15106 932
7dc0f844
MD
933 /*
934 * Enqueue the message (the flush codes handles stream
935 * encryption).
936 */
937 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
938 ++ioq->msgcount;
939 }
940 hammer2_iocom_flush2(iocom);
4a2e0eae
MD
941}
942
7dc0f844
MD
943/*
944 * Thread localized, iocom->mtx not held by caller.
945 */
4a2e0eae 946void
7dc0f844 947hammer2_iocom_flush2(hammer2_iocom_t *iocom)
4a2e0eae
MD
948{
949 hammer2_ioq_t *ioq = &iocom->ioq_tx;
950 hammer2_msg_t *msg;
951 ssize_t nmax;
952 ssize_t nact;
953 struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
78476205
MD
954 size_t hbytes;
955 size_t abytes;
4a2e0eae
MD
956 int hoff;
957 int aoff;
958 int n;
9ab15106 959
7dc0f844
MD
960 if (ioq->error) {
961 hammer2_iocom_drain(iocom);
962 return;
963 }
964
9ab15106
MD
965 /*
966 * Pump messages out the connection by building an iovec.
967 */
968 n = 0;
969 nmax = 0;
970
78476205 971 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
9ab15106
MD
972 hoff = 0;
973 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
974 HAMMER2_MSG_ALIGN;
975 aoff = 0;
976 abytes = msg->aux_size;
977 if (n == 0) {
978 hoff += ioq->hbytes;
979 aoff += ioq->abytes;
980 }
981 if (hbytes - hoff > 0) {
982 iov[n].iov_base = (char *)&msg->any.head + hoff;
983 iov[n].iov_len = hbytes - hoff;
984 nmax += hbytes - hoff;
985 ++n;
986 if (n == HAMMER2_IOQ_MAXIOVEC)
987 break;
988 }
989 if (abytes - aoff > 0) {
990 assert(msg->aux_data != NULL);
991 iov[n].iov_base = msg->aux_data + aoff;
992 iov[n].iov_len = abytes - aoff;
993 nmax += abytes - aoff;
994 ++n;
995 if (n == HAMMER2_IOQ_MAXIOVEC)
996 break;
997 }
998 }
999 if (n == 0)
1000 return;
1001
1002 /*
5cf97ec5
MD
1003 * Encrypt and write the data. The crypto code will move the
1004 * data into the fifo and adjust the iov as necessary. If
1005 * encryption is disabled the iov is left alone.
1006 *
1007 * hammer2_crypto_encrypt_wrote()
1008 */
1009 n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
1010
1011 /*
9ab15106
MD
1012 * Execute the writev() then figure out what happened.
1013 */
1014 nact = writev(iocom->sock_fd, iov, n);
1015 if (nact < 0) {
1016 if (errno != EINTR &&
1017 errno != EINPROGRESS &&
1018 errno != EAGAIN) {
7dc0f844
MD
1019 /*
1020 * Fatal write error
1021 */
9ab15106 1022 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
4a2e0eae 1023 hammer2_iocom_drain(iocom);
9ab15106 1024 } else {
7dc0f844
MD
1025 /*
1026 * Wait for socket buffer space
1027 */
9ab15106
MD
1028 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1029 }
1030 return;
1031 }
7dc0f844
MD
1032
1033 /*
1034 * Indicate bytes written successfully. If we were unable to
1035 * write the entire iov array then set WREQ to wait for more
1036 * socket buffer space.
1037 */
5cf97ec5 1038 hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
7dc0f844 1039 if (nact != nmax)
9ab15106
MD
1040 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1041
7dc0f844
MD
1042 /*
1043 * Clean out the transmit queue based on what we successfully
1044 * sent.
1045 */
9ab15106
MD
1046 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1047 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1048 HAMMER2_MSG_ALIGN;
1049 abytes = msg->aux_size;
1050
78476205 1051 if ((size_t)nact < hbytes - ioq->hbytes) {
9ab15106
MD
1052 ioq->hbytes += nact;
1053 break;
1054 }
1055 nact -= hbytes - ioq->hbytes;
1056 ioq->hbytes = hbytes;
78476205 1057 if ((size_t)nact < abytes - ioq->abytes) {
9ab15106
MD
1058 ioq->abytes += nact;
1059 break;
1060 }
1061 nact -= abytes - ioq->abytes;
1062
78476205 1063 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106
MD
1064 --ioq->msgcount;
1065 ioq->hbytes = 0;
1066 ioq->abytes = 0;
78476205
MD
1067
1068 hammer2_state_cleanuptx(iocom, msg);
9ab15106 1069 }
9ab15106 1070 if (ioq->error) {
7dc0f844 1071 hammer2_iocom_drain(iocom);
9ab15106
MD
1072 }
1073}
1074
1075/*
1076 * Kill pending msgs on ioq_tx and adjust the flags such that no more
1077 * write events will occur. We don't kill read msgs because we want
1078 * the caller to pull off our contrived terminal error msg to detect
1079 * the connection failure.
7dc0f844
MD
1080 *
1081 * Thread localized, iocom->mtx not held by caller.
9ab15106
MD
1082 */
1083void
4a2e0eae 1084hammer2_iocom_drain(hammer2_iocom_t *iocom)
9ab15106
MD
1085{
1086 hammer2_ioq_t *ioq = &iocom->ioq_tx;
1087 hammer2_msg_t *msg;
1088
7dc0f844
MD
1089 iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1090
9ab15106 1091 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
78476205 1092 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
9ab15106 1093 --ioq->msgcount;
7dc0f844 1094 hammer2_state_cleanuptx(iocom, msg);
9ab15106 1095 }
9ab15106
MD
1096}
1097
1098/*
8c280d5d 1099 * Write a message to an iocom, with additional state processing.
78476205
MD
1100 */
1101void
8c280d5d
MD
1102hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
1103 void (*func)(hammer2_state_t *, hammer2_msg_t *),
7dc0f844
MD
1104 void *data,
1105 hammer2_state_t **statep)
78476205 1106{
8c280d5d 1107 hammer2_state_t *state;
7dc0f844 1108 char dummy;
78476205 1109
8c280d5d
MD
1110 /*
1111 * Handle state processing, create state if necessary.
1112 */
7dc0f844 1113 pthread_mutex_lock(&iocom->mtx);
8c280d5d 1114 if ((state = msg->state) != NULL) {
78476205 1115 /*
8c280d5d
MD
1116 * Existing transaction (could be reply). It is also
1117 * possible for this to be the first reply (CREATE is set),
1118 * in which case we populate state->txcmd.
78476205 1119 */
8c280d5d
MD
1120 msg->any.head.msgid = state->msgid;
1121 msg->any.head.spanid = state->spanid;
1122 if (func) {
1123 state->func = func;
1124 state->any.any = data;
78476205 1125 }
7dc0f844
MD
1126 assert(((state->txcmd ^ msg->any.head.cmd) &
1127 HAMMER2_MSGF_REPLY) == 0);
8c280d5d
MD
1128 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1129 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
8c280d5d 1130 } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
8c280d5d
MD
1131 /*
1132 * No existing state and CREATE is set, create new
1133 * state for outgoing command. This can't happen if
1134 * REPLY is set as the state would already exist for
1135 * a transaction reply.
1136 */
1137 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1138
1139 state = malloc(sizeof(*state));
1140 bzero(state, sizeof(*state));
1141 state->iocom = iocom;
1142 state->flags = HAMMER2_STATE_DYNAMIC;
1143 state->msg = msg;
1144 state->msgid = (uint64_t)(uintptr_t)state;
1145 state->spanid = msg->any.head.spanid;
1146 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
7dc0f844 1147 state->rxcmd = HAMMER2_MSGF_REPLY;
8c280d5d
MD
1148 state->func = func;
1149 state->any.any = data;
1150 RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
1151 state->flags |= HAMMER2_STATE_INSERTED;
1152 msg->state = state;
1153 msg->any.head.msgid = state->msgid;
1154 /* spanid set by caller */
78476205 1155 } else {
8c280d5d
MD
1156 msg->any.head.msgid = 0;
1157 /* spanid set by caller */
1158 }
1159
7dc0f844
MD
1160 if (statep)
1161 *statep = state;
1162
8c280d5d 1163 /*
7dc0f844
MD
1164 * Queue it for output, wake up the I/O pthread. Note that the
1165 * I/O thread is responsible for generating the CRCs and encryption.
8c280d5d 1166 */
7dc0f844
MD
1167 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1168 dummy = 0;
1169 write(iocom->wakeupfds[1], &dummy, 1); /* XXX optimize me */
1170 pthread_mutex_unlock(&iocom->mtx);
8c280d5d
MD
1171}
1172
8c280d5d
MD
1173/*
1174 * This is a shortcut to formulate a reply to msg with a simple error code,
1175 * It can reply to and terminate a transaction, or it can reply to a one-way
1176 * messages. A HAMMER2_LNK_ERROR command code is utilized to encode
1177 * the error code (which can be 0). Not all transactions are terminated
1178 * with HAMMER2_LNK_ERROR status (the low level only cares about the
1179 * MSGF_DELETE flag), but most are.
1180 *
1181 * Replies to one-way messages are a bit of an oxymoron but the feature
1182 * is used by the debug (DBG) protocol.
1183 *
1184 * The reply contains no extended data.
1185 */
1186void
1187hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1188{
1189 hammer2_state_t *state = msg->state;
1190 hammer2_msg_t *nmsg;
1191 uint32_t cmd;
1192
1193
1194 /*
1195 * Reply with a simple error code and terminate the transaction.
1196 */
1197 cmd = HAMMER2_LNK_ERROR;
1198
1199 /*
1200 * Check if our direction has even been initiated yet, set CREATE.
1201 *
1202 * Check what direction this is (command or reply direction). Note
1203 * that txcmd might not have been initiated yet.
1204 *
1205 * If our direction has already been closed we just return without
1206 * doing anything.
1207 */
1208 if (state) {
1209 if (state->txcmd & HAMMER2_MSGF_DELETE)
1210 return;
1211 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1212 cmd |= HAMMER2_MSGF_CREATE;
7dc0f844 1213 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
1214 cmd |= HAMMER2_MSGF_REPLY;
1215 cmd |= HAMMER2_MSGF_DELETE;
1216 } else {
1217 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1218 cmd |= HAMMER2_MSGF_REPLY;
78476205 1219 }
8c280d5d 1220
78476205
MD
1221 nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1222 nmsg->any.head.error = error;
8c280d5d 1223 nmsg->state = msg->state;
7dc0f844 1224 hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
8c280d5d
MD
1225}
1226
1227/*
1228 * Similar to hammer2_msg_reply() but leave the transaction open. That is,
1229 * we are generating a streaming reply or an intermediate acknowledgement
1230 * of some sort as part of the higher level protocol, with more to come
1231 * later.
1232 */
1233void
1234hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1235{
1236 hammer2_state_t *state = msg->state;
1237 hammer2_msg_t *nmsg;
1238 uint32_t cmd;
1239
1240
1241 /*
1242 * Reply with a simple error code and terminate the transaction.
1243 */
1244 cmd = HAMMER2_LNK_ERROR;
1245
1246 /*
1247 * Check if our direction has even been initiated yet, set CREATE.
1248 *
1249 * Check what direction this is (command or reply direction). Note
1250 * that txcmd might not have been initiated yet.
1251 *
1252 * If our direction has already been closed we just return without
1253 * doing anything.
1254 */
1255 if (state) {
1256 if (state->txcmd & HAMMER2_MSGF_DELETE)
1257 return;
1258 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1259 cmd |= HAMMER2_MSGF_CREATE;
7dc0f844 1260 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
1261 cmd |= HAMMER2_MSGF_REPLY;
1262 /* continuing transaction, do not set MSGF_DELETE */
1263 } else {
1264 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1265 cmd |= HAMMER2_MSGF_REPLY;
1266 }
1267
1268 nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1269 nmsg->any.head.error = error;
1270 nmsg->state = state;
7dc0f844 1271 hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
8c280d5d
MD
1272}
1273
1274/*
1275 * Terminate a transaction given a state structure by issuing a DELETE.
1276 */
1277void
1278hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1279{
1280 hammer2_msg_t *nmsg;
1281 uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1282
1283 /*
1284 * Nothing to do if we already transmitted a delete
1285 */
1286 if (state->txcmd & HAMMER2_MSGF_DELETE)
1287 return;
1288
1289 /*
1290 * We must also set CREATE if this is our first response to a
1291 * remote command.
1292 */
1293 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1294 cmd |= HAMMER2_MSGF_CREATE;
1295
1296 /*
1297 * Set REPLY if the other end initiated the command. Otherwise
1298 * we are the command direction.
1299 */
7dc0f844 1300 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
1301 cmd |= HAMMER2_MSGF_REPLY;
1302
1303 nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
1304 nmsg->any.head.error = error;
1305 nmsg->state = state;
7dc0f844 1306 hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
78476205
MD
1307}
1308
1309/************************************************************************
1310 * TRANSACTION STATE HANDLING *
1311 ************************************************************************
1312 *
1313 */
1314
1315RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
1316
1317/*
1318 * Process state tracking for a message after reception, prior to
1319 * execution.
1320 *
1321 * Called with msglk held and the msg dequeued.
1322 *
1323 * All messages are called with dummy state and return actual state.
1324 * (One-off messages often just return the same dummy state).
1325 *
1326 * May request that caller discard the message by setting *discardp to 1.
1327 * The returned state is not used in this case and is allowed to be NULL.
1328 *
1329 * --
1330 *
1331 * These routines handle persistent and command/reply message state via the
1332 * CREATE and DELETE flags. The first message in a command or reply sequence
1333 * sets CREATE, the last message in a command or reply sequence sets DELETE.
4a2e0eae 1334 *
78476205
MD
1335 * There can be any number of intermediate messages belonging to the same
1336 * sequence sent inbetween the CREATE message and the DELETE message,
1337 * which set neither flag. This represents a streaming command or reply.
1338 *
1339 * Any command message received with CREATE set expects a reply sequence to
1340 * be returned. Reply sequences work the same as command sequences except the
1341 * REPLY bit is also sent. Both the command side and reply side can
1342 * degenerate into a single message with both CREATE and DELETE set. Note
1343 * that one side can be streaming and the other side not, or neither, or both.
1344 *
1345 * The msgid is unique for the initiator. That is, two sides sending a new
1346 * message can use the same msgid without colliding.
1347 *
1348 * --
1349 *
1350 * ABORT sequences work by setting the ABORT flag along with normal message
1351 * state. However, ABORTs can also be sent on half-closed messages, that is
1352 * even if the command or reply side has already sent a DELETE, as long as
1353 * the message has not been fully closed it can still send an ABORT+DELETE
1354 * to terminate the half-closed message state.
1355 *
1356 * Since ABORT+DELETEs can race we silently discard ABORT's for message
1357 * state which has already been fully closed. REPLY+ABORT+DELETEs can
1358 * also race, and in this situation the other side might have already
1359 * initiated a new unrelated command with the same message id. Since
1360 * the abort has not set the CREATE flag the situation can be detected
1361 * and the message will also be discarded.
1362 *
1363 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1364 * The ABORT request is essentially integrated into the command instead
1365 * of being sent later on. In this situation the command implementation
1366 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1367 * special-case non-blocking operation for the command.
1368 *
1369 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1370 * to be mid-stream aborts for command/reply sequences. ABORTs on
1371 * one-way messages are not supported.
1372 *
1373 * NOTE! If a command sequence does not support aborts the ABORT flag is
1374 * simply ignored.
1375 *
1376 * --
1377 *
1378 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1379 * set. One-off messages cannot be aborted and typically aren't processed
1380 * by these routines. The REPLY bit can be used to distinguish whether a
1381 * one-off message is a command or reply. For example, one-off replies
1382 * will typically just contain status updates.
9ab15106 1383 */
78476205
MD
1384static int
1385hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1386{
1387 hammer2_state_t *state;
1388 hammer2_state_t dummy;
1389 int error;
1390
1391 /*
1392 * Lock RB tree and locate existing persistent state, if any.
1393 *
1394 * If received msg is a command state is on staterd_tree.
1395 * If received msg is a reply state is on statewr_tree.
1396 */
78476205
MD
1397
1398 dummy.msgid = msg->any.head.msgid;
8c280d5d
MD
1399 dummy.spanid = msg->any.head.spanid;
1400#if 0
78476205 1401 iocom_printf(iocom, msg->any.head.cmd,
8c280d5d
MD
1402 "received msg %08x msgid %jx spanid=%jx\n",
1403 msg->any.head.cmd,
1404 (intmax_t)msg->any.head.msgid,
1405 (intmax_t)msg->any.head.spanid);
1406#endif
7dc0f844 1407 pthread_mutex_lock(&iocom->mtx);
78476205
MD
1408 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1409 state = RB_FIND(hammer2_state_tree,
1410 &iocom->statewr_tree, &dummy);
1411 } else {
1412 state = RB_FIND(hammer2_state_tree,
1413 &iocom->staterd_tree, &dummy);
1414 }
1415 msg->state = state;
7dc0f844 1416 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1417
1418 /*
1419 * Short-cut one-off or mid-stream messages (state may be NULL).
1420 */
1421 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1422 HAMMER2_MSGF_ABORT)) == 0) {
78476205
MD
1423 return(0);
1424 }
1425
1426 /*
1427 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1428 * inside the case statements.
1429 */
1430 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1431 HAMMER2_MSGF_REPLY)) {
1432 case HAMMER2_MSGF_CREATE:
1433 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1434 /*
1435 * New persistant command received.
1436 */
1437 if (state) {
1438 iocom_printf(iocom, msg->any.head.cmd,
1439 "hammer2_state_msgrx: "
1440 "duplicate transaction\n");
1441 error = HAMMER2_IOQ_ERROR_TRANS;
1442 break;
1443 }
1444 state = malloc(sizeof(*state));
1445 bzero(state, sizeof(*state));
1446 state->iocom = iocom;
1447 state->flags = HAMMER2_STATE_DYNAMIC;
1448 state->msg = msg;
7dc0f844 1449 state->txcmd = HAMMER2_MSGF_REPLY;
78476205 1450 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
7dc0f844 1451 pthread_mutex_lock(&iocom->mtx);
78476205 1452 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
7dc0f844 1453 pthread_mutex_unlock(&iocom->mtx);
78476205 1454 state->flags |= HAMMER2_STATE_INSERTED;
8c280d5d
MD
1455 state->msgid = msg->any.head.msgid;
1456 state->spanid = msg->any.head.spanid;
78476205
MD
1457 msg->state = state;
1458 error = 0;
1459 break;
1460 case HAMMER2_MSGF_DELETE:
1461 /*
1462 * Persistent state is expected but might not exist if an
1463 * ABORT+DELETE races the close.
1464 */
1465 if (state == NULL) {
1466 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1467 error = HAMMER2_IOQ_ERROR_EALREADY;
1468 } else {
1469 iocom_printf(iocom, msg->any.head.cmd,
1470 "hammer2_state_msgrx: "
1471 "no state for DELETE\n");
1472 error = HAMMER2_IOQ_ERROR_TRANS;
1473 }
1474 break;
1475 }
1476
1477 /*
1478 * Handle another ABORT+DELETE case if the msgid has already
1479 * been reused.
1480 */
1481 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1482 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1483 error = HAMMER2_IOQ_ERROR_EALREADY;
1484 } else {
1485 iocom_printf(iocom, msg->any.head.cmd,
1486 "hammer2_state_msgrx: "
1487 "state reused for DELETE\n");
1488 error = HAMMER2_IOQ_ERROR_TRANS;
1489 }
1490 break;
1491 }
1492 error = 0;
1493 break;
1494 default:
1495 /*
1496 * Check for mid-stream ABORT command received, otherwise
1497 * allow.
1498 */
1499 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1500 if (state == NULL ||
1501 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1502 error = HAMMER2_IOQ_ERROR_EALREADY;
1503 break;
1504 }
1505 }
1506 error = 0;
1507 break;
1508 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1509 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1510 /*
1511 * When receiving a reply with CREATE set the original
1512 * persistent state message should already exist.
1513 */
1514 if (state == NULL) {
1515 iocom_printf(iocom, msg->any.head.cmd,
1516 "hammer2_state_msgrx: "
1517 "no state match for REPLY cmd=%08x\n",
1518 msg->any.head.cmd);
1519 error = HAMMER2_IOQ_ERROR_TRANS;
1520 break;
1521 }
7dc0f844
MD
1522 assert(((state->rxcmd ^ msg->any.head.cmd) &
1523 HAMMER2_MSGF_REPLY) == 0);
78476205
MD
1524 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1525 error = 0;
1526 break;
1527 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1528 /*
1529 * Received REPLY+ABORT+DELETE in case where msgid has
1530 * already been fully closed, ignore the message.
1531 */
1532 if (state == NULL) {
1533 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1534 error = HAMMER2_IOQ_ERROR_EALREADY;
1535 } else {
1536 iocom_printf(iocom, msg->any.head.cmd,
1537 "hammer2_state_msgrx: "
1538 "no state match for "
1539 "REPLY|DELETE\n");
1540 error = HAMMER2_IOQ_ERROR_TRANS;
1541 }
1542 break;
1543 }
1544
1545 /*
1546 * Received REPLY+ABORT+DELETE in case where msgid has
1547 * already been reused for an unrelated message,
1548 * ignore the message.
1549 */
1550 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1551 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1552 error = HAMMER2_IOQ_ERROR_EALREADY;
1553 } else {
1554 iocom_printf(iocom, msg->any.head.cmd,
1555 "hammer2_state_msgrx: "
1556 "state reused for REPLY|DELETE\n");
1557 error = HAMMER2_IOQ_ERROR_TRANS;
1558 }
1559 break;
1560 }
1561 error = 0;
1562 break;
1563 case HAMMER2_MSGF_REPLY:
1564 /*
1565 * Check for mid-stream ABORT reply received to sent command.
1566 */
1567 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1568 if (state == NULL ||
1569 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1570 error = HAMMER2_IOQ_ERROR_EALREADY;
1571 break;
1572 }
1573 }
1574 error = 0;
1575 break;
1576 }
78476205
MD
1577 return (error);
1578}
1579
9ab15106 1580void
78476205 1581hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
9ab15106 1582{
78476205
MD
1583 hammer2_state_t *state;
1584
1585 if ((state = msg->state) == NULL) {
1b195a98
MD
1586 /*
1587 * Free a non-transactional message, there is no state
1588 * to worry about.
1589 */
78476205
MD
1590 hammer2_msg_free(iocom, msg);
1591 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1b195a98
MD
1592 /*
1593 * Message terminating transaction, destroy the related
1594 * state, the original message, and this message (if it
1595 * isn't the original message due to a CREATE|DELETE).
1596 */
7dc0f844 1597 pthread_mutex_lock(&iocom->mtx);
78476205
MD
1598 state->rxcmd |= HAMMER2_MSGF_DELETE;
1599 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1600 if (state->msg == msg)
1601 state->msg = NULL;
1602 assert(state->flags & HAMMER2_STATE_INSERTED);
7dc0f844
MD
1603 if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1604 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
78476205
MD
1605 RB_REMOVE(hammer2_state_tree,
1606 &iocom->statewr_tree, state);
1607 } else {
7dc0f844 1608 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
78476205
MD
1609 RB_REMOVE(hammer2_state_tree,
1610 &iocom->staterd_tree, state);
1611 }
1612 state->flags &= ~HAMMER2_STATE_INSERTED;
78476205
MD
1613 hammer2_state_free(state);
1614 } else {
7dc0f844 1615 ;
78476205 1616 }
7dc0f844 1617 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1618 hammer2_msg_free(iocom, msg);
1619 } else if (state->msg != msg) {
1b195a98
MD
1620 /*
1621 * Message not terminating transaction, leave state intact
1622 * and free message if it isn't the CREATE message.
1623 */
78476205 1624 hammer2_msg_free(iocom, msg);
9ab15106 1625 }
78476205
MD
1626}
1627
78476205
MD
1628static void
1629hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1630{
1631 hammer2_state_t *state;
1632
1633 if ((state = msg->state) == NULL) {
1634 hammer2_msg_free(iocom, msg);
1635 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
7dc0f844 1636 pthread_mutex_lock(&iocom->mtx);
78476205
MD
1637 state->txcmd |= HAMMER2_MSGF_DELETE;
1638 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1639 if (state->msg == msg)
1640 state->msg = NULL;
1641 assert(state->flags & HAMMER2_STATE_INSERTED);
7dc0f844
MD
1642 if (state->txcmd & HAMMER2_MSGF_REPLY) {
1643 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
78476205
MD
1644 RB_REMOVE(hammer2_state_tree,
1645 &iocom->staterd_tree, state);
1646 } else {
7dc0f844 1647 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
78476205
MD
1648 RB_REMOVE(hammer2_state_tree,
1649 &iocom->statewr_tree, state);
1650 }
1651 state->flags &= ~HAMMER2_STATE_INSERTED;
78476205
MD
1652 hammer2_state_free(state);
1653 } else {
7dc0f844 1654 ;
78476205 1655 }
7dc0f844 1656 pthread_mutex_unlock(&iocom->mtx);
78476205
MD
1657 hammer2_msg_free(iocom, msg);
1658 } else if (state->msg != msg) {
1659 hammer2_msg_free(iocom, msg);
1660 }
1661}
1662
7dc0f844
MD
1663/*
1664 * Called with iocom locked
1665 */
78476205
MD
1666void
1667hammer2_state_free(hammer2_state_t *state)
1668{
1669 hammer2_iocom_t *iocom = state->iocom;
1670 hammer2_msg_t *msg;
7dc0f844
MD
1671 char dummy;
1672
1673 fprintf(stderr, "STATE FREE %p\n", state);
78476205 1674
7dc0f844 1675 assert(state->any.any == NULL);
78476205
MD
1676 msg = state->msg;
1677 state->msg = NULL;
78476205 1678 if (msg)
7dc0f844 1679 hammer2_msg_free_locked(iocom, msg);
78476205 1680 free(state);
7dc0f844
MD
1681
1682 /*
1683 * When an iocom error is present we are trying to close down the
1684 * iocom, but we have to wait for all states to terminate before
1685 * we can do so. The iocom rx code will terminate the receive side
1686 * for all transactions by simulating incoming DELETE messages,
1687 * but the state doesn't go away until both sides are terminated.
1688 *
1689 * We may have to wake up the rx code.
1690 */
1691 if (iocom->ioq_rx.error &&
1692 RB_EMPTY(&iocom->staterd_tree) &&
1693 RB_EMPTY(&iocom->statewr_tree)) {
1694 dummy = 0;
1695 write(iocom->wakeupfds[1], &dummy, 1);
1696 }
78476205
MD
1697}
1698
1699/*
1700 * Indexed messages are stored in a red-black tree indexed by their
1701 * msgid. Only persistent messages are indexed.
1702 */
1703int
1704hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
1705{
8c280d5d 1706 if (state1->spanid < state2->spanid)
78476205 1707 return(-1);
8c280d5d 1708 if (state1->spanid > state2->spanid)
78476205
MD
1709 return(1);
1710 if (state1->msgid < state2->msgid)
1711 return(-1);
1712 if (state1->msgid > state2->msgid)
1713 return(1);
1714 return(0);
9ab15106 1715}