hammer2 - Early messaging infrastructure
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 /*
39  * Initialize a low-level ioq
40  */
41 void
42 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
43 {
44         bzero(ioq, sizeof(*ioq));
45         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
46         TAILQ_INIT(&ioq->msgq);
47 }
48
49 void
50 hammer2_ioq_done(hammer2_iocom_t *iocom, hammer2_ioq_t *ioq)
51 {
52         hammer2_msg_t *msg;
53
54         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
55                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
56                 hammer2_iocom_freemsg(iocom, msg);
57         }
58         if ((msg = ioq->msg) != NULL) {
59                 ioq->msg = NULL;
60                 hammer2_iocom_freemsg(iocom, msg);
61         }
62 }
63
64 /*
65  * Initialize a low-level communications channel
66  */
67 void
68 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
69 {
70         bzero(iocom, sizeof(*iocom));
71
72         TAILQ_INIT(&iocom->freeq);
73         TAILQ_INIT(&iocom->freeq_aux);
74         iocom->sock_fd = sock_fd;
75         iocom->alt_fd = alt_fd;
76         iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
77         hammer2_ioq_init(iocom, &iocom->ioq_rx);
78         hammer2_ioq_init(iocom, &iocom->ioq_tx);
79
80         if (sock_fd >= 0)
81                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
82 #if 0
83         /* if line buffered our single fgets() should be fine */
84         if (alt_fd >= 0)
85                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
86 #endif
87 }
88
89 void
90 hammer2_iocom_done(hammer2_iocom_t *iocom)
91 {
92         hammer2_msg_t *msg;
93
94         iocom->sock_fd = -1;
95         hammer2_ioq_done(iocom, &iocom->ioq_rx);
96         hammer2_ioq_done(iocom, &iocom->ioq_tx);
97         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
98                 TAILQ_REMOVE(&iocom->freeq, msg, entry);
99                 free(msg);
100         }
101         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
102                 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
103                 free(msg->aux_data);
104                 msg->aux_data = NULL;
105                 free(msg);
106         }
107 }
108
109 hammer2_msg_t *
110 hammer2_iocom_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
111 {
112         hammer2_msg_t *msg;
113         int hbytes;
114
115         if (aux_size) {
116                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
117                            ~HAMMER2_MSG_ALIGNMASK;
118                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
119                         TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
120         } else {
121                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
122                         TAILQ_REMOVE(&iocom->freeq, msg, entry);
123         }
124         if (msg == NULL) {
125                 msg = malloc(sizeof(*msg));
126                 msg->aux_data = NULL;
127                 msg->aux_size = 0;
128         }
129         if (msg->aux_size != aux_size) {
130                 if (msg->aux_data) {
131                         free(msg->aux_data);
132                         msg->aux_data = NULL;
133                         msg->aux_size = 0;
134                 }
135                 if (aux_size) {
136                         msg->aux_data = malloc(aux_size);
137                         msg->aux_size = aux_size;
138                 }
139         }
140         msg->flags = 0;
141         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
142         bzero(&msg->any.head, hbytes);
143         msg->any.head.cmd = cmd;
144
145         return (msg);
146 }
147
148 void
149 hammer2_iocom_reallocmsg(hammer2_iocom_t *iocom __unused, hammer2_msg_t *msg,
150                          int aux_size)
151 {
152         aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) & ~HAMMER2_MSG_ALIGNMASK;
153         if (aux_size && msg->aux_size != aux_size) {
154                 if (msg->aux_data) {
155                         free(msg->aux_data);
156                         msg->aux_data = NULL;
157                 }
158                 msg->aux_data = malloc(aux_size);
159                 msg->aux_size = aux_size;
160         }
161         msg->flags = 0;
162 }
163
164 void
165 hammer2_iocom_freemsg(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
166 {
167         if (msg->aux_data)
168                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
169         else
170                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
171 }
172
173 /*
174  * I/O core loop for an iocom.
175  */
176 void
177 hammer2_iocom_core(hammer2_iocom_t *iocom,
178                    void (*recvmsg_func)(hammer2_iocom_t *),
179                    void (*sendmsg_func)(hammer2_iocom_t *),
180                    void (*altmsg_func)(hammer2_iocom_t *))
181 {
182         struct pollfd fds[2];
183         int timeout = 5000;
184
185         iocom->recvmsg_callback = recvmsg_func;
186         iocom->sendmsg_callback = sendmsg_func;
187         iocom->altmsg_callback = altmsg_func;
188
189         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
190                 fds[0].fd = iocom->sock_fd;
191                 fds[0].events = 0;
192                 fds[0].revents = 0;
193
194                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
195                         fds[0].events |= POLLIN;
196                 else
197                         timeout = 0;
198                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
199                         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
200                                 fds[0].events |= POLLOUT;
201                         else
202                                 timeout = 0;
203                 }
204
205                 if (iocom->alt_fd >= 0) {
206                         fds[1].fd = iocom->alt_fd;
207                         fds[1].events |= POLLIN;
208                         fds[1].revents = 0;
209                         poll(fds, 2, timeout);
210                 } else {
211                         poll(fds, 1, timeout);
212                 }
213                 if ((fds[0].revents & POLLIN) ||
214                     (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
215                         iocom->recvmsg_callback(iocom);
216                 }
217                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
218                         if ((fds[0].revents & POLLOUT) ||
219                             (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
220                                 iocom->sendmsg_callback(iocom);
221                         }
222                 }
223                 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
224                         iocom->altmsg_callback(iocom);
225         }
226 }
227
228 /*
229  * Read the next ready message from the ioq, issuing I/O if needed.
230  * Caller should retry on a read-event when NULL is returned.
231  *
232  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
233  * be returned (and the caller must not call us again after that).
234  */
235 hammer2_msg_t *
236 hammer2_ioq_read(hammer2_iocom_t *iocom)
237 {
238         hammer2_ioq_t *ioq = &iocom->ioq_rx;
239         hammer2_msg_t *msg;
240         hammer2_msg_hdr_t *head;
241         ssize_t n;
242         int bytes;
243         int flags;
244         int nmax;
245         uint16_t xcrc16;
246         uint32_t xcrc32;
247
248         /*
249          * If a message is already pending we can just remove and
250          * return it.
251          */
252         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
253                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
254                 return(msg);
255         }
256
257         /*
258          * Message read in-progress (msg is NULL at the moment).  We don't
259          * allocate a msg until we have its core header.
260          */
261         bytes = ioq->fifo_end - ioq->fifo_beg;
262         nmax = sizeof(iocom->rxbuf) - ioq->fifo_end;
263         msg = ioq->msg;
264
265         switch(ioq->state) {
266         case HAMMER2_MSGQ_STATE_HEADER1:
267                 /*
268                  * Load the primary header, fail on any non-trivial read
269                  * error or on EOF.  Since the primary header is the same
270                  * size is the message alignment it will never straddle
271                  * the end of the buffer.
272                  */
273                 if (bytes < (int)sizeof(msg->any.head)) {
274                         n = read(iocom->sock_fd,
275                                  iocom->rxbuf + ioq->fifo_end,
276                                  nmax);
277                         if (n <= 0) {
278                                 if (n == 0) {
279                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
280                                         break;
281                                 }
282                                 if (errno != EINTR &&
283                                     errno != EINPROGRESS &&
284                                     errno != EAGAIN) {
285                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
286                                         break;
287                                 }
288                                 n = 0;
289                                 /* fall through */
290                         }
291                         ioq->fifo_end += n;
292                         bytes += n;
293                         nmax -= n;
294                 }
295
296                 /*
297                  * Insufficient data accumulated (msg is NULL, caller will
298                  * retry on event).
299                  */
300                 assert(msg == NULL);
301                 if (bytes < (int)sizeof(msg->any.head))
302                         break;
303
304                 flags = 0;
305                 head = (void *)(iocom->rxbuf + ioq->fifo_beg);
306
307                 /*
308                  * XXX Decrypt the core header
309                  */
310
311                 /*
312                  * Check and fixup the core header.  Note that the icrc
313                  * has to be calculated before any fixups, but the crc
314                  * fields in the msg may have to be swapped like everything
315                  * else.
316                  */
317                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
318                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
319                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
320                         break;
321                 }
322
323                 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
324                                         HAMMER2_MSGHDR_CRCBYTES);
325                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
326                         hammer2_bswap_head(head);
327                         flags |= HAMMER2_MSGX_BSWAPPED;
328                 }
329                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
330                 if (xcrc16 != head->icrc1) {
331                         ioq->error = HAMMER2_IOQ_ERROR_HCRC;
332                         break;
333                 }
334
335                 /*
336                  * Calculate the full header size and aux data size
337                  */
338                 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
339                               HAMMER2_MSG_ALIGN;
340                 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
341                 if (ioq->hbytes < (int)sizeof(msg->any.head) ||
342                     ioq->hbytes > (int)sizeof(msg->any) ||
343                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
344                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
345                         break;
346                 }
347
348                 /*
349                  * Finally allocate the message and copy the core header
350                  * to the embedded extended header.
351                  */
352                 if (ioq->abytes) {
353                         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
354                                 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
355                         } else {
356                                 msg = malloc(sizeof(*msg));
357                                 msg->aux_data = NULL;
358                                 msg->aux_size = 0;
359                         }
360                         if (msg->aux_size != ioq->abytes) {
361                                 if (msg->aux_data) {
362                                         free(msg->aux_data);
363                                         msg->aux_data = NULL;
364                                 }
365                                 msg->aux_data = malloc(ioq->abytes);
366                                 /* msg->aux_size = ioq->abytes; */
367                         }
368                 } else {
369                         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
370                                 TAILQ_REMOVE(&iocom->freeq, msg, entry);
371                         } else {
372                                 msg = malloc(sizeof(*msg));
373                                 msg->aux_data = NULL;
374                                 /* msg->aux_size = 0; */
375                         }
376                 }
377                 msg->aux_size = 0;      /* data copied so far */
378                 msg->flags = flags;
379                 ioq->msg = msg;
380
381                 /*
382                  * We are either done or we fall-through
383                  */
384                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
385                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
386                         ioq->fifo_beg += ioq->hbytes;
387                         break;
388                 }
389
390                 /*
391                  * Fall through to the next state.  Make sure that the
392                  * extended header does not straddle the end of the buffer.
393                  * We still want to issue larger reads into our buffer,
394                  * book-keeping is easier if we don't bcopy() yet.
395                  */
396                 if (bytes + nmax < ioq->hbytes) {
397                         bcopy(iocom->rxbuf + ioq->fifo_beg, iocom->rxbuf,
398                               bytes);
399                         ioq->fifo_beg = 0;
400                         ioq->fifo_end = bytes;
401                         nmax = sizeof(iocom->rxbuf) - ioq->fifo_end;
402                 }
403                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
404                 /* fall through */
405         case HAMMER2_MSGQ_STATE_HEADER2:
406                 /*
407                  * Fill out the extended header.
408                  */
409                 assert(msg != NULL);
410                 if (bytes < ioq->hbytes) {
411                         n = read(iocom->sock_fd,
412                                  msg->any.buf + ioq->fifo_end,
413                                  nmax);
414                         if (n <= 0) {
415                                 if (n == 0) {
416                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
417                                         break;
418                                 }
419                                 if (errno != EINTR &&
420                                     errno != EINPROGRESS &&
421                                     errno != EAGAIN) {
422                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
423                                         break;
424                                 }
425                                 n = 0;
426                                 /* fall through */
427                         }
428                         ioq->fifo_end += n;
429                         bytes += n;
430                         nmax -= n;
431                 }
432
433                 /*
434                  * Insufficient data accumulated (set msg NULL so caller will
435                  * retry on event).
436                  */
437                 if (bytes < ioq->hbytes) {
438                         msg = NULL;
439                         break;
440                 }
441
442                 /*
443                  * XXX Decrypt the extended header
444                  */
445                 head = (void *)(iocom->rxbuf + ioq->fifo_beg);
446
447                 /*
448                  * Check the crc on the extended header
449                  */
450                 if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
451                         xcrc32 = hammer2_icrc32(head + 1,
452                                                 ioq->hbytes - sizeof(*head));
453                         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
454                         if (head->icrc2 != xcrc16) {
455                                 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
456                                 break;
457                         }
458                 }
459
460                 /*
461                  * Copy the extended header into the msg and adjust the
462                  * FIFO.
463                  */
464                 bcopy(head, &msg->any, ioq->hbytes);
465
466                 /*
467                  * We are either done or we fall-through.
468                  */
469                 if (ioq->abytes == 0) {
470                         ioq->fifo_beg += ioq->hbytes;
471                         break;
472                 }
473
474                 /*
475                  * Must adjust nmax and bytes (and the state) when falling
476                  * through.
477                  */
478                 ioq->fifo_beg += ioq->hbytes;
479                 nmax -= ioq->hbytes;
480                 bytes -= ioq->hbytes;
481                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
482                 /* fall through */
483         case HAMMER2_MSGQ_STATE_AUXDATA1:
484                 /*
485                  * Copy the partial or complete payload from remaining
486                  * bytes in the FIFO.  We have to fall-through either
487                  * way so we can check the crc.
488                  */
489                 assert(msg->aux_size == 0);
490                 if (bytes >= ioq->abytes) {
491                         bcopy(iocom->rxbuf + ioq->fifo_beg, msg->aux_data,
492                               ioq->abytes);
493                         msg->aux_size = ioq->abytes;
494                         ioq->fifo_beg += ioq->abytes;
495                         bytes -= ioq->abytes;
496                 } else if (bytes) {
497                         bcopy(iocom->rxbuf + ioq->fifo_beg, msg->aux_data,
498                               bytes);
499                         msg->aux_size = bytes;
500                         ioq->fifo_beg += bytes;
501                         bytes = 0;
502                 }
503                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
504                 /* fall through */
505         case HAMMER2_MSGQ_STATE_AUXDATA2:
506                 /*
507                  * Read the remainder of the payload directly into the
508                  * msg->aux_data buffer.
509                  */
510                 assert(msg);
511                 if (msg->aux_size < ioq->abytes) {
512                         assert(bytes == 0);
513                         n = read(iocom->sock_fd,
514                                  msg->aux_data + msg->aux_size,
515                                  ioq->abytes - msg->aux_size);
516                         if (n <= 0) {
517                                 if (n == 0) {
518                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
519                                         break;
520                                 }
521                                 if (errno != EINTR &&
522                                     errno != EINPROGRESS &&
523                                     errno != EAGAIN) {
524                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
525                                         break;
526                                 }
527                                 n = 0;
528                                 /* fall through */
529                         }
530                         msg->aux_size += n;
531                 }
532
533                 /*
534                  * Insufficient data accumulated (set msg NULL so caller will
535                  * retry on event).
536                  */
537                 if (msg->aux_size < ioq->abytes) {
538                         msg = NULL;
539                         break;
540                 }
541                 assert(msg->aux_size == ioq->abytes);
542
543                 /*
544                  * XXX Decrypt the data
545                  */
546
547                 /*
548                  * Check aux_icrc, then we are done.
549                  */
550                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
551                 if (xcrc32 != msg->any.head.aux_icrc) {
552                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
553                         break;
554                 }
555                 break;
556         case HAMMER2_MSGQ_STATE_ERROR:
557         default:
558                 /*
559                  * We don't double-return errors, the caller should not
560                  * have called us again after getting an error msg.
561                  */
562                 assert(0);
563                 break;
564         }
565
566         /*
567          * Handle error, RREQ, or completion
568          *
569          * NOTE: nmax and bytes are invalid at this point, we don't bother
570          *       to update them when breaking out.
571          */
572         if (ioq->error) {
573                 /*
574                  * An unrecoverable error occured during processing,
575                  * return a special error message.  Try to leave the
576                  * ioq state alone for post-mortem debugging.
577                  *
578                  * Link error messages are returned as one-way messages,
579                  * so no flags get set.  Source and target is 0 (link-level),
580                  * msgid is 0 (link-level).  All we really need to do is
581                  * set up magic, cmd, and error.
582                  */
583                 if (msg == NULL) {
584                         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
585                                 TAILQ_REMOVE(&iocom->freeq, msg, entry);
586                         } else {
587                                 msg = malloc(sizeof(*msg));
588                                 msg->aux_data = NULL;
589                                 msg->aux_size = 0;
590                         }
591                         assert(ioq->msg == NULL);
592                 } else {
593                         assert(ioq->msg == msg);
594                         ioq->msg = NULL;
595                 }
596                 if (msg->aux_data) {
597                         free(msg->aux_data);
598                         msg->aux_data = NULL;
599                         msg->aux_size = 0;
600                 }
601                 bzero(&msg->any.head, sizeof(msg->any.head));
602                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
603                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
604                 msg->any.head.error = ioq->error;
605                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
606                 iocom->flags |= HAMMER2_IOCOMF_EOF;
607         } else if (msg == NULL) {
608                 /*
609                  * Insufficient data received to finish building the message,
610                  * set RREQ and return NULL.
611                  *
612                  * Leave ioq->msg intact.
613                  * Leave the FIFO intact.
614                  */
615                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
616                 ioq->fifo_beg = 0;
617                 ioq->fifo_end = 0;
618         } else {
619                 /*
620                  * Return msg, clear the FIFO if it is now empty.
621                  * Flag RREQ if the caller needs to wait for a read-event
622                  * or not.
623                  *
624                  * The fifo has already been advanced past the message.
625                  * Trivially reset the FIFO indices if possible.
626                  */
627                 if (ioq->fifo_beg == ioq->fifo_end) {
628                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
629                         ioq->fifo_beg = 0;
630                         ioq->fifo_end = 0;
631                 } else {
632                         iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
633                 }
634                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
635                 ioq->msg = NULL;
636         }
637         return (msg);
638 }
639
640 /*
641  * Calculate the header and data crc's and write a low-level message to
642  * the connection.  If aux_icrc is non-zero the aux_data crc is already
643  * assumed to have been set.
644  *
645  * A non-NULL msg is added to the queue but not necessarily flushed.
646  * Calling this function with msg == NULL will get a flush going.
647  */
648 void
649 hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
650 {
651         hammer2_ioq_t *ioq = &iocom->ioq_tx;
652         ssize_t nmax;
653         ssize_t nact;
654         int hbytes;
655         int abytes;
656         int hoff;
657         int aoff;
658         uint16_t xcrc16;
659         uint32_t xcrc32;
660         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
661         int n;
662
663         if (ioq->error) {
664                 if (msg) {
665                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
666                         ++ioq->msgcount;
667                 }
668                 hammer2_ioq_write_drain(iocom);
669                 return;
670         }
671
672         if (msg) {
673                 /*
674                  * Finish populating the msg fields
675                  */
676                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
677                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
678                 ++ioq->seq;
679
680                 /*
681                  * Calculate aux_icrc if 0, calculate icrc2, and finally
682                  * calculate icrc1.
683                  */
684                 if (msg->aux_size && msg->any.head.aux_icrc == 0) {
685                         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
686                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
687                         msg->any.head.aux_icrc = xcrc32;
688                 }
689                 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
690                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
691
692                 if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
693                     sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
694                         hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
695                                 HAMMER2_MSG_ALIGN;
696                         hbytes -= sizeof(msg->any.head);
697                         xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
698                         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
699                         msg->any.head.icrc2 = xcrc16;
700                 } else {
701                         msg->any.head.icrc2 = 0;
702                 }
703                 xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
704                                         HAMMER2_MSGHDR_CRCBYTES);
705                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
706                 msg->any.head.icrc1 = xcrc16;
707
708                 /*
709                  * XXX Encrypt the message
710                  */
711
712                 /*
713                  * Enqueue the message, stop now if we already know that
714                  * we can't write.
715                  */
716                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
717                 ++ioq->msgcount;
718                 iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
719                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
720                         return;
721
722                 /*
723                  * Flush if we can aggregate several msgs, otherwise
724                  * we will wait for the global flush (msg == NULL).
725                  */
726                 if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
727                         return;
728         } else if (iocom->flags &= HAMMER2_IOCOMF_WIDLE) {
729                 /*
730                  * Nothing to do if WIDLE is set.
731                  */
732                 assert(TAILQ_FIRST(&ioq->msgq) == NULL);
733                 return;
734         }
735
736         /*
737          * Pump messages out the connection by building an iovec.
738          */
739         n = 0;
740         nmax = 0;
741
742         TAILQ_FOREACH(msg, &ioq->msgq, entry) {
743                 hoff = 0;
744                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
745                          HAMMER2_MSG_ALIGN;
746                 aoff = 0;
747                 abytes = msg->aux_size;
748                 if (n == 0) {
749                         hoff += ioq->hbytes;
750                         aoff += ioq->abytes;
751                 }
752                 if (hbytes - hoff > 0) {
753                         iov[n].iov_base = (char *)&msg->any.head + hoff;
754                         iov[n].iov_len = hbytes - hoff;
755                         nmax += hbytes - hoff;
756                         ++n;
757                         if (n == HAMMER2_IOQ_MAXIOVEC)
758                                 break;
759                 }
760                 if (abytes - aoff > 0) {
761                         assert(msg->aux_data != NULL);
762                         iov[n].iov_base = msg->aux_data + aoff;
763                         iov[n].iov_len = abytes - aoff;
764                         nmax += abytes - aoff;
765                         ++n;
766                         if (n == HAMMER2_IOQ_MAXIOVEC)
767                                 break;
768                 }
769         }
770         if (n == 0)
771                 return;
772
773         /*
774          * Execute the writev() then figure out what happened.
775          */
776         nact = writev(iocom->sock_fd, iov, n);
777         if (nact < 0) {
778                 if (errno != EINTR &&
779                     errno != EINPROGRESS &&
780                     errno != EAGAIN) {
781                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
782                         hammer2_ioq_write_drain(iocom);
783                 } else {
784                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
785                 }
786                 return;
787         }
788         if (nact == nmax)
789                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
790         else
791                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
792
793         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
794                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
795                          HAMMER2_MSG_ALIGN;
796                 abytes = msg->aux_size;
797
798                 if (nact < hbytes - ioq->hbytes) {
799                         ioq->hbytes += nact;
800                         break;
801                 }
802                 nact -= hbytes - ioq->hbytes;
803                 ioq->hbytes = hbytes;
804                 if (nact < abytes - ioq->abytes) {
805                         ioq->abytes += nact;
806                         break;
807                 }
808                 nact -= abytes - ioq->abytes;
809
810                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
811                 --ioq->msgcount;
812                 ioq->hbytes = 0;
813                 ioq->abytes = 0;
814                 if (msg->aux_data)
815                         TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
816                 else
817                         TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
818         }
819         if (msg == NULL) {
820                 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
821                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
822         }
823         if (ioq->error) {
824                 iocom->flags |= HAMMER2_IOCOMF_EOF |
825                                 HAMMER2_IOCOMF_WIDLE;
826                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
827         }
828 }
829
830 /*
831  * Kill pending msgs on ioq_tx and adjust the flags such that no more
832  * write events will occur.  We don't kill read msgs because we want
833  * the caller to pull off our contrived terminal error msg to detect
834  * the connection failure.
835  */
836 void
837 hammer2_ioq_write_drain(hammer2_iocom_t *iocom)
838 {
839         hammer2_ioq_t *ioq = &iocom->ioq_tx;
840         hammer2_msg_t *msg;
841
842         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
843                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
844                 --ioq->msgcount;
845                 hammer2_iocom_freemsg(iocom, msg);
846         }
847         iocom->flags |= HAMMER2_IOCOMF_WIDLE;
848         iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
849 }
850
851 /*
852  * Reply to a message after setting various fields appropriately.
853  * This function will swap (source) and (target) and enqueue the
854  * message for transmission.
855  */
856 void
857 hammer2_ioq_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
858 {
859         uint16_t t16;
860
861         t16 = msg->any.head.source;
862         msg->any.head.source = msg->any.head.target;
863         msg->any.head.target = t16;
864         msg->any.head.cmd ^= HAMMER2_MSGF_REPLY;
865         hammer2_ioq_write(iocom, msg);
866 }
867
868 void
869 hammer2_ioq_reply_term(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
870                        uint16_t error)
871 {
872         if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
873                 msg->any.head.cmd |= HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE;
874                 msg->any.head.error = error;
875                 hammer2_ioq_reply(iocom, msg);
876         }
877 }