Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository...
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 /*
39  * Initialize a low-level ioq
40  */
41 void
42 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
43 {
44         bzero(ioq, sizeof(*ioq));
45         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
46         TAILQ_INIT(&ioq->msgq);
47 }
48
49 void
50 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
51 {
52         hammer2_msg_t *msg;
53
54         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
55                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
56                 hammer2_freemsg(msg);
57         }
58         if ((msg = ioq->msg) != NULL) {
59                 ioq->msg = NULL;
60                 hammer2_freemsg(msg);
61         }
62 }
63
64 /*
65  * Initialize a low-level communications channel
66  */
67 void
68 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
69 {
70         bzero(iocom, sizeof(*iocom));
71
72         TAILQ_INIT(&iocom->freeq);
73         TAILQ_INIT(&iocom->freeq_aux);
74         iocom->sock_fd = sock_fd;
75         iocom->alt_fd = alt_fd;
76         iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
77         hammer2_ioq_init(iocom, &iocom->ioq_rx);
78         hammer2_ioq_init(iocom, &iocom->ioq_tx);
79
80         /*
81          * Negotiate session crypto synchronously.  This will mark the
82          * connection as error'd if it fails.
83          */
84         hammer2_crypto_negotiate(iocom);
85
86         /*
87          * Make sure our fds are set to non-blocking for the iocom core.
88          */
89         if (sock_fd >= 0)
90                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
91 #if 0
92         /* if line buffered our single fgets() should be fine */
93         if (alt_fd >= 0)
94                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
95 #endif
96 }
97
98 void
99 hammer2_iocom_done(hammer2_iocom_t *iocom)
100 {
101         hammer2_msg_t *msg;
102
103         iocom->sock_fd = -1;
104         hammer2_ioq_done(iocom, &iocom->ioq_rx);
105         hammer2_ioq_done(iocom, &iocom->ioq_tx);
106         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
107                 TAILQ_REMOVE(&iocom->freeq, msg, entry);
108                 free(msg);
109         }
110         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
111                 TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
112                 free(msg->aux_data);
113                 msg->aux_data = NULL;
114                 free(msg);
115         }
116 }
117
118 /*
119  * Allocate a new one-way message.
120  */
121 hammer2_msg_t *
122 hammer2_allocmsg(hammer2_iocom_t *iocom, uint32_t cmd, int aux_size)
123 {
124         hammer2_msg_t *msg;
125         int hbytes;
126
127         if (aux_size) {
128                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
129                            ~HAMMER2_MSG_ALIGNMASK;
130                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
131                         TAILQ_REMOVE(&iocom->freeq_aux, msg, entry);
132         } else {
133                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
134                         TAILQ_REMOVE(&iocom->freeq, msg, entry);
135         }
136         if (msg == NULL) {
137                 msg = malloc(sizeof(*msg));
138                 msg->iocom = iocom;
139                 msg->aux_data = NULL;
140                 msg->aux_size = 0;
141         }
142         if (msg->aux_size != aux_size) {
143                 if (msg->aux_data) {
144                         free(msg->aux_data);
145                         msg->aux_data = NULL;
146                         msg->aux_size = 0;
147                 }
148                 if (aux_size) {
149                         msg->aux_data = malloc(aux_size);
150                         msg->aux_size = aux_size;
151                 }
152         }
153         msg->flags = 0;
154         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
155         if (hbytes)
156                 bzero(&msg->any.head, hbytes);
157         msg->any.head.aux_icrc = 0;
158         msg->any.head.cmd = cmd;
159
160         return (msg);
161 }
162
163 /*
164  * Allocate a one-way or streaming reply to a message.  The message is
165  * not modified.  This function may be used to allocate multiple replies.
166  *
167  * If cmd is 0 then msg->any.head.cmd is used to formulate the reply command.
168  */
169 hammer2_msg_t *
170 hammer2_allocreply(hammer2_msg_t *msg, uint32_t cmd, int aux_size)
171 {
172         hammer2_msg_t *rmsg;
173         hammer2_persist_t *pers;
174
175         assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
176         if (cmd == 0)
177                 cmd = msg->any.head.cmd;
178
179         rmsg = hammer2_allocmsg(msg->iocom, cmd, aux_size);
180         rmsg->any.head = msg->any.head;
181         rmsg->any.head.source = msg->any.head.target;
182         rmsg->any.head.target = msg->any.head.source;
183         rmsg->any.head.cmd = (cmd | HAMMER2_MSGF_REPLY) &
184                              ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
185         rmsg->any.head.aux_icrc = 0;
186
187         if ((pers = msg->persist) != NULL) {
188                 assert(pers->lrep & HAMMER2_MSGF_DELETE);
189                 rmsg->any.head.cmd |= pers->lrep & HAMMER2_MSGF_CREATE;
190                 pers->lrep &= ~HAMMER2_MSGF_CREATE;
191                 /* do not clear DELETE */
192         }
193         return (rmsg);
194 }
195
196 /*
197  * Free a message so it can be reused afresh.
198  *
199  * NOTE: aux_size can be 0 with a non-NULL aux_data.
200  */
201 void
202 hammer2_freemsg(hammer2_msg_t *msg)
203 {
204         hammer2_iocom_t *iocom = msg->iocom;
205
206         if (msg->aux_data)
207                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
208         else
209                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
210 }
211
212 /*
213  * I/O core loop for an iocom.
214  */
215 void
216 hammer2_iocom_core(hammer2_iocom_t *iocom,
217                    void (*recvmsg_func)(hammer2_iocom_t *),
218                    void (*sendmsg_func)(hammer2_iocom_t *),
219                    void (*altmsg_func)(hammer2_iocom_t *))
220 {
221         struct pollfd fds[2];
222         int timeout;
223
224         iocom->recvmsg_callback = recvmsg_func;
225         iocom->sendmsg_callback = sendmsg_func;
226         iocom->altmsg_callback = altmsg_func;
227
228         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
229                 timeout = 5000;
230
231                 fds[0].fd = iocom->sock_fd;
232                 fds[0].events = 0;
233                 fds[0].revents = 0;
234
235                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
236                         fds[0].events |= POLLIN;
237                 else
238                         timeout = 0;
239                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
240                         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
241                                 fds[0].events |= POLLOUT;
242                         else
243                                 timeout = 0;
244                 }
245
246                 if (iocom->alt_fd >= 0) {
247                         fds[1].fd = iocom->alt_fd;
248                         fds[1].events |= POLLIN;
249                         fds[1].revents = 0;
250                         poll(fds, 2, timeout);
251                 } else {
252                         poll(fds, 1, timeout);
253                 }
254                 if ((fds[0].revents & POLLIN) ||
255                     (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
256                         iocom->recvmsg_callback(iocom);
257                 }
258                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
259                         if ((fds[0].revents & POLLOUT) ||
260                             (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
261                                 iocom->sendmsg_callback(iocom);
262                         }
263                 }
264                 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
265                         iocom->altmsg_callback(iocom);
266         }
267 }
268
269 /*
270  * Read the next ready message from the ioq, issuing I/O if needed.
271  * Caller should retry on a read-event when NULL is returned.
272  *
273  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
274  * be returned (and the caller must not call us again after that).
275  */
276 hammer2_msg_t *
277 hammer2_ioq_read(hammer2_iocom_t *iocom)
278 {
279         hammer2_ioq_t *ioq = &iocom->ioq_rx;
280         hammer2_msg_t *msg;
281         hammer2_msg_hdr_t *head;
282         ssize_t n;
283         int bytes;
284         int flags;
285         int nmax;
286         uint16_t xcrc16;
287         uint32_t xcrc32;
288
289         /*
290          * If a message is already pending we can just remove and
291          * return it.
292          */
293         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
294                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
295                 return(msg);
296         }
297
298         /*
299          * Message read in-progress (msg is NULL at the moment).  We don't
300          * allocate a msg until we have its core header.
301          */
302         bytes = ioq->fifo_end - ioq->fifo_beg;
303         nmax = sizeof(ioq->buf) - ioq->fifo_end;
304         msg = ioq->msg;
305
306         switch(ioq->state) {
307         case HAMMER2_MSGQ_STATE_HEADER1:
308                 /*
309                  * Load the primary header, fail on any non-trivial read
310                  * error or on EOF.  Since the primary header is the same
311                  * size is the message alignment it will never straddle
312                  * the end of the buffer.
313                  */
314                 if (bytes < (int)sizeof(msg->any.head)) {
315                         n = read(iocom->sock_fd,
316                                  ioq->buf + ioq->fifo_end,
317                                  nmax);
318                         if (n <= 0) {
319                                 if (n == 0) {
320                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
321                                         break;
322                                 }
323                                 if (errno != EINTR &&
324                                     errno != EINPROGRESS &&
325                                     errno != EAGAIN) {
326                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
327                                         break;
328                                 }
329                                 n = 0;
330                                 /* fall through */
331                         }
332                         ioq->fifo_end += n;
333                         bytes += n;
334                         nmax -= n;
335                 }
336
337                 /*
338                  * Insufficient data accumulated (msg is NULL, caller will
339                  * retry on event).
340                  */
341                 assert(msg == NULL);
342                 if (bytes < (int)sizeof(msg->any.head))
343                         break;
344
345                 /*
346                  * Calculate the header, decrypt data received so far.
347                  * Data will be decrypted in-place.  Partial blocks are
348                  * not immediately decrypted.
349                  */
350                 hammer2_crypto_decrypt(iocom, ioq);
351                 flags = 0;
352                 head = (void *)(ioq->buf + ioq->fifo_beg);
353
354                 /*
355                  * Check and fixup the core header.  Note that the icrc
356                  * has to be calculated before any fixups, but the crc
357                  * fields in the msg may have to be swapped like everything
358                  * else.
359                  */
360                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
361                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
362                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
363                         break;
364                 }
365
366                 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
367                                         HAMMER2_MSGHDR_CRCBYTES);
368                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
369                         hammer2_bswap_head(head);
370                         flags |= HAMMER2_MSGX_BSWAPPED;
371                 }
372                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
373                 if (xcrc16 != head->icrc1) {
374                         ioq->error = HAMMER2_IOQ_ERROR_HCRC;
375                         break;
376                 }
377
378                 /*
379                  * Calculate the full header size and aux data size
380                  */
381                 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
382                               HAMMER2_MSG_ALIGN;
383                 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
384                 if (ioq->hbytes < (int)sizeof(msg->any.head) ||
385                     ioq->hbytes > (int)sizeof(msg->any) ||
386                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
387                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
388                         break;
389                 }
390
391                 /*
392                  * Finally allocate the message and copy the core header
393                  * to the embedded extended header.
394                  *
395                  * Initialize msg->aux_size to 0 and use it to track
396                  * the amount of data copied from the stream.
397                  */
398                 msg = hammer2_allocmsg(iocom, 0, ioq->abytes);
399                 msg->aux_size = 0;
400                 msg->flags = flags;
401                 ioq->msg = msg;
402
403                 /*
404                  * We are either done or we fall-through
405                  */
406                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
407                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
408                         ioq->fifo_beg += ioq->hbytes;
409                         break;
410                 }
411
412                 /*
413                  * Fall through to the next state.  Make sure that the
414                  * extended header does not straddle the end of the buffer.
415                  * We still want to issue larger reads into our buffer,
416                  * book-keeping is easier if we don't bcopy() yet.
417                  */
418                 if (bytes + nmax < ioq->hbytes) {
419                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
420                         ioq->fifo_cdx -= ioq->fifo_beg;
421                         ioq->fifo_beg = 0;
422                         ioq->fifo_end = bytes;
423                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
424                 }
425                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
426                 /* fall through */
427         case HAMMER2_MSGQ_STATE_HEADER2:
428                 /*
429                  * Fill out the extended header.
430                  */
431                 assert(msg != NULL);
432                 if (bytes < ioq->hbytes) {
433                         n = read(iocom->sock_fd,
434                                  msg->any.buf + ioq->fifo_end,
435                                  nmax);
436                         if (n <= 0) {
437                                 if (n == 0) {
438                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
439                                         break;
440                                 }
441                                 if (errno != EINTR &&
442                                     errno != EINPROGRESS &&
443                                     errno != EAGAIN) {
444                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
445                                         break;
446                                 }
447                                 n = 0;
448                                 /* fall through */
449                         }
450                         ioq->fifo_end += n;
451                         bytes += n;
452                         nmax -= n;
453                 }
454
455                 /*
456                  * Insufficient data accumulated (set msg NULL so caller will
457                  * retry on event).
458                  */
459                 if (bytes < ioq->hbytes) {
460                         msg = NULL;
461                         break;
462                 }
463
464                 /*
465                  * Calculate the extended header, decrypt data received
466                  * so far.
467                  */
468                 hammer2_crypto_decrypt(iocom, ioq);
469                 head = (void *)(ioq->buf + ioq->fifo_beg);
470
471                 /*
472                  * Check the crc on the extended header
473                  */
474                 if (ioq->hbytes > (int)sizeof(hammer2_msg_hdr_t)) {
475                         xcrc32 = hammer2_icrc32(head + 1,
476                                                 ioq->hbytes - sizeof(*head));
477                         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
478                         if (head->icrc2 != xcrc16) {
479                                 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
480                                 break;
481                         }
482                 }
483
484                 /*
485                  * Copy the extended header into the msg and adjust the
486                  * FIFO.
487                  */
488                 bcopy(head, &msg->any, ioq->hbytes);
489
490                 /*
491                  * We are either done or we fall-through.
492                  */
493                 if (ioq->abytes == 0) {
494                         ioq->fifo_beg += ioq->hbytes;
495                         break;
496                 }
497
498                 /*
499                  * Must adjust nmax and bytes (and the state) when falling
500                  * through.
501                  */
502                 ioq->fifo_beg += ioq->hbytes;
503                 nmax -= ioq->hbytes;
504                 bytes -= ioq->hbytes;
505                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
506                 /* fall through */
507         case HAMMER2_MSGQ_STATE_AUXDATA1:
508                 /*
509                  * Copy the partial or complete payload from remaining
510                  * bytes in the FIFO.  We have to fall-through either
511                  * way so we can check the crc.
512                  */
513                 assert(msg->aux_size == 0);
514                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
515                 if (ioq->already > ioq->abytes)
516                         ioq->already = ioq->abytes;
517                 if (bytes >= ioq->abytes) {
518                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
519                               ioq->abytes);
520                         msg->aux_size = ioq->abytes;
521                         ioq->fifo_beg += ioq->abytes;
522                         if (ioq->fifo_cdx < ioq->fifo_beg)
523                                 ioq->fifo_cdx = ioq->fifo_beg;
524                         bytes -= ioq->abytes;
525                 } else if (bytes) {
526                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
527                               bytes);
528                         msg->aux_size = bytes;
529                         ioq->fifo_beg += bytes;
530                         if (ioq->fifo_cdx < ioq->fifo_beg)
531                                 ioq->fifo_cdx = ioq->fifo_beg;
532                         bytes = 0;
533                 }
534                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
535                 /* fall through */
536         case HAMMER2_MSGQ_STATE_AUXDATA2:
537                 /*
538                  * Read the remainder of the payload directly into the
539                  * msg->aux_data buffer.
540                  */
541                 assert(msg);
542                 if (msg->aux_size < ioq->abytes) {
543                         assert(bytes == 0);
544                         n = read(iocom->sock_fd,
545                                  msg->aux_data + msg->aux_size,
546                                  ioq->abytes - msg->aux_size);
547                         if (n <= 0) {
548                                 if (n == 0) {
549                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
550                                         break;
551                                 }
552                                 if (errno != EINTR &&
553                                     errno != EINPROGRESS &&
554                                     errno != EAGAIN) {
555                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
556                                         break;
557                                 }
558                                 n = 0;
559                                 /* fall through */
560                         }
561                         msg->aux_size += n;
562                 }
563
564                 /*
565                  * Insufficient data accumulated (set msg NULL so caller will
566                  * retry on event).
567                  */
568                 if (msg->aux_size < ioq->abytes) {
569                         msg = NULL;
570                         break;
571                 }
572                 assert(msg->aux_size == ioq->abytes);
573                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
574
575                 /*
576                  * Check aux_icrc, then we are done.
577                  */
578                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
579                 if (xcrc32 != msg->any.head.aux_icrc) {
580                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
581                         break;
582                 }
583                 break;
584         case HAMMER2_MSGQ_STATE_ERROR:
585         default:
586                 /*
587                  * We don't double-return errors, the caller should not
588                  * have called us again after getting an error msg.
589                  */
590                 assert(0);
591                 break;
592         }
593
594         /*
595          * Check the message sequence.  The iv[] should prevent any
596          * possibility of a replay but we add this check anyway.
597          */
598         if (msg && ioq->error == 0) {
599                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
600                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
601                 } else {
602                         ++ioq->seq;
603                 }
604         }
605
606         /*
607          * Handle error, RREQ, or completion
608          *
609          * NOTE: nmax and bytes are invalid at this point, we don't bother
610          *       to update them when breaking out.
611          */
612         if (ioq->error) {
613                 /*
614                  * An unrecoverable error occured during processing,
615                  * return a special error message.  Try to leave the
616                  * ioq state alone for post-mortem debugging.
617                  *
618                  * Link error messages are returned as one-way messages,
619                  * so no flags get set.  Source and target is 0 (link-level),
620                  * msgid is 0 (link-level).  All we really need to do is
621                  * set up magic, cmd, and error.
622                  */
623                 assert(ioq->msg == msg);
624                 if (msg == NULL)
625                         msg = hammer2_allocmsg(iocom, 0, 0);
626                 else
627                         ioq->msg = NULL;
628
629                 if (msg->aux_data) {
630                         free(msg->aux_data);
631                         msg->aux_data = NULL;
632                         msg->aux_size = 0;
633                 }
634                 bzero(&msg->any.head, sizeof(msg->any.head));
635                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
636                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
637                 msg->any.head.error = ioq->error;
638                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
639                 iocom->flags |= HAMMER2_IOCOMF_EOF;
640         } else if (msg == NULL) {
641                 /*
642                  * Insufficient data received to finish building the message,
643                  * set RREQ and return NULL.
644                  *
645                  * Leave ioq->msg intact.
646                  * Leave the FIFO intact.
647                  */
648                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
649 #if 0
650                 ioq->fifo_cdx = 0;
651                 ioq->fifo_beg = 0;
652                 ioq->fifo_end = 0;
653 #endif
654         } else {
655                 /*
656                  * Return msg, clear the FIFO if it is now empty.
657                  * Flag RREQ if the caller needs to wait for a read-event
658                  * or not.
659                  *
660                  * The fifo has already been advanced past the message.
661                  * Trivially reset the FIFO indices if possible.
662                  */
663                 if (ioq->fifo_beg == ioq->fifo_end) {
664                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
665                         ioq->fifo_cdx = 0;
666                         ioq->fifo_beg = 0;
667                         ioq->fifo_end = 0;
668                 } else {
669                         iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
670                 }
671                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
672                 ioq->msg = NULL;
673         }
674         return (msg);
675 }
676
677 /*
678  * Calculate the header and data crc's and write a low-level message to
679  * the connection.  If aux_icrc is non-zero the aux_data crc is already
680  * assumed to have been set.
681  *
682  * A non-NULL msg is added to the queue but not necessarily flushed.
683  * Calling this function with msg == NULL will get a flush going.
684  */
685 void
686 hammer2_ioq_write(hammer2_msg_t *msg)
687 {
688         hammer2_iocom_t *iocom = msg->iocom;
689         hammer2_ioq_t *ioq = &iocom->ioq_tx;
690         uint16_t xcrc16;
691         uint32_t xcrc32;
692         int hbytes;
693
694         assert(msg);
695         if (ioq->error) {
696                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
697                 ++ioq->msgcount;
698                 hammer2_iocom_drain(iocom);
699                 return;
700         }
701
702         /*
703          * Finish populating the msg fields.  The salt ensures that the iv[]
704          * array is ridiculously randomized and we also re-seed our PRNG
705          * every 32768 messages just to be sure.
706          */
707         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
708         msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
709         ++ioq->seq;
710         if ((ioq->seq & 32767) == 0)
711                 srandomdev();
712
713         /*
714          * Calculate aux_icrc if 0, calculate icrc2, and finally
715          * calculate icrc1.
716          */
717         if (msg->aux_size && msg->any.head.aux_icrc == 0) {
718                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
719                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
720                 msg->any.head.aux_icrc = xcrc32;
721         }
722         msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
723         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
724
725         if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
726             sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
727                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
728                         HAMMER2_MSG_ALIGN;
729                 hbytes -= sizeof(msg->any.head);
730                 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
731                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
732                 msg->any.head.icrc2 = xcrc16;
733         } else {
734                 msg->any.head.icrc2 = 0;
735         }
736         xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
737                                 HAMMER2_MSGHDR_CRCBYTES);
738         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
739         msg->any.head.icrc1 = xcrc16;
740
741         /*
742          * XXX Encrypt the message
743          */
744
745         /*
746          * Enqueue the message.
747          */
748         TAILQ_INSERT_TAIL(&ioq->msgq, msg, entry);
749         ++ioq->msgcount;
750         iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
751
752         /*
753          * Flush if we know we can write (WREQ not set) and if
754          * sufficient messages have accumulated.  Otherwise hold
755          * off to avoid piecemeal system calls.
756          */
757         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
758                 return;
759         if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
760                 return;
761         hammer2_iocom_flush(iocom);
762 }
763
764 void
765 hammer2_iocom_flush(hammer2_iocom_t *iocom)
766 {
767         hammer2_ioq_t *ioq = &iocom->ioq_tx;
768         hammer2_msg_t *msg;
769         ssize_t nmax;
770         ssize_t nact;
771         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
772         int hbytes;
773         int abytes;
774         int hoff;
775         int aoff;
776         int n;
777
778         /*
779          * Pump messages out the connection by building an iovec.
780          */
781         n = 0;
782         nmax = 0;
783
784         TAILQ_FOREACH(msg, &ioq->msgq, entry) {
785                 hoff = 0;
786                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
787                          HAMMER2_MSG_ALIGN;
788                 aoff = 0;
789                 abytes = msg->aux_size;
790                 if (n == 0) {
791                         hoff += ioq->hbytes;
792                         aoff += ioq->abytes;
793                 }
794                 if (hbytes - hoff > 0) {
795                         iov[n].iov_base = (char *)&msg->any.head + hoff;
796                         iov[n].iov_len = hbytes - hoff;
797                         nmax += hbytes - hoff;
798                         ++n;
799                         if (n == HAMMER2_IOQ_MAXIOVEC)
800                                 break;
801                 }
802                 if (abytes - aoff > 0) {
803                         assert(msg->aux_data != NULL);
804                         iov[n].iov_base = msg->aux_data + aoff;
805                         iov[n].iov_len = abytes - aoff;
806                         nmax += abytes - aoff;
807                         ++n;
808                         if (n == HAMMER2_IOQ_MAXIOVEC)
809                                 break;
810                 }
811         }
812         if (n == 0)
813                 return;
814
815         /*
816          * Encrypt and write the data.  The crypto code will move the
817          * data into the fifo and adjust the iov as necessary.  If
818          * encryption is disabled the iov is left alone.
819          *
820          * hammer2_crypto_encrypt_wrote()
821          */
822         n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
823
824         /*
825          * Execute the writev() then figure out what happened.
826          */
827         nact = writev(iocom->sock_fd, iov, n);
828         if (nact < 0) {
829                 if (errno != EINTR &&
830                     errno != EINPROGRESS &&
831                     errno != EAGAIN) {
832                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
833                         hammer2_iocom_drain(iocom);
834                 } else {
835                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
836                 }
837                 return;
838         }
839         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
840         if (nact == nmax)
841                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
842         else
843                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
844
845         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
846                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
847                          HAMMER2_MSG_ALIGN;
848                 abytes = msg->aux_size;
849
850                 if (nact < hbytes - ioq->hbytes) {
851                         ioq->hbytes += nact;
852                         break;
853                 }
854                 nact -= hbytes - ioq->hbytes;
855                 ioq->hbytes = hbytes;
856                 if (nact < abytes - ioq->abytes) {
857                         ioq->abytes += nact;
858                         break;
859                 }
860                 nact -= abytes - ioq->abytes;
861
862                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
863                 --ioq->msgcount;
864                 ioq->hbytes = 0;
865                 ioq->abytes = 0;
866                 if (msg->aux_data)
867                         TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, entry);
868                 else
869                         TAILQ_INSERT_TAIL(&iocom->freeq, msg, entry);
870         }
871         if (msg == NULL) {
872                 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
873                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
874         }
875         if (ioq->error) {
876                 iocom->flags |= HAMMER2_IOCOMF_EOF |
877                                 HAMMER2_IOCOMF_WIDLE;
878                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
879         }
880 }
881
882 /*
883  * Kill pending msgs on ioq_tx and adjust the flags such that no more
884  * write events will occur.  We don't kill read msgs because we want
885  * the caller to pull off our contrived terminal error msg to detect
886  * the connection failure.
887  */
888 void
889 hammer2_iocom_drain(hammer2_iocom_t *iocom)
890 {
891         hammer2_ioq_t *ioq = &iocom->ioq_tx;
892         hammer2_msg_t *msg;
893
894         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
895                 TAILQ_REMOVE(&ioq->msgq, msg, entry);
896                 --ioq->msgcount;
897                 hammer2_freemsg(msg);
898         }
899         iocom->flags |= HAMMER2_IOCOMF_WIDLE;
900         iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
901 }
902
903 /*
904  * This is a shortcut to the normal hammer2_allocreply() mechanic which
905  * uses the received message to formulate a final reply and error code.
906  * Can be used to issue a final reply for one-way, one-off, or streaming
907  * commands.
908  *
909  * Replies to one-way messages are a bit of an oxymoron but the feature
910  * is used by the debug (DBG) protocol.
911  *
912  * The reply contains no data.
913  *
914  * (msg) is eaten up by this function.
915  */
916 void
917 hammer2_replymsg(hammer2_msg_t *msg, uint16_t error)
918 {
919         hammer2_persist_t *pers;
920         uint16_t t16;
921
922         assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
923
924         t16 = msg->any.head.source;
925         msg->any.head.source = msg->any.head.target;
926         msg->any.head.target = t16;
927         msg->any.head.error = error;
928         msg->any.head.cmd |= HAMMER2_MSGF_REPLY;
929         msg->aux_size = 0;
930         if ((pers = msg->persist) != NULL) {
931                 assert(pers->lrep & HAMMER2_MSGF_DELETE);
932                 msg->any.head.cmd |= pers->lrep & (HAMMER2_MSGF_CREATE |
933                                                    HAMMER2_MSGF_DELETE);
934                 pers->lrep &= ~(HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE);
935         }
936         hammer2_ioq_write(msg);
937 }