Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository...
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
41 /*
42  * Indexed messages are stored in a red-black tree indexed by their
43  * msgid.  Only persistent messages are indexed.
44  */
45 int
46 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
47 {
48         if (state1->spanid < state2->spanid)
49                 return(-1);
50         if (state1->spanid > state2->spanid)
51                 return(1);
52         if (state1->msgid < state2->msgid)
53                 return(-1);
54         if (state1->msgid > state2->msgid)
55                 return(1);
56         return(0);
57 }
58
59 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
60
61 /*
62  * Initialize a low-level ioq
63  */
64 void
65 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
66 {
67         bzero(ioq, sizeof(*ioq));
68         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
69         TAILQ_INIT(&ioq->msgq);
70 }
71
72 /*
73  * Cleanup queue.
74  *
75  * caller holds iocom->mtx.
76  */
77 void
78 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
79 {
80         hammer2_msg_t *msg;
81
82         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
83                 assert(0);      /* shouldn't happen */
84                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
85                 hammer2_msg_free(msg);
86         }
87         if ((msg = ioq->msg) != NULL) {
88                 ioq->msg = NULL;
89                 hammer2_msg_free(msg);
90         }
91 }
92
93 /*
94  * Initialize a low-level communications channel.
95  *
96  * NOTE: The signal_func() is called at least once from the loop and can be
97  *       re-armed via hammer2_iocom_restate().
98  */
99 void
100 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
101                    void (*signal_func)(hammer2_router_t *),
102                    void (*rcvmsg_func)(hammer2_msg_t *),
103                    void (*altmsg_func)(hammer2_iocom_t *))
104 {
105         bzero(iocom, sizeof(*iocom));
106
107         iocom->router.signal_callback = signal_func;
108         iocom->router.rcvmsg_callback = rcvmsg_func;
109         iocom->router.altmsg_callback = altmsg_func;
110
111         pthread_mutex_init(&iocom->mtx, NULL);
112         RB_INIT(&iocom->router.staterd_tree);
113         RB_INIT(&iocom->router.statewr_tree);
114         TAILQ_INIT(&iocom->freeq);
115         TAILQ_INIT(&iocom->freeq_aux);
116         TAILQ_INIT(&iocom->addrq);
117         TAILQ_INIT(&iocom->router.txmsgq);
118         iocom->router.iocom = iocom;
119         iocom->sock_fd = sock_fd;
120         iocom->alt_fd = alt_fd;
121         iocom->flags = HAMMER2_IOCOMF_RREQ;
122         if (signal_func)
123                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
124         hammer2_ioq_init(iocom, &iocom->ioq_rx);
125         hammer2_ioq_init(iocom, &iocom->ioq_tx);
126         if (pipe(iocom->wakeupfds) < 0)
127                 assert(0);
128         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
129         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
130
131         /*
132          * Negotiate session crypto synchronously.  This will mark the
133          * connection as error'd if it fails.
134          */
135         hammer2_crypto_negotiate(iocom);
136
137         /*
138          * Make sure our fds are set to non-blocking for the iocom core.
139          */
140         if (sock_fd >= 0)
141                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
142 #if 0
143         /* if line buffered our single fgets() should be fine */
144         if (alt_fd >= 0)
145                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
146 #endif
147 }
148
149 /*
150  * May only be called from a callback from iocom_core.
151  *
152  * Adjust state machine functions, set flags to guarantee that both
153  * the recevmsg_func and the sendmsg_func is called at least once.
154  */
155 void
156 hammer2_router_restate(hammer2_router_t *router,
157                    void (*signal_func)(hammer2_router_t *),
158                    void (*rcvmsg_func)(hammer2_msg_t *msg),
159                    void (*altmsg_func)(hammer2_iocom_t *))
160 {
161         router->signal_callback = signal_func;
162         router->rcvmsg_callback = rcvmsg_func;
163         router->altmsg_callback = altmsg_func;
164         if (signal_func)
165                 router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
166         else
167                 router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
168 }
169
170 void
171 hammer2_router_signal(hammer2_router_t *router)
172 {
173         if (router->signal_callback)
174                 router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
175 }
176
177 /*
178  * Cleanup a terminating iocom.
179  *
180  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
181  * from all possible references to it.
182  */
183 void
184 hammer2_iocom_done(hammer2_iocom_t *iocom)
185 {
186         hammer2_msg_t *msg;
187
188         if (iocom->sock_fd >= 0) {
189                 close(iocom->sock_fd);
190                 iocom->sock_fd = -1;
191         }
192         if (iocom->alt_fd >= 0) {
193                 close(iocom->alt_fd);
194                 iocom->alt_fd = -1;
195         }
196         hammer2_ioq_done(iocom, &iocom->ioq_rx);
197         hammer2_ioq_done(iocom, &iocom->ioq_tx);
198         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
199                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
200                 free(msg);
201         }
202         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
203                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
204                 free(msg->aux_data);
205                 msg->aux_data = NULL;
206                 free(msg);
207         }
208         if (iocom->wakeupfds[0] >= 0) {
209                 close(iocom->wakeupfds[0]);
210                 iocom->wakeupfds[0] = -1;
211         }
212         if (iocom->wakeupfds[1] >= 0) {
213                 close(iocom->wakeupfds[1]);
214                 iocom->wakeupfds[1] = -1;
215         }
216         pthread_mutex_destroy(&iocom->mtx);
217 }
218
219 /*
220  * Allocate a new one-way message.
221  */
222 hammer2_msg_t *
223 hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
224                   void (*func)(hammer2_msg_t *), void *data)
225 {
226         hammer2_state_t *state = NULL;
227         hammer2_iocom_t *iocom = router->iocom;
228         hammer2_msg_t *msg;
229         int hbytes;
230
231         pthread_mutex_lock(&iocom->mtx);
232         if (aux_size) {
233                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
234                            ~HAMMER2_MSG_ALIGNMASK;
235                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
236                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
237         } else {
238                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
239                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
240         }
241         if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) ==
242             HAMMER2_MSGF_CREATE) {
243                 /*
244                  * Create state when CREATE is set without REPLY.
245                  *
246                  * NOTE: CREATE in txcmd handled by hammer2_msg_write()
247                  * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx()
248                  */
249                 state = malloc(sizeof(*state));
250                 bzero(state, sizeof(*state));
251                 state->iocom = iocom;
252                 state->flags = HAMMER2_STATE_DYNAMIC;
253                 state->msgid = (uint64_t)(uintptr_t)state;
254                 /* XXX set state->spanid from router */
255                 state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
256                                        HAMMER2_MSGF_DELETE);
257                 state->rxcmd = HAMMER2_MSGF_REPLY;
258                 state->func = func;
259                 state->any.any = data;
260                 pthread_mutex_lock(&iocom->mtx);
261                 RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state);
262                 pthread_mutex_unlock(&iocom->mtx);
263                 state->flags |= HAMMER2_STATE_INSERTED;
264         }
265         pthread_mutex_unlock(&iocom->mtx);
266         if (msg == NULL) {
267                 msg = malloc(sizeof(*msg));
268                 bzero(msg, sizeof(*msg));
269                 msg->aux_data = NULL;
270                 msg->aux_size = 0;
271         }
272         if (msg->aux_size != aux_size) {
273                 if (msg->aux_data) {
274                         free(msg->aux_data);
275                         msg->aux_data = NULL;
276                         msg->aux_size = 0;
277                 }
278                 if (aux_size) {
279                         msg->aux_data = malloc(aux_size);
280                         msg->aux_size = aux_size;
281                 }
282         }
283         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
284         if (hbytes)
285                 bzero(&msg->any.head, hbytes);
286         msg->hdr_size = hbytes;
287         msg->any.head.cmd = cmd;
288         msg->any.head.aux_descr = 0;
289         msg->any.head.aux_crc = 0;
290         msg->router = router;
291         if (state) {
292                 msg->state = state;
293                 state->msg = msg;
294                 msg->any.head.msgid = state->msgid;
295         }
296         return (msg);
297 }
298
299 /*
300  * Free a message so it can be reused afresh.
301  *
302  * NOTE: aux_size can be 0 with a non-NULL aux_data.
303  */
304 static
305 void
306 hammer2_msg_free_locked(hammer2_msg_t *msg)
307 {
308         hammer2_iocom_t *iocom = msg->router->iocom;
309
310         msg->state = NULL;
311         if (msg->aux_data)
312                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
313         else
314                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
315 }
316
317 void
318 hammer2_msg_free(hammer2_msg_t *msg)
319 {
320         hammer2_iocom_t *iocom = msg->router->iocom;
321
322         pthread_mutex_lock(&iocom->mtx);
323         hammer2_msg_free_locked(msg);
324         pthread_mutex_unlock(&iocom->mtx);
325 }
326
327 /*
328  * I/O core loop for an iocom.
329  *
330  * Thread localized, iocom->mtx not held.
331  */
332 void
333 hammer2_iocom_core(hammer2_iocom_t *iocom)
334 {
335         struct pollfd fds[3];
336         char dummybuf[256];
337         hammer2_msg_t *msg;
338         int timeout;
339         int count;
340         int wi; /* wakeup pipe */
341         int si; /* socket */
342         int ai; /* alt bulk path socket */
343
344         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
345                 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
346                                      HAMMER2_IOCOMF_WWORK |
347                                      HAMMER2_IOCOMF_PWORK |
348                                      HAMMER2_IOCOMF_SWORK |
349                                      HAMMER2_IOCOMF_ARWORK |
350                                      HAMMER2_IOCOMF_AWWORK)) == 0) {
351                         /*
352                          * Only poll if no immediate work is pending.
353                          * Otherwise we are just wasting our time calling
354                          * poll.
355                          */
356                         timeout = 5000;
357
358                         count = 0;
359                         wi = -1;
360                         si = -1;
361                         ai = -1;
362
363                         /*
364                          * Always check the inter-thread pipe, e.g.
365                          * for iocom->txmsgq work.
366                          */
367                         wi = count++;
368                         fds[wi].fd = iocom->wakeupfds[0];
369                         fds[wi].events = POLLIN;
370                         fds[wi].revents = 0;
371
372                         /*
373                          * Check the socket input/output direction as
374                          * requested
375                          */
376                         if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
377                                             HAMMER2_IOCOMF_WREQ)) {
378                                 si = count++;
379                                 fds[si].fd = iocom->sock_fd;
380                                 fds[si].events = 0;
381                                 fds[si].revents = 0;
382
383                                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
384                                         fds[si].events |= POLLIN;
385                                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
386                                         fds[si].events |= POLLOUT;
387                         }
388
389                         /*
390                          * Check the alternative fd for work.
391                          */
392                         if (iocom->alt_fd >= 0) {
393                                 ai = count++;
394                                 fds[ai].fd = iocom->alt_fd;
395                                 fds[ai].events = POLLIN;
396                                 fds[ai].revents = 0;
397                         }
398                         poll(fds, count, timeout);
399
400                         if (wi >= 0 && (fds[wi].revents & POLLIN))
401                                 iocom->flags |= HAMMER2_IOCOMF_PWORK;
402                         if (si >= 0 && (fds[si].revents & POLLIN))
403                                 iocom->flags |= HAMMER2_IOCOMF_RWORK;
404                         if (si >= 0 && (fds[si].revents & POLLOUT))
405                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
406                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
407                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
408                         if (ai >= 0 && (fds[ai].revents & POLLIN))
409                                 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
410                 } else {
411                         /*
412                          * Always check the pipe
413                          */
414                         iocom->flags |= HAMMER2_IOCOMF_PWORK;
415                 }
416
417                 if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
418                         iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
419                         iocom->router.signal_callback(&iocom->router);
420                 }
421
422                 /*
423                  * Pending message queues from other threads wake us up
424                  * with a write to the wakeupfds[] pipe.  We have to clear
425                  * the pipe with a dummy read.
426                  */
427                 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
428                         iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
429                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
430                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
431                         iocom->flags |= HAMMER2_IOCOMF_WWORK;
432                         if (TAILQ_FIRST(&iocom->router.txmsgq))
433                                 hammer2_iocom_flush1(iocom);
434                 }
435
436                 /*
437                  * Message write sequencing
438                  */
439                 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
440                         hammer2_iocom_flush1(iocom);
441
442                 /*
443                  * Message read sequencing.  Run this after the write
444                  * sequencing in case the write sequencing allowed another
445                  * auto-DELETE to occur on the read side.
446                  */
447                 if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
448                         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
449                                (msg = hammer2_ioq_read(iocom)) != NULL) {
450                                 if (DebugOpt) {
451                                         fprintf(stderr, "receive %s\n",
452                                                 hammer2_msg_str(msg));
453                                 }
454                                 iocom->router.rcvmsg_callback(msg);
455                                 hammer2_state_cleanuprx(iocom, msg);
456                         }
457                 }
458
459                 if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
460                         iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
461                         iocom->router.altmsg_callback(iocom);
462                 }
463         }
464 }
465
466 /*
467  * Make sure there's enough room in the FIFO to hold the
468  * needed data.
469  *
470  * Assume worst case encrypted form is 2x the size of the
471  * plaintext equivalent.
472  */
473 static
474 size_t
475 hammer2_ioq_makeroom(hammer2_ioq_t *ioq, size_t needed)
476 {
477         size_t bytes;
478         size_t nmax;
479
480         bytes = ioq->fifo_cdx - ioq->fifo_beg;
481         nmax = sizeof(ioq->buf) - ioq->fifo_end;
482         if (bytes + nmax / 2 < needed) {
483                 if (bytes) {
484                         bcopy(ioq->buf + ioq->fifo_beg,
485                               ioq->buf,
486                               bytes);
487                 }
488                 ioq->fifo_cdx -= ioq->fifo_beg;
489                 ioq->fifo_beg = 0;
490                 if (ioq->fifo_cdn < ioq->fifo_end) {
491                         bcopy(ioq->buf + ioq->fifo_cdn,
492                               ioq->buf + ioq->fifo_cdx,
493                               ioq->fifo_end - ioq->fifo_cdn);
494                 }
495                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
496                 ioq->fifo_cdn = ioq->fifo_cdx;
497                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
498         }
499         return(nmax);
500 }
501
502 /*
503  * Read the next ready message from the ioq, issuing I/O if needed.
504  * Caller should retry on a read-event when NULL is returned.
505  *
506  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
507  * be returned for each open transaction, then the ioq and iocom
508  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
509  * msg will be returned as the final message.  The caller should not call
510  * us again after the final message is returned.
511  *
512  * Thread localized, iocom->mtx not held.
513  */
514 hammer2_msg_t *
515 hammer2_ioq_read(hammer2_iocom_t *iocom)
516 {
517         hammer2_ioq_t *ioq = &iocom->ioq_rx;
518         hammer2_msg_t *msg;
519         hammer2_msg_hdr_t *head;
520         hammer2_state_t *state;
521         ssize_t n;
522         size_t bytes;
523         size_t nmax;
524         uint32_t xcrc32;
525         int error;
526
527 again:
528         iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
529
530         /*
531          * If a message is already pending we can just remove and
532          * return it.  Message state has already been processed.
533          */
534         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
535                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
536                 return (msg);
537         }
538
539         if (ioq->error)
540                 goto skip;
541
542         /*
543          * Message read in-progress (msg is NULL at the moment).  We don't
544          * allocate a msg until we have its core header.
545          */
546         nmax = sizeof(ioq->buf) - ioq->fifo_end;
547         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
548         msg = ioq->msg;
549
550         switch(ioq->state) {
551         case HAMMER2_MSGQ_STATE_HEADER1:
552                 /*
553                  * Load the primary header, fail on any non-trivial read
554                  * error or on EOF.  Since the primary header is the same
555                  * size is the message alignment it will never straddle
556                  * the end of the buffer.
557                  */
558                 nmax = hammer2_ioq_makeroom(ioq, sizeof(msg->any.head));
559                 if (bytes < sizeof(msg->any.head)) {
560                         n = read(iocom->sock_fd,
561                                  ioq->buf + ioq->fifo_end,
562                                  nmax);
563                         if (n <= 0) {
564                                 if (n == 0) {
565                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
566                                         break;
567                                 }
568                                 if (errno != EINTR &&
569                                     errno != EINPROGRESS &&
570                                     errno != EAGAIN) {
571                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
572                                         break;
573                                 }
574                                 n = 0;
575                                 /* fall through */
576                         }
577                         ioq->fifo_end += (size_t)n;
578                         nmax -= (size_t)n;
579                 }
580
581                 /*
582                  * Decrypt data received so far.  Data will be decrypted
583                  * in-place but might create gaps in the FIFO.  Partial
584                  * blocks are not immediately decrypted.
585                  *
586                  * WARNING!  The header might be in the wrong endian, we
587                  *           do not fix it up until we get the entire
588                  *           extended header.
589                  */
590                 if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
591                         hammer2_crypto_decrypt(iocom, ioq);
592                 } else {
593                         ioq->fifo_cdx = ioq->fifo_end;
594                         ioq->fifo_cdn = ioq->fifo_end;
595                 }
596                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
597
598                 /*
599                  * Insufficient data accumulated (msg is NULL, caller will
600                  * retry on event).
601                  */
602                 assert(msg == NULL);
603                 if (bytes < sizeof(msg->any.head))
604                         break;
605
606                 /*
607                  * Check and fixup the core header.  Note that the icrc
608                  * has to be calculated before any fixups, but the crc
609                  * fields in the msg may have to be swapped like everything
610                  * else.
611                  */
612                 head = (void *)(ioq->buf + ioq->fifo_beg);
613                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
614                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
615                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
616                         break;
617                 }
618
619                 /*
620                  * Calculate the full header size and aux data size
621                  */
622                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
623                         ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
624                                       HAMMER2_MSG_ALIGN;
625                         ioq->abytes = bswap32(head->aux_bytes) *
626                                       HAMMER2_MSG_ALIGN;
627                 } else {
628                         ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
629                                       HAMMER2_MSG_ALIGN;
630                         ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
631                 }
632                 if (ioq->hbytes < sizeof(msg->any.head) ||
633                     ioq->hbytes > sizeof(msg->any) ||
634                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
635                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
636                         break;
637                 }
638
639                 /*
640                  * Allocate the message, the next state will fill it in.
641                  */
642                 msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0,
643                                         NULL, NULL);
644                 ioq->msg = msg;
645
646                 /*
647                  * Fall through to the next state.  Make sure that the
648                  * extended header does not straddle the end of the buffer.
649                  * We still want to issue larger reads into our buffer,
650                  * book-keeping is easier if we don't bcopy() yet.
651                  *
652                  * Make sure there is enough room for bloated encrypt data.
653                  */
654                 nmax = hammer2_ioq_makeroom(ioq, ioq->hbytes);
655                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
656                 /* fall through */
657         case HAMMER2_MSGQ_STATE_HEADER2:
658                 /*
659                  * Fill out the extended header.
660                  */
661                 assert(msg != NULL);
662                 if (bytes < ioq->hbytes) {
663                         n = read(iocom->sock_fd,
664                                  ioq->buf + ioq->fifo_end,
665                                  nmax);
666                         if (n <= 0) {
667                                 if (n == 0) {
668                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
669                                         break;
670                                 }
671                                 if (errno != EINTR &&
672                                     errno != EINPROGRESS &&
673                                     errno != EAGAIN) {
674                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
675                                         break;
676                                 }
677                                 n = 0;
678                                 /* fall through */
679                         }
680                         ioq->fifo_end += (size_t)n;
681                         nmax -= (size_t)n;
682                 }
683
684                 if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
685                         hammer2_crypto_decrypt(iocom, ioq);
686                 } else {
687                         ioq->fifo_cdx = ioq->fifo_end;
688                         ioq->fifo_cdn = ioq->fifo_end;
689                 }
690                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
691
692                 /*
693                  * Insufficient data accumulated (set msg NULL so caller will
694                  * retry on event).
695                  */
696                 if (bytes < ioq->hbytes) {
697                         msg = NULL;
698                         break;
699                 }
700
701                 /*
702                  * Calculate the extended header, decrypt data received
703                  * so far.  Handle endian-conversion for the entire extended
704                  * header.
705                  */
706                 head = (void *)(ioq->buf + ioq->fifo_beg);
707
708                 /*
709                  * Check the CRC.
710                  */
711                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
712                         xcrc32 = bswap32(head->hdr_crc);
713                 else
714                         xcrc32 = head->hdr_crc;
715                 head->hdr_crc = 0;
716                 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
717                         ioq->error = HAMMER2_IOQ_ERROR_XCRC;
718                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
719                                 xcrc32, hammer2_icrc32(head, ioq->hbytes),
720                                 hammer2_msg_str(msg));
721                         assert(0);
722                         break;
723                 }
724                 head->hdr_crc = xcrc32;
725
726                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
727                         hammer2_bswap_head(head);
728                 }
729
730                 /*
731                  * Copy the extended header into the msg and adjust the
732                  * FIFO.
733                  */
734                 bcopy(head, &msg->any, ioq->hbytes);
735
736                 /*
737                  * We are either done or we fall-through.
738                  */
739                 if (ioq->abytes == 0) {
740                         ioq->fifo_beg += ioq->hbytes;
741                         break;
742                 }
743
744                 /*
745                  * Must adjust bytes (and the state) when falling through.
746                  * nmax doesn't change.
747                  */
748                 ioq->fifo_beg += ioq->hbytes;
749                 bytes -= ioq->hbytes;
750                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
751                 /* fall through */
752         case HAMMER2_MSGQ_STATE_AUXDATA1:
753                 /*
754                  * Copy the partial or complete payload from remaining
755                  * bytes in the FIFO in order to optimize the makeroom call
756                  * in the AUXDATA2 state.  We have to fall-through either
757                  * way so we can check the crc.
758                  *
759                  * msg->aux_size tracks our aux data.
760                  */
761                 if (bytes >= ioq->abytes) {
762                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
763                               ioq->abytes);
764                         msg->aux_size = ioq->abytes;
765                         ioq->fifo_beg += ioq->abytes;
766                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
767                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
768                         bytes -= ioq->abytes;
769                 } else if (bytes) {
770                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
771                               bytes);
772                         msg->aux_size = bytes;
773                         ioq->fifo_beg += bytes;
774                         if (ioq->fifo_cdx < ioq->fifo_beg)
775                                 ioq->fifo_cdx = ioq->fifo_beg;
776                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
777                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
778                         bytes = 0;
779                 } else {
780                         msg->aux_size = 0;
781                 }
782                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
783                 /* fall through */
784         case HAMMER2_MSGQ_STATE_AUXDATA2:
785                 /*
786                  * Make sure there is enough room for more data.
787                  */
788                 assert(msg);
789                 nmax = hammer2_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
790
791                 /*
792                  * Read and decrypt more of the payload.
793                  */
794                 if (msg->aux_size < ioq->abytes) {
795                         assert(bytes == 0);
796                         n = read(iocom->sock_fd,
797                                  ioq->buf + ioq->fifo_end,
798                                  nmax);
799                         if (n <= 0) {
800                                 if (n == 0) {
801                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
802                                         break;
803                                 }
804                                 if (errno != EINTR &&
805                                     errno != EINPROGRESS &&
806                                     errno != EAGAIN) {
807                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
808                                         break;
809                                 }
810                                 n = 0;
811                                 /* fall through */
812                         }
813                         ioq->fifo_end += (size_t)n;
814                         nmax -= (size_t)n;
815                 }
816
817                 if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
818                         hammer2_crypto_decrypt(iocom, ioq);
819                 } else {
820                         ioq->fifo_cdx = ioq->fifo_end;
821                         ioq->fifo_cdn = ioq->fifo_end;
822                 }
823                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
824
825                 if (bytes > ioq->abytes - msg->aux_size)
826                         bytes = ioq->abytes - msg->aux_size;
827
828                 if (bytes) {
829                         bcopy(ioq->buf + ioq->fifo_beg,
830                               msg->aux_data + msg->aux_size,
831                               bytes);
832                         msg->aux_size += bytes;
833                         ioq->fifo_beg += bytes;
834                 }
835
836                 /*
837                  * Insufficient data accumulated (set msg NULL so caller will
838                  * retry on event).
839                  */
840                 if (msg->aux_size < ioq->abytes) {
841                         msg = NULL;
842                         break;
843                 }
844                 assert(msg->aux_size == ioq->abytes);
845
846                 /*
847                  * Check aux_crc, then we are done.
848                  */
849                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
850                 if (xcrc32 != msg->any.head.aux_crc) {
851                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
852                         break;
853                 }
854                 break;
855         case HAMMER2_MSGQ_STATE_ERROR:
856                 /*
857                  * Continued calls to drain recorded transactions (returning
858                  * a LNK_ERROR for each one), before we return the final
859                  * LNK_ERROR.
860                  */
861                 assert(msg == NULL);
862                 break;
863         default:
864                 /*
865                  * We don't double-return errors, the caller should not
866                  * have called us again after getting an error msg.
867                  */
868                 assert(0);
869                 break;
870         }
871
872         /*
873          * Check the message sequence.  The iv[] should prevent any
874          * possibility of a replay but we add this check anyway.
875          */
876         if (msg && ioq->error == 0) {
877                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
878                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
879                 } else {
880                         ++ioq->seq;
881                 }
882         }
883
884         /*
885          * Process transactional state for the message.
886          */
887         if (msg && ioq->error == 0) {
888                 error = hammer2_state_msgrx(iocom, msg);
889                 if (error) {
890                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
891                                 hammer2_msg_free(msg);
892                                 goto again;
893                         }
894                         ioq->error = error;
895                 }
896         }
897
898         /*
899          * Handle error, RREQ, or completion
900          *
901          * NOTE: nmax and bytes are invalid at this point, we don't bother
902          *       to update them when breaking out.
903          */
904         if (ioq->error) {
905 skip:
906                 /*
907                  * An unrecoverable error causes all active receive
908                  * transactions to be terminated with a LNK_ERROR message.
909                  *
910                  * Once all active transactions are exhausted we set the
911                  * iocom ERROR flag and return a non-transactional LNK_ERROR
912                  * message, which should cause master processing loops to
913                  * terminate.
914                  */
915                 assert(ioq->msg == msg);
916                 if (msg) {
917                         hammer2_msg_free(msg);
918                         ioq->msg = NULL;
919                 }
920
921                 /*
922                  * No more I/O read processing
923                  */
924                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
925
926                 /*
927                  * Simulate a remote LNK_ERROR DELETE msg for any open
928                  * transactions, ending with a final non-transactional
929                  * LNK_ERROR (that the session can detect) when no
930                  * transactions remain.
931                  */
932                 msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL);
933                 bzero(&msg->any.head, sizeof(msg->any.head));
934                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
935                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
936                 msg->any.head.error = ioq->error;
937
938                 pthread_mutex_lock(&iocom->mtx);
939                 hammer2_iocom_drain(iocom);
940                 if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) {
941                         /*
942                          * Active remote transactions are still present.
943                          * Simulate the other end sending us a DELETE.
944                          */
945                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
946                                 hammer2_msg_free(msg);
947                                 msg = NULL;
948                         } else {
949                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
950                                 msg->state = state;
951                                 msg->any.head.spanid = state->spanid;
952                                 msg->any.head.msgid = state->msgid;
953                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
954                                                      HAMMER2_MSGF_DELETE;
955                         }
956                 } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) {
957                         /*
958                          * Active local transactions are still present.
959                          * Simulate the other end sending us a DELETE.
960                          */
961                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
962                                 hammer2_msg_free(msg);
963                                 msg = NULL;
964                         } else {
965                                 msg->state = state;
966                                 msg->any.head.spanid = state->spanid;
967                                 msg->any.head.msgid = state->msgid;
968                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
969                                                      HAMMER2_MSGF_DELETE |
970                                                      HAMMER2_MSGF_REPLY;
971                                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
972                                         msg->any.head.cmd |=
973                                                      HAMMER2_MSGF_CREATE;
974                                 }
975                         }
976                 } else {
977                         /*
978                          * No active local or remote transactions remain.
979                          * Generate a final LNK_ERROR and flag EOF.
980                          */
981                         msg->state = NULL;
982                         iocom->flags |= HAMMER2_IOCOMF_EOF;
983                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
984                 }
985                 pthread_mutex_unlock(&iocom->mtx);
986
987                 /*
988                  * For the iocom error case we want to set RWORK to indicate
989                  * that more messages might be pending.
990                  *
991                  * It is possible to return NULL when there is more work to
992                  * do because each message has to be DELETEd in both
993                  * directions before we continue on with the next (though
994                  * this could be optimized).  The transmit direction will
995                  * re-set RWORK.
996                  */
997                 if (msg)
998                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
999         } else if (msg == NULL) {
1000                 /*
1001                  * Insufficient data received to finish building the message,
1002                  * set RREQ and return NULL.
1003                  *
1004                  * Leave ioq->msg intact.
1005                  * Leave the FIFO intact.
1006                  */
1007                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
1008         } else {
1009                 /*
1010                  * Return msg.
1011                  *
1012                  * The fifo has already been advanced past the message.
1013                  * Trivially reset the FIFO indices if possible.
1014                  *
1015                  * clear the FIFO if it is now empty and set RREQ to wait
1016                  * for more from the socket.  If the FIFO is not empty set
1017                  * TWORK to bypass the poll so we loop immediately.
1018                  */
1019                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1020                     ioq->fifo_cdn == ioq->fifo_end) {
1021                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
1022                         ioq->fifo_cdx = 0;
1023                         ioq->fifo_cdn = 0;
1024                         ioq->fifo_beg = 0;
1025                         ioq->fifo_end = 0;
1026                 } else {
1027                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
1028                 }
1029                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
1030                 ioq->msg = NULL;
1031         }
1032         return (msg);
1033 }
1034
1035 /*
1036  * Calculate the header and data crc's and write a low-level message to
1037  * the connection.  If aux_crc is non-zero the aux_data crc is already
1038  * assumed to have been set.
1039  *
1040  * A non-NULL msg is added to the queue but not necessarily flushed.
1041  * Calling this function with msg == NULL will get a flush going.
1042  *
1043  * Caller must hold iocom->mtx.
1044  */
1045 void
1046 hammer2_iocom_flush1(hammer2_iocom_t *iocom)
1047 {
1048         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1049         hammer2_msg_t *msg;
1050         uint32_t xcrc32;
1051         int hbytes;
1052         hammer2_msg_queue_t tmpq;
1053
1054         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1055         TAILQ_INIT(&tmpq);
1056         pthread_mutex_lock(&iocom->mtx);
1057         while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) {
1058                 TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry);
1059                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1060         }
1061         pthread_mutex_unlock(&iocom->mtx);
1062
1063         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1064                 /*
1065                  * Process terminal connection errors.
1066                  */
1067                 TAILQ_REMOVE(&tmpq, msg, qentry);
1068                 if (ioq->error) {
1069                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1070                         ++ioq->msgcount;
1071                         continue;
1072                 }
1073
1074                 /*
1075                  * Finish populating the msg fields.  The salt ensures that
1076                  * the iv[] array is ridiculously randomized and we also
1077                  * re-seed our PRNG every 32768 messages just to be sure.
1078                  */
1079                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
1080                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1081                 ++ioq->seq;
1082                 if ((ioq->seq & 32767) == 0)
1083                         srandomdev();
1084
1085                 /*
1086                  * Calculate aux_crc if 0, then calculate hdr_crc.
1087                  */
1088                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1089                         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
1090                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
1091                         msg->any.head.aux_crc = xcrc32;
1092                 }
1093                 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
1094                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
1095
1096                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1097                          HAMMER2_MSG_ALIGN;
1098                 msg->any.head.hdr_crc = 0;
1099                 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
1100
1101                 /*
1102                  * Enqueue the message (the flush codes handles stream
1103                  * encryption).
1104                  */
1105                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1106                 ++ioq->msgcount;
1107         }
1108         hammer2_iocom_flush2(iocom);
1109 }
1110
1111 /*
1112  * Thread localized, iocom->mtx not held by caller.
1113  */
1114 void
1115 hammer2_iocom_flush2(hammer2_iocom_t *iocom)
1116 {
1117         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1118         hammer2_msg_t *msg;
1119         ssize_t n;
1120         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
1121         size_t nact;
1122         size_t hbytes;
1123         size_t abytes;
1124         size_t hoff;
1125         size_t aoff;
1126         int iovcnt;
1127
1128         if (ioq->error) {
1129                 hammer2_iocom_drain(iocom);
1130                 return;
1131         }
1132
1133         /*
1134          * Pump messages out the connection by building an iovec.
1135          *
1136          * ioq->hbytes/ioq->abytes tracks how much of the first message
1137          * in the queue has been successfully written out, so we can
1138          * resume writing.
1139          */
1140         iovcnt = 0;
1141         nact = 0;
1142         hoff = ioq->hbytes;
1143         aoff = ioq->abytes;
1144
1145         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1146                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1147                          HAMMER2_MSG_ALIGN;
1148                 abytes = msg->aux_size;
1149                 assert(hoff <= hbytes && aoff <= abytes);
1150
1151                 if (hoff < hbytes) {
1152                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1153                         iov[iovcnt].iov_len = hbytes - hoff;
1154                         nact += hbytes - hoff;
1155                         ++iovcnt;
1156                         if (iovcnt == HAMMER2_IOQ_MAXIOVEC)
1157                                 break;
1158                 }
1159                 if (aoff < abytes) {
1160                         assert(msg->aux_data != NULL);
1161                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1162                         iov[iovcnt].iov_len = abytes - aoff;
1163                         nact += abytes - aoff;
1164                         ++iovcnt;
1165                         if (iovcnt == HAMMER2_IOQ_MAXIOVEC)
1166                                 break;
1167                 }
1168                 hoff = 0;
1169                 aoff = 0;
1170         }
1171         if (iovcnt == 0)
1172                 return;
1173
1174         /*
1175          * Encrypt and write the data.  The crypto code will move the
1176          * data into the fifo and adjust the iov as necessary.  If
1177          * encryption is disabled the iov is left alone.
1178          *
1179          * May return a smaller iov (thus a smaller n), with aggregated
1180          * chunks.  May reduce nmax to what fits in the FIFO.
1181          *
1182          * This function sets nact to the number of original bytes now
1183          * encrypted, adding to the FIFO some number of bytes that might
1184          * be greater depending on the crypto mechanic.  iov[] is adjusted
1185          * to point at the FIFO if necessary.
1186          *
1187          * NOTE: The return value from the writev() is the post-encrypted
1188          *       byte count, not the plaintext count.
1189          */
1190         if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
1191                 /*
1192                  * Make sure the FIFO has a reasonable amount of space
1193                  * left (if not completely full).
1194                  */
1195                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1196                     sizeof(ioq->buf) - ioq->fifo_end >= HAMMER2_MSG_ALIGN * 2) {
1197                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1198                               ioq->fifo_end - ioq->fifo_beg);
1199                         ioq->fifo_cdx -= ioq->fifo_beg;
1200                         ioq->fifo_cdn -= ioq->fifo_beg;
1201                         ioq->fifo_end -= ioq->fifo_beg;
1202                         ioq->fifo_beg = 0;
1203                 }
1204
1205                 iovcnt = hammer2_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1206                 n = writev(iocom->sock_fd, iov, iovcnt);
1207                 if (n > 0) {
1208                         ioq->fifo_beg += n;
1209                         ioq->fifo_cdn += n;
1210                         ioq->fifo_cdx += n;
1211                         if (ioq->fifo_beg == ioq->fifo_end) {
1212                                 ioq->fifo_beg = 0;
1213                                 ioq->fifo_cdn = 0;
1214                                 ioq->fifo_cdx = 0;
1215                                 ioq->fifo_end = 0;
1216                         }
1217                 }
1218         } else {
1219                 n = writev(iocom->sock_fd, iov, iovcnt);
1220                 if (n > 0)
1221                         nact = n;
1222                 else
1223                         nact = 0;
1224         }
1225
1226         /*
1227          * Clean out the transmit queue based on what we successfully
1228          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1229          * represents the portion of the first message previously sent.
1230          */
1231         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1232                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1233                          HAMMER2_MSG_ALIGN;
1234                 abytes = msg->aux_size;
1235
1236                 if ((size_t)nact < hbytes - ioq->hbytes) {
1237                         ioq->hbytes += nact;
1238                         /* nact = 0; */
1239                         break;
1240                 }
1241                 nact -= hbytes - ioq->hbytes;
1242                 ioq->hbytes = hbytes;
1243                 if ((size_t)nact < abytes - ioq->abytes) {
1244                         ioq->abytes += nact;
1245                         /* nact = 0; */
1246                         break;
1247                 }
1248                 nact -= abytes - ioq->abytes;
1249
1250                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1251                 --ioq->msgcount;
1252                 ioq->hbytes = 0;
1253                 ioq->abytes = 0;
1254
1255                 hammer2_state_cleanuptx(iocom, msg);
1256         }
1257         assert(nact == 0);
1258
1259         /*
1260          * Process the return value from the write w/regards to blocking.
1261          */
1262         if (n < 0) {
1263                 if (errno != EINTR &&
1264                     errno != EINPROGRESS &&
1265                     errno != EAGAIN) {
1266                         /*
1267                          * Fatal write error
1268                          */
1269                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
1270                         hammer2_iocom_drain(iocom);
1271                 } else {
1272                         /*
1273                          * Wait for socket buffer space
1274                          */
1275                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
1276                 }
1277         } else {
1278                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1279         }
1280         if (ioq->error) {
1281                 hammer2_iocom_drain(iocom);
1282         }
1283 }
1284
1285 /*
1286  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1287  * write events will occur.  We don't kill read msgs because we want
1288  * the caller to pull off our contrived terminal error msg to detect
1289  * the connection failure.
1290  *
1291  * Thread localized, iocom->mtx not held by caller.
1292  */
1293 void
1294 hammer2_iocom_drain(hammer2_iocom_t *iocom)
1295 {
1296         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1297         hammer2_msg_t *msg;
1298
1299         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1300         ioq->hbytes = 0;
1301         ioq->abytes = 0;
1302
1303         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1304                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1305                 --ioq->msgcount;
1306                 hammer2_state_cleanuptx(iocom, msg);
1307         }
1308 }
1309
1310 /*
1311  * Write a message to an iocom, with additional state processing.
1312  */
1313 void
1314 hammer2_msg_write(hammer2_msg_t *msg)
1315 {
1316         hammer2_iocom_t *iocom = msg->router->iocom;
1317         hammer2_state_t *state;
1318         char dummy;
1319
1320         /*
1321          * Handle state processing, create state if necessary.
1322          */
1323         pthread_mutex_lock(&iocom->mtx);
1324         if ((state = msg->state) != NULL) {
1325                 /*
1326                  * Existing transaction (could be reply).  It is also
1327                  * possible for this to be the first reply (CREATE is set),
1328                  * in which case we populate state->txcmd.
1329                  *
1330                  * state->txcmd is adjusted to hold the final message cmd,
1331                  * and we also be sure to set the CREATE bit here.  We did
1332                  * not set it in hammer2_msg_alloc() because that would have
1333                  * not been serialized (state could have gotten ripped out
1334                  * from under the message prior to it being transmitted).
1335                  */
1336                 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE |
1337                                           HAMMER2_MSGF_REPLY)) ==
1338                     HAMMER2_MSGF_CREATE) {
1339                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1340                 }
1341                 msg->any.head.msgid = state->msgid;
1342                 msg->any.head.spanid = state->spanid;
1343                 assert(((state->txcmd ^ msg->any.head.cmd) &
1344                         HAMMER2_MSGF_REPLY) == 0);
1345                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1346                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1347         } else {
1348                 msg->any.head.msgid = 0;
1349                 /* XXX set spanid by router */
1350         }
1351
1352         /*
1353          * Queue it for output, wake up the I/O pthread.  Note that the
1354          * I/O thread is responsible for generating the CRCs and encryption.
1355          */
1356         TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry);
1357         dummy = 0;
1358         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1359         pthread_mutex_unlock(&iocom->mtx);
1360 }
1361
1362 /*
1363  * This is a shortcut to formulate a reply to msg with a simple error code,
1364  * It can reply to and terminate a transaction, or it can reply to a one-way
1365  * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
1366  * the error code (which can be 0).  Not all transactions are terminated
1367  * with HAMMER2_LNK_ERROR status (the low level only cares about the
1368  * MSGF_DELETE flag), but most are.
1369  *
1370  * Replies to one-way messages are a bit of an oxymoron but the feature
1371  * is used by the debug (DBG) protocol.
1372  *
1373  * The reply contains no extended data.
1374  */
1375 void
1376 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
1377 {
1378         hammer2_iocom_t *iocom = msg->router->iocom;
1379         hammer2_state_t *state = msg->state;
1380         hammer2_msg_t *nmsg;
1381         uint32_t cmd;
1382
1383
1384         /*
1385          * Reply with a simple error code and terminate the transaction.
1386          */
1387         cmd = HAMMER2_LNK_ERROR;
1388
1389         /*
1390          * Check if our direction has even been initiated yet, set CREATE.
1391          *
1392          * Check what direction this is (command or reply direction).  Note
1393          * that txcmd might not have been initiated yet.
1394          *
1395          * If our direction has already been closed we just return without
1396          * doing anything.
1397          */
1398         if (state) {
1399                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1400                         return;
1401                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1402                         cmd |= HAMMER2_MSGF_REPLY;
1403                 cmd |= HAMMER2_MSGF_DELETE;
1404         } else {
1405                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1406                         cmd |= HAMMER2_MSGF_REPLY;
1407         }
1408
1409         /*
1410          * Allocate the message and associate it with the existing state.
1411          * We cannot pass MSGF_CREATE to msg_alloc() because that may
1412          * allocate new state.  We have our state already.
1413          */
1414         nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
1415         if (state) {
1416                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1417                         nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
1418         }
1419         nmsg->any.head.error = error;
1420         nmsg->state = state;
1421         hammer2_msg_write(nmsg);
1422 }
1423
1424 /*
1425  * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
1426  * we are generating a streaming reply or an intermediate acknowledgement
1427  * of some sort as part of the higher level protocol, with more to come
1428  * later.
1429  */
1430 void
1431 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
1432 {
1433         hammer2_iocom_t *iocom = msg->router->iocom;
1434         hammer2_state_t *state = msg->state;
1435         hammer2_msg_t *nmsg;
1436         uint32_t cmd;
1437
1438
1439         /*
1440          * Reply with a simple error code and terminate the transaction.
1441          */
1442         cmd = HAMMER2_LNK_ERROR;
1443
1444         /*
1445          * Check if our direction has even been initiated yet, set CREATE.
1446          *
1447          * Check what direction this is (command or reply direction).  Note
1448          * that txcmd might not have been initiated yet.
1449          *
1450          * If our direction has already been closed we just return without
1451          * doing anything.
1452          */
1453         if (state) {
1454                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1455                         return;
1456                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1457                         cmd |= HAMMER2_MSGF_REPLY;
1458                 /* continuing transaction, do not set MSGF_DELETE */
1459         } else {
1460                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1461                         cmd |= HAMMER2_MSGF_REPLY;
1462         }
1463
1464         nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
1465         if (state) {
1466                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1467                         nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
1468         }
1469         nmsg->any.head.error = error;
1470         nmsg->state = state;
1471         hammer2_msg_write(nmsg);
1472 }
1473
1474 /*
1475  * Terminate a transaction given a state structure by issuing a DELETE.
1476  */
1477 void
1478 hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1479 {
1480         hammer2_msg_t *nmsg;
1481         uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1482
1483         /*
1484          * Nothing to do if we already transmitted a delete
1485          */
1486         if (state->txcmd & HAMMER2_MSGF_DELETE)
1487                 return;
1488
1489         /*
1490          * Set REPLY if the other end initiated the command.  Otherwise
1491          * we are the command direction.
1492          */
1493         if (state->txcmd & HAMMER2_MSGF_REPLY)
1494                 cmd |= HAMMER2_MSGF_REPLY;
1495
1496         nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL);
1497         if (state) {
1498                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1499                         nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
1500         }
1501         nmsg->any.head.error = error;
1502         nmsg->state = state;
1503         hammer2_msg_write(nmsg);
1504 }
1505
1506 /************************************************************************
1507  *                      TRANSACTION STATE HANDLING                      *
1508  ************************************************************************
1509  *
1510  */
1511
1512 /*
1513  * Process state tracking for a message after reception, prior to
1514  * execution.
1515  *
1516  * Called with msglk held and the msg dequeued.
1517  *
1518  * All messages are called with dummy state and return actual state.
1519  * (One-off messages often just return the same dummy state).
1520  *
1521  * May request that caller discard the message by setting *discardp to 1.
1522  * The returned state is not used in this case and is allowed to be NULL.
1523  *
1524  * --
1525  *
1526  * These routines handle persistent and command/reply message state via the
1527  * CREATE and DELETE flags.  The first message in a command or reply sequence
1528  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1529  *
1530  * There can be any number of intermediate messages belonging to the same
1531  * sequence sent inbetween the CREATE message and the DELETE message,
1532  * which set neither flag.  This represents a streaming command or reply.
1533  *
1534  * Any command message received with CREATE set expects a reply sequence to
1535  * be returned.  Reply sequences work the same as command sequences except the
1536  * REPLY bit is also sent.  Both the command side and reply side can
1537  * degenerate into a single message with both CREATE and DELETE set.  Note
1538  * that one side can be streaming and the other side not, or neither, or both.
1539  *
1540  * The msgid is unique for the initiator.  That is, two sides sending a new
1541  * message can use the same msgid without colliding.
1542  *
1543  * --
1544  *
1545  * ABORT sequences work by setting the ABORT flag along with normal message
1546  * state.  However, ABORTs can also be sent on half-closed messages, that is
1547  * even if the command or reply side has already sent a DELETE, as long as
1548  * the message has not been fully closed it can still send an ABORT+DELETE
1549  * to terminate the half-closed message state.
1550  *
1551  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1552  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1553  * also race, and in this situation the other side might have already
1554  * initiated a new unrelated command with the same message id.  Since
1555  * the abort has not set the CREATE flag the situation can be detected
1556  * and the message will also be discarded.
1557  *
1558  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1559  * The ABORT request is essentially integrated into the command instead
1560  * of being sent later on.  In this situation the command implementation
1561  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1562  * special-case non-blocking operation for the command.
1563  *
1564  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1565  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1566  *        one-way messages are not supported.
1567  *
1568  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1569  *        simply ignored.
1570  *
1571  * --
1572  *
1573  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1574  * set.  One-off messages cannot be aborted and typically aren't processed
1575  * by these routines.  The REPLY bit can be used to distinguish whether a
1576  * one-off message is a command or reply.  For example, one-off replies
1577  * will typically just contain status updates.
1578  */
1579 static int
1580 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1581 {
1582         hammer2_state_t *state;
1583         hammer2_state_t dummy;
1584         int error;
1585
1586         /*
1587          * Lock RB tree and locate existing persistent state, if any.
1588          *
1589          * If received msg is a command state is on staterd_tree.
1590          * If received msg is a reply state is on statewr_tree.
1591          */
1592
1593         dummy.msgid = msg->any.head.msgid;
1594         dummy.spanid = msg->any.head.spanid;
1595         pthread_mutex_lock(&iocom->mtx);
1596         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1597                 state = RB_FIND(hammer2_state_tree,
1598                                 &iocom->router.statewr_tree, &dummy);
1599         } else {
1600                 state = RB_FIND(hammer2_state_tree,
1601                                 &iocom->router.staterd_tree, &dummy);
1602         }
1603         msg->state = state;
1604         pthread_mutex_unlock(&iocom->mtx);
1605
1606         /*
1607          * Short-cut one-off or mid-stream messages (state may be NULL).
1608          */
1609         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1610                                   HAMMER2_MSGF_ABORT)) == 0) {
1611                 return(0);
1612         }
1613
1614         /*
1615          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1616          * inside the case statements.
1617          */
1618         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1619                                     HAMMER2_MSGF_REPLY)) {
1620         case HAMMER2_MSGF_CREATE:
1621         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1622                 /*
1623                  * New persistant command received.
1624                  */
1625                 if (state) {
1626                         fprintf(stderr, "duplicate-trans %s\n",
1627                                 hammer2_msg_str(msg));
1628                         error = HAMMER2_IOQ_ERROR_TRANS;
1629                         assert(0);
1630                         break;
1631                 }
1632                 state = malloc(sizeof(*state));
1633                 bzero(state, sizeof(*state));
1634                 state->iocom = iocom;
1635                 state->flags = HAMMER2_STATE_DYNAMIC;
1636                 state->msg = msg;
1637                 state->txcmd = HAMMER2_MSGF_REPLY;
1638                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1639                 state->flags |= HAMMER2_STATE_INSERTED;
1640                 state->msgid = msg->any.head.msgid;
1641                 state->spanid = msg->any.head.spanid;
1642                 msg->state = state;
1643                 pthread_mutex_lock(&iocom->mtx);
1644                 RB_INSERT(hammer2_state_tree,
1645                           &iocom->router.staterd_tree, state);
1646                 pthread_mutex_unlock(&iocom->mtx);
1647                 error = 0;
1648                 if (DebugOpt) {
1649                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1650                                 state, (uint32_t)state->msgid, iocom);
1651                 }
1652                 break;
1653         case HAMMER2_MSGF_DELETE:
1654                 /*
1655                  * Persistent state is expected but might not exist if an
1656                  * ABORT+DELETE races the close.
1657                  */
1658                 if (state == NULL) {
1659                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1660                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1661                         } else {
1662                                 fprintf(stderr, "missing-state %s\n",
1663                                         hammer2_msg_str(msg));
1664                                 error = HAMMER2_IOQ_ERROR_TRANS;
1665                         assert(0);
1666                         }
1667                         break;
1668                 }
1669
1670                 /*
1671                  * Handle another ABORT+DELETE case if the msgid has already
1672                  * been reused.
1673                  */
1674                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1675                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1676                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1677                         } else {
1678                                 fprintf(stderr, "reused-state %s\n",
1679                                         hammer2_msg_str(msg));
1680                                 error = HAMMER2_IOQ_ERROR_TRANS;
1681                         assert(0);
1682                         }
1683                         break;
1684                 }
1685                 error = 0;
1686                 break;
1687         default:
1688                 /*
1689                  * Check for mid-stream ABORT command received, otherwise
1690                  * allow.
1691                  */
1692                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1693                         if (state == NULL ||
1694                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1695                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1696                                 break;
1697                         }
1698                 }
1699                 error = 0;
1700                 break;
1701         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1702         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1703                 /*
1704                  * When receiving a reply with CREATE set the original
1705                  * persistent state message should already exist.
1706                  */
1707                 if (state == NULL) {
1708                         fprintf(stderr, "no-state(r) %s\n",
1709                                 hammer2_msg_str(msg));
1710                         error = HAMMER2_IOQ_ERROR_TRANS;
1711                         assert(0);
1712                         break;
1713                 }
1714                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1715                         HAMMER2_MSGF_REPLY) == 0);
1716                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1717                 error = 0;
1718                 break;
1719         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1720                 /*
1721                  * Received REPLY+ABORT+DELETE in case where msgid has
1722                  * already been fully closed, ignore the message.
1723                  */
1724                 if (state == NULL) {
1725                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1726                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1727                         } else {
1728                                 fprintf(stderr, "no-state(r,d) %s\n",
1729                                         hammer2_msg_str(msg));
1730                                 error = HAMMER2_IOQ_ERROR_TRANS;
1731                         assert(0);
1732                         }
1733                         break;
1734                 }
1735
1736                 /*
1737                  * Received REPLY+ABORT+DELETE in case where msgid has
1738                  * already been reused for an unrelated message,
1739                  * ignore the message.
1740                  */
1741                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1742                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1743                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1744                         } else {
1745                                 fprintf(stderr, "reused-state(r,d) %s\n",
1746                                         hammer2_msg_str(msg));
1747                                 error = HAMMER2_IOQ_ERROR_TRANS;
1748                         assert(0);
1749                         }
1750                         break;
1751                 }
1752                 error = 0;
1753                 break;
1754         case HAMMER2_MSGF_REPLY:
1755                 /*
1756                  * Check for mid-stream ABORT reply received to sent command.
1757                  */
1758                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1759                         if (state == NULL ||
1760                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1761                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1762                                 break;
1763                         }
1764                 }
1765                 error = 0;
1766                 break;
1767         }
1768         return (error);
1769 }
1770
1771 void
1772 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1773 {
1774         hammer2_state_t *state;
1775
1776         if ((state = msg->state) == NULL) {
1777                 /*
1778                  * Free a non-transactional message, there is no state
1779                  * to worry about.
1780                  */
1781                 hammer2_msg_free(msg);
1782         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1783                 /*
1784                  * Message terminating transaction, destroy the related
1785                  * state, the original message, and this message (if it
1786                  * isn't the original message due to a CREATE|DELETE).
1787                  */
1788                 pthread_mutex_lock(&iocom->mtx);
1789                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1790                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1791                         if (state->msg == msg)
1792                                 state->msg = NULL;
1793                         assert(state->flags & HAMMER2_STATE_INSERTED);
1794                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1795                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1796                                 RB_REMOVE(hammer2_state_tree,
1797                                           &iocom->router.statewr_tree, state);
1798                         } else {
1799                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1800                                 RB_REMOVE(hammer2_state_tree,
1801                                           &iocom->router.staterd_tree, state);
1802                         }
1803                         state->flags &= ~HAMMER2_STATE_INSERTED;
1804                         hammer2_state_free(state);
1805                 } else {
1806                         ;
1807                 }
1808                 pthread_mutex_unlock(&iocom->mtx);
1809                 hammer2_msg_free(msg);
1810         } else if (state->msg != msg) {
1811                 /*
1812                  * Message not terminating transaction, leave state intact
1813                  * and free message if it isn't the CREATE message.
1814                  */
1815                 hammer2_msg_free(msg);
1816         }
1817 }
1818
1819 static void
1820 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1821 {
1822         hammer2_state_t *state;
1823
1824         if ((state = msg->state) == NULL) {
1825                 hammer2_msg_free(msg);
1826         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1827                 pthread_mutex_lock(&iocom->mtx);
1828                 state->txcmd |= HAMMER2_MSGF_DELETE;
1829                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1830                         if (state->msg == msg)
1831                                 state->msg = NULL;
1832                         assert(state->flags & HAMMER2_STATE_INSERTED);
1833                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
1834                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1835                                 RB_REMOVE(hammer2_state_tree,
1836                                           &iocom->router.staterd_tree, state);
1837                         } else {
1838                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1839                                 RB_REMOVE(hammer2_state_tree,
1840                                           &iocom->router.statewr_tree, state);
1841                         }
1842                         state->flags &= ~HAMMER2_STATE_INSERTED;
1843                         hammer2_state_free(state);
1844                 } else {
1845                         ;
1846                 }
1847                 pthread_mutex_unlock(&iocom->mtx);
1848                 hammer2_msg_free(msg);
1849         } else if (state->msg != msg) {
1850                 hammer2_msg_free(msg);
1851         }
1852 }
1853
1854 /*
1855  * Called with iocom locked
1856  */
1857 void
1858 hammer2_state_free(hammer2_state_t *state)
1859 {
1860         hammer2_iocom_t *iocom = state->iocom;
1861         hammer2_msg_t *msg;
1862         char dummy;
1863
1864         if (DebugOpt) {
1865                 fprintf(stderr, "terminate state %p id=%08x\n",
1866                         state, (uint32_t)state->msgid);
1867         }
1868         assert(state->any.any == NULL);
1869         msg = state->msg;
1870         state->msg = NULL;
1871         if (msg)
1872                 hammer2_msg_free_locked(msg);
1873         free(state);
1874
1875         /*
1876          * When an iocom error is present we are trying to close down the
1877          * iocom, but we have to wait for all states to terminate before
1878          * we can do so.  The iocom rx code will terminate the receive side
1879          * for all transactions by simulating incoming DELETE messages,
1880          * but the state doesn't go away until both sides are terminated.
1881          *
1882          * We may have to wake up the rx code.
1883          */
1884         if (iocom->ioq_rx.error &&
1885             RB_EMPTY(&iocom->router.staterd_tree) &&
1886             RB_EMPTY(&iocom->router.statewr_tree)) {
1887                 dummy = 0;
1888                 write(iocom->wakeupfds[1], &dummy, 1);
1889         }
1890 }
1891
1892 const char *
1893 hammer2_basecmd_str(uint32_t cmd)
1894 {
1895         static char buf[64];
1896         char protobuf[32];
1897         char cmdbuf[32];
1898         const char *protostr;
1899         const char *cmdstr;
1900
1901         switch(cmd & HAMMER2_MSGF_PROTOS) {
1902         case HAMMER2_MSG_PROTO_LNK:
1903                 protostr = "LNK_";
1904                 break;
1905         case HAMMER2_MSG_PROTO_DBG:
1906                 protostr = "DBG_";
1907                 break;
1908         case HAMMER2_MSG_PROTO_DOM:
1909                 protostr = "DOM_";
1910                 break;
1911         case HAMMER2_MSG_PROTO_CAC:
1912                 protostr = "CAC_";
1913                 break;
1914         case HAMMER2_MSG_PROTO_QRM:
1915                 protostr = "QRM_";
1916                 break;
1917         case HAMMER2_MSG_PROTO_BLK:
1918                 protostr = "BLK_";
1919                 break;
1920         case HAMMER2_MSG_PROTO_VOP:
1921                 protostr = "VOP_";
1922                 break;
1923         default:
1924                 snprintf(protobuf, sizeof(protobuf), "%x_",
1925                         (cmd & HAMMER2_MSGF_PROTOS) >> 20);
1926                 protostr = protobuf;
1927                 break;
1928         }
1929
1930         switch(cmd & (HAMMER2_MSGF_PROTOS |
1931                       HAMMER2_MSGF_CMDS |
1932                       HAMMER2_MSGF_SIZE)) {
1933         case HAMMER2_LNK_PAD:
1934                 cmdstr = "PAD";
1935                 break;
1936         case HAMMER2_LNK_PING:
1937                 cmdstr = "PING";
1938                 break;
1939         case HAMMER2_LNK_AUTH:
1940                 cmdstr = "AUTH";
1941                 break;
1942         case HAMMER2_LNK_CONN:
1943                 cmdstr = "CONN";
1944                 break;
1945         case HAMMER2_LNK_SPAN:
1946                 cmdstr = "SPAN";
1947                 break;
1948         case HAMMER2_LNK_ERROR:
1949                 if (cmd & HAMMER2_MSGF_DELETE)
1950                         cmdstr = "RETURN";
1951                 else
1952                         cmdstr = "RESULT";
1953                 break;
1954         case HAMMER2_DBG_SHELL:
1955                 cmdstr = "SHELL";
1956                 break;
1957         default:
1958                 snprintf(cmdbuf, sizeof(cmdbuf),
1959                          "%06x", (cmd & (HAMMER2_MSGF_PROTOS |
1960                                          HAMMER2_MSGF_CMDS |
1961                                          HAMMER2_MSGF_SIZE)));
1962                 cmdstr = cmdbuf;
1963                 break;
1964         }
1965         snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr);
1966         return (buf);
1967 }
1968
1969 const char *
1970 hammer2_msg_str(hammer2_msg_t *msg)
1971 {
1972         hammer2_state_t *state;
1973         static char buf[256];
1974         char errbuf[16];
1975         char statebuf[64];
1976         char flagbuf[64];
1977         const char *statestr;
1978         const char *errstr;
1979         uint32_t basecmd;
1980         int i;
1981
1982         /*
1983          * Parse the state
1984          */
1985         if ((state = msg->state) != NULL) {
1986                 basecmd = (state->rxcmd & HAMMER2_MSGF_REPLY) ?
1987                           state->txcmd : state->rxcmd;
1988                 snprintf(statebuf, sizeof(statebuf),
1989                          " %s=%s,L=%s%s,R=%s%s",
1990                          ((state->txcmd & HAMMER2_MSGF_REPLY) ?
1991                                 "rcvcmd" : "sndcmd"),
1992                          hammer2_basecmd_str(basecmd),
1993                          ((state->txcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1994                          ((state->txcmd & HAMMER2_MSGF_DELETE) ? "D" : ""),
1995                          ((state->rxcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1996                          ((state->rxcmd & HAMMER2_MSGF_DELETE) ? "D" : "")
1997                 );
1998                 statestr = statebuf;
1999         } else {
2000                 statestr = "";
2001         }
2002
2003         /*
2004          * Parse the error
2005          */
2006         switch(msg->any.head.error) {
2007         case 0:
2008                 errstr = "";
2009                 break;
2010         case HAMMER2_IOQ_ERROR_SYNC:
2011                 errstr = "err=IOQ:NOSYNC";
2012                 break;
2013         case HAMMER2_IOQ_ERROR_EOF:
2014                 errstr = "err=IOQ:STREAMEOF";
2015                 break;
2016         case HAMMER2_IOQ_ERROR_SOCK:
2017                 errstr = "err=IOQ:SOCKERR";
2018                 break;
2019         case HAMMER2_IOQ_ERROR_FIELD:
2020                 errstr = "err=IOQ:BADFIELD";
2021                 break;
2022         case HAMMER2_IOQ_ERROR_HCRC:
2023                 errstr = "err=IOQ:BADHCRC";
2024                 break;
2025         case HAMMER2_IOQ_ERROR_XCRC:
2026                 errstr = "err=IOQ:BADXCRC";
2027                 break;
2028         case HAMMER2_IOQ_ERROR_ACRC:
2029                 errstr = "err=IOQ:BADACRC";
2030                 break;
2031         case HAMMER2_IOQ_ERROR_STATE:
2032                 errstr = "err=IOQ:BADSTATE";
2033                 break;
2034         case HAMMER2_IOQ_ERROR_NOPEER:
2035                 errstr = "err=IOQ:PEERCONFIG";
2036                 break;
2037         case HAMMER2_IOQ_ERROR_NORKEY:
2038                 errstr = "err=IOQ:BADRKEY";
2039                 break;
2040         case HAMMER2_IOQ_ERROR_NOLKEY:
2041                 errstr = "err=IOQ:BADLKEY";
2042                 break;
2043         case HAMMER2_IOQ_ERROR_KEYXCHGFAIL:
2044                 errstr = "err=IOQ:BADKEYXCHG";
2045                 break;
2046         case HAMMER2_IOQ_ERROR_KEYFMT:
2047                 errstr = "err=IOQ:BADFMT";
2048                 break;
2049         case HAMMER2_IOQ_ERROR_BADURANDOM:
2050                 errstr = "err=IOQ:BADRANDOM";
2051                 break;
2052         case HAMMER2_IOQ_ERROR_MSGSEQ:
2053                 errstr = "err=IOQ:BADSEQ";
2054                 break;
2055         case HAMMER2_IOQ_ERROR_EALREADY:
2056                 errstr = "err=IOQ:DUPMSG";
2057                 break;
2058         case HAMMER2_IOQ_ERROR_TRANS:
2059                 errstr = "err=IOQ:BADTRANS";
2060                 break;
2061         case HAMMER2_MSG_ERR_NOSUPP:
2062                 errstr = "err=NOSUPPORT";
2063                 break;
2064         default:
2065                 snprintf(errbuf, sizeof(errbuf),
2066                          " err=%d", msg->any.head.error);
2067                 errstr = errbuf;
2068                 break;
2069         }
2070
2071         /*
2072          * Message flags
2073          */
2074         i = 0;
2075         if (msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
2076                                  HAMMER2_MSGF_ABORT | HAMMER2_MSGF_REPLY)) {
2077                 flagbuf[i++] = '|';
2078                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
2079                         flagbuf[i++] = 'C';
2080                 if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
2081                         flagbuf[i++] = 'D';
2082                 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
2083                         flagbuf[i++] = 'R';
2084                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT)
2085                         flagbuf[i++] = 'A';
2086         }
2087         flagbuf[i] = 0;
2088
2089         /*
2090          * Generate the buf
2091          */
2092         snprintf(buf, sizeof(buf),
2093                 "msg=%s%s %s id=%08x span=%08x %s",
2094                  hammer2_basecmd_str(msg->any.head.cmd),
2095                  flagbuf,
2096                  errstr,
2097                  (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
2098                  (uint32_t)(intmax_t)msg->any.head.spanid,  /* for brevity */
2099                  statestr);
2100
2101         return(buf);
2102 }