2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * Initialize a low-level ioq
42 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
44 bzero(ioq, sizeof(*ioq));
45 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
46 TAILQ_INIT(&ioq->msgq);
50 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
54 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
55 TAILQ_REMOVE(&ioq->msgq, msg, entry);
58 if ((msg = ioq->msg) != NULL) {
65 * Initialize a low-level communications channel
68 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
70 bzero(iocom, sizeof(*iocom));
72 TAILQ_INIT(&iocom->freeq);
73 TAILQ_INIT(&iocom->freeq_aux);
74 iocom->sock_fd = sock_fd;
75 iocom->alt_fd = alt_fd;
76 iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
77 hammer2_ioq_init(iocom, &iocom->ioq_rx);
78 hammer2_ioq_init(iocom, &iocom->ioq_tx);
81 * Negotiate session crypto synchronously. This will mark the
82 * connection as error'd if it fails.
84 hammer2_crypto_negotiate(iocom);
87 * Make sure our fds are set to non-blocking for the iocom core.
90 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
92 /* if line buffered our single fgets() should be fine */
94 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
99 hammer2_iocom_done(hammer2_iocom_t *iocom)
104 hammer2_ioq_done(iocom, &iocom->ioq_rx);
105 hammer2_ioq_done(iocom, &iocom->ioq_tx);
106 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
107 TAILQ_REMOVE(&iocom->freeq, msg, entry);
110 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
111 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
113 msg->aux_data = NULL;
119 * Allocate a new one-way message.
122 hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
128 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
129 ~HAMMER2_MSG_ALIGNMASK;
130 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
131 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
133 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
134 TAILQ_REMOVE(&iocom->freeq, msg, entry);
137 msg = malloc(sizeof(*msg));
139 msg->aux_data = NULL;
142 if (msg->aux_size != aux_size) {
145 msg->aux_data = NULL;
149 msg->aux_data = malloc(aux_size);
150 msg->aux_size = aux_size;
154 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
156 bzero(&msg->any.head, hbytes);
157 msg->any.head.aux_icrc = 0;
158 msg->any.head.cmd = cmd;
164 * Allocate a one-way or streaming reply to a message. The message is
165 * not modified. This function may be used to allocate multiple replies.
167 * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
170 hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
173 hammer2_persist_t *pers;
175 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
177 cmd = msg->any.head.cmd;
179 rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
180 rmsg->any.head = msg->any.head;
181 rmsg->any.head.source = msg->any.head.target;
182 rmsg->any.head.target = msg->any.head.source;
183 rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
184 ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
185 rmsg->any.head.aux_icrc = 0;
187 if ((pers = msg->persist) != NULL) {
188 assert(pers->lrep & HAMMER2_MSGF_DELETE);
189 rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
190 pers->lrep &= ~HAMMER2_MSGF_CREATE;
191 /* do not clear DELETE */
197 * Free a message so it can be reused afresh.
199 * NOTE: aux_size can be 0 with a non-NULL aux_data.
202 hammer2_freemsg(hammer2_msg_t *msg)
204 hammer2_iocom_t *iocom = msg->iocom;
207 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
209 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
213 * I/O core loop for an iocom.
216 hammer2_iocom_core(hammer2_iocom_t *iocom,
217 void (*recvmsg_func)(hammer2_iocom_t *),
218 void (*sendmsg_func)(hammer2_iocom_t *),
219 void (*altmsg_func)(hammer2_iocom_t *))
221 struct pollfd fds[2];
224 iocom->recvmsg_callback = recvmsg_func;
225 iocom->sendmsg_callback = sendmsg_func;
226 iocom->altmsg_callback = altmsg_func;
228 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
231 fds[0].fd = iocom->sock_fd;
235 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
236 fds[0].events |= POLLIN;
239 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
240 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
241 fds[0].events |= POLLOUT;
246 if (iocom->alt_fd >= 0) {
247 fds[1].fd = iocom->alt_fd;
248 fds[1].events |= POLLIN;
250 poll(fds, 2, timeout);
252 poll(fds, 1, timeout);
254 if ((fds[0].revents & POLLIN) ||
255 (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
256 iocom->recvmsg_callback(iocom);
258 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
259 if ((fds[0].revents & POLLOUT) ||
260 (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
261 iocom->sendmsg_callback(iocom);
264 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
265 iocom->altmsg_callback(iocom);
270 * Read the next ready message from the ioq, issuing I/O if needed.
271 * Caller should retry on a read-event when NULL is returned.
273 * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
274 * be returned (and the caller must not call us again after that).
277 hammer2_ioq_read(hammer2_iocom_t *iocom)
279 hammer2_ioq_t *ioq = &iocom->ioq_rx;
281 hammer2_msg_hdr_t *head;
290 * If a message is already pending we can just remove and
293 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
294 TAILQ_REMOVE(&ioq->msgq, msg, entry);
299 * Message read in-progress (msg is NULL at the moment). We don't
300 * allocate a msg until we have its core header.
302 bytes = ioq->fifo_end - ioq->fifo_beg;
303 nmax = sizeof(ioq->buf) - ioq->fifo_end;
307 case HAMMER2_MSGQ_STATE_HEADER1:
309 * Load the primary header, fail on any non-trivial read
310 * error or on EOF. Since the primary header is the same
311 * size is the message alignment it will never straddle
312 * the end of the buffer.
314 if (bytes < (int)sizeof(msg->any.head)) {
315 n = read(iocom->sock_fd,
316 ioq->buf + ioq->fifo_end,
320 ioq->error = HAMMER2_IOQ_ERROR_EOF;
323 if (errno != EINTR &&
324 errno != EINPROGRESS &&
326 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
338 * Insufficient data accumulated (msg is NULL, caller will
342 if (bytes < (int)sizeof(msg->any.head))
346 * Calculate the header, decrypt data received so far.
347 * Data will be decrypted in-place. Partial blocks are
348 * not immediately decrypted.
350 hammer2_crypto_decrypt(iocom, ioq);
352 head = (void *)(ioq->buf + ioq->fifo_beg);
355 * Check and fixup the core header. Note that the icrc
356 * has to be calculated before any fixups, but the crc
357 * fields in the msg may have to be swapped like everything
360 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
361 head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
362 ioq->error = HAMMER2_IOQ_ERROR_SYNC;
366 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
367 HAMMER2_MSGHDR_CRCBYTES);
368 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
369 hammer2_bswap_head(head);
370 flags |= HAMMER2_MSGX_BSWAPPED;
372 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
373 if (xcrc16 != head->icrc1) {
374 ioq->error = HAMMER2_IOQ_ERROR_HCRC;
379 * Calculate the full header size and aux data size
381 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
383 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
384 if (ioq->hbytes < (int)sizeof(msg->any.head) ||
385 ioq->hbytes > (int)sizeof(msg->any) ||
386 ioq->abytes > HAMMER2_MSGAUX_MAX) {
387 ioq->error = HAMMER2_IOQ_ERROR_FIELD;
392 * Finally allocate the message and copy the core header
393 * to the embedded extended header.
395 * Initialize msg->aux_size to 0 and use it to track
396 * the amount of data copied from the stream.
398 msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
404 * We are either done or we fall-through
406 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
407 bcopy(head, &msg->any.head, sizeof(msg->any.head));
408 ioq->fifo_beg += ioq->hbytes;
413 * Fall through to the next state. Make sure that the
414 * extended header does not straddle the end of the buffer.
415 * We still want to issue larger reads into our buffer,
416 * book-keeping is easier if we don't bcopy() yet.
418 if (bytes + nmax < ioq->hbytes) {
419 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
420 ioq->fifo_cdx -= ioq->fifo_beg;
422 ioq->fifo_end = bytes;
423 nmax = sizeof(ioq->buf) - ioq->fifo_end;
425 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
427 case HAMMER2_MSGQ_STATE_HEADER2:
429 * Fill out the extended header.
432 if (bytes < ioq->hbytes) {
433 n = read(iocom->sock_fd,
434 msg->any.buf + ioq->fifo_end,
438 ioq->error = HAMMER2_IOQ_ERROR_EOF;
441 if (errno != EINTR &&
442 errno != EINPROGRESS &&
444 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
456 * Insufficient data accumulated (set msg NULL so caller will
459 if (bytes < ioq->hbytes) {
465 * Calculate the extended header, decrypt data received
468 hammer2_crypto_decrypt(iocom, ioq);
469 head = (void *)(ioq->buf + ioq->fifo_beg);
472 * Check the crc on the extended header
474 if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
475 xcrc32 = hammer2_icrc32(head + 1,
476 ioq->hbytes - sizeof(*head));
477 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
478 if (head->icrc2 != xcrc16) {
479 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
485 * Copy the extended header into the msg and adjust the
488 bcopy(head, &msg->any, ioq->hbytes);
491 * We are either done or we fall-through.
493 if (ioq->abytes == 0) {
494 ioq->fifo_beg += ioq->hbytes;
499 * Must adjust nmax and bytes (and the state) when falling
502 ioq->fifo_beg += ioq->hbytes;
504 bytes -= ioq->hbytes;
505 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
507 case HAMMER2_MSGQ_STATE_AUXDATA1:
509 * Copy the partial or complete payload from remaining
510 * bytes in the FIFO. We have to fall-through either
511 * way so we can check the crc.
513 assert(msg->aux_size == 0);
514 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
515 if (ioq->already > ioq->abytes)
516 ioq->already = ioq->abytes;
517 if (bytes >= ioq->abytes) {
518 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
520 msg->aux_size = ioq->abytes;
521 ioq->fifo_beg += ioq->abytes;
522 if (ioq->fifo_cdx < ioq->fifo_beg)
523 ioq->fifo_cdx = ioq->fifo_beg;
524 bytes -= ioq->abytes;
526 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
528 msg->aux_size = bytes;
529 ioq->fifo_beg += bytes;
530 if (ioq->fifo_cdx < ioq->fifo_beg)
531 ioq->fifo_cdx = ioq->fifo_beg;
534 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
536 case HAMMER2_MSGQ_STATE_AUXDATA2:
538 * Read the remainder of the payload directly into the
539 * msg->aux_data buffer.
542 if (msg->aux_size < ioq->abytes) {
544 n = read(iocom->sock_fd,
545 msg->aux_data + msg->aux_size,
546 ioq->abytes - msg->aux_size);
549 ioq->error = HAMMER2_IOQ_ERROR_EOF;
552 if (errno != EINTR &&
553 errno != EINPROGRESS &&
555 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
565 * Insufficient data accumulated (set msg NULL so caller will
568 if (msg->aux_size < ioq->abytes) {
572 assert(msg->aux_size == ioq->abytes);
573 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
576 * Check aux_icrc, then we are done.
578 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
579 if (xcrc32 != msg->any.head.aux_icrc) {
580 ioq->error = HAMMER2_IOQ_ERROR_ACRC;
584 case HAMMER2_MSGQ_STATE_ERROR:
587 * We don't double-return errors, the caller should not
588 * have called us again after getting an error msg.
595 * Check the message sequence. The iv[] should prevent any
596 * possibility of a replay but we add this check anyway.
598 if (msg && ioq->error == 0) {
599 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
600 ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
607 * Handle error, RREQ, or completion
609 * NOTE: nmax and bytes are invalid at this point, we don't bother
610 * to update them when breaking out.
614 * An unrecoverable error occured during processing,
615 * return a special error message. Try to leave the
616 * ioq state alone for post-mortem debugging.
618 * Link error messages are returned as one-way messages,
619 * so no flags get set. Source and target is 0 (link-level),
620 * msgid is 0 (link-level). All we really need to do is
621 * set up magic, cmd, and error.
623 assert(ioq->msg == msg);
625 msg = hammer2_allocmsg(iocom, 0, 0);
631 msg->aux_data = NULL;
634 bzero(&msg->any.head, sizeof(msg->any.head));
635 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
636 msg->any.head.cmd = HAMMER2_LNK_ERROR;
637 msg->any.head.error = ioq->error;
638 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
639 iocom->flags |= HAMMER2_IOCOMF_EOF;
640 } else if (msg == NULL) {
642 * Insufficient data received to finish building the message,
643 * set RREQ and return NULL.
645 * Leave ioq->msg intact.
646 * Leave the FIFO intact.
648 iocom->flags |= HAMMER2_IOCOMF_RREQ;
656 * Return msg, clear the FIFO if it is now empty.
657 * Flag RREQ if the caller needs to wait for a read-event
660 * The fifo has already been advanced past the message.
661 * Trivially reset the FIFO indices if possible.
663 if (ioq->fifo_beg == ioq->fifo_end) {
664 iocom->flags |= HAMMER2_IOCOMF_RREQ;
669 iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
671 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
678 * Calculate the header and data crc's and write a low-level message to
679 * the connection. If aux_icrc is non-zero the aux_data crc is already
680 * assumed to have been set.
682 * A non-NULL msg is added to the queue but not necessarily flushed.
683 * Calling this function with msg == NULL will get a flush going.
686 hammer2_ioq_write(hammer2_msg_t *msg)
688 hammer2_iocom_t *iocom = msg->iocom;
689 hammer2_ioq_t *ioq = &iocom->ioq_tx;
696 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
698 hammer2_iocom_drain(iocom);
703 * Finish populating the msg fields. The salt ensures that the iv[]
704 * array is ridiculously randomized and we also re-seed our PRNG
705 * every 32768 messages just to be sure.
707 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
708 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
710 if ((ioq->seq & 32767) == 0)
714 * Calculate aux_icrc if 0, calculate icrc2, and finally
717 if (msg->aux_size && msg->any.head.aux_icrc == 0) {
718 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
719 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
720 msg->any.head.aux_icrc = xcrc32;
722 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
723 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
725 if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
726 sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
727 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
729 hbytes -= sizeof(msg->any.head);
730 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
731 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
732 msg->any.head.icrc2 = xcrc16;
734 msg->any.head.icrc2 = 0;
736 xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
737 HAMMER2_MSGHDR_CRCBYTES);
738 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
739 msg->any.head.icrc1 = xcrc16;
742 * XXX Encrypt the message
746 * Enqueue the message.
748 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
750 iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
753 * Flush if we know we can write (WREQ not set) and if
754 * sufficient messages have accumulated. Otherwise hold
755 * off to avoid piecemeal system calls.
757 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
759 if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
761 hammer2_iocom_flush(iocom);
765 hammer2_iocom_flush(hammer2_iocom_t *iocom)
767 hammer2_ioq_t *ioq = &iocom->ioq_tx;
771 struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
779 * Pump messages out the connection by building an iovec.
784 TAILQ_FOREACH(msg, &ioq->msgq, entry) {
786 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
789 abytes = msg->aux_size;
794 if (hbytes - hoff > 0) {
795 iov[n].iov_base = (char *)&msg->any.head + hoff;
796 iov[n].iov_len = hbytes - hoff;
797 nmax += hbytes - hoff;
799 if (n == HAMMER2_IOQ_MAXIOVEC)
802 if (abytes - aoff > 0) {
803 assert(msg->aux_data != NULL);
804 iov[n].iov_base = msg->aux_data + aoff;
805 iov[n].iov_len = abytes - aoff;
806 nmax += abytes - aoff;
808 if (n == HAMMER2_IOQ_MAXIOVEC)
816 * Encrypt and write the data. The crypto code will move the
817 * data into the fifo and adjust the iov as necessary. If
818 * encryption is disabled the iov is left alone.
820 * hammer2_crypto_encrypt_wrote()
822 n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
825 * Execute the writev() then figure out what happened.
827 nact = writev(iocom->sock_fd, iov, n);
829 if (errno != EINTR &&
830 errno != EINPROGRESS &&
832 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
833 hammer2_iocom_drain(iocom);
835 iocom->flags |= HAMMER2_IOCOMF_WREQ;
839 hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
841 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
843 iocom->flags |= HAMMER2_IOCOMF_WREQ;
845 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
846 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
848 abytes = msg->aux_size;
850 if (nact < hbytes - ioq->hbytes) {
854 nact -= hbytes - ioq->hbytes;
855 ioq->hbytes = hbytes;
856 if (nact < abytes - ioq->abytes) {
860 nact -= abytes - ioq->abytes;
862 TAILQ_REMOVE(&ioq->msgq, msg, entry);
867 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
869 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
872 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
873 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
876 iocom->flags |= HAMMER2_IOCOMF_EOF |
877 HAMMER2_IOCOMF_WIDLE;
878 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
883 * Kill pending msgs on ioq_tx and adjust the flags such that no more
884 * write events will occur. We don't kill read msgs because we want
885 * the caller to pull off our contrived terminal error msg to detect
886 * the connection failure.
889 hammer2_iocom_drain(hammer2_iocom_t *iocom)
891 hammer2_ioq_t *ioq = &iocom->ioq_tx;
894 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
895 TAILQ_REMOVE(&ioq->msgq, msg, entry);
897 hammer2_freemsg(msg);
899 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
900 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
904 * This is a shortcut to the normal hammer2_allocreply() mechanic which
905 * uses the received message to formulate a final reply and error code.
906 * Can be used to issue a final reply for one-way, one-off, or streaming
909 * Replies to one-way messages are a bit of an oxymoron but the feature
910 * is used by the debug (DBG) protocol.
912 * The reply contains no data.
914 * (msg) is eaten up by this function.
917 hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
919 hammer2_persist_t *pers;
922 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
924 t16 = msg->any.head.source;
925 msg->any.head.source = msg->any.head.target;
926 msg->any.head.target = t16;
927 msg->any.head.error = error;
928 msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
930 if ((pers = msg->persist) != NULL) {
931 assert(pers->lrep & HAMMER2_MSGF_DELETE);
932 msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
933 HAMMER2_MSGF_DELETE);
934 pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
936 hammer2_ioq_write(msg);