2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
43 * Initialize a low-level ioq
46 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
48 bzero(ioq, sizeof(*ioq));
49 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
50 TAILQ_INIT(&ioq->msgq);
54 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
58 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
59 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
60 hammer2_msg_free(iocom, msg);
62 if ((msg = ioq->msg) != NULL) {
64 hammer2_msg_free(iocom, msg);
69 * Initialize a low-level communications channel
72 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
74 bzero(iocom, sizeof(*iocom));
76 RB_INIT(&iocom->staterd_tree);
77 RB_INIT(&iocom->statewr_tree);
78 TAILQ_INIT(&iocom->freeq);
79 TAILQ_INIT(&iocom->freeq_aux);
80 iocom->sock_fd = sock_fd;
81 iocom->alt_fd = alt_fd;
82 iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
83 hammer2_ioq_init(iocom, &iocom->ioq_rx);
84 hammer2_ioq_init(iocom, &iocom->ioq_tx);
87 * Negotiate session crypto synchronously. This will mark the
88 * connection as error'd if it fails.
90 hammer2_crypto_negotiate(iocom);
93 * Make sure our fds are set to non-blocking for the iocom core.
96 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
98 /* if line buffered our single fgets() should be fine */
100 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
105 hammer2_iocom_done(hammer2_iocom_t *iocom)
110 hammer2_ioq_done(iocom, &iocom->ioq_rx);
111 hammer2_ioq_done(iocom, &iocom->ioq_tx);
112 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
113 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
116 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
117 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
119 msg->aux_data = NULL;
125 * Allocate a new one-way message.
128 hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
134 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
135 ~HAMMER2_MSG_ALIGNMASK;
136 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
137 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
139 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
140 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
143 msg = malloc(sizeof(*msg));
144 bzero(msg, sizeof(*msg));
145 msg->aux_data = NULL;
148 if (msg->aux_size != aux_size) {
151 msg->aux_data = NULL;
155 msg->aux_data = malloc(aux_size);
156 msg->aux_size = aux_size;
159 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
161 bzero(&msg->any.head, hbytes);
162 msg->hdr_size = hbytes;
163 msg->any.head.aux_icrc = 0;
164 msg->any.head.cmd = cmd;
170 * Free a message so it can be reused afresh.
172 * NOTE: aux_size can be 0 with a non-NULL aux_data.
175 hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
178 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
180 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
184 * I/O core loop for an iocom.
187 hammer2_iocom_core(hammer2_iocom_t *iocom,
188 void (*recvmsg_func)(hammer2_iocom_t *),
189 void (*sendmsg_func)(hammer2_iocom_t *),
190 void (*altmsg_func)(hammer2_iocom_t *))
192 struct pollfd fds[2];
195 iocom->recvmsg_callback = recvmsg_func;
196 iocom->sendmsg_callback = sendmsg_func;
197 iocom->altmsg_callback = altmsg_func;
199 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
202 fds[0].fd = iocom->sock_fd;
206 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
207 fds[0].events |= POLLIN;
210 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
211 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
212 fds[0].events |= POLLOUT;
217 if (iocom->alt_fd >= 0) {
218 fds[1].fd = iocom->alt_fd;
219 fds[1].events |= POLLIN;
221 poll(fds, 2, timeout);
223 poll(fds, 1, timeout);
225 if ((fds[0].revents & POLLIN) ||
226 (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
227 iocom->recvmsg_callback(iocom);
229 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
230 if ((fds[0].revents & POLLOUT) ||
231 (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
232 iocom->sendmsg_callback(iocom);
235 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
236 iocom->altmsg_callback(iocom);
241 * Read the next ready message from the ioq, issuing I/O if needed.
242 * Caller should retry on a read-event when NULL is returned.
244 * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
245 * be returned (and the caller must not call us again after that).
248 hammer2_ioq_read(hammer2_iocom_t *iocom)
250 hammer2_ioq_t *ioq = &iocom->ioq_rx;
252 hammer2_msg_hdr_t *head;
262 * If a message is already pending we can just remove and
263 * return it. Message state has already been processed.
265 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
266 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
271 * Message read in-progress (msg is NULL at the moment). We don't
272 * allocate a msg until we have its core header.
274 bytes = ioq->fifo_end - ioq->fifo_beg;
275 nmax = sizeof(ioq->buf) - ioq->fifo_end;
279 case HAMMER2_MSGQ_STATE_HEADER1:
281 * Load the primary header, fail on any non-trivial read
282 * error or on EOF. Since the primary header is the same
283 * size is the message alignment it will never straddle
284 * the end of the buffer.
286 if (bytes < (int)sizeof(msg->any.head)) {
287 n = read(iocom->sock_fd,
288 ioq->buf + ioq->fifo_end,
292 ioq->error = HAMMER2_IOQ_ERROR_EOF;
295 if (errno != EINTR &&
296 errno != EINPROGRESS &&
298 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
310 * Insufficient data accumulated (msg is NULL, caller will
314 if (bytes < (int)sizeof(msg->any.head))
318 * Calculate the header, decrypt data received so far.
319 * Data will be decrypted in-place. Partial blocks are
320 * not immediately decrypted.
322 hammer2_crypto_decrypt(iocom, ioq);
323 head = (void *)(ioq->buf + ioq->fifo_beg);
326 * Check and fixup the core header. Note that the icrc
327 * has to be calculated before any fixups, but the crc
328 * fields in the msg may have to be swapped like everything
331 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
332 head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
333 ioq->error = HAMMER2_IOQ_ERROR_SYNC;
337 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
338 HAMMER2_MSGHDR_CRCBYTES);
339 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
340 hammer2_bswap_head(head);
342 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
343 if (xcrc16 != head->icrc1) {
344 ioq->error = HAMMER2_IOQ_ERROR_HCRC;
349 * Calculate the full header size and aux data size
351 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
353 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
354 if (ioq->hbytes < sizeof(msg->any.head) ||
355 ioq->hbytes > sizeof(msg->any) ||
356 ioq->abytes > HAMMER2_MSGAUX_MAX) {
357 ioq->error = HAMMER2_IOQ_ERROR_FIELD;
362 * Finally allocate the message and copy the core header
363 * to the embedded extended header.
365 * Initialize msg->aux_size to 0 and use it to track
366 * the amount of data copied from the stream.
368 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
372 * We are either done or we fall-through
374 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
375 bcopy(head, &msg->any.head, sizeof(msg->any.head));
376 ioq->fifo_beg += ioq->hbytes;
381 * Fall through to the next state. Make sure that the
382 * extended header does not straddle the end of the buffer.
383 * We still want to issue larger reads into our buffer,
384 * book-keeping is easier if we don't bcopy() yet.
386 if (bytes + nmax < ioq->hbytes) {
387 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
388 ioq->fifo_cdx -= ioq->fifo_beg;
390 ioq->fifo_end = bytes;
391 nmax = sizeof(ioq->buf) - ioq->fifo_end;
393 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
395 case HAMMER2_MSGQ_STATE_HEADER2:
397 * Fill out the extended header.
400 if (bytes < ioq->hbytes) {
401 n = read(iocom->sock_fd,
402 msg->any.buf + ioq->fifo_end,
406 ioq->error = HAMMER2_IOQ_ERROR_EOF;
409 if (errno != EINTR &&
410 errno != EINPROGRESS &&
412 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
424 * Insufficient data accumulated (set msg NULL so caller will
427 if (bytes < ioq->hbytes) {
433 * Calculate the extended header, decrypt data received
436 hammer2_crypto_decrypt(iocom, ioq);
437 head = (void *)(ioq->buf + ioq->fifo_beg);
440 * Check the crc on the extended header
442 if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) {
443 xcrc32 = hammer2_icrc32(head + 1,
444 ioq->hbytes - sizeof(*head));
445 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
446 if (head->icrc2 != xcrc16) {
447 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
453 * Copy the extended header into the msg and adjust the
456 bcopy(head, &msg->any, ioq->hbytes);
459 * We are either done or we fall-through.
461 if (ioq->abytes == 0) {
462 ioq->fifo_beg += ioq->hbytes;
467 * Must adjust nmax and bytes (and the state) when falling
470 ioq->fifo_beg += ioq->hbytes;
472 bytes -= ioq->hbytes;
473 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
475 case HAMMER2_MSGQ_STATE_AUXDATA1:
477 * Copy the partial or complete payload from remaining
478 * bytes in the FIFO. We have to fall-through either
479 * way so we can check the crc.
481 * Adjust msg->aux_size to the final actual value.
483 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
484 if (ioq->already > ioq->abytes)
485 ioq->already = ioq->abytes;
486 if (bytes >= ioq->abytes) {
487 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
489 msg->aux_size = ioq->abytes;
490 ioq->fifo_beg += ioq->abytes;
491 if (ioq->fifo_cdx < ioq->fifo_beg)
492 ioq->fifo_cdx = ioq->fifo_beg;
493 bytes -= ioq->abytes;
495 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
497 msg->aux_size = bytes;
498 ioq->fifo_beg += bytes;
499 if (ioq->fifo_cdx < ioq->fifo_beg)
500 ioq->fifo_cdx = ioq->fifo_beg;
505 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
507 case HAMMER2_MSGQ_STATE_AUXDATA2:
509 * Read the remainder of the payload directly into the
510 * msg->aux_data buffer.
513 if (msg->aux_size < ioq->abytes) {
515 n = read(iocom->sock_fd,
516 msg->aux_data + msg->aux_size,
517 ioq->abytes - msg->aux_size);
520 ioq->error = HAMMER2_IOQ_ERROR_EOF;
523 if (errno != EINTR &&
524 errno != EINPROGRESS &&
526 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
536 * Insufficient data accumulated (set msg NULL so caller will
539 if (msg->aux_size < ioq->abytes) {
543 assert(msg->aux_size == ioq->abytes);
544 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
547 * Check aux_icrc, then we are done.
549 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
550 if (xcrc32 != msg->any.head.aux_icrc) {
551 ioq->error = HAMMER2_IOQ_ERROR_ACRC;
555 case HAMMER2_MSGQ_STATE_ERROR:
558 * We don't double-return errors, the caller should not
559 * have called us again after getting an error msg.
566 * Check the message sequence. The iv[] should prevent any
567 * possibility of a replay but we add this check anyway.
569 if (msg && ioq->error == 0) {
570 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
571 ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
578 * Process transactional state for the message.
580 if (msg && ioq->error == 0) {
581 error = hammer2_state_msgrx(iocom, msg);
583 if (error == HAMMER2_IOQ_ERROR_EALREADY) {
584 hammer2_msg_free(iocom, msg);
592 * Handle error, RREQ, or completion
594 * NOTE: nmax and bytes are invalid at this point, we don't bother
595 * to update them when breaking out.
599 * An unrecoverable error occured during processing,
600 * return a special error message. Try to leave the
601 * ioq state alone for post-mortem debugging.
603 * Link error messages are returned as one-way messages,
604 * so no flags get set. Source and target is 0 (link-level),
605 * msgid is 0 (link-level). All we really need to do is
606 * set up magic, cmd, and error.
608 assert(ioq->msg == msg);
610 msg = hammer2_msg_alloc(iocom, 0, 0);
616 msg->aux_data = NULL;
619 bzero(&msg->any.head, sizeof(msg->any.head));
620 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
621 msg->any.head.cmd = HAMMER2_LNK_ERROR;
622 msg->any.head.error = ioq->error;
623 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
624 iocom->flags |= HAMMER2_IOCOMF_EOF;
625 } else if (msg == NULL) {
627 * Insufficient data received to finish building the message,
628 * set RREQ and return NULL.
630 * Leave ioq->msg intact.
631 * Leave the FIFO intact.
633 iocom->flags |= HAMMER2_IOCOMF_RREQ;
641 * Return msg, clear the FIFO if it is now empty.
642 * Flag RREQ if the caller needs to wait for a read-event
645 * The fifo has already been advanced past the message.
646 * Trivially reset the FIFO indices if possible.
648 if (ioq->fifo_beg == ioq->fifo_end) {
649 iocom->flags |= HAMMER2_IOCOMF_RREQ;
654 iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
656 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
663 * Calculate the header and data crc's and write a low-level message to
664 * the connection. If aux_icrc is non-zero the aux_data crc is already
665 * assumed to have been set.
667 * A non-NULL msg is added to the queue but not necessarily flushed.
668 * Calling this function with msg == NULL will get a flush going.
671 hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
673 hammer2_ioq_t *ioq = &iocom->ioq_tx;
682 * Process transactional state.
684 if (ioq->error == 0) {
685 error = hammer2_state_msgtx(iocom, msg);
687 if (error == HAMMER2_IOQ_ERROR_EALREADY) {
688 hammer2_msg_free(iocom, msg);
696 * Process terminal connection errors.
699 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
701 hammer2_iocom_drain(iocom);
706 * Finish populating the msg fields. The salt ensures that the iv[]
707 * array is ridiculously randomized and we also re-seed our PRNG
708 * every 32768 messages just to be sure.
710 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
711 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
713 if ((ioq->seq & 32767) == 0)
717 * Calculate aux_icrc if 0, calculate icrc2, and finally
720 if (msg->aux_size && msg->any.head.aux_icrc == 0) {
721 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
722 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
723 msg->any.head.aux_icrc = xcrc32;
725 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
726 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
728 if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
729 sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
730 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
732 hbytes -= sizeof(msg->any.head);
733 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
734 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
735 msg->any.head.icrc2 = xcrc16;
737 msg->any.head.icrc2 = 0;
739 xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
740 HAMMER2_MSGHDR_CRCBYTES);
741 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
742 msg->any.head.icrc1 = xcrc16;
745 * XXX Encrypt the message
749 * Enqueue the message.
751 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
753 iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
756 * Flush if we know we can write (WREQ not set) and if
757 * sufficient messages have accumulated. Otherwise hold
758 * off to avoid piecemeal system calls.
760 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
762 if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
764 hammer2_iocom_flush(iocom);
768 hammer2_iocom_flush(hammer2_iocom_t *iocom)
770 hammer2_ioq_t *ioq = &iocom->ioq_tx;
774 struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
782 * Pump messages out the connection by building an iovec.
787 TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
789 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
792 abytes = msg->aux_size;
797 if (hbytes - hoff > 0) {
798 iov[n].iov_base = (char *)&msg->any.head + hoff;
799 iov[n].iov_len = hbytes - hoff;
800 nmax += hbytes - hoff;
802 if (n == HAMMER2_IOQ_MAXIOVEC)
805 if (abytes - aoff > 0) {
806 assert(msg->aux_data != NULL);
807 iov[n].iov_base = msg->aux_data + aoff;
808 iov[n].iov_len = abytes - aoff;
809 nmax += abytes - aoff;
811 if (n == HAMMER2_IOQ_MAXIOVEC)
819 * Encrypt and write the data. The crypto code will move the
820 * data into the fifo and adjust the iov as necessary. If
821 * encryption is disabled the iov is left alone.
823 * hammer2_crypto_encrypt_wrote()
825 n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
828 * Execute the writev() then figure out what happened.
830 nact = writev(iocom->sock_fd, iov, n);
832 if (errno != EINTR &&
833 errno != EINPROGRESS &&
835 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
836 hammer2_iocom_drain(iocom);
838 iocom->flags |= HAMMER2_IOCOMF_WREQ;
842 hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
844 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
846 iocom->flags |= HAMMER2_IOCOMF_WREQ;
848 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
849 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
851 abytes = msg->aux_size;
853 if ((size_t)nact < hbytes - ioq->hbytes) {
857 nact -= hbytes - ioq->hbytes;
858 ioq->hbytes = hbytes;
859 if ((size_t)nact < abytes - ioq->abytes) {
863 nact -= abytes - ioq->abytes;
865 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
870 hammer2_state_cleanuptx(iocom, msg);
873 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
874 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
877 iocom->flags |= HAMMER2_IOCOMF_EOF |
878 HAMMER2_IOCOMF_WIDLE;
879 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
884 * Kill pending msgs on ioq_tx and adjust the flags such that no more
885 * write events will occur. We don't kill read msgs because we want
886 * the caller to pull off our contrived terminal error msg to detect
887 * the connection failure.
890 hammer2_iocom_drain(hammer2_iocom_t *iocom)
892 hammer2_ioq_t *ioq = &iocom->ioq_tx;
895 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
896 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
898 hammer2_msg_free(iocom, msg);
900 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
901 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
905 * This is a shortcut to formulate a reply to msg with a simple error code.
906 * It can reply to transaction or one-way messages, or terminate one side
907 * of a stream. A HAMMER2_LNK_ERROR command code is utilized to encode
908 * the error code (which can be 0).
910 * Replies to one-way messages are a bit of an oxymoron but the feature
911 * is used by the debug (DBG) protocol.
913 * The reply contains no data.
916 hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error)
921 cmd = HAMMER2_LNK_ERROR;
922 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
924 * Reply to received reply, reply direction uses txcmd.
925 * txcmd will be updated by hammer2_ioq_write().
928 if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
929 cmd |= HAMMER2_MSGF_CREATE;
930 cmd |= HAMMER2_MSGF_DELETE;
934 * Reply to received command, reply direction uses rxcmd.
935 * txcmd will be updated by hammer2_ioq_write().
937 cmd |= HAMMER2_MSGF_REPLY;
939 if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
940 cmd |= HAMMER2_MSGF_CREATE;
941 cmd |= HAMMER2_MSGF_DELETE;
944 nmsg = hammer2_msg_alloc(iocom, 0, cmd);
945 nmsg->any.head.error = error;
946 hammer2_ioq_write(iocom, nmsg);
949 /************************************************************************
950 * TRANSACTION STATE HANDLING *
951 ************************************************************************
955 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
958 * Process state tracking for a message after reception, prior to
961 * Called with msglk held and the msg dequeued.
963 * All messages are called with dummy state and return actual state.
964 * (One-off messages often just return the same dummy state).
966 * May request that caller discard the message by setting *discardp to 1.
967 * The returned state is not used in this case and is allowed to be NULL.
971 * These routines handle persistent and command/reply message state via the
972 * CREATE and DELETE flags. The first message in a command or reply sequence
973 * sets CREATE, the last message in a command or reply sequence sets DELETE.
975 * There can be any number of intermediate messages belonging to the same
976 * sequence sent inbetween the CREATE message and the DELETE message,
977 * which set neither flag. This represents a streaming command or reply.
979 * Any command message received with CREATE set expects a reply sequence to
980 * be returned. Reply sequences work the same as command sequences except the
981 * REPLY bit is also sent. Both the command side and reply side can
982 * degenerate into a single message with both CREATE and DELETE set. Note
983 * that one side can be streaming and the other side not, or neither, or both.
985 * The msgid is unique for the initiator. That is, two sides sending a new
986 * message can use the same msgid without colliding.
990 * ABORT sequences work by setting the ABORT flag along with normal message
991 * state. However, ABORTs can also be sent on half-closed messages, that is
992 * even if the command or reply side has already sent a DELETE, as long as
993 * the message has not been fully closed it can still send an ABORT+DELETE
994 * to terminate the half-closed message state.
996 * Since ABORT+DELETEs can race we silently discard ABORT's for message
997 * state which has already been fully closed. REPLY+ABORT+DELETEs can
998 * also race, and in this situation the other side might have already
999 * initiated a new unrelated command with the same message id. Since
1000 * the abort has not set the CREATE flag the situation can be detected
1001 * and the message will also be discarded.
1003 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1004 * The ABORT request is essentially integrated into the command instead
1005 * of being sent later on. In this situation the command implementation
1006 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1007 * special-case non-blocking operation for the command.
1009 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
1010 * to be mid-stream aborts for command/reply sequences. ABORTs on
1011 * one-way messages are not supported.
1013 * NOTE! If a command sequence does not support aborts the ABORT flag is
1018 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1019 * set. One-off messages cannot be aborted and typically aren't processed
1020 * by these routines. The REPLY bit can be used to distinguish whether a
1021 * one-off message is a command or reply. For example, one-off replies
1022 * will typically just contain status updates.
1025 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1027 hammer2_state_t *state;
1028 hammer2_state_t dummy;
1032 * Lock RB tree and locate existing persistent state, if any.
1034 * If received msg is a command state is on staterd_tree.
1035 * If received msg is a reply state is on statewr_tree.
1037 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1039 dummy.msgid = msg->any.head.msgid;
1040 dummy.source = msg->any.head.source;
1041 dummy.target = msg->any.head.target;
1042 iocom_printf(iocom, msg->any.head.cmd,
1043 "received msg %08x msgid %u source=%u target=%u\n",
1044 msg->any.head.cmd, msg->any.head.msgid,
1045 msg->any.head.source, msg->any.head.target);
1046 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1047 state = RB_FIND(hammer2_state_tree,
1048 &iocom->statewr_tree, &dummy);
1050 state = RB_FIND(hammer2_state_tree,
1051 &iocom->staterd_tree, &dummy);
1056 * Short-cut one-off or mid-stream messages (state may be NULL).
1058 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1059 HAMMER2_MSGF_ABORT)) == 0) {
1060 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1065 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1066 * inside the case statements.
1068 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1069 HAMMER2_MSGF_REPLY)) {
1070 case HAMMER2_MSGF_CREATE:
1071 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1073 * New persistant command received.
1076 iocom_printf(iocom, msg->any.head.cmd,
1077 "hammer2_state_msgrx: "
1078 "duplicate transaction\n");
1079 error = HAMMER2_IOQ_ERROR_TRANS;
1082 state = malloc(sizeof(*state));
1083 bzero(state, sizeof(*state));
1084 state->iocom = iocom;
1085 state->flags = HAMMER2_STATE_DYNAMIC;
1087 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1088 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1089 state->flags |= HAMMER2_STATE_INSERTED;
1093 case HAMMER2_MSGF_DELETE:
1095 * Persistent state is expected but might not exist if an
1096 * ABORT+DELETE races the close.
1098 if (state == NULL) {
1099 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1100 error = HAMMER2_IOQ_ERROR_EALREADY;
1102 iocom_printf(iocom, msg->any.head.cmd,
1103 "hammer2_state_msgrx: "
1104 "no state for DELETE\n");
1105 error = HAMMER2_IOQ_ERROR_TRANS;
1111 * Handle another ABORT+DELETE case if the msgid has already
1114 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1115 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1116 error = HAMMER2_IOQ_ERROR_EALREADY;
1118 iocom_printf(iocom, msg->any.head.cmd,
1119 "hammer2_state_msgrx: "
1120 "state reused for DELETE\n");
1121 error = HAMMER2_IOQ_ERROR_TRANS;
1129 * Check for mid-stream ABORT command received, otherwise
1132 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1133 if (state == NULL ||
1134 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1135 error = HAMMER2_IOQ_ERROR_EALREADY;
1141 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1142 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1144 * When receiving a reply with CREATE set the original
1145 * persistent state message should already exist.
1147 if (state == NULL) {
1148 iocom_printf(iocom, msg->any.head.cmd,
1149 "hammer2_state_msgrx: "
1150 "no state match for REPLY cmd=%08x\n",
1152 error = HAMMER2_IOQ_ERROR_TRANS;
1155 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1158 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1160 * Received REPLY+ABORT+DELETE in case where msgid has
1161 * already been fully closed, ignore the message.
1163 if (state == NULL) {
1164 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1165 error = HAMMER2_IOQ_ERROR_EALREADY;
1167 iocom_printf(iocom, msg->any.head.cmd,
1168 "hammer2_state_msgrx: "
1169 "no state match for "
1171 error = HAMMER2_IOQ_ERROR_TRANS;
1177 * Received REPLY+ABORT+DELETE in case where msgid has
1178 * already been reused for an unrelated message,
1179 * ignore the message.
1181 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1182 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1183 error = HAMMER2_IOQ_ERROR_EALREADY;
1185 iocom_printf(iocom, msg->any.head.cmd,
1186 "hammer2_state_msgrx: "
1187 "state reused for REPLY|DELETE\n");
1188 error = HAMMER2_IOQ_ERROR_TRANS;
1194 case HAMMER2_MSGF_REPLY:
1196 * Check for mid-stream ABORT reply received to sent command.
1198 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1199 if (state == NULL ||
1200 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1201 error = HAMMER2_IOQ_ERROR_EALREADY;
1208 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1213 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1215 hammer2_state_t *state;
1217 if ((state = msg->state) == NULL) {
1218 hammer2_msg_free(iocom, msg);
1219 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1220 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1221 state->rxcmd |= HAMMER2_MSGF_DELETE;
1222 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1223 if (state->msg == msg)
1225 assert(state->flags & HAMMER2_STATE_INSERTED);
1226 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1227 RB_REMOVE(hammer2_state_tree,
1228 &iocom->statewr_tree, state);
1230 RB_REMOVE(hammer2_state_tree,
1231 &iocom->staterd_tree, state);
1233 state->flags &= ~HAMMER2_STATE_INSERTED;
1234 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1235 hammer2_state_free(state);
1237 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1239 hammer2_msg_free(iocom, msg);
1240 } else if (state->msg != msg) {
1241 hammer2_msg_free(iocom, msg);
1246 * Process state tracking for a message prior to transmission.
1248 * Called with msglk held and the msg dequeued.
1250 * One-off messages are usually with dummy state and msg->state may be NULL
1251 * in this situation.
1253 * New transactions (when CREATE is set) will insert the state.
1255 * May request that caller discard the message by setting *discardp to 1.
1256 * A NULL state may be returned in this case.
1259 hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1261 hammer2_state_t *state;
1265 * Lock RB tree. If persistent state is present it will have already
1266 * been assigned to msg.
1268 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1272 * Short-cut one-off or mid-stream messages (state may be NULL).
1274 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1275 HAMMER2_MSGF_ABORT)) == 0) {
1276 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1282 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1283 * inside the case statements.
1285 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1286 HAMMER2_MSGF_REPLY)) {
1287 case HAMMER2_MSGF_CREATE:
1288 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1290 * Insert the new persistent message state and mark
1291 * half-closed if DELETE is set. Since this is a new
1292 * message it isn't possible to transition into the fully
1293 * closed state here.
1295 * XXX state must be assigned and inserted by
1296 * hammer2_msg_write(). txcmd is assigned by us
1299 assert(state != NULL);
1301 if (state == NULL) {
1302 state = pmp->freerd_state;
1303 pmp->freerd_state = NULL;
1306 state->msgid = msg->any.head.msgid;
1307 state->source = msg->any.head.source;
1308 state->target = msg->any.head.target;
1310 assert((state->flags & HAMMER2_STATE_INSERTED) == 0);
1311 if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
1312 iocom_printf(iocom, msg->any.head.cmd,
1313 "hammer2_state_msgtx: "
1314 "duplicate transaction\n");
1315 error = HAMMER2_IOQ_ERROR_TRANS;
1318 state->flags |= HAMMER2_STATE_INSERTED;
1320 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1323 case HAMMER2_MSGF_DELETE:
1325 * Sent ABORT+DELETE in case where msgid has already
1326 * been fully closed, ignore the message.
1328 if (state == NULL) {
1329 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1330 error = HAMMER2_IOQ_ERROR_EALREADY;
1332 iocom_printf(iocom, msg->any.head.cmd,
1333 "hammer2_state_msgtx: "
1334 "no state match for DELETE\n");
1335 error = HAMMER2_IOQ_ERROR_TRANS;
1341 * Sent ABORT+DELETE in case where msgid has
1342 * already been reused for an unrelated message,
1343 * ignore the message.
1345 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1346 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1347 error = HAMMER2_IOQ_ERROR_EALREADY;
1349 iocom_printf(iocom, msg->any.head.cmd,
1350 "hammer2_state_msgtx: "
1351 "state reused for DELETE\n");
1352 error = HAMMER2_IOQ_ERROR_TRANS;
1360 * Check for mid-stream ABORT command sent
1362 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1363 if (state == NULL ||
1364 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1365 error = HAMMER2_IOQ_ERROR_EALREADY;
1371 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1372 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1374 * When transmitting a reply with CREATE set the original
1375 * persistent state message should already exist.
1377 if (state == NULL) {
1378 iocom_printf(iocom, msg->any.head.cmd,
1379 "hammer2_state_msgtx: no state match "
1380 "for REPLY | CREATE\n");
1381 error = HAMMER2_IOQ_ERROR_TRANS;
1384 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1387 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1389 * When transmitting a reply with DELETE set the original
1390 * persistent state message should already exist.
1392 * This is very similar to the REPLY|CREATE|* case except
1393 * txcmd is already stored, so we just add the DELETE flag.
1395 * Sent REPLY+ABORT+DELETE in case where msgid has
1396 * already been fully closed, ignore the message.
1398 if (state == NULL) {
1399 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1400 error = HAMMER2_IOQ_ERROR_EALREADY;
1402 iocom_printf(iocom, msg->any.head.cmd,
1403 "hammer2_state_msgtx: "
1404 "no state match for "
1405 "REPLY | DELETE\n");
1406 error = HAMMER2_IOQ_ERROR_TRANS;
1412 * Sent REPLY+ABORT+DELETE in case where msgid has already
1413 * been reused for an unrelated message, ignore the message.
1415 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1416 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1417 error = HAMMER2_IOQ_ERROR_EALREADY;
1419 iocom_printf(iocom, msg->any.head.cmd,
1420 "hammer2_state_msgtx: "
1422 "REPLY | DELETE\n");
1423 error = HAMMER2_IOQ_ERROR_TRANS;
1429 case HAMMER2_MSGF_REPLY:
1431 * Check for mid-stream ABORT reply sent.
1433 * One-off REPLY messages are allowed for e.g. status updates.
1435 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1436 if (state == NULL ||
1437 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1438 error = HAMMER2_IOQ_ERROR_EALREADY;
1445 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1450 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1452 hammer2_state_t *state;
1454 if ((state = msg->state) == NULL) {
1455 hammer2_msg_free(iocom, msg);
1456 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1457 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1458 state->txcmd |= HAMMER2_MSGF_DELETE;
1459 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1460 if (state->msg == msg)
1462 assert(state->flags & HAMMER2_STATE_INSERTED);
1463 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1464 RB_REMOVE(hammer2_state_tree,
1465 &iocom->staterd_tree, state);
1467 RB_REMOVE(hammer2_state_tree,
1468 &iocom->statewr_tree, state);
1470 state->flags &= ~HAMMER2_STATE_INSERTED;
1471 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1472 hammer2_state_free(state);
1474 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1476 hammer2_msg_free(iocom, msg);
1477 } else if (state->msg != msg) {
1478 hammer2_msg_free(iocom, msg);
1483 hammer2_state_free(hammer2_state_t *state)
1485 hammer2_iocom_t *iocom = state->iocom;
1492 hammer2_msg_free(iocom, msg);
1497 * Indexed messages are stored in a red-black tree indexed by their
1498 * msgid. Only persistent messages are indexed.
1501 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
1503 if (state1->source < state2->source)
1505 if (state1->source > state2->source)
1507 if (state1->target < state2->target)
1509 if (state1->target > state2->target)
1511 if (state1->msgid < state2->msgid)
1513 if (state1->msgid > state2->msgid)