Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository...
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 /*
39  * Initialize a low-level ioq
40  */
41 void
42 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
43 {
44         bzero(ioq, sizeof(*ioq));
45         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
46         TAILQ_INIT(&ioq->msgq);
47 }
48
49 void
50 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
51 {
52         hammer2_msg_t *msg;
53
54         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
55                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
56                 hammer2_freemsg(msg);
57         }
58         if ((msg = ioq->msg) != NULL) {
59                 ioq->msg = NULL;
60                 hammer2_freemsg(msg);
61         }
62 }
63
64 /*
65  * Initialize a low-level communications channel
66  */
67 void
68 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
69 {
70         bzero(iocom, sizeof(*iocom));
71
72         TAILQ_INIT(&iocom->freeq);
73         TAILQ_INIT(&iocom->freeq_aux);
74         iocom->sock_fd = sock_fd;
75         iocom->alt_fd = alt_fd;
76         iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
77         hammer2_ioq_init(iocom, &iocom->ioq_rx);
78         hammer2_ioq_init(iocom, &iocom->ioq_tx);
79
80         if (sock_fd >= 0)
81                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
82 #if 0
83         /* if line buffered our single fgets() should be fine */
84         if (alt_fd >= 0)
85                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
86 #endif
87 }
88
89 void
90 hammer2_iocom_done(hammer2_iocom_t *iocom)
91 {
92         hammer2_msg_t *msg;
93
94         iocom->sock_fd = -1;
95         hammer2_ioq_done(iocom, &iocom->ioq_rx);
96         hammer2_ioq_done(iocom, &iocom->ioq_tx);
97         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
98                 TAILQ_REMOVE(&iocom->freeq, msg, entry);
99                 free(msg);
100         }
101         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
102                 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
103                 free(msg->aux_data);
104                 msg->aux_data = NULL;
105                 free(msg);
106         }
107 }
108
109 /*
110  * Allocate a new one-way message.
111  */
112 hammer2_msg_t *
113 hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
114 {
115         hammer2_msg_t *msg;
116         int hbytes;
117
118         if (aux_size) {
119                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
120                            ~HAMMER2_MSG_ALIGNMASK;
121                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
122                         TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
123         } else {
124                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
125                         TAILQ_REMOVE(&iocom->freeq, msg, entry);
126         }
127         if (msg == NULL) {
128                 msg = malloc(sizeof(*msg));
129                 msg->iocom = iocom;
130                 msg->aux_data = NULL;
131                 msg->aux_size = 0;
132         }
133         if (msg->aux_size != aux_size) {
134                 if (msg->aux_data) {
135                         free(msg->aux_data);
136                         msg->aux_data = NULL;
137                         msg->aux_size = 0;
138                 }
139                 if (aux_size) {
140                         msg->aux_data = malloc(aux_size);
141                         msg->aux_size = aux_size;
142                 }
143         }
144         msg->flags = 0;
145         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
146         if (hbytes)
147                 bzero(&msg->any.head, hbytes);
148         msg->any.head.aux_icrc = 0;
149         msg->any.head.cmd = cmd;
150
151         return (msg);
152 }
153
154 /*
155  * Allocate a one-way or streaming reply to a message.  The message is
156  * not modified.  This function may be used to allocate multiple replies.
157  *
158  * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
159  */
160 hammer2_msg_t *
161 hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
162 {
163         hammer2_msg_t *rmsg;
164         hammer2_persist_t *pers;
165
166         assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
167         if (cmd == 0)
168                 cmd = msg->any.head.cmd;
169
170         rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
171         rmsg->any.head = msg->any.head;
172         rmsg->any.head.source = msg->any.head.target;
173         rmsg->any.head.target = msg->any.head.source;
174         rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
175                              ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
176         rmsg->any.head.aux_icrc = 0;
177
178         if ((pers = msg->persist) != NULL) {
179                 assert(pers->lrep & HAMMER2_MSGF_DELETE);
180                 rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
181                 pers->lrep &= ~HAMMER2_MSGF_CREATE;
182                 /* do not clear DELETE */
183         }
184         return (rmsg);
185 }
186
187 /*
188  * Free a message so it can be reused afresh.
189  *
190  * NOTE: aux_size can be 0 with a non-NULL aux_data.
191  */
192 void
193 hammer2_freemsg(hammer2_msg_t *msg)
194 {
195         hammer2_iocom_t *iocom = msg->iocom;
196
197         if (msg->aux_data)
198                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
199         else
200                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
201 }
202
203 /*
204  * I/O core loop for an iocom.
205  */
206 void
207 hammer2_iocom_core(hammer2_iocom_t *iocom,
208                    void (*recvmsg_func)(hammer2_iocom_t *),
209                    void (*sendmsg_func)(hammer2_iocom_t *),
210                    void (*altmsg_func)(hammer2_iocom_t *))
211 {
212         struct pollfd fds[2];
213         int timeout;
214
215         iocom->recvmsg_callback = recvmsg_func;
216         iocom->sendmsg_callback = sendmsg_func;
217         iocom->altmsg_callback = altmsg_func;
218
219         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
220                 timeout = 5000;
221
222                 fds[0].fd = iocom->sock_fd;
223                 fds[0].events = 0;
224                 fds[0].revents = 0;
225
226                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
227                         fds[0].events |= POLLIN;
228                 else
229                         timeout = 0;
230                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
231                         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
232                                 fds[0].events |= POLLOUT;
233                         else
234                                 timeout = 0;
235                 }
236
237                 if (iocom->alt_fd >= 0) {
238                         fds[1].fd = iocom->alt_fd;
239                         fds[1].events |= POLLIN;
240                         fds[1].revents = 0;
241                         poll(fds, 2, timeout);
242                 } else {
243                         poll(fds, 1, timeout);
244                 }
245                 if ((fds[0].revents & POLLIN) ||
246                     (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
247                         iocom->recvmsg_callback(iocom);
248                 }
249                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
250                         if ((fds[0].revents & POLLOUT) ||
251                             (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
252                                 iocom->sendmsg_callback(iocom);
253                         }
254                 }
255                 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
256                         iocom->altmsg_callback(iocom);
257         }
258 }
259
260 /*
261  * Read the next ready message from the ioq, issuing I/O if needed.
262  * Caller should retry on a read-event when NULL is returned.
263  *
264  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
265  * be returned (and the caller must not call us again after that).
266  */
267 hammer2_msg_t *
268 hammer2_ioq_read(hammer2_iocom_t *iocom)
269 {
270         hammer2_ioq_t *ioq = &iocom->ioq_rx;
271         hammer2_msg_t *msg;
272         hammer2_msg_hdr_t *head;
273         ssize_t n;
274         int bytes;
275         int flags;
276         int nmax;
277         uint16_t xcrc16;
278         uint32_t xcrc32;
279
280         /*
281          * If a message is already pending we can just remove and
282          * return it.
283          */
284         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
285                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
286                 return(msg);
287         }
288
289         /*
290          * Message read in-progress (msg is NULL at the moment).  We don't
291          * allocate a msg until we have its core header.
292          */
293         bytes = ioq->fifo_end - ioq->fifo_beg;
294         nmax = sizeof(iocom->rxbuf) - ioq->fifo_end;
295         msg = ioq->msg;
296
297         switch(ioq->state) {
298         case HAMMER2_MSGQ_STATE_HEADER1:
299                 /*
300                  * Load the primary header, fail on any non-trivial read
301                  * error or on EOF.  Since the primary header is the same
302                  * size is the message alignment it will never straddle
303                  * the end of the buffer.
304                  */
305                 if (bytes < (int)sizeof(msg->any.head)) {
306                         n = read(iocom->sock_fd,
307                                  iocom->rxbuf + ioq->fifo_end,
308                                  nmax);
309                         if (n <= 0) {
310                                 if (n == 0) {
311                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
312                                         break;
313                                 }
314                                 if (errno != EINTR &&
315                                     errno != EINPROGRESS &&
316                                     errno != EAGAIN) {
317                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
318                                         break;
319                                 }
320                                 n = 0;
321                                 /* fall through */
322                         }
323                         ioq->fifo_end += n;
324                         bytes += n;
325                         nmax -= n;
326                 }
327
328                 /*
329                  * Insufficient data accumulated (msg is NULL, caller will
330                  * retry on event).
331                  */
332                 assert(msg == NULL);
333                 if (bytes < (int)sizeof(msg->any.head))
334                         break;
335
336                 flags = 0;
337                 head = (void *)(iocom->rxbuf + ioq->fifo_beg);
338
339                 /*
340                  * XXX Decrypt the core header
341                  */
342
343                 /*
344                  * Check and fixup the core header.  Note that the icrc
345                  * has to be calculated before any fixups, but the crc
346                  * fields in the msg may have to be swapped like everything
347                  * else.
348                  */
349                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
350                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
351                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
352                         break;
353                 }
354
355                 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
356                                         HAMMER2_MSGHDR_CRCBYTES);
357                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
358                         hammer2_bswap_head(head);
359                         flags |= HAMMER2_MSGX_BSWAPPED;
360                 }
361                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
362                 if (xcrc16 != head->icrc1) {
363                         ioq->error = HAMMER2_IOQ_ERROR_HCRC;
364                         break;
365                 }
366
367                 /*
368                  * Calculate the full header size and aux data size
369                  */
370                 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
371                               HAMMER2_MSG_ALIGN;
372                 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
373                 if (ioq->hbytes < (int)sizeof(msg->any.head) ||
374                     ioq->hbytes > (int)sizeof(msg->any) ||
375                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
376                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
377                         break;
378                 }
379
380                 /*
381                  * Finally allocate the message and copy the core header
382                  * to the embedded extended header.
383                  *
384                  * Initialize msg->aux_size to 0 and use it to track
385                  * the amount of data copied from the stream.
386                  */
387                 msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
388                 msg->aux_size = 0;
389                 msg->flags = flags;
390                 ioq->msg = msg;
391
392                 /*
393                  * We are either done or we fall-through
394                  */
395                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
396                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
397                         ioq->fifo_beg += ioq->hbytes;
398                         break;
399                 }
400
401                 /*
402                  * Fall through to the next state.  Make sure that the
403                  * extended header does not straddle the end of the buffer.
404                  * We still want to issue larger reads into our buffer,
405                  * book-keeping is easier if we don't bcopy() yet.
406                  */
407                 if (bytes + nmax < ioq->hbytes) {
408                         bcopy(iocom->rxbuf + ioq->fifo_beg, iocom->rxbuf,
409                               bytes);
410                         ioq->fifo_beg = 0;
411                         ioq->fifo_end = bytes;
412                         nmax = sizeof(iocom->rxbuf) - ioq->fifo_end;
413                 }
414                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
415                 /* fall through */
416         case HAMMER2_MSGQ_STATE_HEADER2:
417                 /*
418                  * Fill out the extended header.
419                  */
420                 assert(msg != NULL);
421                 if (bytes < ioq->hbytes) {
422                         n = read(iocom->sock_fd,
423                                  msg->any.buf + ioq->fifo_end,
424                                  nmax);
425                         if (n <= 0) {
426                                 if (n == 0) {
427                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
428                                         break;
429                                 }
430                                 if (errno != EINTR &&
431                                     errno != EINPROGRESS &&
432                                     errno != EAGAIN) {
433                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
434                                         break;
435                                 }
436                                 n = 0;
437                                 /* fall through */
438                         }
439                         ioq->fifo_end += n;
440                         bytes += n;
441                         nmax -= n;
442                 }
443
444                 /*
445                  * Insufficient data accumulated (set msg NULL so caller will
446                  * retry on event).
447                  */
448                 if (bytes < ioq->hbytes) {
449                         msg = NULL;
450                         break;
451                 }
452
453                 /*
454                  * XXX Decrypt the extended header
455                  */
456                 head = (void *)(iocom->rxbuf + ioq->fifo_beg);
457
458                 /*
459                  * Check the crc on the extended header
460                  */
461                 if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
462                         xcrc32 = hammer2_icrc32(head + 1,
463                                                 ioq->hbytes - sizeof(*head));
464                         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
465                         if (head->icrc2 != xcrc16) {
466                                 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
467                                 break;
468                         }
469                 }
470
471                 /*
472                  * Copy the extended header into the msg and adjust the
473                  * FIFO.
474                  */
475                 bcopy(head, &msg->any, ioq->hbytes);
476
477                 /*
478                  * We are either done or we fall-through.
479                  */
480                 if (ioq->abytes == 0) {
481                         ioq->fifo_beg += ioq->hbytes;
482                         break;
483                 }
484
485                 /*
486                  * Must adjust nmax and bytes (and the state) when falling
487                  * through.
488                  */
489                 ioq->fifo_beg += ioq->hbytes;
490                 nmax -= ioq->hbytes;
491                 bytes -= ioq->hbytes;
492                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
493                 /* fall through */
494         case HAMMER2_MSGQ_STATE_AUXDATA1:
495                 /*
496                  * Copy the partial or complete payload from remaining
497                  * bytes in the FIFO.  We have to fall-through either
498                  * way so we can check the crc.
499                  */
500                 assert(msg->aux_size == 0);
501                 if (bytes >= ioq->abytes) {
502                         bcopy(iocom->rxbuf + ioq->fifo_beg, msg->aux_data,
503                               ioq->abytes);
504                         msg->aux_size = ioq->abytes;
505                         ioq->fifo_beg += ioq->abytes;
506                         bytes -= ioq->abytes;
507                 } else if (bytes) {
508                         bcopy(iocom->rxbuf + ioq->fifo_beg, msg->aux_data,
509                               bytes);
510                         msg->aux_size = bytes;
511                         ioq->fifo_beg += bytes;
512                         bytes = 0;
513                 }
514                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
515                 /* fall through */
516         case HAMMER2_MSGQ_STATE_AUXDATA2:
517                 /*
518                  * Read the remainder of the payload directly into the
519                  * msg->aux_data buffer.
520                  */
521                 assert(msg);
522                 if (msg->aux_size < ioq->abytes) {
523                         assert(bytes == 0);
524                         n = read(iocom->sock_fd,
525                                  msg->aux_data + msg->aux_size,
526                                  ioq->abytes - msg->aux_size);
527                         if (n <= 0) {
528                                 if (n == 0) {
529                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
530                                         break;
531                                 }
532                                 if (errno != EINTR &&
533                                     errno != EINPROGRESS &&
534                                     errno != EAGAIN) {
535                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
536                                         break;
537                                 }
538                                 n = 0;
539                                 /* fall through */
540                         }
541                         msg->aux_size += n;
542                 }
543
544                 /*
545                  * Insufficient data accumulated (set msg NULL so caller will
546                  * retry on event).
547                  */
548                 if (msg->aux_size < ioq->abytes) {
549                         msg = NULL;
550                         break;
551                 }
552                 assert(msg->aux_size == ioq->abytes);
553
554                 /*
555                  * XXX Decrypt the data
556                  */
557
558                 /*
559                  * Check aux_icrc, then we are done.
560                  */
561                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
562                 if (xcrc32 != msg->any.head.aux_icrc) {
563                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
564                         break;
565                 }
566                 break;
567         case HAMMER2_MSGQ_STATE_ERROR:
568         default:
569                 /*
570                  * We don't double-return errors, the caller should not
571                  * have called us again after getting an error msg.
572                  */
573                 assert(0);
574                 break;
575         }
576
577         /*
578          * Handle error, RREQ, or completion
579          *
580          * NOTE: nmax and bytes are invalid at this point, we don't bother
581          *       to update them when breaking out.
582          */
583         if (ioq->error) {
584                 /*
585                  * An unrecoverable error occured during processing,
586                  * return a special error message.  Try to leave the
587                  * ioq state alone for post-mortem debugging.
588                  *
589                  * Link error messages are returned as one-way messages,
590                  * so no flags get set.  Source and target is 0 (link-level),
591                  * msgid is 0 (link-level).  All we really need to do is
592                  * set up magic, cmd, and error.
593                  */
594                 assert(ioq->msg == msg);
595                 if (msg == NULL)
596                         msg = hammer2_allocmsg(iocom, 0, 0);
597                 else
598                         ioq->msg = NULL;
599
600                 if (msg->aux_data) {
601                         free(msg->aux_data);
602                         msg->aux_data = NULL;
603                         msg->aux_size = 0;
604                 }
605                 bzero(&msg->any.head, sizeof(msg->any.head));
606                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
607                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
608                 msg->any.head.error = ioq->error;
609                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
610                 iocom->flags |= HAMMER2_IOCOMF_EOF;
611         } else if (msg == NULL) {
612                 /*
613                  * Insufficient data received to finish building the message,
614                  * set RREQ and return NULL.
615                  *
616                  * Leave ioq->msg intact.
617                  * Leave the FIFO intact.
618                  */
619                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
620                 ioq->fifo_beg = 0;
621                 ioq->fifo_end = 0;
622         } else {
623                 /*
624                  * Return msg, clear the FIFO if it is now empty.
625                  * Flag RREQ if the caller needs to wait for a read-event
626                  * or not.
627                  *
628                  * The fifo has already been advanced past the message.
629                  * Trivially reset the FIFO indices if possible.
630                  */
631                 if (ioq->fifo_beg == ioq->fifo_end) {
632                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
633                         ioq->fifo_beg = 0;
634                         ioq->fifo_end = 0;
635                 } else {
636                         iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
637                 }
638                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
639                 ioq->msg = NULL;
640         }
641         return (msg);
642 }
643
644 /*
645  * Calculate the header and data crc's and write a low-level message to
646  * the connection.  If aux_icrc is non-zero the aux_data crc is already
647  * assumed to have been set.
648  *
649  * A non-NULL msg is added to the queue but not necessarily flushed.
650  * Calling this function with msg == NULL will get a flush going.
651  */
652 void
653 hammer2_ioq_write(hammer2_msg_t *msg)
654 {
655         hammer2_iocom_t *iocom = msg->iocom;
656         hammer2_ioq_t *ioq = &iocom->ioq_tx;
657         uint16_t xcrc16;
658         uint32_t xcrc32;
659         int hbytes;
660
661         assert(msg);
662         if (ioq->error) {
663                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
664                 ++ioq->msgcount;
665                 hammer2_iocom_drain(iocom);
666                 return;
667         }
668
669         /*
670          * Finish populating the msg fields
671          */
672         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
673         msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
674         ++ioq->seq;
675
676         /*
677          * Calculate aux_icrc if 0, calculate icrc2, and finally
678          * calculate icrc1.
679          */
680         if (msg->aux_size && msg->any.head.aux_icrc == 0) {
681                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
682                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
683                 msg->any.head.aux_icrc = xcrc32;
684         }
685         msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
686         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
687
688         if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
689             sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
690                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
691                         HAMMER2_MSG_ALIGN;
692                 hbytes -= sizeof(msg->any.head);
693                 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
694                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
695                 msg->any.head.icrc2 = xcrc16;
696         } else {
697                 msg->any.head.icrc2 = 0;
698         }
699         xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
700                                 HAMMER2_MSGHDR_CRCBYTES);
701         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
702         msg->any.head.icrc1 = xcrc16;
703
704         /*
705          * XXX Encrypt the message
706          */
707
708         /*
709          * Enqueue the message.
710          */
711         TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
712         ++ioq->msgcount;
713         iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
714
715         /*
716          * Flush if we know we can write (WREQ not set) and if
717          * sufficient messages have accumulated.  Otherwise hold
718          * off to avoid piecemeal system calls.
719          */
720         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
721                 return;
722         if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
723                 return;
724         hammer2_iocom_flush(iocom);
725 }
726
727 void
728 hammer2_iocom_flush(hammer2_iocom_t *iocom)
729 {
730         hammer2_ioq_t *ioq = &iocom->ioq_tx;
731         hammer2_msg_t *msg;
732         ssize_t nmax;
733         ssize_t nact;
734         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
735         int hbytes;
736         int abytes;
737         int hoff;
738         int aoff;
739         int n;
740
741         /*
742          * Pump messages out the connection by building an iovec.
743          */
744         n = 0;
745         nmax = 0;
746
747         TAILQ_FOREACH(msg, &ioq->msgq, entry) {
748                 hoff = 0;
749                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
750                          HAMMER2_MSG_ALIGN;
751                 aoff = 0;
752                 abytes = msg->aux_size;
753                 if (n == 0) {
754                         hoff += ioq->hbytes;
755                         aoff += ioq->abytes;
756                 }
757                 if (hbytes - hoff > 0) {
758                         iov[n].iov_base = (char *)&msg->any.head + hoff;
759                         iov[n].iov_len = hbytes - hoff;
760                         nmax += hbytes - hoff;
761                         ++n;
762                         if (n == HAMMER2_IOQ_MAXIOVEC)
763                                 break;
764                 }
765                 if (abytes - aoff > 0) {
766                         assert(msg->aux_data != NULL);
767                         iov[n].iov_base = msg->aux_data + aoff;
768                         iov[n].iov_len = abytes - aoff;
769                         nmax += abytes - aoff;
770                         ++n;
771                         if (n == HAMMER2_IOQ_MAXIOVEC)
772                                 break;
773                 }
774         }
775         if (n == 0)
776                 return;
777
778         /*
779          * Execute the writev() then figure out what happened.
780          */
781         nact = writev(iocom->sock_fd, iov, n);
782         if (nact < 0) {
783                 if (errno != EINTR &&
784                     errno != EINPROGRESS &&
785                     errno != EAGAIN) {
786                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
787                         hammer2_iocom_drain(iocom);
788                 } else {
789                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
790                 }
791                 return;
792         }
793         if (nact == nmax)
794                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
795         else
796                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
797
798         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
799                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
800                          HAMMER2_MSG_ALIGN;
801                 abytes = msg->aux_size;
802
803                 if (nact < hbytes - ioq->hbytes) {
804                         ioq->hbytes += nact;
805                         break;
806                 }
807                 nact -= hbytes - ioq->hbytes;
808                 ioq->hbytes = hbytes;
809                 if (nact < abytes - ioq->abytes) {
810                         ioq->abytes += nact;
811                         break;
812                 }
813                 nact -= abytes - ioq->abytes;
814
815                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
816                 --ioq->msgcount;
817                 ioq->hbytes = 0;
818                 ioq->abytes = 0;
819                 if (msg->aux_data)
820                         TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
821                 else
822                         TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
823         }
824         if (msg == NULL) {
825                 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
826                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
827         }
828         if (ioq->error) {
829                 iocom->flags |= HAMMER2_IOCOMF_EOF |
830                                 HAMMER2_IOCOMF_WIDLE;
831                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
832         }
833 }
834
835 /*
836  * Kill pending msgs on ioq_tx and adjust the flags such that no more
837  * write events will occur.  We don't kill read msgs because we want
838  * the caller to pull off our contrived terminal error msg to detect
839  * the connection failure.
840  */
841 void
842 hammer2_iocom_drain(hammer2_iocom_t *iocom)
843 {
844         hammer2_ioq_t *ioq = &iocom->ioq_tx;
845         hammer2_msg_t *msg;
846
847         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
848                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
849                 --ioq->msgcount;
850                 hammer2_freemsg(msg);
851         }
852         iocom->flags |= HAMMER2_IOCOMF_WIDLE;
853         iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
854 }
855
856 /*
857  * This is a shortcut to the normal hammer2_allocreply() mechanic which
858  * uses the received message to formulate a final reply and error code.
859  * Can be used to issue a final reply for one-way, one-off, or streaming
860  * commands.
861  *
862  * Replies to one-way messages are a bit of an oxymoron but the feature
863  * is used by the debug (DBG) protocol.
864  *
865  * The reply contains no data.
866  *
867  * (msg) is eaten up by this function.
868  */
869 void
870 hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
871 {
872         hammer2_persist_t *pers;
873         uint16_t t16;
874
875         assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
876
877         t16 = msg->any.head.source;
878         msg->any.head.source = msg->any.head.target;
879         msg->any.head.target = t16;
880         msg->any.head.error = error;
881         msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
882         msg->aux_size = 0;
883         if ((pers = msg->persist) != NULL) {
884                 assert(pers->lrep & HAMMER2_MSGF_DELETE);
885                 msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
886                                                    HAMMER2_MSGF_DELETE);
887                 pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
888         }
889         hammer2_ioq_write(msg);
890 }