1e33c809e243bf79dead34ef659a1a6984335f5a
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
41 /*
42  * Indexed messages are stored in a red-black tree indexed by their
43  * msgid.  Only persistent messages are indexed.
44  */
45 int
46 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
47 {
48         if (state1->spanid < state2->spanid)
49                 return(-1);
50         if (state1->spanid > state2->spanid)
51                 return(1);
52         if (state1->msgid < state2->msgid)
53                 return(-1);
54         if (state1->msgid > state2->msgid)
55                 return(1);
56         return(0);
57 }
58
59 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
60
61 /*
62  * Initialize a low-level ioq
63  */
64 void
65 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
66 {
67         bzero(ioq, sizeof(*ioq));
68         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
69         TAILQ_INIT(&ioq->msgq);
70 }
71
72 /*
73  * Cleanup queue.
74  *
75  * caller holds iocom->mtx.
76  */
77 void
78 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
79 {
80         hammer2_msg_t *msg;
81
82         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
83                 assert(0);      /* shouldn't happen */
84                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
85                 hammer2_msg_free(iocom, msg);
86         }
87         if ((msg = ioq->msg) != NULL) {
88                 ioq->msg = NULL;
89                 hammer2_msg_free(iocom, msg);
90         }
91 }
92
93 /*
94  * Initialize a low-level communications channel.
95  *
96  * NOTE: The state_func() is called at least once from the loop and can be
97  *       re-armed via hammer2_iocom_restate().
98  */
99 void
100 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
101                    void (*state_func)(hammer2_iocom_t *),
102                    void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
103                    void (*altmsg_func)(hammer2_iocom_t *))
104 {
105         bzero(iocom, sizeof(*iocom));
106
107         iocom->state_callback = state_func;
108         iocom->rcvmsg_callback = rcvmsg_func;
109         iocom->altmsg_callback = altmsg_func;
110
111         pthread_mutex_init(&iocom->mtx, NULL);
112         RB_INIT(&iocom->staterd_tree);
113         RB_INIT(&iocom->statewr_tree);
114         TAILQ_INIT(&iocom->freeq);
115         TAILQ_INIT(&iocom->freeq_aux);
116         TAILQ_INIT(&iocom->addrq);
117         TAILQ_INIT(&iocom->txmsgq);
118         iocom->sock_fd = sock_fd;
119         iocom->alt_fd = alt_fd;
120         iocom->flags = HAMMER2_IOCOMF_RREQ;
121         if (state_func)
122                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
123         hammer2_ioq_init(iocom, &iocom->ioq_rx);
124         hammer2_ioq_init(iocom, &iocom->ioq_tx);
125         if (pipe(iocom->wakeupfds) < 0)
126                 assert(0);
127         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
128         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
129
130         /*
131          * Negotiate session crypto synchronously.  This will mark the
132          * connection as error'd if it fails.
133          */
134         hammer2_crypto_negotiate(iocom);
135
136         /*
137          * Make sure our fds are set to non-blocking for the iocom core.
138          */
139         if (sock_fd >= 0)
140                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
141 #if 0
142         /* if line buffered our single fgets() should be fine */
143         if (alt_fd >= 0)
144                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
145 #endif
146 }
147
148 /*
149  * May only be called from a callback from iocom_core.
150  *
151  * Adjust state machine functions, set flags to guarantee that both
152  * the recevmsg_func and the sendmsg_func is called at least once.
153  */
154 void
155 hammer2_iocom_restate(hammer2_iocom_t *iocom,
156                    void (*state_func)(hammer2_iocom_t *),
157                    void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
158                    void (*altmsg_func)(hammer2_iocom_t *))
159 {
160         iocom->state_callback = state_func;
161         iocom->rcvmsg_callback = rcvmsg_func;
162         iocom->altmsg_callback = altmsg_func;
163         if (state_func)
164                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
165         else
166                 iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
167 }
168
169 /*
170  * Cleanup a terminating iocom.
171  *
172  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
173  * from all possible references to it.
174  */
175 void
176 hammer2_iocom_done(hammer2_iocom_t *iocom)
177 {
178         hammer2_msg_t *msg;
179
180         if (iocom->sock_fd >= 0) {
181                 close(iocom->sock_fd);
182                 iocom->sock_fd = -1;
183         }
184         if (iocom->alt_fd >= 0) {
185                 close(iocom->alt_fd);
186                 iocom->alt_fd = -1;
187         }
188         hammer2_ioq_done(iocom, &iocom->ioq_rx);
189         hammer2_ioq_done(iocom, &iocom->ioq_tx);
190         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
191                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
192                 free(msg);
193         }
194         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
195                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
196                 free(msg->aux_data);
197                 msg->aux_data = NULL;
198                 free(msg);
199         }
200         if (iocom->wakeupfds[0] >= 0) {
201                 close(iocom->wakeupfds[0]);
202                 iocom->wakeupfds[0] = -1;
203         }
204         if (iocom->wakeupfds[1] >= 0) {
205                 close(iocom->wakeupfds[1]);
206                 iocom->wakeupfds[1] = -1;
207         }
208         pthread_mutex_destroy(&iocom->mtx);
209 }
210
211 /*
212  * Allocate a new one-way message.
213  */
214 hammer2_msg_t *
215 hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
216 {
217         hammer2_msg_t *msg;
218         int hbytes;
219
220         pthread_mutex_lock(&iocom->mtx);
221         if (aux_size) {
222                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
223                            ~HAMMER2_MSG_ALIGNMASK;
224                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
225                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
226         } else {
227                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
228                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
229         }
230         pthread_mutex_unlock(&iocom->mtx);
231         if (msg == NULL) {
232                 msg = malloc(sizeof(*msg));
233                 bzero(msg, sizeof(*msg));
234                 msg->aux_data = NULL;
235                 msg->aux_size = 0;
236         }
237         if (msg->aux_size != aux_size) {
238                 if (msg->aux_data) {
239                         free(msg->aux_data);
240                         msg->aux_data = NULL;
241                         msg->aux_size = 0;
242                 }
243                 if (aux_size) {
244                         msg->aux_data = malloc(aux_size);
245                         msg->aux_size = aux_size;
246                 }
247         }
248         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
249         if (hbytes)
250                 bzero(&msg->any.head, hbytes);
251         msg->hdr_size = hbytes;
252         msg->any.head.cmd = cmd;
253         msg->any.head.aux_descr = 0;
254         msg->any.head.aux_crc = 0;
255
256         return (msg);
257 }
258
259 /*
260  * Free a message so it can be reused afresh.
261  *
262  * NOTE: aux_size can be 0 with a non-NULL aux_data.
263  */
264 static
265 void
266 hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
267 {
268         msg->state = NULL;
269         if (msg->aux_data)
270                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
271         else
272                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
273 }
274
275 void
276 hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
277 {
278         pthread_mutex_lock(&iocom->mtx);
279         hammer2_msg_free_locked(iocom, msg);
280         pthread_mutex_unlock(&iocom->mtx);
281 }
282
283 /*
284  * I/O core loop for an iocom.
285  *
286  * Thread localized, iocom->mtx not held.
287  */
288 void
289 hammer2_iocom_core(hammer2_iocom_t *iocom)
290 {
291         struct pollfd fds[3];
292         char dummybuf[256];
293         hammer2_msg_t *msg;
294         int timeout;
295         int count;
296         int wi; /* wakeup pipe */
297         int si; /* socket */
298         int ai; /* alt bulk path socket */
299
300         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
301                 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
302                                      HAMMER2_IOCOMF_WWORK |
303                                      HAMMER2_IOCOMF_PWORK |
304                                      HAMMER2_IOCOMF_SWORK |
305                                      HAMMER2_IOCOMF_ARWORK |
306                                      HAMMER2_IOCOMF_AWWORK)) == 0) {
307                         /*
308                          * Only poll if no immediate work is pending.
309                          * Otherwise we are just wasting our time calling
310                          * poll.
311                          */
312                         timeout = 5000;
313
314                         count = 0;
315                         wi = -1;
316                         si = -1;
317                         ai = -1;
318
319                         /*
320                          * Always check the inter-thread pipe, e.g.
321                          * for iocom->txmsgq work.
322                          */
323                         wi = count++;
324                         fds[wi].fd = iocom->wakeupfds[0];
325                         fds[wi].events = POLLIN;
326                         fds[wi].revents = 0;
327
328                         /*
329                          * Check the socket input/output direction as
330                          * requested
331                          */
332                         if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
333                                             HAMMER2_IOCOMF_WREQ)) {
334                                 si = count++;
335                                 fds[si].fd = iocom->sock_fd;
336                                 fds[si].events = 0;
337                                 fds[si].revents = 0;
338
339                                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
340                                         fds[si].events |= POLLIN;
341                                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
342                                         fds[si].events |= POLLOUT;
343                         }
344
345                         /*
346                          * Check the alternative fd for work.
347                          */
348                         if (iocom->alt_fd >= 0) {
349                                 ai = count++;
350                                 fds[ai].fd = iocom->alt_fd;
351                                 fds[ai].events = POLLIN;
352                                 fds[ai].revents = 0;
353                         }
354                         poll(fds, count, timeout);
355
356                         if (wi >= 0 && (fds[wi].revents & POLLIN))
357                                 iocom->flags |= HAMMER2_IOCOMF_PWORK;
358                         if (si >= 0 && (fds[si].revents & POLLIN))
359                                 iocom->flags |= HAMMER2_IOCOMF_RWORK;
360                         if (si >= 0 && (fds[si].revents & POLLOUT))
361                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
362                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
363                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
364                         if (ai >= 0 && (fds[ai].revents & POLLIN))
365                                 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
366                 } else {
367                         /*
368                          * Always check the pipe
369                          */
370                         iocom->flags |= HAMMER2_IOCOMF_PWORK;
371                 }
372
373                 if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
374                         iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
375                         iocom->state_callback(iocom);
376                 }
377
378                 /*
379                  * Pending message queues from other threads wake us up
380                  * with a write to the wakeupfds[] pipe.  We have to clear
381                  * the pipe with a dummy read.
382                  */
383                 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
384                         iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
385                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
386                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
387                         iocom->flags |= HAMMER2_IOCOMF_WWORK;
388                         if (TAILQ_FIRST(&iocom->txmsgq))
389                                 hammer2_iocom_flush1(iocom);
390                 }
391
392                 /*
393                  * Message write sequencing
394                  */
395                 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
396                         hammer2_iocom_flush1(iocom);
397
398                 /*
399                  * Message read sequencing.  Run this after the write
400                  * sequencing in case the write sequencing allowed another
401                  * auto-DELETE to occur on the read side.
402                  */
403                 if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
404                         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
405                                (msg = hammer2_ioq_read(iocom)) != NULL) {
406                                 if (DebugOpt) {
407                                         fprintf(stderr, "receive %s\n",
408                                                 hammer2_msg_str(msg));
409                                 }
410                                 iocom->rcvmsg_callback(iocom, msg);
411                                 hammer2_state_cleanuprx(iocom, msg);
412                         }
413                 }
414
415                 if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
416                         iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
417                         iocom->altmsg_callback(iocom);
418                 }
419         }
420 }
421
422 /*
423  * Read the next ready message from the ioq, issuing I/O if needed.
424  * Caller should retry on a read-event when NULL is returned.
425  *
426  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
427  * be returned for each open transaction, then the ioq and iocom
428  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
429  * msg will be returned as the final message.  The caller should not call
430  * us again after the final message is returned.
431  *
432  * Thread localized, iocom->mtx not held.
433  */
434 hammer2_msg_t *
435 hammer2_ioq_read(hammer2_iocom_t *iocom)
436 {
437         hammer2_ioq_t *ioq = &iocom->ioq_rx;
438         hammer2_msg_t *msg;
439         hammer2_msg_hdr_t *head;
440         hammer2_state_t *state;
441         ssize_t n;
442         size_t bytes;
443         size_t nmax;
444         uint32_t xcrc32;
445         int error;
446
447 again:
448         iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
449
450         /*
451          * If a message is already pending we can just remove and
452          * return it.  Message state has already been processed.
453          */
454         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
455                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
456                 return (msg);
457         }
458
459         if (ioq->error)
460                 goto skip;
461
462         /*
463          * Message read in-progress (msg is NULL at the moment).  We don't
464          * allocate a msg until we have its core header.
465          */
466         bytes = ioq->fifo_end - ioq->fifo_beg;
467         nmax = sizeof(ioq->buf) - ioq->fifo_end;
468         msg = ioq->msg;
469
470         switch(ioq->state) {
471         case HAMMER2_MSGQ_STATE_HEADER1:
472                 /*
473                  * Load the primary header, fail on any non-trivial read
474                  * error or on EOF.  Since the primary header is the same
475                  * size is the message alignment it will never straddle
476                  * the end of the buffer.
477                  */
478                 if (bytes < (int)sizeof(msg->any.head)) {
479                         n = read(iocom->sock_fd,
480                                  ioq->buf + ioq->fifo_end,
481                                  nmax);
482                         if (n <= 0) {
483                                 if (n == 0) {
484                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
485                                         break;
486                                 }
487                                 if (errno != EINTR &&
488                                     errno != EINPROGRESS &&
489                                     errno != EAGAIN) {
490                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
491                                         break;
492                                 }
493                                 n = 0;
494                                 /* fall through */
495                         }
496                         ioq->fifo_end += n;
497                         bytes += n;
498                         nmax -= n;
499                 }
500
501                 /*
502                  * Insufficient data accumulated (msg is NULL, caller will
503                  * retry on event).
504                  */
505                 assert(msg == NULL);
506                 if (bytes < (int)sizeof(msg->any.head))
507                         break;
508
509                 /*
510                  * Calculate the header, decrypt data received so far.
511                  * Data will be decrypted in-place.  Partial blocks are
512                  * not immediately decrypted.
513                  *
514                  * WARNING!  The header might be in the wrong endian, we
515                  *           do not fix it up until we get the entire
516                  *           extended header.
517                  */
518                 hammer2_crypto_decrypt(iocom, ioq);
519                 head = (void *)(ioq->buf + ioq->fifo_beg);
520
521                 /*
522                  * Check and fixup the core header.  Note that the icrc
523                  * has to be calculated before any fixups, but the crc
524                  * fields in the msg may have to be swapped like everything
525                  * else.
526                  */
527                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
528                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
529                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
530                         break;
531                 }
532
533                 /*
534                  * Calculate the full header size and aux data size
535                  */
536                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
537                         ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
538                                       HAMMER2_MSG_ALIGN;
539                         ioq->abytes = bswap32(head->aux_bytes) *
540                                       HAMMER2_MSG_ALIGN;
541                 } else {
542                         ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
543                                       HAMMER2_MSG_ALIGN;
544                         ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
545                 }
546                 if (ioq->hbytes < sizeof(msg->any.head) ||
547                     ioq->hbytes > sizeof(msg->any) ||
548                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
549                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
550                         break;
551                 }
552
553                 /*
554                  * Allocate the message, the next state will fill it in.
555                  */
556                 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
557                 ioq->msg = msg;
558
559                 /*
560                  * Fall through to the next state.  Make sure that the
561                  * extended header does not straddle the end of the buffer.
562                  * We still want to issue larger reads into our buffer,
563                  * book-keeping is easier if we don't bcopy() yet.
564                  */
565                 if (bytes + nmax < ioq->hbytes) {
566                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
567                         ioq->fifo_cdx -= ioq->fifo_beg;
568                         ioq->fifo_beg = 0;
569                         ioq->fifo_end = bytes;
570                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
571                 }
572                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
573                 /* fall through */
574         case HAMMER2_MSGQ_STATE_HEADER2:
575                 /*
576                  * Fill out the extended header.
577                  */
578                 assert(msg != NULL);
579                 if (bytes < ioq->hbytes) {
580                         n = read(iocom->sock_fd,
581                                  ioq->buf + ioq->fifo_end,
582                                  nmax);
583                         if (n <= 0) {
584                                 if (n == 0) {
585                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
586                                         break;
587                                 }
588                                 if (errno != EINTR &&
589                                     errno != EINPROGRESS &&
590                                     errno != EAGAIN) {
591                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
592                                         break;
593                                 }
594                                 n = 0;
595                                 /* fall through */
596                         }
597                         ioq->fifo_end += n;
598                         bytes += n;
599                         nmax -= n;
600                 }
601
602                 /*
603                  * Insufficient data accumulated (set msg NULL so caller will
604                  * retry on event).
605                  */
606                 if (bytes < ioq->hbytes) {
607                         msg = NULL;
608                         break;
609                 }
610
611                 /*
612                  * Calculate the extended header, decrypt data received
613                  * so far.  Handle endian-conversion for the entire extended
614                  * header.
615                  */
616                 hammer2_crypto_decrypt(iocom, ioq);
617                 head = (void *)(ioq->buf + ioq->fifo_beg);
618
619                 /*
620                  * Check the CRC.
621                  */
622                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
623                         xcrc32 = bswap32(head->hdr_crc);
624                 else
625                         xcrc32 = head->hdr_crc;
626                 head->hdr_crc = 0;
627                 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
628                         ioq->error = HAMMER2_IOQ_ERROR_XCRC;
629                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
630                                 xcrc32, hammer2_icrc32(head, ioq->hbytes),
631                                 hammer2_msg_str(msg));
632                         assert(0);
633                         break;
634                 }
635                 head->hdr_crc = xcrc32;
636
637                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
638                         hammer2_bswap_head(head);
639                 }
640
641                 /*
642                  * Copy the extended header into the msg and adjust the
643                  * FIFO.
644                  */
645                 bcopy(head, &msg->any, ioq->hbytes);
646
647                 /*
648                  * We are either done or we fall-through.
649                  */
650                 if (ioq->abytes == 0) {
651                         ioq->fifo_beg += ioq->hbytes;
652                         break;
653                 }
654
655                 /*
656                  * Must adjust nmax and bytes (and the state) when falling
657                  * through.
658                  */
659                 ioq->fifo_beg += ioq->hbytes;
660                 nmax -= ioq->hbytes;
661                 bytes -= ioq->hbytes;
662                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
663                 /* fall through */
664         case HAMMER2_MSGQ_STATE_AUXDATA1:
665                 /*
666                  * Copy the partial or complete payload from remaining
667                  * bytes in the FIFO.  We have to fall-through either
668                  * way so we can check the crc.
669                  *
670                  * Adjust msg->aux_size to the final actual value.
671                  */
672                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
673                 if (ioq->already > ioq->abytes)
674                         ioq->already = ioq->abytes;
675                 if (bytes >= ioq->abytes) {
676                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
677                               ioq->abytes);
678                         msg->aux_size = ioq->abytes;
679                         ioq->fifo_beg += ioq->abytes;
680                         if (ioq->fifo_cdx < ioq->fifo_beg)
681                                 ioq->fifo_cdx = ioq->fifo_beg;
682                         bytes -= ioq->abytes;
683                 } else if (bytes) {
684                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
685                               bytes);
686                         msg->aux_size = bytes;
687                         ioq->fifo_beg += bytes;
688                         if (ioq->fifo_cdx < ioq->fifo_beg)
689                                 ioq->fifo_cdx = ioq->fifo_beg;
690                         bytes = 0;
691                 } else {
692                         msg->aux_size = 0;
693                 }
694                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
695                 /* fall through */
696         case HAMMER2_MSGQ_STATE_AUXDATA2:
697                 /*
698                  * Read the remainder of the payload directly into the
699                  * msg->aux_data buffer.
700                  */
701                 assert(msg);
702                 if (msg->aux_size < ioq->abytes) {
703                         assert(bytes == 0);
704                         n = read(iocom->sock_fd,
705                                  msg->aux_data + msg->aux_size,
706                                  ioq->abytes - msg->aux_size);
707                         if (n <= 0) {
708                                 if (n == 0) {
709                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
710                                         break;
711                                 }
712                                 if (errno != EINTR &&
713                                     errno != EINPROGRESS &&
714                                     errno != EAGAIN) {
715                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
716                                         break;
717                                 }
718                                 n = 0;
719                                 /* fall through */
720                         }
721                         msg->aux_size += n;
722                 }
723
724                 /*
725                  * Insufficient data accumulated (set msg NULL so caller will
726                  * retry on event).
727                  */
728                 if (msg->aux_size < ioq->abytes) {
729                         msg = NULL;
730                         break;
731                 }
732                 assert(msg->aux_size == ioq->abytes);
733                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
734
735                 /*
736                  * Check aux_crc, then we are done.
737                  */
738                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
739                 if (xcrc32 != msg->any.head.aux_crc) {
740                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
741                         break;
742                 }
743                 break;
744         case HAMMER2_MSGQ_STATE_ERROR:
745                 /*
746                  * Continued calls to drain recorded transactions (returning
747                  * a LNK_ERROR for each one), before we return the final
748                  * LNK_ERROR.
749                  */
750                 assert(msg == NULL);
751                 break;
752         default:
753                 /*
754                  * We don't double-return errors, the caller should not
755                  * have called us again after getting an error msg.
756                  */
757                 assert(0);
758                 break;
759         }
760
761         /*
762          * Check the message sequence.  The iv[] should prevent any
763          * possibility of a replay but we add this check anyway.
764          */
765         if (msg && ioq->error == 0) {
766                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
767                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
768                 } else {
769                         ++ioq->seq;
770                 }
771         }
772
773         /*
774          * Process transactional state for the message.
775          */
776         if (msg && ioq->error == 0) {
777                 error = hammer2_state_msgrx(iocom, msg);
778                 if (error) {
779                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
780                                 hammer2_msg_free(iocom, msg);
781                                 goto again;
782                         }
783                         ioq->error = error;
784                 }
785         }
786
787         /*
788          * Handle error, RREQ, or completion
789          *
790          * NOTE: nmax and bytes are invalid at this point, we don't bother
791          *       to update them when breaking out.
792          */
793         if (ioq->error) {
794 skip:
795                 /*
796                  * An unrecoverable error causes all active receive
797                  * transactions to be terminated with a LNK_ERROR message.
798                  *
799                  * Once all active transactions are exhausted we set the
800                  * iocom ERROR flag and return a non-transactional LNK_ERROR
801                  * message, which should cause master processing loops to
802                  * terminate.
803                  */
804                 assert(ioq->msg == msg);
805                 if (msg) {
806                         hammer2_msg_free(iocom, msg);
807                         ioq->msg = NULL;
808                 }
809
810                 /*
811                  * No more I/O read processing
812                  */
813                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
814
815                 /*
816                  * Simulate a remote LNK_ERROR DELETE msg for any open
817                  * transactions, ending with a final non-transactional
818                  * LNK_ERROR (that the session can detect) when no
819                  * transactions remain.
820                  */
821                 msg = hammer2_msg_alloc(iocom, 0, 0);
822                 bzero(&msg->any.head, sizeof(msg->any.head));
823                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
824                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
825                 msg->any.head.error = ioq->error;
826
827                 pthread_mutex_lock(&iocom->mtx);
828                 hammer2_iocom_drain(iocom);
829                 if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
830                         /*
831                          * Active remote transactions are still present.
832                          * Simulate the other end sending us a DELETE.
833                          */
834                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
835                                 hammer2_msg_free(iocom, msg);
836                                 msg = NULL;
837                         } else {
838                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
839                                 msg->state = state;
840                                 msg->any.head.spanid = state->spanid;
841                                 msg->any.head.msgid = state->msgid;
842                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
843                                                      HAMMER2_MSGF_DELETE;
844                         }
845                 } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
846                         /*
847                          * Active local transactions are still present.
848                          * Simulate the other end sending us a DELETE.
849                          */
850                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
851                                 hammer2_msg_free(iocom, msg);
852                                 msg = NULL;
853                         } else {
854                                 msg->state = state;
855                                 msg->any.head.spanid = state->spanid;
856                                 msg->any.head.msgid = state->msgid;
857                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
858                                                      HAMMER2_MSGF_DELETE |
859                                                      HAMMER2_MSGF_REPLY;
860                                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
861                                         msg->any.head.cmd |=
862                                                      HAMMER2_MSGF_CREATE;
863                                 }
864                         }
865                 } else {
866                         /*
867                          * No active local or remote transactions remain.
868                          * Generate a final LNK_ERROR and flag EOF.
869                          */
870                         msg->state = NULL;
871                         iocom->flags |= HAMMER2_IOCOMF_EOF;
872                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
873                 }
874                 pthread_mutex_unlock(&iocom->mtx);
875
876                 /*
877                  * For the iocom error case we want to set RWORK to indicate
878                  * that more messages might be pending.
879                  *
880                  * It is possible to return NULL when there is more work to
881                  * do because each message has to be DELETEd in both
882                  * directions before we continue on with the next (though
883                  * this could be optimized).  The transmit direction will
884                  * re-set RWORK.
885                  */
886                 if (msg)
887                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
888         } else if (msg == NULL) {
889                 /*
890                  * Insufficient data received to finish building the message,
891                  * set RREQ and return NULL.
892                  *
893                  * Leave ioq->msg intact.
894                  * Leave the FIFO intact.
895                  */
896                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
897         } else {
898                 /*
899                  * Return msg.
900                  *
901                  * The fifo has already been advanced past the message.
902                  * Trivially reset the FIFO indices if possible.
903                  *
904                  * clear the FIFO if it is now empty and set RREQ to wait
905                  * for more from the socket.  If the FIFO is not empty set
906                  * TWORK to bypass the poll so we loop immediately.
907                  */
908                 if (ioq->fifo_beg == ioq->fifo_end) {
909                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
910                         ioq->fifo_cdx = 0;
911                         ioq->fifo_beg = 0;
912                         ioq->fifo_end = 0;
913                 } else {
914                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
915                 }
916                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
917                 ioq->msg = NULL;
918         }
919         return (msg);
920 }
921
922 /*
923  * Calculate the header and data crc's and write a low-level message to
924  * the connection.  If aux_crc is non-zero the aux_data crc is already
925  * assumed to have been set.
926  *
927  * A non-NULL msg is added to the queue but not necessarily flushed.
928  * Calling this function with msg == NULL will get a flush going.
929  *
930  * Caller must hold iocom->mtx.
931  */
932 void
933 hammer2_iocom_flush1(hammer2_iocom_t *iocom)
934 {
935         hammer2_ioq_t *ioq = &iocom->ioq_tx;
936         hammer2_msg_t *msg;
937         uint32_t xcrc32;
938         int hbytes;
939         hammer2_msg_queue_t tmpq;
940
941         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
942         TAILQ_INIT(&tmpq);
943         pthread_mutex_lock(&iocom->mtx);
944         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
945                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
946                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
947         }
948         pthread_mutex_unlock(&iocom->mtx);
949
950         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
951                 /*
952                  * Process terminal connection errors.
953                  */
954                 TAILQ_REMOVE(&tmpq, msg, qentry);
955                 if (ioq->error) {
956                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
957                         ++ioq->msgcount;
958                         continue;
959                 }
960
961                 /*
962                  * Finish populating the msg fields.  The salt ensures that
963                  * the iv[] array is ridiculously randomized and we also
964                  * re-seed our PRNG every 32768 messages just to be sure.
965                  */
966                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
967                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
968                 ++ioq->seq;
969                 if ((ioq->seq & 32767) == 0)
970                         srandomdev();
971
972                 /*
973                  * Calculate aux_crc if 0, then calculate hdr_crc.
974                  */
975                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
976                         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
977                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
978                         msg->any.head.aux_crc = xcrc32;
979                 }
980                 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
981                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
982
983                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
984                          HAMMER2_MSG_ALIGN;
985                 msg->any.head.hdr_crc = 0;
986                 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
987
988                 /*
989                  * Enqueue the message (the flush codes handles stream
990                  * encryption).
991                  */
992                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
993                 ++ioq->msgcount;
994         }
995         hammer2_iocom_flush2(iocom);
996 }
997
998 /*
999  * Thread localized, iocom->mtx not held by caller.
1000  */
1001 void
1002 hammer2_iocom_flush2(hammer2_iocom_t *iocom)
1003 {
1004         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1005         hammer2_msg_t *msg;
1006         ssize_t nmax;
1007         ssize_t omax;
1008         ssize_t nact;
1009         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
1010         size_t hbytes;
1011         size_t abytes;
1012         size_t hoff;
1013         size_t aoff;
1014         int n;
1015
1016         if (ioq->error) {
1017                 hammer2_iocom_drain(iocom);
1018                 return;
1019         }
1020
1021         /*
1022          * Pump messages out the connection by building an iovec.
1023          *
1024          * ioq->hbytes/ioq->abytes tracks how much of the first message
1025          * in the queue has been successfully written out, so we can
1026          * resume writing.
1027          */
1028         n = 0;
1029         nmax = 0;
1030         hoff = ioq->hbytes;
1031         aoff = ioq->abytes;
1032
1033         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1034                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1035                          HAMMER2_MSG_ALIGN;
1036                 abytes = msg->aux_size;
1037                 assert(hoff <= hbytes && aoff <= abytes);
1038
1039                 if (hoff < hbytes) {
1040                         iov[n].iov_base = (char *)&msg->any.head + hoff;
1041                         iov[n].iov_len = hbytes - hoff;
1042                         nmax += hbytes - hoff;
1043                         ++n;
1044                         if (n == HAMMER2_IOQ_MAXIOVEC)
1045                                 break;
1046                 }
1047                 if (aoff < abytes) {
1048                         assert(msg->aux_data != NULL);
1049                         iov[n].iov_base = (char *)msg->aux_data + aoff;
1050                         iov[n].iov_len = abytes - aoff;
1051                         nmax += abytes - aoff;
1052                         ++n;
1053                         if (n == HAMMER2_IOQ_MAXIOVEC)
1054                                 break;
1055                 }
1056                 hoff = 0;
1057                 aoff = 0;
1058         }
1059         if (n == 0)
1060                 return;
1061
1062         /*
1063          * Encrypt and write the data.  The crypto code will move the
1064          * data into the fifo and adjust the iov as necessary.  If
1065          * encryption is disabled the iov is left alone.
1066          *
1067          * May return a smaller iov (thus a smaller n), with aggregated
1068          * chunks.  May reduce nmax to what fits in the FIFO.
1069          */
1070         omax = nmax;
1071         n = hammer2_crypto_encrypt(iocom, ioq, iov, n, &nmax);
1072
1073         /*
1074          * Execute the writev() then figure out what happened.
1075          */
1076         nact = writev(iocom->sock_fd, iov, n);
1077         if (nact < 0) {
1078                 if (errno != EINTR &&
1079                     errno != EINPROGRESS &&
1080                     errno != EAGAIN) {
1081                         /*
1082                          * Fatal write error
1083                          */
1084                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
1085                         hammer2_iocom_drain(iocom);
1086                 } else {
1087                         /*
1088                          * Wait for socket buffer space
1089                          */
1090                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
1091                 }
1092                 return;
1093         }
1094
1095         /*
1096          * Indicate bytes written successfully.
1097          *
1098          * If we were unable to write the entire iov array then set WREQ
1099          * to wait for more socket buffer space.
1100          *
1101          * If the FIFO space was insufficient to fully drain all messages
1102          * set WWORK to cause the core to call us again for the next batch.
1103          */
1104         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
1105         if (nact != nmax)
1106                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1107
1108         /*
1109          * Clean out the transmit queue based on what we successfully
1110          * sent.  ioq->hbytes/abytes represents the portion of the first
1111          * message previously sent.
1112          */
1113         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1114                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1115                          HAMMER2_MSG_ALIGN;
1116                 abytes = msg->aux_size;
1117
1118                 if ((size_t)nact < hbytes - ioq->hbytes) {
1119                         ioq->hbytes += nact;
1120                         /* nact = 0; */
1121                         break;
1122                 }
1123                 nact -= hbytes - ioq->hbytes;
1124                 ioq->hbytes = hbytes;
1125                 if ((size_t)nact < abytes - ioq->abytes) {
1126                         ioq->abytes += nact;
1127                         /* nact = 0; */
1128                         break;
1129                 }
1130                 nact -= abytes - ioq->abytes;
1131
1132                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1133                 --ioq->msgcount;
1134                 ioq->hbytes = 0;
1135                 ioq->abytes = 0;
1136
1137                 hammer2_state_cleanuptx(iocom, msg);
1138         }
1139
1140         /*
1141          * If more messages are pending on WREQ wasn't set we must
1142          * ensure that WWORK gets set.
1143          */
1144         if (msg && (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0)
1145                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
1146         assert(nact == 0);
1147         if (ioq->error) {
1148                 hammer2_iocom_drain(iocom);
1149         }
1150 }
1151
1152 /*
1153  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1154  * write events will occur.  We don't kill read msgs because we want
1155  * the caller to pull off our contrived terminal error msg to detect
1156  * the connection failure.
1157  *
1158  * Thread localized, iocom->mtx not held by caller.
1159  */
1160 void
1161 hammer2_iocom_drain(hammer2_iocom_t *iocom)
1162 {
1163         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1164         hammer2_msg_t *msg;
1165
1166         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1167         ioq->hbytes = 0;
1168         ioq->abytes = 0;
1169
1170         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1171                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1172                 --ioq->msgcount;
1173                 hammer2_state_cleanuptx(iocom, msg);
1174         }
1175 }
1176
1177 /*
1178  * Write a message to an iocom, with additional state processing.
1179  */
1180 void
1181 hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
1182                   void (*func)(hammer2_state_t *, hammer2_msg_t *),
1183                   void *data,
1184                   hammer2_state_t **statep)
1185 {
1186         hammer2_state_t *state;
1187         char dummy;
1188
1189         /*
1190          * Handle state processing, create state if necessary.
1191          */
1192         pthread_mutex_lock(&iocom->mtx);
1193         if ((state = msg->state) != NULL) {
1194                 /*
1195                  * Existing transaction (could be reply).  It is also
1196                  * possible for this to be the first reply (CREATE is set),
1197                  * in which case we populate state->txcmd.
1198                  */
1199                 msg->any.head.msgid = state->msgid;
1200                 msg->any.head.spanid = state->spanid;
1201                 if (func) {
1202                         state->func = func;
1203                         state->any.any = data;
1204                 }
1205                 assert(((state->txcmd ^ msg->any.head.cmd) &
1206                         HAMMER2_MSGF_REPLY) == 0);
1207                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1208                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1209         } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
1210                 /*
1211                  * No existing state and CREATE is set, create new
1212                  * state for outgoing command.  This can't happen if
1213                  * REPLY is set as the state would already exist for
1214                  * a transaction reply.
1215                  */
1216                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1217
1218                 state = malloc(sizeof(*state));
1219                 bzero(state, sizeof(*state));
1220                 state->iocom = iocom;
1221                 state->flags = HAMMER2_STATE_DYNAMIC;
1222                 state->msg = msg;
1223                 state->msgid = (uint64_t)(uintptr_t)state;
1224                 state->spanid = msg->any.head.spanid;
1225                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1226                 state->rxcmd = HAMMER2_MSGF_REPLY;
1227                 state->func = func;
1228                 state->any.any = data;
1229                 pthread_mutex_lock(&iocom->mtx);
1230                 RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
1231                 pthread_mutex_unlock(&iocom->mtx);
1232                 state->flags |= HAMMER2_STATE_INSERTED;
1233                 msg->state = state;
1234                 msg->any.head.msgid = state->msgid;
1235                 /* spanid set by caller */
1236         } else {
1237                 msg->any.head.msgid = 0;
1238                 /* spanid set by caller */
1239         }
1240
1241         if (statep)
1242                 *statep = state;
1243
1244         /*
1245          * Queue it for output, wake up the I/O pthread.  Note that the
1246          * I/O thread is responsible for generating the CRCs and encryption.
1247          */
1248         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1249         dummy = 0;
1250         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1251         pthread_mutex_unlock(&iocom->mtx);
1252 }
1253
1254 /*
1255  * This is a shortcut to formulate a reply to msg with a simple error code,
1256  * It can reply to and terminate a transaction, or it can reply to a one-way
1257  * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
1258  * the error code (which can be 0).  Not all transactions are terminated
1259  * with HAMMER2_LNK_ERROR status (the low level only cares about the
1260  * MSGF_DELETE flag), but most are.
1261  *
1262  * Replies to one-way messages are a bit of an oxymoron but the feature
1263  * is used by the debug (DBG) protocol.
1264  *
1265  * The reply contains no extended data.
1266  */
1267 void
1268 hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1269 {
1270         hammer2_state_t *state = msg->state;
1271         hammer2_msg_t *nmsg;
1272         uint32_t cmd;
1273
1274
1275         /*
1276          * Reply with a simple error code and terminate the transaction.
1277          */
1278         cmd = HAMMER2_LNK_ERROR;
1279
1280         /*
1281          * Check if our direction has even been initiated yet, set CREATE.
1282          *
1283          * Check what direction this is (command or reply direction).  Note
1284          * that txcmd might not have been initiated yet.
1285          *
1286          * If our direction has already been closed we just return without
1287          * doing anything.
1288          */
1289         if (state) {
1290                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1291                         return;
1292                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1293                         cmd |= HAMMER2_MSGF_CREATE;
1294                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1295                         cmd |= HAMMER2_MSGF_REPLY;
1296                 cmd |= HAMMER2_MSGF_DELETE;
1297         } else {
1298                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1299                         cmd |= HAMMER2_MSGF_REPLY;
1300         }
1301
1302         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1303         nmsg->any.head.error = error;
1304         nmsg->state = msg->state;
1305         hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
1306 }
1307
1308 /*
1309  * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
1310  * we are generating a streaming reply or an intermediate acknowledgement
1311  * of some sort as part of the higher level protocol, with more to come
1312  * later.
1313  */
1314 void
1315 hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1316 {
1317         hammer2_state_t *state = msg->state;
1318         hammer2_msg_t *nmsg;
1319         uint32_t cmd;
1320
1321
1322         /*
1323          * Reply with a simple error code and terminate the transaction.
1324          */
1325         cmd = HAMMER2_LNK_ERROR;
1326
1327         /*
1328          * Check if our direction has even been initiated yet, set CREATE.
1329          *
1330          * Check what direction this is (command or reply direction).  Note
1331          * that txcmd might not have been initiated yet.
1332          *
1333          * If our direction has already been closed we just return without
1334          * doing anything.
1335          */
1336         if (state) {
1337                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1338                         return;
1339                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1340                         cmd |= HAMMER2_MSGF_CREATE;
1341                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1342                         cmd |= HAMMER2_MSGF_REPLY;
1343                 /* continuing transaction, do not set MSGF_DELETE */
1344         } else {
1345                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1346                         cmd |= HAMMER2_MSGF_REPLY;
1347         }
1348
1349         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1350         nmsg->any.head.error = error;
1351         nmsg->state = state;
1352         hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
1353 }
1354
1355 /*
1356  * Terminate a transaction given a state structure by issuing a DELETE.
1357  */
1358 void
1359 hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1360 {
1361         hammer2_msg_t *nmsg;
1362         uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1363
1364         /*
1365          * Nothing to do if we already transmitted a delete
1366          */
1367         if (state->txcmd & HAMMER2_MSGF_DELETE)
1368                 return;
1369
1370         /*
1371          * We must also set CREATE if this is our first response to a
1372          * remote command.
1373          */
1374         if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1375                 cmd |= HAMMER2_MSGF_CREATE;
1376
1377         /*
1378          * Set REPLY if the other end initiated the command.  Otherwise
1379          * we are the command direction.
1380          */
1381         if (state->txcmd & HAMMER2_MSGF_REPLY)
1382                 cmd |= HAMMER2_MSGF_REPLY;
1383
1384         nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
1385         nmsg->any.head.error = error;
1386         nmsg->state = state;
1387         hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
1388 }
1389
1390 /************************************************************************
1391  *                      TRANSACTION STATE HANDLING                      *
1392  ************************************************************************
1393  *
1394  */
1395
1396 /*
1397  * Process state tracking for a message after reception, prior to
1398  * execution.
1399  *
1400  * Called with msglk held and the msg dequeued.
1401  *
1402  * All messages are called with dummy state and return actual state.
1403  * (One-off messages often just return the same dummy state).
1404  *
1405  * May request that caller discard the message by setting *discardp to 1.
1406  * The returned state is not used in this case and is allowed to be NULL.
1407  *
1408  * --
1409  *
1410  * These routines handle persistent and command/reply message state via the
1411  * CREATE and DELETE flags.  The first message in a command or reply sequence
1412  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1413  *
1414  * There can be any number of intermediate messages belonging to the same
1415  * sequence sent inbetween the CREATE message and the DELETE message,
1416  * which set neither flag.  This represents a streaming command or reply.
1417  *
1418  * Any command message received with CREATE set expects a reply sequence to
1419  * be returned.  Reply sequences work the same as command sequences except the
1420  * REPLY bit is also sent.  Both the command side and reply side can
1421  * degenerate into a single message with both CREATE and DELETE set.  Note
1422  * that one side can be streaming and the other side not, or neither, or both.
1423  *
1424  * The msgid is unique for the initiator.  That is, two sides sending a new
1425  * message can use the same msgid without colliding.
1426  *
1427  * --
1428  *
1429  * ABORT sequences work by setting the ABORT flag along with normal message
1430  * state.  However, ABORTs can also be sent on half-closed messages, that is
1431  * even if the command or reply side has already sent a DELETE, as long as
1432  * the message has not been fully closed it can still send an ABORT+DELETE
1433  * to terminate the half-closed message state.
1434  *
1435  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1436  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1437  * also race, and in this situation the other side might have already
1438  * initiated a new unrelated command with the same message id.  Since
1439  * the abort has not set the CREATE flag the situation can be detected
1440  * and the message will also be discarded.
1441  *
1442  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1443  * The ABORT request is essentially integrated into the command instead
1444  * of being sent later on.  In this situation the command implementation
1445  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1446  * special-case non-blocking operation for the command.
1447  *
1448  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1449  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1450  *        one-way messages are not supported.
1451  *
1452  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1453  *        simply ignored.
1454  *
1455  * --
1456  *
1457  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1458  * set.  One-off messages cannot be aborted and typically aren't processed
1459  * by these routines.  The REPLY bit can be used to distinguish whether a
1460  * one-off message is a command or reply.  For example, one-off replies
1461  * will typically just contain status updates.
1462  */
1463 static int
1464 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1465 {
1466         hammer2_state_t *state;
1467         hammer2_state_t dummy;
1468         int error;
1469
1470         /*
1471          * Lock RB tree and locate existing persistent state, if any.
1472          *
1473          * If received msg is a command state is on staterd_tree.
1474          * If received msg is a reply state is on statewr_tree.
1475          */
1476
1477         dummy.msgid = msg->any.head.msgid;
1478         dummy.spanid = msg->any.head.spanid;
1479         pthread_mutex_lock(&iocom->mtx);
1480         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1481                 state = RB_FIND(hammer2_state_tree,
1482                                 &iocom->statewr_tree, &dummy);
1483         } else {
1484                 state = RB_FIND(hammer2_state_tree,
1485                                 &iocom->staterd_tree, &dummy);
1486         }
1487         msg->state = state;
1488         pthread_mutex_unlock(&iocom->mtx);
1489
1490         /*
1491          * Short-cut one-off or mid-stream messages (state may be NULL).
1492          */
1493         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1494                                   HAMMER2_MSGF_ABORT)) == 0) {
1495                 return(0);
1496         }
1497
1498         /*
1499          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1500          * inside the case statements.
1501          */
1502         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1503                                     HAMMER2_MSGF_REPLY)) {
1504         case HAMMER2_MSGF_CREATE:
1505         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1506                 /*
1507                  * New persistant command received.
1508                  */
1509                 if (state) {
1510                         fprintf(stderr, "duplicate-trans %s\n",
1511                                 hammer2_msg_str(msg));
1512                         error = HAMMER2_IOQ_ERROR_TRANS;
1513                         assert(0);
1514                         break;
1515                 }
1516                 state = malloc(sizeof(*state));
1517                 bzero(state, sizeof(*state));
1518                 state->iocom = iocom;
1519                 state->flags = HAMMER2_STATE_DYNAMIC;
1520                 state->msg = msg;
1521                 state->txcmd = HAMMER2_MSGF_REPLY;
1522                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1523                 state->flags |= HAMMER2_STATE_INSERTED;
1524                 state->msgid = msg->any.head.msgid;
1525                 state->spanid = msg->any.head.spanid;
1526                 msg->state = state;
1527                 pthread_mutex_lock(&iocom->mtx);
1528                 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1529                 pthread_mutex_unlock(&iocom->mtx);
1530                 error = 0;
1531                 if (DebugOpt) {
1532                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1533                                 state, (uint32_t)state->msgid, iocom);
1534                 }
1535                 break;
1536         case HAMMER2_MSGF_DELETE:
1537                 /*
1538                  * Persistent state is expected but might not exist if an
1539                  * ABORT+DELETE races the close.
1540                  */
1541                 if (state == NULL) {
1542                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1543                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1544                         } else {
1545                                 fprintf(stderr, "missing-state %s\n",
1546                                         hammer2_msg_str(msg));
1547                                 error = HAMMER2_IOQ_ERROR_TRANS;
1548                         assert(0);
1549                         }
1550                         break;
1551                 }
1552
1553                 /*
1554                  * Handle another ABORT+DELETE case if the msgid has already
1555                  * been reused.
1556                  */
1557                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1558                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1559                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1560                         } else {
1561                                 fprintf(stderr, "reused-state %s\n",
1562                                         hammer2_msg_str(msg));
1563                                 error = HAMMER2_IOQ_ERROR_TRANS;
1564                         assert(0);
1565                         }
1566                         break;
1567                 }
1568                 error = 0;
1569                 break;
1570         default:
1571                 /*
1572                  * Check for mid-stream ABORT command received, otherwise
1573                  * allow.
1574                  */
1575                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1576                         if (state == NULL ||
1577                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1578                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1579                                 break;
1580                         }
1581                 }
1582                 error = 0;
1583                 break;
1584         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1585         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1586                 /*
1587                  * When receiving a reply with CREATE set the original
1588                  * persistent state message should already exist.
1589                  */
1590                 if (state == NULL) {
1591                         fprintf(stderr, "no-state(r) %s\n",
1592                                 hammer2_msg_str(msg));
1593                         error = HAMMER2_IOQ_ERROR_TRANS;
1594                         assert(0);
1595                         break;
1596                 }
1597                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1598                         HAMMER2_MSGF_REPLY) == 0);
1599                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1600                 error = 0;
1601                 break;
1602         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1603                 /*
1604                  * Received REPLY+ABORT+DELETE in case where msgid has
1605                  * already been fully closed, ignore the message.
1606                  */
1607                 if (state == NULL) {
1608                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1609                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1610                         } else {
1611                                 fprintf(stderr, "no-state(r,d) %s\n",
1612                                         hammer2_msg_str(msg));
1613                                 error = HAMMER2_IOQ_ERROR_TRANS;
1614                         assert(0);
1615                         }
1616                         break;
1617                 }
1618
1619                 /*
1620                  * Received REPLY+ABORT+DELETE in case where msgid has
1621                  * already been reused for an unrelated message,
1622                  * ignore the message.
1623                  */
1624                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1625                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1626                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1627                         } else {
1628                                 fprintf(stderr, "reused-state(r,d) %s\n",
1629                                         hammer2_msg_str(msg));
1630                                 error = HAMMER2_IOQ_ERROR_TRANS;
1631                         assert(0);
1632                         }
1633                         break;
1634                 }
1635                 error = 0;
1636                 break;
1637         case HAMMER2_MSGF_REPLY:
1638                 /*
1639                  * Check for mid-stream ABORT reply received to sent command.
1640                  */
1641                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1642                         if (state == NULL ||
1643                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1644                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1645                                 break;
1646                         }
1647                 }
1648                 error = 0;
1649                 break;
1650         }
1651         return (error);
1652 }
1653
1654 void
1655 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1656 {
1657         hammer2_state_t *state;
1658
1659         if ((state = msg->state) == NULL) {
1660                 /*
1661                  * Free a non-transactional message, there is no state
1662                  * to worry about.
1663                  */
1664                 hammer2_msg_free(iocom, msg);
1665         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1666                 /*
1667                  * Message terminating transaction, destroy the related
1668                  * state, the original message, and this message (if it
1669                  * isn't the original message due to a CREATE|DELETE).
1670                  */
1671                 pthread_mutex_lock(&iocom->mtx);
1672                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1673                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1674                         if (state->msg == msg)
1675                                 state->msg = NULL;
1676                         assert(state->flags & HAMMER2_STATE_INSERTED);
1677                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1678                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1679                                 RB_REMOVE(hammer2_state_tree,
1680                                           &iocom->statewr_tree, state);
1681                         } else {
1682                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1683                                 RB_REMOVE(hammer2_state_tree,
1684                                           &iocom->staterd_tree, state);
1685                         }
1686                         state->flags &= ~HAMMER2_STATE_INSERTED;
1687                         hammer2_state_free(state);
1688                 } else {
1689                         ;
1690                 }
1691                 pthread_mutex_unlock(&iocom->mtx);
1692                 hammer2_msg_free(iocom, msg);
1693         } else if (state->msg != msg) {
1694                 /*
1695                  * Message not terminating transaction, leave state intact
1696                  * and free message if it isn't the CREATE message.
1697                  */
1698                 hammer2_msg_free(iocom, msg);
1699         }
1700 }
1701
1702 static void
1703 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1704 {
1705         hammer2_state_t *state;
1706
1707         if ((state = msg->state) == NULL) {
1708                 hammer2_msg_free(iocom, msg);
1709         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1710                 pthread_mutex_lock(&iocom->mtx);
1711                 state->txcmd |= HAMMER2_MSGF_DELETE;
1712                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1713                         if (state->msg == msg)
1714                                 state->msg = NULL;
1715                         assert(state->flags & HAMMER2_STATE_INSERTED);
1716                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
1717                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1718                                 RB_REMOVE(hammer2_state_tree,
1719                                           &iocom->staterd_tree, state);
1720                         } else {
1721                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1722                                 RB_REMOVE(hammer2_state_tree,
1723                                           &iocom->statewr_tree, state);
1724                         }
1725                         state->flags &= ~HAMMER2_STATE_INSERTED;
1726                         hammer2_state_free(state);
1727                 } else {
1728                         ;
1729                 }
1730                 pthread_mutex_unlock(&iocom->mtx);
1731                 hammer2_msg_free(iocom, msg);
1732         } else if (state->msg != msg) {
1733                 hammer2_msg_free(iocom, msg);
1734         }
1735 }
1736
1737 /*
1738  * Called with iocom locked
1739  */
1740 void
1741 hammer2_state_free(hammer2_state_t *state)
1742 {
1743         hammer2_iocom_t *iocom = state->iocom;
1744         hammer2_msg_t *msg;
1745         char dummy;
1746
1747         if (DebugOpt) {
1748                 fprintf(stderr, "terminate state %p id=%08x\n",
1749                         state, (uint32_t)state->msgid);
1750         }
1751         assert(state->any.any == NULL);
1752         msg = state->msg;
1753         state->msg = NULL;
1754         if (msg)
1755                 hammer2_msg_free_locked(iocom, msg);
1756         free(state);
1757
1758         /*
1759          * When an iocom error is present we are trying to close down the
1760          * iocom, but we have to wait for all states to terminate before
1761          * we can do so.  The iocom rx code will terminate the receive side
1762          * for all transactions by simulating incoming DELETE messages,
1763          * but the state doesn't go away until both sides are terminated.
1764          *
1765          * We may have to wake up the rx code.
1766          */
1767         if (iocom->ioq_rx.error &&
1768             RB_EMPTY(&iocom->staterd_tree) &&
1769             RB_EMPTY(&iocom->statewr_tree)) {
1770                 dummy = 0;
1771                 write(iocom->wakeupfds[1], &dummy, 1);
1772         }
1773 }
1774
1775 const char *
1776 hammer2_basecmd_str(uint32_t cmd)
1777 {
1778         static char buf[64];
1779         char protobuf[32];
1780         char cmdbuf[32];
1781         const char *protostr;
1782         const char *cmdstr;
1783
1784         switch(cmd & HAMMER2_MSGF_PROTOS) {
1785         case HAMMER2_MSG_PROTO_LNK:
1786                 protostr = "LNK_";
1787                 break;
1788         case HAMMER2_MSG_PROTO_DBG:
1789                 protostr = "DBG_";
1790                 break;
1791         case HAMMER2_MSG_PROTO_DOM:
1792                 protostr = "DOM_";
1793                 break;
1794         case HAMMER2_MSG_PROTO_CAC:
1795                 protostr = "CAC_";
1796                 break;
1797         case HAMMER2_MSG_PROTO_QRM:
1798                 protostr = "QRM_";
1799                 break;
1800         case HAMMER2_MSG_PROTO_BLK:
1801                 protostr = "BLK_";
1802                 break;
1803         case HAMMER2_MSG_PROTO_VOP:
1804                 protostr = "VOP_";
1805                 break;
1806         default:
1807                 snprintf(protobuf, sizeof(protobuf), "%x_",
1808                         (cmd & HAMMER2_MSGF_PROTOS) >> 20);
1809                 protostr = protobuf;
1810                 break;
1811         }
1812
1813         switch(cmd & (HAMMER2_MSGF_PROTOS |
1814                       HAMMER2_MSGF_CMDS |
1815                       HAMMER2_MSGF_SIZE)) {
1816         case HAMMER2_LNK_PAD:
1817                 cmdstr = "PAD";
1818                 break;
1819         case HAMMER2_LNK_PING:
1820                 cmdstr = "PING";
1821                 break;
1822         case HAMMER2_LNK_AUTH:
1823                 cmdstr = "AUTH";
1824                 break;
1825         case HAMMER2_LNK_CONN:
1826                 cmdstr = "CONN";
1827                 break;
1828         case HAMMER2_LNK_SPAN:
1829                 cmdstr = "SPAN";
1830                 break;
1831         case HAMMER2_LNK_ERROR:
1832                 if (cmd & HAMMER2_MSGF_DELETE)
1833                         cmdstr = "RETURN";
1834                 else
1835                         cmdstr = "RESULT";
1836                 break;
1837         case HAMMER2_DBG_SHELL:
1838                 cmdstr = "SHELL";
1839                 break;
1840         default:
1841                 snprintf(cmdbuf, sizeof(cmdbuf),
1842                          "%06x", (cmd & (HAMMER2_MSGF_PROTOS |
1843                                          HAMMER2_MSGF_CMDS |
1844                                          HAMMER2_MSGF_SIZE)));
1845                 cmdstr = cmdbuf;
1846                 break;
1847         }
1848         snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr);
1849         return (buf);
1850 }
1851
1852 const char *
1853 hammer2_msg_str(hammer2_msg_t *msg)
1854 {
1855         hammer2_state_t *state;
1856         static char buf[256];
1857         char errbuf[16];
1858         char statebuf[64];
1859         char flagbuf[64];
1860         const char *statestr;
1861         const char *errstr;
1862         uint32_t basecmd;
1863         int i;
1864
1865         /*
1866          * Parse the state
1867          */
1868         if ((state = msg->state) != NULL) {
1869                 basecmd = (state->rxcmd & HAMMER2_MSGF_REPLY) ?
1870                           state->txcmd : state->rxcmd;
1871                 snprintf(statebuf, sizeof(statebuf),
1872                          " %s=%s,L=%s%s,R=%s%s",
1873                          ((state->txcmd & HAMMER2_MSGF_REPLY) ?
1874                                 "rcvcmd" : "sndcmd"),
1875                          hammer2_basecmd_str(basecmd),
1876                          ((state->txcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1877                          ((state->txcmd & HAMMER2_MSGF_DELETE) ? "D" : ""),
1878                          ((state->rxcmd & HAMMER2_MSGF_CREATE) ? "C" : ""),
1879                          ((state->rxcmd & HAMMER2_MSGF_DELETE) ? "D" : "")
1880                 );
1881                 statestr = statebuf;
1882         } else {
1883                 statestr = "";
1884         }
1885
1886         /*
1887          * Parse the error
1888          */
1889         switch(msg->any.head.error) {
1890         case 0:
1891                 errstr = "";
1892                 break;
1893         case HAMMER2_IOQ_ERROR_SYNC:
1894                 errstr = "err=IOQ:NOSYNC";
1895                 break;
1896         case HAMMER2_IOQ_ERROR_EOF:
1897                 errstr = "err=IOQ:STREAMEOF";
1898                 break;
1899         case HAMMER2_IOQ_ERROR_SOCK:
1900                 errstr = "err=IOQ:SOCKERR";
1901                 break;
1902         case HAMMER2_IOQ_ERROR_FIELD:
1903                 errstr = "err=IOQ:BADFIELD";
1904                 break;
1905         case HAMMER2_IOQ_ERROR_HCRC:
1906                 errstr = "err=IOQ:BADHCRC";
1907                 break;
1908         case HAMMER2_IOQ_ERROR_XCRC:
1909                 errstr = "err=IOQ:BADXCRC";
1910                 break;
1911         case HAMMER2_IOQ_ERROR_ACRC:
1912                 errstr = "err=IOQ:BADACRC";
1913                 break;
1914         case HAMMER2_IOQ_ERROR_STATE:
1915                 errstr = "err=IOQ:BADSTATE";
1916                 break;
1917         case HAMMER2_IOQ_ERROR_NOPEER:
1918                 errstr = "err=IOQ:PEERCONFIG";
1919                 break;
1920         case HAMMER2_IOQ_ERROR_NORKEY:
1921                 errstr = "err=IOQ:BADRKEY";
1922                 break;
1923         case HAMMER2_IOQ_ERROR_NOLKEY:
1924                 errstr = "err=IOQ:BADLKEY";
1925                 break;
1926         case HAMMER2_IOQ_ERROR_KEYXCHGFAIL:
1927                 errstr = "err=IOQ:BADKEYXCHG";
1928                 break;
1929         case HAMMER2_IOQ_ERROR_KEYFMT:
1930                 errstr = "err=IOQ:BADFMT";
1931                 break;
1932         case HAMMER2_IOQ_ERROR_BADURANDOM:
1933                 errstr = "err=IOQ:BADRANDOM";
1934                 break;
1935         case HAMMER2_IOQ_ERROR_MSGSEQ:
1936                 errstr = "err=IOQ:BADSEQ";
1937                 break;
1938         case HAMMER2_IOQ_ERROR_EALREADY:
1939                 errstr = "err=IOQ:DUPMSG";
1940                 break;
1941         case HAMMER2_IOQ_ERROR_TRANS:
1942                 errstr = "err=IOQ:BADTRANS";
1943                 break;
1944         case HAMMER2_MSG_ERR_NOSUPP:
1945                 errstr = "err=NOSUPPORT";
1946                 break;
1947         default:
1948                 snprintf(errbuf, sizeof(errbuf),
1949                          " err=%d", msg->any.head.error);
1950                 errstr = errbuf;
1951                 break;
1952         }
1953
1954         /*
1955          * Message flags
1956          */
1957         i = 0;
1958         if (msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1959                                  HAMMER2_MSGF_ABORT | HAMMER2_MSGF_REPLY)) {
1960                 flagbuf[i++] = '|';
1961                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1962                         flagbuf[i++] = 'C';
1963                 if (msg->any.head.cmd & HAMMER2_MSGF_DELETE)
1964                         flagbuf[i++] = 'D';
1965                 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
1966                         flagbuf[i++] = 'R';
1967                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT)
1968                         flagbuf[i++] = 'A';
1969         }
1970         flagbuf[i] = 0;
1971
1972         /*
1973          * Generate the buf
1974          */
1975         snprintf(buf, sizeof(buf),
1976                 "msg=%s%s %s id=%08x span=%08x %s",
1977                  hammer2_basecmd_str(msg->any.head.cmd),
1978                  flagbuf,
1979                  errstr,
1980                  (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
1981                  (uint32_t)(intmax_t)msg->any.head.spanid,  /* for brevity */
1982                  statestr);
1983
1984         return(buf);
1985 }