2 * Copyright (c) 2011-2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@dragonflybsd.org>
6 * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
8 * Redistribution and use in source and binary forms, with or without
9 * modification, are permitted provided that the following conditions
12 * 1. Redistributions of source code must retain the above copyright
13 * notice, this list of conditions and the following disclaimer.
14 * 2. Redistributions in binary form must reproduce the above copyright
15 * notice, this list of conditions and the following disclaimer in
16 * the documentation and/or other materials provided with the
18 * 3. Neither the name of The DragonFly Project nor the names of its
19 * contributors may be used to endorse or promote products derived
20 * from this software without specific, prior written permission.
22 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
26 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
39 * Initialize a low-level ioq
42 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
44 bzero(ioq, sizeof(*ioq));
45 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
46 TAILQ_INIT(&ioq->msgq);
50 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
54 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
55 TAILQ_REMOVE(&ioq->msgq, msg, entry);
58 if ((msg = ioq->msg) != NULL) {
65 * Initialize a low-level communications channel
68 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
70 bzero(iocom, sizeof(*iocom));
72 TAILQ_INIT(&iocom->freeq);
73 TAILQ_INIT(&iocom->freeq_aux);
74 iocom->sock_fd = sock_fd;
75 iocom->alt_fd = alt_fd;
76 iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
77 hammer2_ioq_init(iocom, &iocom->ioq_rx);
78 hammer2_ioq_init(iocom, &iocom->ioq_tx);
81 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
83 /* if line buffered our single fgets() should be fine */
85 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
90 hammer2_iocom_done(hammer2_iocom_t *iocom)
95 hammer2_ioq_done(iocom, &iocom->ioq_rx);
96 hammer2_ioq_done(iocom, &iocom->ioq_tx);
97 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
98 TAILQ_REMOVE(&iocom->freeq, msg, entry);
101 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
102 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
104 msg->aux_data = NULL;
110 * Allocate a new one-way message.
113 hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
119 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
120 ~HAMMER2_MSG_ALIGNMASK;
121 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
122 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
124 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
125 TAILQ_REMOVE(&iocom->freeq, msg, entry);
128 msg = malloc(sizeof(*msg));
130 msg->aux_data = NULL;
133 if (msg->aux_size != aux_size) {
136 msg->aux_data = NULL;
140 msg->aux_data = malloc(aux_size);
141 msg->aux_size = aux_size;
145 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
147 bzero(&msg->any.head, hbytes);
148 msg->any.head.aux_icrc = 0;
149 msg->any.head.cmd = cmd;
155 * Allocate a one-way or streaming reply to a message. The message is
156 * not modified. This function may be used to allocate multiple replies.
158 * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
161 hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
164 hammer2_persist_t *pers;
166 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
168 cmd = msg->any.head.cmd;
170 rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
171 rmsg->any.head = msg->any.head;
172 rmsg->any.head.source = msg->any.head.target;
173 rmsg->any.head.target = msg->any.head.source;
174 rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
175 ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
176 rmsg->any.head.aux_icrc = 0;
178 if ((pers = msg->persist) != NULL) {
179 assert(pers->lrep & HAMMER2_MSGF_DELETE);
180 rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
181 pers->lrep &= ~HAMMER2_MSGF_CREATE;
182 /* do not clear DELETE */
188 * Free a message so it can be reused afresh.
190 * NOTE: aux_size can be 0 with a non-NULL aux_data.
193 hammer2_freemsg(hammer2_msg_t *msg)
195 hammer2_iocom_t *iocom = msg->iocom;
198 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
200 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
204 * I/O core loop for an iocom.
207 hammer2_iocom_core(hammer2_iocom_t *iocom,
208 void (*recvmsg_func)(hammer2_iocom_t *),
209 void (*sendmsg_func)(hammer2_iocom_t *),
210 void (*altmsg_func)(hammer2_iocom_t *))
212 struct pollfd fds[2];
215 iocom->recvmsg_callback = recvmsg_func;
216 iocom->sendmsg_callback = sendmsg_func;
217 iocom->altmsg_callback = altmsg_func;
219 while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
222 fds[0].fd = iocom->sock_fd;
226 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
227 fds[0].events |= POLLIN;
230 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
231 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
232 fds[0].events |= POLLOUT;
237 if (iocom->alt_fd >= 0) {
238 fds[1].fd = iocom->alt_fd;
239 fds[1].events |= POLLIN;
241 poll(fds, 2, timeout);
243 poll(fds, 1, timeout);
245 if ((fds[0].revents & POLLIN) ||
246 (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
247 iocom->recvmsg_callback(iocom);
249 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
250 if ((fds[0].revents & POLLOUT) ||
251 (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
252 iocom->sendmsg_callback(iocom);
255 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
256 iocom->altmsg_callback(iocom);
261 * Read the next ready message from the ioq, issuing I/O if needed.
262 * Caller should retry on a read-event when NULL is returned.
264 * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
265 * be returned (and the caller must not call us again after that).
268 hammer2_ioq_read(hammer2_iocom_t *iocom)
270 hammer2_ioq_t *ioq = &iocom->ioq_rx;
272 hammer2_msg_hdr_t *head;
281 * If a message is already pending we can just remove and
284 if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
285 TAILQ_REMOVE(&ioq->msgq, msg, entry);
290 * Message read in-progress (msg is NULL at the moment). We don't
291 * allocate a msg until we have its core header.
293 bytes = ioq->fifo_end - ioq->fifo_beg;
294 nmax = sizeof(iocom->rxbuf) - ioq->fifo_end;
298 case HAMMER2_MSGQ_STATE_HEADER1:
300 * Load the primary header, fail on any non-trivial read
301 * error or on EOF. Since the primary header is the same
302 * size is the message alignment it will never straddle
303 * the end of the buffer.
305 if (bytes < (int)sizeof(msg->any.head)) {
306 n = read(iocom->sock_fd,
307 iocom->rxbuf + ioq->fifo_end,
311 ioq->error = HAMMER2_IOQ_ERROR_EOF;
314 if (errno != EINTR &&
315 errno != EINPROGRESS &&
317 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
329 * Insufficient data accumulated (msg is NULL, caller will
333 if (bytes < (int)sizeof(msg->any.head))
337 head = (void *)(iocom->rxbuf + ioq->fifo_beg);
340 * XXX Decrypt the core header
344 * Check and fixup the core header. Note that the icrc
345 * has to be calculated before any fixups, but the crc
346 * fields in the msg may have to be swapped like everything
349 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
350 head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
351 ioq->error = HAMMER2_IOQ_ERROR_SYNC;
355 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
356 HAMMER2_MSGHDR_CRCBYTES);
357 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
358 hammer2_bswap_head(head);
359 flags |= HAMMER2_MSGX_BSWAPPED;
361 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
362 if (xcrc16 != head->icrc1) {
363 ioq->error = HAMMER2_IOQ_ERROR_HCRC;
368 * Calculate the full header size and aux data size
370 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
372 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
373 if (ioq->hbytes < (int)sizeof(msg->any.head) ||
374 ioq->hbytes > (int)sizeof(msg->any) ||
375 ioq->abytes > HAMMER2_MSGAUX_MAX) {
376 ioq->error = HAMMER2_IOQ_ERROR_FIELD;
381 * Finally allocate the message and copy the core header
382 * to the embedded extended header.
384 * Initialize msg->aux_size to 0 and use it to track
385 * the amount of data copied from the stream.
387 msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
393 * We are either done or we fall-through
395 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
396 bcopy(head, &msg->any.head, sizeof(msg->any.head));
397 ioq->fifo_beg += ioq->hbytes;
402 * Fall through to the next state. Make sure that the
403 * extended header does not straddle the end of the buffer.
404 * We still want to issue larger reads into our buffer,
405 * book-keeping is easier if we don't bcopy() yet.
407 if (bytes + nmax < ioq->hbytes) {
408 bcopy(iocom->rxbuf + ioq->fifo_beg, iocom->rxbuf,
411 ioq->fifo_end = bytes;
412 nmax = sizeof(iocom->rxbuf) - ioq->fifo_end;
414 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
416 case HAMMER2_MSGQ_STATE_HEADER2:
418 * Fill out the extended header.
421 if (bytes < ioq->hbytes) {
422 n = read(iocom->sock_fd,
423 msg->any.buf + ioq->fifo_end,
427 ioq->error = HAMMER2_IOQ_ERROR_EOF;
430 if (errno != EINTR &&
431 errno != EINPROGRESS &&
433 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
445 * Insufficient data accumulated (set msg NULL so caller will
448 if (bytes < ioq->hbytes) {
454 * XXX Decrypt the extended header
456 head = (void *)(iocom->rxbuf + ioq->fifo_beg);
459 * Check the crc on the extended header
461 if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
462 xcrc32 = hammer2_icrc32(head + 1,
463 ioq->hbytes - sizeof(*head));
464 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
465 if (head->icrc2 != xcrc16) {
466 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
472 * Copy the extended header into the msg and adjust the
475 bcopy(head, &msg->any, ioq->hbytes);
478 * We are either done or we fall-through.
480 if (ioq->abytes == 0) {
481 ioq->fifo_beg += ioq->hbytes;
486 * Must adjust nmax and bytes (and the state) when falling
489 ioq->fifo_beg += ioq->hbytes;
491 bytes -= ioq->hbytes;
492 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
494 case HAMMER2_MSGQ_STATE_AUXDATA1:
496 * Copy the partial or complete payload from remaining
497 * bytes in the FIFO. We have to fall-through either
498 * way so we can check the crc.
500 assert(msg->aux_size == 0);
501 if (bytes >= ioq->abytes) {
502 bcopy(iocom->rxbuf + ioq->fifo_beg, msg->aux_data,
504 msg->aux_size = ioq->abytes;
505 ioq->fifo_beg += ioq->abytes;
506 bytes -= ioq->abytes;
508 bcopy(iocom->rxbuf + ioq->fifo_beg, msg->aux_data,
510 msg->aux_size = bytes;
511 ioq->fifo_beg += bytes;
514 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
516 case HAMMER2_MSGQ_STATE_AUXDATA2:
518 * Read the remainder of the payload directly into the
519 * msg->aux_data buffer.
522 if (msg->aux_size < ioq->abytes) {
524 n = read(iocom->sock_fd,
525 msg->aux_data + msg->aux_size,
526 ioq->abytes - msg->aux_size);
529 ioq->error = HAMMER2_IOQ_ERROR_EOF;
532 if (errno != EINTR &&
533 errno != EINPROGRESS &&
535 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
545 * Insufficient data accumulated (set msg NULL so caller will
548 if (msg->aux_size < ioq->abytes) {
552 assert(msg->aux_size == ioq->abytes);
555 * XXX Decrypt the data
559 * Check aux_icrc, then we are done.
561 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
562 if (xcrc32 != msg->any.head.aux_icrc) {
563 ioq->error = HAMMER2_IOQ_ERROR_ACRC;
567 case HAMMER2_MSGQ_STATE_ERROR:
570 * We don't double-return errors, the caller should not
571 * have called us again after getting an error msg.
578 * Handle error, RREQ, or completion
580 * NOTE: nmax and bytes are invalid at this point, we don't bother
581 * to update them when breaking out.
585 * An unrecoverable error occured during processing,
586 * return a special error message. Try to leave the
587 * ioq state alone for post-mortem debugging.
589 * Link error messages are returned as one-way messages,
590 * so no flags get set. Source and target is 0 (link-level),
591 * msgid is 0 (link-level). All we really need to do is
592 * set up magic, cmd, and error.
594 assert(ioq->msg == msg);
596 msg = hammer2_allocmsg(iocom, 0, 0);
602 msg->aux_data = NULL;
605 bzero(&msg->any.head, sizeof(msg->any.head));
606 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
607 msg->any.head.cmd = HAMMER2_LNK_ERROR;
608 msg->any.head.error = ioq->error;
609 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
610 iocom->flags |= HAMMER2_IOCOMF_EOF;
611 } else if (msg == NULL) {
613 * Insufficient data received to finish building the message,
614 * set RREQ and return NULL.
616 * Leave ioq->msg intact.
617 * Leave the FIFO intact.
619 iocom->flags |= HAMMER2_IOCOMF_RREQ;
624 * Return msg, clear the FIFO if it is now empty.
625 * Flag RREQ if the caller needs to wait for a read-event
628 * The fifo has already been advanced past the message.
629 * Trivially reset the FIFO indices if possible.
631 if (ioq->fifo_beg == ioq->fifo_end) {
632 iocom->flags |= HAMMER2_IOCOMF_RREQ;
636 iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
638 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
645 * Calculate the header and data crc's and write a low-level message to
646 * the connection. If aux_icrc is non-zero the aux_data crc is already
647 * assumed to have been set.
649 * A non-NULL msg is added to the queue but not necessarily flushed.
650 * Calling this function with msg == NULL will get a flush going.
653 hammer2_ioq_write(hammer2_msg_t *msg)
655 hammer2_iocom_t *iocom = msg->iocom;
656 hammer2_ioq_t *ioq = &iocom->ioq_tx;
663 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
665 hammer2_iocom_drain(iocom);
670 * Finish populating the msg fields
672 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
673 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
677 * Calculate aux_icrc if 0, calculate icrc2, and finally
680 if (msg->aux_size && msg->any.head.aux_icrc == 0) {
681 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
682 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
683 msg->any.head.aux_icrc = xcrc32;
685 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
686 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
688 if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
689 sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
690 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
692 hbytes -= sizeof(msg->any.head);
693 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
694 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
695 msg->any.head.icrc2 = xcrc16;
697 msg->any.head.icrc2 = 0;
699 xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
700 HAMMER2_MSGHDR_CRCBYTES);
701 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
702 msg->any.head.icrc1 = xcrc16;
705 * XXX Encrypt the message
709 * Enqueue the message.
711 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
713 iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
716 * Flush if we know we can write (WREQ not set) and if
717 * sufficient messages have accumulated. Otherwise hold
718 * off to avoid piecemeal system calls.
720 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
722 if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
724 hammer2_iocom_flush(iocom);
728 hammer2_iocom_flush(hammer2_iocom_t *iocom)
730 hammer2_ioq_t *ioq = &iocom->ioq_tx;
734 struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
742 * Pump messages out the connection by building an iovec.
747 TAILQ_FOREACH(msg, &ioq->msgq, entry) {
749 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
752 abytes = msg->aux_size;
757 if (hbytes - hoff > 0) {
758 iov[n].iov_base = (char *)&msg->any.head + hoff;
759 iov[n].iov_len = hbytes - hoff;
760 nmax += hbytes - hoff;
762 if (n == HAMMER2_IOQ_MAXIOVEC)
765 if (abytes - aoff > 0) {
766 assert(msg->aux_data != NULL);
767 iov[n].iov_base = msg->aux_data + aoff;
768 iov[n].iov_len = abytes - aoff;
769 nmax += abytes - aoff;
771 if (n == HAMMER2_IOQ_MAXIOVEC)
779 * Execute the writev() then figure out what happened.
781 nact = writev(iocom->sock_fd, iov, n);
783 if (errno != EINTR &&
784 errno != EINPROGRESS &&
786 ioq->error = HAMMER2_IOQ_ERROR_SOCK;
787 hammer2_iocom_drain(iocom);
789 iocom->flags |= HAMMER2_IOCOMF_WREQ;
794 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
796 iocom->flags |= HAMMER2_IOCOMF_WREQ;
798 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
799 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
801 abytes = msg->aux_size;
803 if (nact < hbytes - ioq->hbytes) {
807 nact -= hbytes - ioq->hbytes;
808 ioq->hbytes = hbytes;
809 if (nact < abytes - ioq->abytes) {
813 nact -= abytes - ioq->abytes;
815 TAILQ_REMOVE(&ioq->msgq, msg, entry);
820 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
822 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
825 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
826 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
829 iocom->flags |= HAMMER2_IOCOMF_EOF |
830 HAMMER2_IOCOMF_WIDLE;
831 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
836 * Kill pending msgs on ioq_tx and adjust the flags such that no more
837 * write events will occur. We don't kill read msgs because we want
838 * the caller to pull off our contrived terminal error msg to detect
839 * the connection failure.
842 hammer2_iocom_drain(hammer2_iocom_t *iocom)
844 hammer2_ioq_t *ioq = &iocom->ioq_tx;
847 while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
848 TAILQ_REMOVE(&ioq->msgq, msg, entry);
850 hammer2_freemsg(msg);
852 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
853 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
857 * This is a shortcut to the normal hammer2_allocreply() mechanic which
858 * uses the received message to formulate a final reply and error code.
859 * Can be used to issue a final reply for one-way, one-off, or streaming
862 * Replies to one-way messages are a bit of an oxymoron but the feature
863 * is used by the debug (DBG) protocol.
865 * The reply contains no data.
867 * (msg) is eaten up by this function.
870 hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
872 hammer2_persist_t *pers;
875 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
877 t16 = msg->any.head.source;
878 msg->any.head.source = msg->any.head.target;
879 msg->any.head.target = t16;
880 msg->any.head.error = error;
881 msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
883 if ((pers = msg->persist) != NULL) {
884 assert(pers->lrep & HAMMER2_MSGF_DELETE);
885 msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
886 HAMMER2_MSGF_DELETE);
887 pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
889 hammer2_ioq_write(msg);