2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * Initialize a low-level ioq
42 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
44 bzero(ioq, sizeof(*ioq));
45 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
46 TAILQ_INIT(&ioq->msgq);
50 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
54 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
55 TAILQ_REMOVE(&ioq->msgq, msg, entry);
58 if ((msg = ioq->msg) != NULL) {
65 * Initialize a low-level communications channel
68 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
70 bzero(iocom, sizeof(*iocom));
72 TAILQ_INIT(&iocom->freeq);
73 TAILQ_INIT(&iocom->freeq_aux);
74 iocom->sock_fd = sock_fd;
75 iocom->alt_fd = alt_fd;
76 iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
77 hammer2_ioq_init(iocom, &iocom->ioq_rx);
78 hammer2_ioq_init(iocom, &iocom->ioq_tx);
81 * Negotiate session crypto synchronously. This will mark the
82 * connection as error'd if it fails.
84 hammer2_crypto_negotiate(iocom);
87 * Make sure our fds are set to non-blocking for the iocom core.
90 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
92 /* if line buffered our single fgets() should be fine */
94 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
99 hammer2_iocom_done(hammer2_iocom_t *iocom)
104 hammer2_ioq_done(iocom, &iocom->ioq_rx);
105 hammer2_ioq_done(iocom, &iocom->ioq_tx);
106 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
107 TAILQ_REMOVE(&iocom->freeq, msg, entry);
110 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
111 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
113 msg->aux_data = NULL;
119 * Allocate a new one-way message.
122 hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
128 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
129 ~HAMMER2_MSG_ALIGNMASK;
130 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
131 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
133 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
134 TAILQ_REMOVE(&iocom->freeq, msg, entry);
137 msg = malloc(sizeof(*msg));
139 msg->aux_data = NULL;
142 if (msg->aux_size != aux_size) {
145 msg->aux_data = NULL;
149 msg->aux_data = malloc(aux_size);
150 msg->aux_size = aux_size;
154 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
156 bzero(&msg->any.head, hbytes);
157 msg->any.head.aux_icrc = 0;
158 msg->any.head.cmd = cmd;
164 * Allocate a one-way or streaming reply to a message. The message is
165 * not modified. This function may be used to allocate multiple replies.
167 * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
170 hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
173 hammer2_persist_t *pers;
175 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
177 cmd = msg->any.head.cmd;
179 rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
180 rmsg->any.head = msg->any.head;
181 rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
182 ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
183 rmsg->any.head.aux_icrc = 0;
185 if ((pers = msg->persist) != NULL) {
186 assert(pers->lrep & HAMMER2_MSGF_DELETE);
187 rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
188 pers->lrep &= ~HAMMER2_MSGF_CREATE;
189 /* do not clear DELETE */
195 * Free a message so it can be reused afresh.
197 * NOTE: aux_size can be 0 with a non-NULL aux_data.
200 hammer2_freemsg(hammer2_msg_t *msg)
202 hammer2_iocom_t *iocom = msg->iocom;
205 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
207 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
211 * I/O core loop for an iocom.
214 hammer2_iocom_core(hammer2_iocom_t *iocom,
215 void (*recvmsg_func)(hammer2_iocom_t *),
216 void (*sendmsg_func)(hammer2_iocom_t *),
217 void (*altmsg_func)(hammer2_iocom_t *))
219 struct pollfd fds[2];
222 iocom->recvmsg_callback = recvmsg_func;
223 iocom->sendmsg_callback = sendmsg_func;
224 iocom->altmsg_callback = altmsg_func;
226 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
229 fds[0].fd = iocom->sock_fd;
233 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
234 fds[0].events |= POLLIN;
237 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
238 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
239 fds[0].events |= POLLOUT;
244 if (iocom->alt_fd >= 0) {
245 fds[1].fd = iocom->alt_fd;
246 fds[1].events |= POLLIN;
248 poll(fds, 2, timeout);
250 poll(fds, 1, timeout);
252 if ((fds[0].revents & POLLIN) ||
253 (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
254 iocom->recvmsg_callback(iocom);
256 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
257 if ((fds[0].revents & POLLOUT) ||
258 (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
259 iocom->sendmsg_callback(iocom);
262 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
263 iocom->altmsg_callback(iocom);
268 * Read the next ready message from the ioq, issuing I/O if needed.
269 * Caller should retry on a read-event when NULL is returned.
271 * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
272 * be returned (and the caller must not call us again after that).
275 hammer2_ioq_read(hammer2_iocom_t *iocom)
277 hammer2_ioq_t *ioq = &iocom->ioq_rx;
279 hammer2_msg_hdr_t *head;
288 * If a message is already pending we can just remove and
291 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
292 TAILQ_REMOVE(&ioq->msgq, msg, entry);
297 * Message read in-progress (msg is NULL at the moment). We don't
298 * allocate a msg until we have its core header.
300 bytes = ioq->fifo_end - ioq->fifo_beg;
301 nmax = sizeof(ioq->buf) - ioq->fifo_end;
305 case HAMMER2_MSGQ_STATE_HEADER1:
307 * Load the primary header, fail on any non-trivial read
308 * error or on EOF. Since the primary header is the same
309 * size is the message alignment it will never straddle
310 * the end of the buffer.
312 if (bytes < (int)sizeof(msg->any.head)) {
313 n = read(iocom->sock_fd,
314 ioq->buf + ioq->fifo_end,
318 ioq->error = HAMMER2_IOQ_ERROR_EOF;
321 if (errno != EINTR &&
322 errno != EINPROGRESS &&
324 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
336 * Insufficient data accumulated (msg is NULL, caller will
340 if (bytes < (int)sizeof(msg->any.head))
344 * Calculate the header, decrypt data received so far.
345 * Data will be decrypted in-place. Partial blocks are
346 * not immediately decrypted.
348 hammer2_crypto_decrypt(iocom, ioq);
350 head = (void *)(ioq->buf + ioq->fifo_beg);
353 * Check and fixup the core header. Note that the icrc
354 * has to be calculated before any fixups, but the crc
355 * fields in the msg may have to be swapped like everything
358 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
359 head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
360 ioq->error = HAMMER2_IOQ_ERROR_SYNC;
364 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
365 HAMMER2_MSGHDR_CRCBYTES);
366 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
367 hammer2_bswap_head(head);
368 flags |= HAMMER2_MSGX_BSWAPPED;
370 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
371 if (xcrc16 != head->icrc1) {
372 ioq->error = HAMMER2_IOQ_ERROR_HCRC;
377 * Calculate the full header size and aux data size
379 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
381 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
382 if (ioq->hbytes < (int)sizeof(msg->any.head) ||
383 ioq->hbytes > (int)sizeof(msg->any) ||
384 ioq->abytes > HAMMER2_MSGAUX_MAX) {
385 ioq->error = HAMMER2_IOQ_ERROR_FIELD;
390 * Finally allocate the message and copy the core header
391 * to the embedded extended header.
393 * Initialize msg->aux_size to 0 and use it to track
394 * the amount of data copied from the stream.
396 msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
402 * We are either done or we fall-through
404 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
405 bcopy(head, &msg->any.head, sizeof(msg->any.head));
406 ioq->fifo_beg += ioq->hbytes;
411 * Fall through to the next state. Make sure that the
412 * extended header does not straddle the end of the buffer.
413 * We still want to issue larger reads into our buffer,
414 * book-keeping is easier if we don't bcopy() yet.
416 if (bytes + nmax < ioq->hbytes) {
417 bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
418 ioq->fifo_cdx -= ioq->fifo_beg;
420 ioq->fifo_end = bytes;
421 nmax = sizeof(ioq->buf) - ioq->fifo_end;
423 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
425 case HAMMER2_MSGQ_STATE_HEADER2:
427 * Fill out the extended header.
430 if (bytes < ioq->hbytes) {
431 n = read(iocom->sock_fd,
432 msg->any.buf + ioq->fifo_end,
436 ioq->error = HAMMER2_IOQ_ERROR_EOF;
439 if (errno != EINTR &&
440 errno != EINPROGRESS &&
442 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
454 * Insufficient data accumulated (set msg NULL so caller will
457 if (bytes < ioq->hbytes) {
463 * Calculate the extended header, decrypt data received
466 hammer2_crypto_decrypt(iocom, ioq);
467 head = (void *)(ioq->buf + ioq->fifo_beg);
470 * Check the crc on the extended header
472 if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
473 xcrc32 = hammer2_icrc32(head + 1,
474 ioq->hbytes - sizeof(*head));
475 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
476 if (head->icrc2 != xcrc16) {
477 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
483 * Copy the extended header into the msg and adjust the
486 bcopy(head, &msg->any, ioq->hbytes);
489 * We are either done or we fall-through.
491 if (ioq->abytes == 0) {
492 ioq->fifo_beg += ioq->hbytes;
497 * Must adjust nmax and bytes (and the state) when falling
500 ioq->fifo_beg += ioq->hbytes;
502 bytes -= ioq->hbytes;
503 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
505 case HAMMER2_MSGQ_STATE_AUXDATA1:
507 * Copy the partial or complete payload from remaining
508 * bytes in the FIFO. We have to fall-through either
509 * way so we can check the crc.
511 assert(msg->aux_size == 0);
512 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
513 if (ioq->already > ioq->abytes)
514 ioq->already = ioq->abytes;
515 if (bytes >= ioq->abytes) {
516 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
518 msg->aux_size = ioq->abytes;
519 ioq->fifo_beg += ioq->abytes;
520 if (ioq->fifo_cdx < ioq->fifo_beg)
521 ioq->fifo_cdx = ioq->fifo_beg;
522 bytes -= ioq->abytes;
524 bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
526 msg->aux_size = bytes;
527 ioq->fifo_beg += bytes;
528 if (ioq->fifo_cdx < ioq->fifo_beg)
529 ioq->fifo_cdx = ioq->fifo_beg;
532 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
534 case HAMMER2_MSGQ_STATE_AUXDATA2:
536 * Read the remainder of the payload directly into the
537 * msg->aux_data buffer.
540 if (msg->aux_size < ioq->abytes) {
542 n = read(iocom->sock_fd,
543 msg->aux_data + msg->aux_size,
544 ioq->abytes - msg->aux_size);
547 ioq->error = HAMMER2_IOQ_ERROR_EOF;
550 if (errno != EINTR &&
551 errno != EINPROGRESS &&
553 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
563 * Insufficient data accumulated (set msg NULL so caller will
566 if (msg->aux_size < ioq->abytes) {
570 assert(msg->aux_size == ioq->abytes);
571 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
574 * Check aux_icrc, then we are done.
576 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
577 if (xcrc32 != msg->any.head.aux_icrc) {
578 ioq->error = HAMMER2_IOQ_ERROR_ACRC;
582 case HAMMER2_MSGQ_STATE_ERROR:
585 * We don't double-return errors, the caller should not
586 * have called us again after getting an error msg.
593 * Check the message sequence. The iv[] should prevent any
594 * possibility of a replay but we add this check anyway.
596 if (msg && ioq->error == 0) {
597 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
598 ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
605 * Handle error, RREQ, or completion
607 * NOTE: nmax and bytes are invalid at this point, we don't bother
608 * to update them when breaking out.
612 * An unrecoverable error occured during processing,
613 * return a special error message. Try to leave the
614 * ioq state alone for post-mortem debugging.
616 * Link error messages are returned as one-way messages,
617 * so no flags get set. Source and target is 0 (link-level),
618 * msgid is 0 (link-level). All we really need to do is
619 * set up magic, cmd, and error.
621 assert(ioq->msg == msg);
623 msg = hammer2_allocmsg(iocom, 0, 0);
629 msg->aux_data = NULL;
632 bzero(&msg->any.head, sizeof(msg->any.head));
633 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
634 msg->any.head.cmd = HAMMER2_LNK_ERROR;
635 msg->any.head.error = ioq->error;
636 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
637 iocom->flags |= HAMMER2_IOCOMF_EOF;
638 } else if (msg == NULL) {
640 * Insufficient data received to finish building the message,
641 * set RREQ and return NULL.
643 * Leave ioq->msg intact.
644 * Leave the FIFO intact.
646 iocom->flags |= HAMMER2_IOCOMF_RREQ;
654 * Return msg, clear the FIFO if it is now empty.
655 * Flag RREQ if the caller needs to wait for a read-event
658 * The fifo has already been advanced past the message.
659 * Trivially reset the FIFO indices if possible.
661 if (ioq->fifo_beg == ioq->fifo_end) {
662 iocom->flags |= HAMMER2_IOCOMF_RREQ;
667 iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
669 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
676 * Calculate the header and data crc's and write a low-level message to
677 * the connection. If aux_icrc is non-zero the aux_data crc is already
678 * assumed to have been set.
680 * A non-NULL msg is added to the queue but not necessarily flushed.
681 * Calling this function with msg == NULL will get a flush going.
684 hammer2_ioq_write(hammer2_msg_t *msg)
686 hammer2_iocom_t *iocom = msg->iocom;
687 hammer2_ioq_t *ioq = &iocom->ioq_tx;
694 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
696 hammer2_iocom_drain(iocom);
701 * Finish populating the msg fields. The salt ensures that the iv[]
702 * array is ridiculously randomized and we also re-seed our PRNG
703 * every 32768 messages just to be sure.
705 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
706 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
708 if ((ioq->seq & 32767) == 0)
712 * Calculate aux_icrc if 0, calculate icrc2, and finally
715 if (msg->aux_size && msg->any.head.aux_icrc == 0) {
716 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
717 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
718 msg->any.head.aux_icrc = xcrc32;
720 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
721 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
723 if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
724 sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
725 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
727 hbytes -= sizeof(msg->any.head);
728 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
729 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
730 msg->any.head.icrc2 = xcrc16;
732 msg->any.head.icrc2 = 0;
734 xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
735 HAMMER2_MSGHDR_CRCBYTES);
736 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
737 msg->any.head.icrc1 = xcrc16;
740 * XXX Encrypt the message
744 * Enqueue the message.
746 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
748 iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
751 * Flush if we know we can write (WREQ not set) and if
752 * sufficient messages have accumulated. Otherwise hold
753 * off to avoid piecemeal system calls.
755 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
757 if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
759 hammer2_iocom_flush(iocom);
763 hammer2_iocom_flush(hammer2_iocom_t *iocom)
765 hammer2_ioq_t *ioq = &iocom->ioq_tx;
769 struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
777 * Pump messages out the connection by building an iovec.
782 TAILQ_FOREACH(msg, &ioq->msgq, entry) {
784 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
787 abytes = msg->aux_size;
792 if (hbytes - hoff > 0) {
793 iov[n].iov_base = (char *)&msg->any.head + hoff;
794 iov[n].iov_len = hbytes - hoff;
795 nmax += hbytes - hoff;
797 if (n == HAMMER2_IOQ_MAXIOVEC)
800 if (abytes - aoff > 0) {
801 assert(msg->aux_data != NULL);
802 iov[n].iov_base = msg->aux_data + aoff;
803 iov[n].iov_len = abytes - aoff;
804 nmax += abytes - aoff;
806 if (n == HAMMER2_IOQ_MAXIOVEC)
814 * Encrypt and write the data. The crypto code will move the
815 * data into the fifo and adjust the iov as necessary. If
816 * encryption is disabled the iov is left alone.
818 * hammer2_crypto_encrypt_wrote()
820 n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
823 * Execute the writev() then figure out what happened.
825 nact = writev(iocom->sock_fd, iov, n);
827 if (errno != EINTR &&
828 errno != EINPROGRESS &&
830 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
831 hammer2_iocom_drain(iocom);
833 iocom->flags |= HAMMER2_IOCOMF_WREQ;
837 hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
839 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
841 iocom->flags |= HAMMER2_IOCOMF_WREQ;
843 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
844 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
846 abytes = msg->aux_size;
848 if (nact < hbytes - ioq->hbytes) {
852 nact -= hbytes - ioq->hbytes;
853 ioq->hbytes = hbytes;
854 if (nact < abytes - ioq->abytes) {
858 nact -= abytes - ioq->abytes;
860 TAILQ_REMOVE(&ioq->msgq, msg, entry);
865 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
867 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
870 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
871 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
874 iocom->flags |= HAMMER2_IOCOMF_EOF |
875 HAMMER2_IOCOMF_WIDLE;
876 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
881 * Kill pending msgs on ioq_tx and adjust the flags such that no more
882 * write events will occur. We don't kill read msgs because we want
883 * the caller to pull off our contrived terminal error msg to detect
884 * the connection failure.
887 hammer2_iocom_drain(hammer2_iocom_t *iocom)
889 hammer2_ioq_t *ioq = &iocom->ioq_tx;
892 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
893 TAILQ_REMOVE(&ioq->msgq, msg, entry);
895 hammer2_freemsg(msg);
897 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
898 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
902 * This is a shortcut to the normal hammer2_allocreply() mechanic which
903 * uses the received message to formulate a final reply and error code.
904 * Can be used to issue a final reply for one-way, one-off, or streaming
907 * Replies to one-way messages are a bit of an oxymoron but the feature
908 * is used by the debug (DBG) protocol.
910 * The reply contains no data.
912 * (msg) is eaten up by this function.
915 hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
917 hammer2_persist_t *pers;
919 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
921 msg->any.head.error = error;
922 msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
924 if ((pers = msg->persist) != NULL) {
925 assert(pers->lrep & HAMMER2_MSGF_DELETE);
926 msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
927 HAMMER2_MSGF_DELETE);
928 pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
930 hammer2_ioq_write(msg);