ea975481f0f9dafeb5a51fc94cccb5d433567cef
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
41 /*
42  * Indexed messages are stored in a red-black tree indexed by their
43  * msgid.  Only persistent messages are indexed.
44  */
45 int
46 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
47 {
48         if (state1->spanid < state2->spanid)
49                 return(-1);
50         if (state1->spanid > state2->spanid)
51                 return(1);
52         if (state1->msgid < state2->msgid)
53                 return(-1);
54         if (state1->msgid > state2->msgid)
55                 return(1);
56         return(0);
57 }
58
59 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
60
61 /*
62  * Initialize a low-level ioq
63  */
64 void
65 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
66 {
67         bzero(ioq, sizeof(*ioq));
68         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
69         TAILQ_INIT(&ioq->msgq);
70 }
71
72 /*
73  * Cleanup queue.
74  *
75  * caller holds iocom->mtx.
76  */
77 void
78 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
79 {
80         hammer2_msg_t *msg;
81
82         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
83                 assert(0);      /* shouldn't happen */
84                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
85                 hammer2_msg_free(msg);
86         }
87         if ((msg = ioq->msg) != NULL) {
88                 ioq->msg = NULL;
89                 hammer2_msg_free(msg);
90         }
91 }
92
93 /*
94  * Initialize a low-level communications channel.
95  *
96  * NOTE: The signal_func() is called at least once from the loop and can be
97  *       re-armed via hammer2_iocom_restate().
98  */
99 void
100 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
101                    void (*signal_func)(hammer2_router_t *),
102                    void (*rcvmsg_func)(hammer2_msg_t *),
103                    void (*altmsg_func)(hammer2_iocom_t *))
104 {
105         bzero(iocom, sizeof(*iocom));
106
107         iocom->router.signal_callback = signal_func;
108         iocom->router.rcvmsg_callback = rcvmsg_func;
109         iocom->router.altmsg_callback = altmsg_func;
110
111         pthread_mutex_init(&iocom->mtx, NULL);
112         RB_INIT(&iocom->router.staterd_tree);
113         RB_INIT(&iocom->router.statewr_tree);
114         TAILQ_INIT(&iocom->freeq);
115         TAILQ_INIT(&iocom->freeq_aux);
116         TAILQ_INIT(&iocom->addrq);
117         TAILQ_INIT(&iocom->router.txmsgq);
118         iocom->router.iocom = iocom;
119         iocom->sock_fd = sock_fd;
120         iocom->alt_fd = alt_fd;
121         iocom->flags = HAMMER2_IOCOMF_RREQ;
122         if (signal_func)
123                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
124         hammer2_ioq_init(iocom, &iocom->ioq_rx);
125         hammer2_ioq_init(iocom, &iocom->ioq_tx);
126         if (pipe(iocom->wakeupfds) < 0)
127                 assert(0);
128         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
129         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
130
131         /*
132          * Negotiate session crypto synchronously.  This will mark the
133          * connection as error'd if it fails.
134          */
135         hammer2_crypto_negotiate(iocom);
136
137         /*
138          * Make sure our fds are set to non-blocking for the iocom core.
139          */
140         if (sock_fd >= 0)
141                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
142 #if 0
143         /* if line buffered our single fgets() should be fine */
144         if (alt_fd >= 0)
145                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
146 #endif
147 }
148
149 /*
150  * May only be called from a callback from iocom_core.
151  *
152  * Adjust state machine functions, set flags to guarantee that both
153  * the recevmsg_func and the sendmsg_func is called at least once.
154  */
155 void
156 hammer2_router_restate(hammer2_router_t *router,
157                    void (*signal_func)(hammer2_router_t *),
158                    void (*rcvmsg_func)(hammer2_msg_t *msg),
159                    void (*altmsg_func)(hammer2_iocom_t *))
160 {
161         router->signal_callback = signal_func;
162         router->rcvmsg_callback = rcvmsg_func;
163         router->altmsg_callback = altmsg_func;
164         if (signal_func)
165                 router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
166         else
167                 router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
168 }
169
170 void
171 hammer2_router_signal(hammer2_router_t *router)
172 {
173         if (router->signal_callback)
174                 router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
175 }
176
177 /*
178  * Cleanup a terminating iocom.
179  *
180  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
181  * from all possible references to it.
182  */
183 void
184 hammer2_iocom_done(hammer2_iocom_t *iocom)
185 {
186         hammer2_msg_t *msg;
187
188         if (iocom->sock_fd >= 0) {
189                 close(iocom->sock_fd);
190                 iocom->sock_fd = -1;
191         }
192         if (iocom->alt_fd >= 0) {
193                 close(iocom->alt_fd);
194                 iocom->alt_fd = -1;
195         }
196         hammer2_ioq_done(iocom, &iocom->ioq_rx);
197         hammer2_ioq_done(iocom, &iocom->ioq_tx);
198         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
199                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
200                 free(msg);
201         }
202         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
203                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
204                 free(msg->aux_data);
205                 msg->aux_data = NULL;
206                 free(msg);
207         }
208         if (iocom->wakeupfds[0] >= 0) {
209                 close(iocom->wakeupfds[0]);
210                 iocom->wakeupfds[0] = -1;
211         }
212         if (iocom->wakeupfds[1] >= 0) {
213                 close(iocom->wakeupfds[1]);
214                 iocom->wakeupfds[1] = -1;
215         }
216         pthread_mutex_destroy(&iocom->mtx);
217 }
218
219 /*
220  * Allocate a new one-way message.
221  */
222 hammer2_msg_t *
223 hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
224                   void (*func)(hammer2_msg_t *), void *data)
225 {
226         hammer2_state_t *state = NULL;
227         hammer2_iocom_t *iocom = router->iocom;
228         hammer2_msg_t *msg;
229         int hbytes;
230
231         pthread_mutex_lock(&iocom->mtx);
232         if (aux_size) {
233                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
234                            ~HAMMER2_MSG_ALIGNMASK;
235                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
236                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
237         } else {
238                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
239                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
240         }
241         if ((cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_REPLY)) ==
242             HAMMER2_MSGF_CREATE) {
243                 /*
244                  * Create state when CREATE is set without REPLY.
245                  *
246                  * NOTE: CREATE in txcmd handled by hammer2_msg_write()
247                  * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx()
248                  */
249                 state = malloc(sizeof(*state));
250                 bzero(state, sizeof(*state));
251                 state->iocom = iocom;
252                 state->flags = HAMMER2_STATE_DYNAMIC;
253                 state->msgid = (uint64_t)(uintptr_t)state;
254                 /* XXX set state->spanid from router */
255                 state->txcmd = cmd & ~(HAMMER2_MSGF_CREATE |
256                                        HAMMER2_MSGF_DELETE);
257                 state->rxcmd = HAMMER2_MSGF_REPLY;
258                 state->func = func;
259                 state->any.any = data;
260                 pthread_mutex_lock(&iocom->mtx);
261                 RB_INSERT(hammer2_state_tree, &iocom->router.statewr_tree, state);
262                 pthread_mutex_unlock(&iocom->mtx);
263                 state->flags |= HAMMER2_STATE_INSERTED;
264         }
265         pthread_mutex_unlock(&iocom->mtx);
266         if (msg == NULL) {
267                 msg = malloc(sizeof(*msg));
268                 bzero(msg, sizeof(*msg));
269                 msg->aux_data = NULL;
270                 msg->aux_size = 0;
271         }
272         if (msg->aux_size != aux_size) {
273                 if (msg->aux_data) {
274                         free(msg->aux_data);
275                         msg->aux_data = NULL;
276                         msg->aux_size = 0;
277                 }
278                 if (aux_size) {
279                         msg->aux_data = malloc(aux_size);
280                         msg->aux_size = aux_size;
281                 }
282         }
283         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
284         if (hbytes)
285                 bzero(&msg->any.head, hbytes);
286         msg->hdr_size = hbytes;
287         msg->any.head.cmd = cmd;
288         msg->any.head.aux_descr = 0;
289         msg->any.head.aux_crc = 0;
290         msg->router = router;
291         if (state) {
292                 msg->state = state;
293                 state->msg = msg;
294                 msg->any.head.msgid = state->msgid;
295         }
296         return (msg);
297 }
298
299 /*
300  * Free a message so it can be reused afresh.
301  *
302  * NOTE: aux_size can be 0 with a non-NULL aux_data.
303  */
304 static
305 void
306 hammer2_msg_free_locked(hammer2_msg_t *msg)
307 {
308         hammer2_iocom_t *iocom = msg->router->iocom;
309
310         msg->state = NULL;
311         if (msg->aux_data)
312                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
313         else
314                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
315 }
316
317 void
318 hammer2_msg_free(hammer2_msg_t *msg)
319 {
320         hammer2_iocom_t *iocom = msg->router->iocom;
321
322         pthread_mutex_lock(&iocom->mtx);
323         hammer2_msg_free_locked(msg);
324         pthread_mutex_unlock(&iocom->mtx);
325 }
326
327 /*
328  * I/O core loop for an iocom.
329  *
330  * Thread localized, iocom->mtx not held.
331  */
332 void
333 hammer2_iocom_core(hammer2_iocom_t *iocom)
334 {
335         struct pollfd fds[3];
336         char dummybuf[256];
337         hammer2_msg_t *msg;
338         int timeout;
339         int count;
340         int wi; /* wakeup pipe */
341         int si; /* socket */
342         int ai; /* alt bulk path socket */
343
344         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
345                 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
346                                      HAMMER2_IOCOMF_WWORK |
347                                      HAMMER2_IOCOMF_PWORK |
348                                      HAMMER2_IOCOMF_SWORK |
349                                      HAMMER2_IOCOMF_ARWORK |
350                                      HAMMER2_IOCOMF_AWWORK)) == 0) {
351                         /*
352                          * Only poll if no immediate work is pending.
353                          * Otherwise we are just wasting our time calling
354                          * poll.
355                          */
356                         timeout = 5000;
357
358                         count = 0;
359                         wi = -1;
360                         si = -1;
361                         ai = -1;
362
363                         /*
364                          * Always check the inter-thread pipe, e.g.
365                          * for iocom->txmsgq work.
366                          */
367                         wi = count++;
368                         fds[wi].fd = iocom->wakeupfds[0];
369                         fds[wi].events = POLLIN;
370                         fds[wi].revents = 0;
371
372                         /*
373                          * Check the socket input/output direction as
374                          * requested
375                          */
376                         if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
377                                             HAMMER2_IOCOMF_WREQ)) {
378                                 si = count++;
379                                 fds[si].fd = iocom->sock_fd;
380                                 fds[si].events = 0;
381                                 fds[si].revents = 0;
382
383                                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
384                                         fds[si].events |= POLLIN;
385                                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
386                                         fds[si].events |= POLLOUT;
387                         }
388
389                         /*
390                          * Check the alternative fd for work.
391                          */
392                         if (iocom->alt_fd >= 0) {
393                                 ai = count++;
394                                 fds[ai].fd = iocom->alt_fd;
395                                 fds[ai].events = POLLIN;
396                                 fds[ai].revents = 0;
397                         }
398                         poll(fds, count, timeout);
399
400                         if (wi >= 0 && (fds[wi].revents & POLLIN))
401                                 iocom->flags |= HAMMER2_IOCOMF_PWORK;
402                         if (si >= 0 && (fds[si].revents & POLLIN))
403                                 iocom->flags |= HAMMER2_IOCOMF_RWORK;
404                         if (si >= 0 && (fds[si].revents & POLLOUT))
405                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
406                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
407                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
408                         if (ai >= 0 && (fds[ai].revents & POLLIN))
409                                 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
410                 } else {
411                         /*
412                          * Always check the pipe
413                          */
414                         iocom->flags |= HAMMER2_IOCOMF_PWORK;
415                 }
416
417                 if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
418                         iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
419                         iocom->router.signal_callback(&iocom->router);
420                 }
421
422                 /*
423                  * Pending message queues from other threads wake us up
424                  * with a write to the wakeupfds[] pipe.  We have to clear
425                  * the pipe with a dummy read.
426                  */
427                 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
428                         iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
429                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
430                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
431                         iocom->flags |= HAMMER2_IOCOMF_WWORK;
432                         if (TAILQ_FIRST(&iocom->router.txmsgq))
433                                 hammer2_iocom_flush1(iocom);
434                 }
435
436                 /*
437                  * Message write sequencing
438                  */
439                 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
440                         hammer2_iocom_flush1(iocom);
441
442                 /*
443                  * Message read sequencing.  Run this after the write
444                  * sequencing in case the write sequencing allowed another
445                  * auto-DELETE to occur on the read side.
446                  */
447                 if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
448                         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
449                                (msg = hammer2_ioq_read(iocom)) != NULL) {
450                                 if (DebugOpt) {
451                                         fprintf(stderr, "receive %s\n",
452                                                 hammer2_msg_str(msg));
453                                 }
454                                 iocom->router.rcvmsg_callback(msg);
455                                 hammer2_state_cleanuprx(iocom, msg);
456                         }
457                 }
458
459                 if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
460                         iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
461                         iocom->router.altmsg_callback(iocom);
462                 }
463         }
464 }
465
466 /*
467  * Read the next ready message from the ioq, issuing I/O if needed.
468  * Caller should retry on a read-event when NULL is returned.
469  *
470  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
471  * be returned for each open transaction, then the ioq and iocom
472  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
473  * msg will be returned as the final message.  The caller should not call
474  * us again after the final message is returned.
475  *
476  * Thread localized, iocom->mtx not held.
477  */
478 hammer2_msg_t *
479 hammer2_ioq_read(hammer2_iocom_t *iocom)
480 {
481         hammer2_ioq_t *ioq = &iocom->ioq_rx;
482         hammer2_msg_t *msg;
483         hammer2_msg_hdr_t *head;
484         hammer2_state_t *state;
485         ssize_t n;
486         size_t bytes;
487         size_t nmax;
488         uint32_t xcrc32;
489         int error;
490
491 again:
492         iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
493
494         /*
495          * If a message is already pending we can just remove and
496          * return it.  Message state has already been processed.
497          */
498         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
499                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
500                 return (msg);
501         }
502
503         if (ioq->error)
504                 goto skip;
505
506         /*
507          * Message read in-progress (msg is NULL at the moment).  We don't
508          * allocate a msg until we have its core header.
509          */
510         bytes = ioq->fifo_end - ioq->fifo_beg;
511         nmax = sizeof(ioq->buf) - ioq->fifo_end;
512         msg = ioq->msg;
513
514         switch(ioq->state) {
515         case HAMMER2_MSGQ_STATE_HEADER1:
516                 /*
517                  * Load the primary header, fail on any non-trivial read
518                  * error or on EOF.  Since the primary header is the same
519                  * size is the message alignment it will never straddle
520                  * the end of the buffer.
521                  */
522                 if (bytes < (int)sizeof(msg->any.head)) {
523                         n = read(iocom->sock_fd,
524                                  ioq->buf + ioq->fifo_end,
525                                  nmax);
526                         if (n <= 0) {
527                                 if (n == 0) {
528                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
529                                         break;
530                                 }
531                                 if (errno != EINTR &&
532                                     errno != EINPROGRESS &&
533                                     errno != EAGAIN) {
534                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
535                                         break;
536                                 }
537                                 n = 0;
538                                 /* fall through */
539                         }
540                         ioq->fifo_end += n;
541                         bytes += n;
542                         nmax -= n;
543                 }
544
545                 /*
546                  * Insufficient data accumulated (msg is NULL, caller will
547                  * retry on event).
548                  */
549                 assert(msg == NULL);
550                 if (bytes < (int)sizeof(msg->any.head))
551                         break;
552
553                 /*
554                  * Calculate the header, decrypt data received so far.
555                  * Data will be decrypted in-place.  Partial blocks are
556                  * not immediately decrypted.
557                  *
558                  * WARNING!  The header might be in the wrong endian, we
559                  *           do not fix it up until we get the entire
560                  *           extended header.
561                  */
562                 hammer2_crypto_decrypt(iocom, ioq);
563                 head = (void *)(ioq->buf + ioq->fifo_beg);
564
565                 /*
566                  * Check and fixup the core header.  Note that the icrc
567                  * has to be calculated before any fixups, but the crc
568                  * fields in the msg may have to be swapped like everything
569                  * else.
570                  */
571                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
572                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
573                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
574                         break;
575                 }
576
577                 /*
578                  * Calculate the full header size and aux data size
579                  */
580                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
581                         ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
582                                       HAMMER2_MSG_ALIGN;
583                         ioq->abytes = bswap32(head->aux_bytes) *
584                                       HAMMER2_MSG_ALIGN;
585                 } else {
586                         ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
587                                       HAMMER2_MSG_ALIGN;
588                         ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
589                 }
590                 if (ioq->hbytes < sizeof(msg->any.head) ||
591                     ioq->hbytes > sizeof(msg->any) ||
592                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
593                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
594                         break;
595                 }
596
597                 /*
598                  * Allocate the message, the next state will fill it in.
599                  */
600                 msg = hammer2_msg_alloc(&iocom->router, ioq->abytes, 0,
601                                         NULL, NULL);
602                 ioq->msg = msg;
603
604                 /*
605                  * Fall through to the next state.  Make sure that the
606                  * extended header does not straddle the end of the buffer.
607                  * We still want to issue larger reads into our buffer,
608                  * book-keeping is easier if we don't bcopy() yet.
609                  */
610                 if (bytes + nmax < ioq->hbytes) {
611                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
612                         ioq->fifo_cdx -= ioq->fifo_beg;
613                         ioq->fifo_beg = 0;
614                         ioq->fifo_end = bytes;
615                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
616                 }
617                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
618                 /* fall through */
619         case HAMMER2_MSGQ_STATE_HEADER2:
620                 /*
621                  * Fill out the extended header.
622                  */
623                 assert(msg != NULL);
624                 if (bytes < ioq->hbytes) {
625                         n = read(iocom->sock_fd,
626                                  ioq->buf + ioq->fifo_end,
627                                  nmax);
628                         if (n <= 0) {
629                                 if (n == 0) {
630                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
631                                         break;
632                                 }
633                                 if (errno != EINTR &&
634                                     errno != EINPROGRESS &&
635                                     errno != EAGAIN) {
636                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
637                                         break;
638                                 }
639                                 n = 0;
640                                 /* fall through */
641                         }
642                         ioq->fifo_end += n;
643                         bytes += n;
644                         nmax -= n;
645                 }
646
647                 /*
648                  * Insufficient data accumulated (set msg NULL so caller will
649                  * retry on event).
650                  */
651                 if (bytes < ioq->hbytes) {
652                         msg = NULL;
653                         break;
654                 }
655
656                 /*
657                  * Calculate the extended header, decrypt data received
658                  * so far.  Handle endian-conversion for the entire extended
659                  * header.
660                  */
661                 hammer2_crypto_decrypt(iocom, ioq);
662                 head = (void *)(ioq->buf + ioq->fifo_beg);
663
664                 /*
665                  * Check the CRC.
666                  */
667                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
668                         xcrc32 = bswap32(head->hdr_crc);
669                 else
670                         xcrc32 = head->hdr_crc;
671                 head->hdr_crc = 0;
672                 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
673                         ioq->error = HAMMER2_IOQ_ERROR_XCRC;
674                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
675                                 xcrc32, hammer2_icrc32(head, ioq->hbytes),
676                                 hammer2_msg_str(msg));
677                         assert(0);
678                         break;
679                 }
680                 head->hdr_crc = xcrc32;
681
682                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
683                         hammer2_bswap_head(head);
684                 }
685
686                 /*
687                  * Copy the extended header into the msg and adjust the
688                  * FIFO.
689                  */
690                 bcopy(head, &msg->any, ioq->hbytes);
691
692                 /*
693                  * We are either done or we fall-through.
694                  */
695                 if (ioq->abytes == 0) {
696                         ioq->fifo_beg += ioq->hbytes;
697                         break;
698                 }
699
700                 /*
701                  * Must adjust nmax and bytes (and the state) when falling
702                  * through.
703                  */
704                 ioq->fifo_beg += ioq->hbytes;
705                 nmax -= ioq->hbytes;
706                 bytes -= ioq->hbytes;
707                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
708                 /* fall through */
709         case HAMMER2_MSGQ_STATE_AUXDATA1:
710                 /*
711                  * Copy the partial or complete payload from remaining
712                  * bytes in the FIFO.  We have to fall-through either
713                  * way so we can check the crc.
714                  *
715                  * Adjust msg->aux_size to the final actual value.
716                  */
717                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
718                 if (ioq->already > ioq->abytes)
719                         ioq->already = ioq->abytes;
720                 if (bytes >= ioq->abytes) {
721                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
722                               ioq->abytes);
723                         msg->aux_size = ioq->abytes;
724                         ioq->fifo_beg += ioq->abytes;
725                         if (ioq->fifo_cdx < ioq->fifo_beg)
726                                 ioq->fifo_cdx = ioq->fifo_beg;
727                         bytes -= ioq->abytes;
728                 } else if (bytes) {
729                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
730                               bytes);
731                         msg->aux_size = bytes;
732                         ioq->fifo_beg += bytes;
733                         if (ioq->fifo_cdx < ioq->fifo_beg)
734                                 ioq->fifo_cdx = ioq->fifo_beg;
735                         bytes = 0;
736                 } else {
737                         msg->aux_size = 0;
738                 }
739                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
740                 /* fall through */
741         case HAMMER2_MSGQ_STATE_AUXDATA2:
742                 /*
743                  * Read the remainder of the payload directly into the
744                  * msg->aux_data buffer.
745                  */
746                 assert(msg);
747                 if (msg->aux_size < ioq->abytes) {
748                         assert(bytes == 0);
749                         n = read(iocom->sock_fd,
750                                  msg->aux_data + msg->aux_size,
751                                  ioq->abytes - msg->aux_size);
752                         if (n <= 0) {
753                                 if (n == 0) {
754                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
755                                         break;
756                                 }
757                                 if (errno != EINTR &&
758                                     errno != EINPROGRESS &&
759                                     errno != EAGAIN) {
760                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
761                                         break;
762                                 }
763                                 n = 0;
764                                 /* fall through */
765                         }
766                         msg->aux_size += n;
767                 }
768
769                 /*
770                  * Insufficient data accumulated (set msg NULL so caller will
771                  * retry on event).
772                  */
773                 if (msg->aux_size < ioq->abytes) {
774                         msg = NULL;
775                         break;
776                 }
777                 assert(msg->aux_size == ioq->abytes);
778                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
779
780                 /*
781                  * Check aux_crc, then we are done.
782                  */
783                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
784                 if (xcrc32 != msg->any.head.aux_crc) {
785                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
786                         break;
787                 }
788                 break;
789         case HAMMER2_MSGQ_STATE_ERROR:
790                 /*
791                  * Continued calls to drain recorded transactions (returning
792                  * a LNK_ERROR for each one), before we return the final
793                  * LNK_ERROR.
794                  */
795                 assert(msg == NULL);
796                 break;
797         default:
798                 /*
799                  * We don't double-return errors, the caller should not
800                  * have called us again after getting an error msg.
801                  */
802                 assert(0);
803                 break;
804         }
805
806         /*
807          * Check the message sequence.  The iv[] should prevent any
808          * possibility of a replay but we add this check anyway.
809          */
810         if (msg && ioq->error == 0) {
811                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
812                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
813                 } else {
814                         ++ioq->seq;
815                 }
816         }
817
818         /*
819          * Process transactional state for the message.
820          */
821         if (msg && ioq->error == 0) {
822                 error = hammer2_state_msgrx(iocom, msg);
823                 if (error) {
824                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
825                                 hammer2_msg_free(msg);
826                                 goto again;
827                         }
828                         ioq->error = error;
829                 }
830         }
831
832         /*
833          * Handle error, RREQ, or completion
834          *
835          * NOTE: nmax and bytes are invalid at this point, we don't bother
836          *       to update them when breaking out.
837          */
838         if (ioq->error) {
839 skip:
840                 /*
841                  * An unrecoverable error causes all active receive
842                  * transactions to be terminated with a LNK_ERROR message.
843                  *
844                  * Once all active transactions are exhausted we set the
845                  * iocom ERROR flag and return a non-transactional LNK_ERROR
846                  * message, which should cause master processing loops to
847                  * terminate.
848                  */
849                 assert(ioq->msg == msg);
850                 if (msg) {
851                         hammer2_msg_free(msg);
852                         ioq->msg = NULL;
853                 }
854
855                 /*
856                  * No more I/O read processing
857                  */
858                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
859
860                 /*
861                  * Simulate a remote LNK_ERROR DELETE msg for any open
862                  * transactions, ending with a final non-transactional
863                  * LNK_ERROR (that the session can detect) when no
864                  * transactions remain.
865                  */
866                 msg = hammer2_msg_alloc(&iocom->router, 0, 0, NULL, NULL);
867                 bzero(&msg->any.head, sizeof(msg->any.head));
868                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
869                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
870                 msg->any.head.error = ioq->error;
871
872                 pthread_mutex_lock(&iocom->mtx);
873                 hammer2_iocom_drain(iocom);
874                 if ((state = RB_ROOT(&iocom->router.staterd_tree)) != NULL) {
875                         /*
876                          * Active remote transactions are still present.
877                          * Simulate the other end sending us a DELETE.
878                          */
879                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
880                                 hammer2_msg_free(msg);
881                                 msg = NULL;
882                         } else {
883                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
884                                 msg->state = state;
885                                 msg->any.head.spanid = state->spanid;
886                                 msg->any.head.msgid = state->msgid;
887                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
888                                                      HAMMER2_MSGF_DELETE;
889                         }
890                 } else if ((state = RB_ROOT(&iocom->router.statewr_tree)) != NULL) {
891                         /*
892                          * Active local transactions are still present.
893                          * Simulate the other end sending us a DELETE.
894                          */
895                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
896                                 hammer2_msg_free(msg);
897                                 msg = NULL;
898                         } else {
899                                 msg->state = state;
900                                 msg->any.head.spanid = state->spanid;
901                                 msg->any.head.msgid = state->msgid;
902                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
903                                                      HAMMER2_MSGF_DELETE |
904                                                      HAMMER2_MSGF_REPLY;
905                                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
906                                         msg->any.head.cmd |=
907                                                      HAMMER2_MSGF_CREATE;
908                                 }
909                         }
910                 } else {
911                         /*
912                          * No active local or remote transactions remain.
913                          * Generate a final LNK_ERROR and flag EOF.
914                          */
915                         msg->state = NULL;
916                         iocom->flags |= HAMMER2_IOCOMF_EOF;
917                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
918                 }
919                 pthread_mutex_unlock(&iocom->mtx);
920
921                 /*
922                  * For the iocom error case we want to set RWORK to indicate
923                  * that more messages might be pending.
924                  *
925                  * It is possible to return NULL when there is more work to
926                  * do because each message has to be DELETEd in both
927                  * directions before we continue on with the next (though
928                  * this could be optimized).  The transmit direction will
929                  * re-set RWORK.
930                  */
931                 if (msg)
932                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
933         } else if (msg == NULL) {
934                 /*
935                  * Insufficient data received to finish building the message,
936                  * set RREQ and return NULL.
937                  *
938                  * Leave ioq->msg intact.
939                  * Leave the FIFO intact.
940                  */
941                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
942         } else {
943                 /*
944                  * Return msg.
945                  *
946                  * The fifo has already been advanced past the message.
947                  * Trivially reset the FIFO indices if possible.
948                  *
949                  * clear the FIFO if it is now empty and set RREQ to wait
950                  * for more from the socket.  If the FIFO is not empty set
951                  * TWORK to bypass the poll so we loop immediately.
952                  */
953                 if (ioq->fifo_beg == ioq->fifo_end) {
954                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
955                         ioq->fifo_cdx = 0;
956                         ioq->fifo_beg = 0;
957                         ioq->fifo_end = 0;
958                 } else {
959                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
960                 }
961                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
962                 ioq->msg = NULL;
963         }
964         return (msg);
965 }
966
967 /*
968  * Calculate the header and data crc's and write a low-level message to
969  * the connection.  If aux_crc is non-zero the aux_data crc is already
970  * assumed to have been set.
971  *
972  * A non-NULL msg is added to the queue but not necessarily flushed.
973  * Calling this function with msg == NULL will get a flush going.
974  *
975  * Caller must hold iocom->mtx.
976  */
977 void
978 hammer2_iocom_flush1(hammer2_iocom_t *iocom)
979 {
980         hammer2_ioq_t *ioq = &iocom->ioq_tx;
981         hammer2_msg_t *msg;
982         uint32_t xcrc32;
983         int hbytes;
984         hammer2_msg_queue_t tmpq;
985
986         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
987         TAILQ_INIT(&tmpq);
988         pthread_mutex_lock(&iocom->mtx);
989         while ((msg = TAILQ_FIRST(&iocom->router.txmsgq)) != NULL) {
990                 TAILQ_REMOVE(&iocom->router.txmsgq, msg, qentry);
991                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
992         }
993         pthread_mutex_unlock(&iocom->mtx);
994
995         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
996                 /*
997                  * Process terminal connection errors.
998                  */
999                 TAILQ_REMOVE(&tmpq, msg, qentry);
1000                 if (ioq->error) {
1001                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1002                         ++ioq->msgcount;
1003                         continue;
1004                 }
1005
1006                 /*
1007                  * Finish populating the msg fields.  The salt ensures that
1008                  * the iv[] array is ridiculously randomized and we also
1009                  * re-seed our PRNG every 32768 messages just to be sure.
1010                  */
1011                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
1012                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1013                 ++ioq->seq;
1014                 if ((ioq->seq & 32767) == 0)
1015                         srandomdev();
1016
1017                 /*
1018                  * Calculate aux_crc if 0, then calculate hdr_crc.
1019                  */
1020                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1021                         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
1022                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
1023                         msg->any.head.aux_crc = xcrc32;
1024                 }
1025                 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
1026                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
1027
1028                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1029                          HAMMER2_MSG_ALIGN;
1030                 msg->any.head.hdr_crc = 0;
1031                 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
1032
1033                 /*
1034                  * Enqueue the message (the flush codes handles stream
1035                  * encryption).
1036                  */
1037                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1038                 ++ioq->msgcount;
1039         }
1040         hammer2_iocom_flush2(iocom);
1041 }
1042
1043 /*
1044  * Thread localized, iocom->mtx not held by caller.
1045  */
1046 void
1047 hammer2_iocom_flush2(hammer2_iocom_t *iocom)
1048 {
1049         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1050         hammer2_msg_t *msg;
1051         ssize_t nmax;
1052         ssize_t omax;
1053         ssize_t nact;
1054         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
1055         size_t hbytes;
1056         size_t abytes;
1057         size_t hoff;
1058         size_t aoff;
1059         int n;
1060
1061         if (ioq->error) {
1062                 hammer2_iocom_drain(iocom);
1063                 return;
1064         }
1065
1066         /*
1067          * Pump messages out the connection by building an iovec.
1068          *
1069          * ioq->hbytes/ioq->abytes tracks how much of the first message
1070          * in the queue has been successfully written out, so we can
1071          * resume writing.
1072          */
1073         n = 0;
1074         nmax = 0;
1075         hoff = ioq->hbytes;
1076         aoff = ioq->abytes;
1077
1078         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1079                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1080                          HAMMER2_MSG_ALIGN;
1081                 abytes = msg->aux_size;
1082                 assert(hoff <= hbytes && aoff <= abytes);
1083
1084                 if (hoff < hbytes) {
1085                         iov[n].iov_base = (char *)&msg->any.head + hoff;
1086                         iov[n].iov_len = hbytes - hoff;
1087                         nmax += hbytes - hoff;
1088                         ++n;
1089                         if (n == HAMMER2_IOQ_MAXIOVEC)
1090                                 break;
1091                 }
1092                 if (aoff < abytes) {
1093                         assert(msg->aux_data != NULL);
1094                         iov[n].iov_base = (char *)msg->aux_data + aoff;
1095                         iov[n].iov_len = abytes - aoff;
1096                         nmax += abytes - aoff;
1097                         ++n;
1098                         if (n == HAMMER2_IOQ_MAXIOVEC)
1099                                 break;
1100                 }
1101                 hoff = 0;
1102                 aoff = 0;
1103         }
1104         if (n == 0)
1105                 return;
1106
1107         /*
1108          * Encrypt and write the data.  The crypto code will move the
1109          * data into the fifo and adjust the iov as necessary.  If
1110          * encryption is disabled the iov is left alone.
1111          *
1112          * May return a smaller iov (thus a smaller n), with aggregated
1113          * chunks.  May reduce nmax to what fits in the FIFO.
1114          */
1115         omax = nmax;
1116         n = hammer2_crypto_encrypt(iocom, ioq, iov, n, &nmax);
1117
1118         /*
1119          * Execute the writev() then figure out what happened.
1120          */
1121         nact = writev(iocom->sock_fd, iov, n);
1122         if (nact < 0) {
1123                 if (errno != EINTR &&
1124                     errno != EINPROGRESS &&
1125                     errno != EAGAIN) {
1126                         /*
1127                          * Fatal write error
1128                          */
1129                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
1130                         hammer2_iocom_drain(iocom);
1131                 } else {
1132                         /*
1133                          * Wait for socket buffer space
1134                          */
1135                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
1136                 }
1137                 return;
1138         }
1139
1140         /*
1141          * Indicate bytes written successfully.
1142          *
1143          * If we were unable to write the entire iov array then set WREQ
1144          * to wait for more socket buffer space.
1145          *
1146          * If the FIFO space was insufficient to fully drain all messages
1147          * set WWORK to cause the core to call us again for the next batch.
1148          */
1149         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
1150         if (nact != nmax)
1151                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1152
1153         /*
1154          * Clean out the transmit queue based on what we successfully
1155          * sent.  ioq->hbytes/abytes represents the portion of the first
1156          * message previously sent.
1157          */
1158         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1159                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1160                          HAMMER2_MSG_ALIGN;
1161                 abytes = msg->aux_size;
1162
1163                 if ((size_t)nact < hbytes - ioq->hbytes) {
1164                         ioq->hbytes += nact;
1165                         /* nact = 0; */
1166                         break;
1167                 }
1168                 nact -= hbytes - ioq->hbytes;
1169                 ioq->hbytes = hbytes;
1170                 if ((size_t)nact < abytes - ioq->abytes) {
1171                         ioq->abytes += nact;
1172                         /* nact = 0; */
1173                         break;
1174                 }
1175                 nact -= abytes - ioq->abytes;
1176
1177                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1178                 --ioq->msgcount;
1179                 ioq->hbytes = 0;
1180                 ioq->abytes = 0;
1181
1182                 hammer2_state_cleanuptx(iocom, msg);
1183         }
1184
1185         /*
1186          * If more messages are pending on WREQ wasn't set we must
1187          * ensure that WWORK gets set.
1188          */
1189         if (msg && (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0)
1190                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
1191         assert(nact == 0);
1192         if (ioq->error) {
1193                 hammer2_iocom_drain(iocom);
1194         }
1195 }
1196
1197 /*
1198  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1199  * write events will occur.  We don't kill read msgs because we want
1200  * the caller to pull off our contrived terminal error msg to detect
1201  * the connection failure.
1202  *
1203  * Thread localized, iocom->mtx not held by caller.
1204  */
1205 void
1206 hammer2_iocom_drain(hammer2_iocom_t *iocom)
1207 {
1208         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1209         hammer2_msg_t *msg;
1210
1211         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1212         ioq->hbytes = 0;
1213         ioq->abytes = 0;
1214
1215         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1216                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1217                 --ioq->msgcount;
1218                 hammer2_state_cleanuptx(iocom, msg);
1219         }
1220 }
1221
1222 /*
1223  * Write a message to an iocom, with additional state processing.
1224  */
1225 void
1226 hammer2_msg_write(hammer2_msg_t *msg)
1227 {
1228         hammer2_iocom_t *iocom = msg->router->iocom;
1229         hammer2_state_t *state;
1230         char dummy;
1231
1232         /*
1233          * Handle state processing, create state if necessary.
1234          */
1235         pthread_mutex_lock(&iocom->mtx);
1236         if ((state = msg->state) != NULL) {
1237                 /*
1238                  * Existing transaction (could be reply).  It is also
1239                  * possible for this to be the first reply (CREATE is set),
1240                  * in which case we populate state->txcmd.
1241                  *
1242                  * state->txcmd is adjusted to hold the final message cmd,
1243                  * and we also be sure to set the CREATE bit here.  We did
1244                  * not set it in hammer2_msg_alloc() because that would have
1245                  * not been serialized (state could have gotten ripped out
1246                  * from under the message prior to it being transmitted).
1247                  */
1248                 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE |
1249                                           HAMMER2_MSGF_REPLY)) ==
1250                     HAMMER2_MSGF_CREATE) {
1251                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1252                 }
1253                 msg->any.head.msgid = state->msgid;
1254                 msg->any.head.spanid = state->spanid;
1255                 assert(((state->txcmd ^ msg->any.head.cmd) &
1256                         HAMMER2_MSGF_REPLY) == 0);
1257                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1258                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1259         } else {
1260                 msg->any.head.msgid = 0;
1261                 /* XXX set spanid by router */
1262         }
1263
1264         /*
1265          * Queue it for output, wake up the I/O pthread.  Note that the
1266          * I/O thread is responsible for generating the CRCs and encryption.
1267          */
1268         TAILQ_INSERT_TAIL(&iocom->router.txmsgq, msg, qentry);
1269         dummy = 0;
1270         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1271         pthread_mutex_unlock(&iocom->mtx);
1272 }
1273
1274 /*
1275  * This is a shortcut to formulate a reply to msg with a simple error code,
1276  * It can reply to and terminate a transaction, or it can reply to a one-way
1277  * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
1278  * the error code (which can be 0).  Not all transactions are terminated
1279  * with HAMMER2_LNK_ERROR status (the low level only cares about the
1280  * MSGF_DELETE flag), but most are.
1281  *
1282  * Replies to one-way messages are a bit of an oxymoron but the feature
1283  * is used by the debug (DBG) protocol.
1284  *
1285  * The reply contains no extended data.
1286  */
1287 void
1288 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
1289 {
1290         hammer2_iocom_t *iocom = msg->router->iocom;
1291         hammer2_state_t *state = msg->state;
1292         hammer2_msg_t *nmsg;
1293         uint32_t cmd;
1294
1295
1296         /*
1297          * Reply with a simple error code and terminate the transaction.
1298          */
1299         cmd = HAMMER2_LNK_ERROR;
1300
1301         /*
1302          * Check if our direction has even been initiated yet, set CREATE.
1303          *
1304          * Check what direction this is (command or reply direction).  Note
1305          * that txcmd might not have been initiated yet.
1306          *
1307          * If our direction has already been closed we just return without
1308          * doing anything.
1309          */
1310         if (state) {
1311                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1312                         return;
1313                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1314                         cmd |= HAMMER2_MSGF_REPLY;
1315                 cmd |= HAMMER2_MSGF_DELETE;
1316         } else {
1317                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1318                         cmd |= HAMMER2_MSGF_REPLY;
1319         }
1320
1321         /*
1322          * Allocate the message and associate it with the existing state.
1323          * We cannot pass MSGF_CREATE to msg_alloc() because that may
1324          * allocate new state.  We have our state already.
1325          */
1326         nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
1327         if (state) {
1328                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1329                         nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
1330         }
1331         nmsg->any.head.error = error;
1332         nmsg->state = state;
1333         hammer2_msg_write(nmsg);
1334 }
1335
1336 /*
1337  * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
1338  * we are generating a streaming reply or an intermediate acknowledgement
1339  * of some sort as part of the higher level protocol, with more to come
1340  * later.
1341  */
1342 void
1343 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
1344 {
1345         hammer2_iocom_t *iocom = msg->router->iocom;
1346         hammer2_state_t *state = msg->state;
1347         hammer2_msg_t *nmsg;
1348         uint32_t cmd;
1349
1350
1351         /*
1352          * Reply with a simple error code and terminate the transaction.
1353          */
1354         cmd = HAMMER2_LNK_ERROR;
1355
1356         /*
1357          * Check if our direction has even been initiated yet, set CREATE.
1358          *
1359          * Check what direction this is (command or reply direction).  Note
1360          * that txcmd might not have been initiated yet.
1361          *
1362          * If our direction has already been closed we just return without
1363          * doing anything.
1364          */
1365         if (state) {
1366                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1367                         return;
1368                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1369                         cmd |= HAMMER2_MSGF_REPLY;
1370                 /* continuing transaction, do not set MSGF_DELETE */
1371         } else {
1372                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1373                         cmd |= HAMMER2_MSGF_REPLY;
1374         }
1375
1376         nmsg = hammer2_msg_alloc(&iocom->router, 0, cmd, NULL, NULL);
1377         if (state) {
1378                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1379                         nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
1380         }
1381         nmsg->any.head.error = error;
1382         nmsg->state = state;
1383         hammer2_msg_write(nmsg);
1384 }
1385
1386 /*
1387  * Terminate a transaction given a state structure by issuing a DELETE.
1388  */
1389 void
1390 hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1391 {
1392         hammer2_msg_t *nmsg;
1393         uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1394
1395         /*
1396          * Nothing to do if we already transmitted a delete
1397          */
1398         if (state->txcmd & HAMMER2_MSGF_DELETE)
1399                 return;
1400
1401         /*
1402          * Set REPLY if the other end initiated the command.  Otherwise
1403          * we are the command direction.
1404          */
1405         if (state->txcmd & HAMMER2_MSGF_REPLY)
1406                 cmd |= HAMMER2_MSGF_REPLY;
1407
1408         nmsg = hammer2_msg_alloc(&state->iocom->router, 0, cmd, NULL, NULL);
1409         if (state) {
1410                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1411                         nmsg->any.head.cmd |= HAMMER2_MSGF_CREATE;
1412         }
1413         nmsg->any.head.error = error;
1414         nmsg->state = state;
1415         hammer2_msg_write(nmsg);
1416 }
1417
1418 /************************************************************************
1419  *                      TRANSACTION STATE HANDLING                      *
1420  ************************************************************************
1421  *
1422  */
1423
1424 /*
1425  * Process state tracking for a message after reception, prior to
1426  * execution.
1427  *
1428  * Called with msglk held and the msg dequeued.
1429  *
1430  * All messages are called with dummy state and return actual state.
1431  * (One-off messages often just return the same dummy state).
1432  *
1433  * May request that caller discard the message by setting *discardp to 1.
1434  * The returned state is not used in this case and is allowed to be NULL.
1435  *
1436  * --
1437  *
1438  * These routines handle persistent and command/reply message state via the
1439  * CREATE and DELETE flags.  The first message in a command or reply sequence
1440  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1441  *
1442  * There can be any number of intermediate messages belonging to the same
1443  * sequence sent inbetween the CREATE message and the DELETE message,
1444  * which set neither flag.  This represents a streaming command or reply.
1445  *
1446  * Any command message received with CREATE set expects a reply sequence to
1447  * be returned.  Reply sequences work the same as command sequences except the
1448  * REPLY bit is also sent.  Both the command side and reply side can
1449  * degenerate into a single message with both CREATE and DELETE set.  Note
1450  * that one side can be streaming and the other side not, or neither, or both.
1451  *
1452  * The msgid is unique for the initiator.  That is, two sides sending a new
1453  * message can use the same msgid without colliding.
1454  *
1455  * --
1456  *
1457  * ABORT sequences work by setting the ABORT flag along with normal message
1458  * state.  However, ABORTs can also be sent on half-closed messages, that is
1459  * even if the command or reply side has already sent a DELETE, as long as
1460  * the message has not been fully closed it can still send an ABORT+DELETE
1461  * to terminate the half-closed message state.
1462  *
1463  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1464  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1465  * also race, and in this situation the other side might have already
1466  * initiated a new unrelated command with the same message id.  Since
1467  * the abort has not set the CREATE flag the situation can be detected
1468  * and the message will also be discarded.
1469  *
1470  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1471  * The ABORT request is essentially integrated into the command instead
1472  * of being sent later on.  In this situation the command implementation
1473  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1474  * special-case non-blocking operation for the command.
1475  *
1476  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1477  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1478  *        one-way messages are not supported.
1479  *
1480  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1481  *        simply ignored.
1482  *
1483  * --
1484  *
1485  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1486  * set.  One-off messages cannot be aborted and typically aren't processed
1487  * by these routines.  The REPLY bit can be used to distinguish whether a
1488  * one-off message is a command or reply.  For example, one-off replies
1489  * will typically just contain status updates.
1490  */
1491 static int
1492 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1493 {
1494         hammer2_state_t *state;
1495         hammer2_state_t dummy;
1496         int error;
1497
1498         /*
1499          * Lock RB tree and locate existing persistent state, if any.
1500          *
1501          * If received msg is a command state is on staterd_tree.
1502          * If received msg is a reply state is on statewr_tree.
1503          */
1504
1505         dummy.msgid = msg->any.head.msgid;
1506         dummy.spanid = msg->any.head.spanid;
1507         pthread_mutex_lock(&iocom->mtx);
1508         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1509                 state = RB_FIND(hammer2_state_tree,
1510                                 &iocom->router.statewr_tree, &dummy);
1511         } else {
1512                 state = RB_FIND(hammer2_state_tree,
1513                                 &iocom->router.staterd_tree, &dummy);
1514         }
1515         msg->state = state;
1516         pthread_mutex_unlock(&iocom->mtx);
1517
1518         /*
1519          * Short-cut one-off or mid-stream messages (state may be NULL).
1520          */
1521         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1522                                   HAMMER2_MSGF_ABORT)) == 0) {
1523                 return(0);
1524         }
1525
1526         /*
1527          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1528          * inside the case statements.
1529          */
1530         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1531                                     HAMMER2_MSGF_REPLY)) {
1532         case HAMMER2_MSGF_CREATE:
1533         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1534                 /*
1535                  * New persistant command received.
1536                  */
1537                 if (state) {
1538                         fprintf(stderr, "duplicate-trans %s\n",
1539                                 hammer2_msg_str(msg));
1540                         error = HAMMER2_IOQ_ERROR_TRANS;
1541                         assert(0);
1542                         break;
1543                 }
1544                 state = malloc(sizeof(*state));
1545                 bzero(state, sizeof(*state));
1546                 state->iocom = iocom;
1547                 state->flags = HAMMER2_STATE_DYNAMIC;
1548                 state->msg = msg;
1549                 state->txcmd = HAMMER2_MSGF_REPLY;
1550                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1551                 state->flags |= HAMMER2_STATE_INSERTED;
1552                 state->msgid = msg->any.head.msgid;
1553                 state->spanid = msg->any.head.spanid;
1554                 msg->state = state;
1555                 pthread_mutex_lock(&iocom->mtx);
1556                 RB_INSERT(hammer2_state_tree,
1557                           &iocom->router.staterd_tree, state);
1558                 pthread_mutex_unlock(&iocom->mtx);
1559                 error = 0;
1560                 if (DebugOpt) {
1561                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1562                                 state, (uint32_t)state->msgid, iocom);
1563                 }
1564                 break;
1565         case HAMMER2_MSGF_DELETE:
1566                 /*
1567                  * Persistent state is expected but might not exist if an
1568                  * ABORT+DELETE races the close.
1569                  */
1570                 if (state == NULL) {
1571                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1572                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1573                         } else {
1574                                 fprintf(stderr, "missing-state %s\n",
1575                                         hammer2_msg_str(msg));
1576                                 error = HAMMER2_IOQ_ERROR_TRANS;
1577                         assert(0);
1578                         }
1579                         break;
1580                 }
1581
1582                 /*
1583                  * Handle another ABORT+DELETE case if the msgid has already
1584                  * been reused.
1585                  */
1586                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1587                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1588                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1589                         } else {
1590                                 fprintf(stderr, "reused-state %s\n",
1591                                         hammer2_msg_str(msg));
1592                                 error = HAMMER2_IOQ_ERROR_TRANS;
1593                         assert(0);
1594                         }
1595                         break;
1596                 }
1597                 error = 0;
1598                 break;
1599         default:
1600                 /*
1601                  * Check for mid-stream ABORT command received, otherwise
1602                  * allow.
1603                  */
1604                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1605                         if (state == NULL ||
1606                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1607                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1608                                 break;
1609                         }
1610                 }
1611                 error = 0;
1612                 break;
1613         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1614         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1615                 /*
1616                  * When receiving a reply with CREATE set the original
1617                  * persistent state message should already exist.
1618                  */
1619                 if (state == NULL) {
1620                         fprintf(stderr, "no-state(r) %s\n",
1621                                 hammer2_msg_str(msg));
1622                         error = HAMMER2_IOQ_ERROR_TRANS;
1623                         assert(0);
1624                         break;
1625                 }
1626                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1627                         HAMMER2_MSGF_REPLY) == 0);
1628                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1629                 error = 0;
1630                 break;
1631         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1632                 /*
1633                  * Received REPLY+ABORT+DELETE in case where msgid has
1634                  * already been fully closed, ignore the message.
1635                  */
1636                 if (state == NULL) {
1637                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1638                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1639                         } else {
1640                                 fprintf(stderr, "no-state(r,d) %s\n",
1641                                         hammer2_msg_str(msg));
1642                                 error = HAMMER2_IOQ_ERROR_TRANS;
1643                         assert(0);
1644                         }
1645                         break;
1646                 }
1647
1648                 /*
1649                  * Received REPLY+ABORT+DELETE in case where msgid has
1650                  * already been reused for an unrelated message,
1651                  * ignore the message.
1652                  */
1653                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1654                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1655                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1656                         } else {
1657                                 fprintf(stderr, "reused-state(r,d) %s\n",
1658                                         hammer2_msg_str(msg));
1659                                 error = HAMMER2_IOQ_ERROR_TRANS;
1660                         assert(0);
1661                         }
1662                         break;
1663                 }
1664                 error = 0;
1665                 break;
1666         case HAMMER2_MSGF_REPLY:
1667                 /*
1668                  * Check for mid-stream ABORT reply received to sent command.
1669                  */
1670                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1671                         if (state == NULL ||
1672                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1673                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1674                                 break;
1675                         }
1676                 }
1677                 error = 0;
1678                 break;
1679         }
1680         return (error);
1681 }
1682
1683 void
1684 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1685 {
1686         hammer2_state_t *state;
1687
1688         if ((state = msg->state) == NULL) {
1689                 /*
1690                  * Free a non-transactional message, there is no state
1691                  * to worry about.
1692                  */
1693                 hammer2_msg_free(msg);
1694         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1695                 /*
1696                  * Message terminating transaction, destroy the related
1697                  * state, the original message, and this message (if it
1698                  * isn't the original message due to a CREATE|DELETE).
1699                  */
1700                 pthread_mutex_lock(&iocom->mtx);
1701                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1702                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1703                         if (state->msg == msg)
1704                                 state->msg = NULL;
1705                         assert(state->flags & HAMMER2_STATE_INSERTED);
1706                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1707                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1708                                 RB_REMOVE(hammer2_state_tree,
1709                                           &iocom->router.statewr_tree, state);
1710                         } else {
1711                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1712                                 RB_REMOVE(hammer2_state_tree,
1713                                           &iocom->router.staterd_tree, state);
1714                         }
1715                         state->flags &= ~HAMMER2_STATE_INSERTED;
1716                         hammer2_state_free(state);
1717                 } else {
1718                         ;
1719                 }
1720                 pthread_mutex_unlock(&iocom->mtx);
1721                 hammer2_msg_free(msg);
1722         } else if (state->msg != msg) {
1723                 /*
1724                  * Message not terminating transaction, leave state intact
1725                  * and free message if it isn't the CREATE message.
1726                  */
1727                 hammer2_msg_free(msg);
1728         }
1729 }
1730
1731 static void
1732 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1733 {
1734         hammer2_state_t *state;
1735
1736         if ((state = msg->state) == NULL) {
1737                 hammer2_msg_free(msg);
1738         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1739                 pthread_mutex_lock(&iocom->mtx);
1740                 state->txcmd |= HAMMER2_MSGF_DELETE;
1741                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1742                         if (state->msg == msg)
1743                                 state->msg = NULL;
1744                         assert(state->flags & HAMMER2_STATE_INSERTED);
1745                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
1746                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1747                                 RB_REMOVE(hammer2_state_tree,
1748                                           &iocom->router.staterd_tree, state);
1749                         } else {
1750                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1751                                 RB_REMOVE(hammer2_state_tree,
1752                                           &iocom->router.statewr_tree, state);
1753                         }
1754                         state->flags &= ~HAMMER2_STATE_INSERTED;
1755                         hammer2_state_free(state);
1756                 } else {
1757                         ;
1758                 }
1759                 pthread_mutex_unlock(&iocom->mtx);
1760                 hammer2_msg_free(msg);
1761         } else if (state->msg != msg) {
1762                 hammer2_msg_free(msg);
1763         }
1764 }
1765
1766 /*
1767  * Called with iocom locked
1768  */
1769 void
1770 hammer2_state_free(hammer2_state_t *state)
1771 {
1772         hammer2_iocom_t *iocom = state->iocom;
1773         hammer2_msg_t *msg;
1774         char dummy;
1775
1776         if (DebugOpt) {
1777                 fprintf(stderr, "terminate state %p id=%08x\n",
1778                         state, (uint32_t)state->msgid);
1779         }
1780         assert(state->any.any == NULL);
1781         msg = state->msg;
1782         state->msg = NULL;
1783         if (msg)
1784                 hammer2_msg_free_locked(msg);
1785         free(state);
1786
1787         /*
1788          * When an iocom error is present we are trying to close down the
1789          * iocom, but we have to wait for all states to terminate before
1790          * we can do so.  The iocom rx code will terminate the receive side
1791          * for all transactions by simulating incoming DELETE messages,
1792          * but the state doesn't go away until both sides are terminated.
1793          *
1794          * We may have to wake up the rx code.
1795          */
1796         if (iocom->ioq_rx.error &&
1797             RB_EMPTY(&iocom->router.staterd_tree) &&
1798             RB_EMPTY(&iocom->router.statewr_tree)) {
1799                 dummy = 0;
1800                 write(iocom->wakeupfds[1], &dummy, 1);
1801         }
1802 }
1803
1804 const char *
1805 hammer2_basecmd_str(uint32_t cmd)
1806 {
1807         static char buf[64];
1808         char protobuf[32];
1809         char cmdbuf[32];
1810         const char *protostr;
1811         const char *cmdstr;
1812
1813         switch(cmd & HAMMER2_MSGF_PROTOS) {
1814         case HAMMER2_MSG_PROTO_LNK:
1815                 protostr = "LNK_";
1816                 break;
1817         case HAMMER2_MSG_PROTO_DBG:
1818                 protostr = "DBG_";
1819                 break;
1820         case HAMMER2_MSG_PROTO_DOM:
1821                 protostr = "DOM_";
1822                 break;
1823         case HAMMER2_MSG_PROTO_CAC:
1824                 protostr = "CAC_";
1825                 break;
1826         case HAMMER2_MSG_PROTO_QRM:
1827                 protostr = "QRM_";
1828                 break;
1829         case HAMMER2_MSG_PROTO_BLK:
1830                 protostr = "BLK_";
1831                 break;
1832         case HAMMER2_MSG_PROTO_VOP:
1833                 protostr = "VOP_";
1834                 break;
1835         default:
1836                 snprintf(protobuf, sizeof(protobuf), "%x_",
1837                         (cmd & HAMMER2_MSGF_PROTOS) >> 20);
1838                 protostr = protobuf;
1839                 break;
1840         }
1841
1842         switch(cmd & (HAMMER2_MSGF_PROTOS |
1843                       HAMMER2_MSGF_CMDS |
1844                       HAMMER2_MSGF_SIZE)) {
1845         case HAMMER2_LNK_PAD:
1846                 cmdstr = "PAD";
1847                 break;
1848         case HAMMER2_LNK_PING:
1849                 cmdstr = "PING";
1850                 break;
1851         case HAMMER2_LNK_AUTH:
1852                 cmdstr = "AUTH";
1853                 break;
1854         case HAMMER2_LNK_CONN:
1855                 cmdstr = "CONN";
1856                 break;
1857         case HAMMER2_LNK_SPAN:
1858                 cmdstr = "SPAN";
1859                 break;
1860         case HAMMER2_LNK_ERROR:
1861                 if (cmd & HAMMER2_MSGF_DELETE)
1862                         cmdstr = "RETURN";
1863                 else
1864                         cmdstr = "RESULT";
1865                 break;
1866         case HAMMER2_DBG_SHELL:
1867                 cmdstr = "SHELL";
1868                 break;
1869         default:
1870                 snprintf(cmdbuf, sizeof(cmdbuf),
1871                          "%06x", (cmd & (HAMMER2_MSGF_PROTOS |
1872                                          HAMMER2_MSGF_CMDS |
1873                                          HAMMER2_MSGF_SIZE)));
1874                 cmdstr = cmdbuf;
1875                 break;
1876         }
1877         snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr);
1878         return (buf);
1879 }
1880
1881 const char *
1882 hammer2_msg_str(hammer2_msg_t *msg)
1883 {
1884         hammer2_state_t *state;
1885         static char buf[256];
1886         char errbuf[16];
1887         char statebuf[64];
1888         char flagbuf[64];
1889         const char *statestr;
1890         const char *errstr;
1891         uint32_t basecmd;
1892         int i;
1893
1894         /*
1895          * Parse the state
1896          */
1897         if ((state = msg->state) != NULL) {
1898                 basecmd = (state->rxcmd & HAMMER2_MSGF_REPLY) ?
1899                           state->txcmd : state->rxcmd;
1900                 snprintf(statebuf, sizeof(statebuf),
1901                          " %s=%s,L=%s%s,R=%s%s",
1902                          ((state->txcmd & HAMMER2_MSGF_REPLY) ?
1903                                 "rcvcmd" : "sndcmd"),
1904                          hammer2_basecmd_str(basecmd),
1905                          ((state->txcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1906                          ((state->txcmd & HAMMER2_MSGF_DELETE) ? "D" : ""),
1907                          ((state->rxcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1908                          ((state->rxcmd & HAMMER2_MSGF_DELETE) ? "D" : "")
1909                 );
1910                 statestr = statebuf;
1911         } else {
1912                 statestr = "";
1913         }
1914
1915         /*
1916          * Parse the error
1917          */
1918         switch(msg->any.head.error) {
1919         case 0:
1920                 errstr = "";
1921                 break;
1922         case HAMMER2_IOQ_ERROR_SYNC:
1923                 errstr = "err=IOQ:NOSYNC";
1924                 break;
1925         case HAMMER2_IOQ_ERROR_EOF:
1926                 errstr = "err=IOQ:STREAMEOF";
1927                 break;
1928         case HAMMER2_IOQ_ERROR_SOCK:
1929                 errstr = "err=IOQ:SOCKERR";
1930                 break;
1931         case HAMMER2_IOQ_ERROR_FIELD:
1932                 errstr = "err=IOQ:BADFIELD";
1933                 break;
1934         case HAMMER2_IOQ_ERROR_HCRC:
1935                 errstr = "err=IOQ:BADHCRC";
1936                 break;
1937         case HAMMER2_IOQ_ERROR_XCRC:
1938                 errstr = "err=IOQ:BADXCRC";
1939                 break;
1940         case HAMMER2_IOQ_ERROR_ACRC:
1941                 errstr = "err=IOQ:BADACRC";
1942                 break;
1943         case HAMMER2_IOQ_ERROR_STATE:
1944                 errstr = "err=IOQ:BADSTATE";
1945                 break;
1946         case HAMMER2_IOQ_ERROR_NOPEER:
1947                 errstr = "err=IOQ:PEERCONFIG";
1948                 break;
1949         case HAMMER2_IOQ_ERROR_NORKEY:
1950                 errstr = "err=IOQ:BADRKEY";
1951                 break;
1952         case HAMMER2_IOQ_ERROR_NOLKEY:
1953                 errstr = "err=IOQ:BADLKEY";
1954                 break;
1955         case HAMMER2_IOQ_ERROR_KEYXCHGFAIL:
1956                 errstr = "err=IOQ:BADKEYXCHG";
1957                 break;
1958         case HAMMER2_IOQ_ERROR_KEYFMT:
1959                 errstr = "err=IOQ:BADFMT";
1960                 break;
1961         case HAMMER2_IOQ_ERROR_BADURANDOM:
1962                 errstr = "err=IOQ:BADRANDOM";
1963                 break;
1964         case HAMMER2_IOQ_ERROR_MSGSEQ:
1965                 errstr = "err=IOQ:BADSEQ";
1966                 break;
1967         case HAMMER2_IOQ_ERROR_EALREADY:
1968                 errstr = "err=IOQ:DUPMSG";
1969                 break;
1970         case HAMMER2_IOQ_ERROR_TRANS:
1971                 errstr = "err=IOQ:BADTRANS";
1972                 break;
1973         case HAMMER2_MSG_ERR_NOSUPP:
1974                 errstr = "err=NOSUPPORT";
1975                 break;
1976         default:
1977                 snprintf(errbuf, sizeof(errbuf),
1978                          " err=%d", msg->any.head.error);
1979                 errstr = errbuf;
1980                 break;
1981         }
1982
1983         /*
1984          * Message flags
1985          */
1986         i = 0;
1987         if (msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1988                                  HAMMER2_MSGF_ABORT | HAMMER2_MSGF_REPLY)) {
1989                 flagbuf[i++] = '|';
1990                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1991                         flagbuf[i++] = 'C';
1992                 if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
1993                         flagbuf[i++] = 'D';
1994                 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
1995                         flagbuf[i++] = 'R';
1996                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT)
1997                         flagbuf[i++] = 'A';
1998         }
1999         flagbuf[i] = 0;
2000
2001         /*
2002          * Generate the buf
2003          */
2004         snprintf(buf, sizeof(buf),
2005                 "msg=%s%s %s id=%08x span=%08x %s",
2006                  hammer2_basecmd_str(msg->any.head.cmd),
2007                  flagbuf,
2008                  errstr,
2009                  (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
2010                  (uint32_t)(intmax_t)msg->any.head.spanid,  /* for brevity */
2011                  statestr);
2012
2013         return(buf);
2014 }