hammer2 - Messaging layer separation work part 1
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_msg_t *msg);
39 static void hammer2_state_cleanuptx(hammer2_msg_t *msg);
40
41 /*
42  * ROUTER TREE - Represents available routes for message routing, indexed
43  *               by their spanid.  The router structure is embedded in
44  *               either an iocom, h2span_link, or h2span_relay (see msg_lnk.c).
45  */
46 int
47 hammer2_router_cmp(hammer2_router_t *router1, hammer2_router_t *router2)
48 {
49         if (router1->target < router2->target)
50                 return(-1);
51         if (router1->target > router2->target)
52                 return(1);
53         return(0);
54 }
55
56 RB_GENERATE(hammer2_router_tree, hammer2_router, rbnode, hammer2_router_cmp);
57
58 static pthread_mutex_t router_mtx;
59 static struct hammer2_router_tree router_ltree = RB_INITIALIZER(router_ltree);
60 static struct hammer2_router_tree router_rtree = RB_INITIALIZER(router_rtree);
61
62 /*
63  * STATE TREE - Represents open transactions which are indexed by their
64  *              {router,msgid} relative to the governing iocom.
65  *
66  *              router is usually iocom->router since state isn't stored
67  *              for relayed messages.
68  */
69 int
70 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
71 {
72 #if 0
73         if (state1->router < state2->router)
74                 return(-1);
75         if (state1->router > state2->router)
76                 return(1);
77 #endif
78         if (state1->msgid < state2->msgid)
79                 return(-1);
80         if (state1->msgid > state2->msgid)
81                 return(1);
82         return(0);
83 }
84
85 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
86
87 /*
88  * Initialize a low-level ioq
89  */
90 void
91 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
92 {
93         bzero(ioq, sizeof(*ioq));
94         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
95         TAILQ_INIT(&ioq->msgq);
96 }
97
98 /*
99  * Cleanup queue.
100  *
101  * caller holds iocom->mtx.
102  */
103 void
104 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
105 {
106         hammer2_msg_t *msg;
107
108         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
109                 assert(0);      /* shouldn't happen */
110                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
111                 hammer2_msg_free(msg);
112         }
113         if ((msg = ioq->msg) != NULL) {
114                 ioq->msg = NULL;
115                 hammer2_msg_free(msg);
116         }
117 }
118
119 /*
120  * Initialize a low-level communications channel.
121  *
122  * NOTE: The signal_func() is called at least once from the loop and can be
123  *       re-armed via hammer2_iocom_restate().
124  */
125 void
126 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
127                    void (*signal_func)(hammer2_router_t *),
128                    void (*rcvmsg_func)(hammer2_msg_t *),
129                    void (*altmsg_func)(hammer2_iocom_t *))
130 {
131         struct stat st;
132
133         bzero(iocom, sizeof(*iocom));
134
135         iocom->router = hammer2_router_alloc();
136         iocom->router->signal_callback = signal_func;
137         iocom->router->rcvmsg_callback = rcvmsg_func;
138         iocom->router->altmsg_callback = altmsg_func;
139         /* we do not call hammer2_router_connect() for iocom routers */
140
141         pthread_mutex_init(&iocom->mtx, NULL);
142         RB_INIT(&iocom->router->staterd_tree);
143         RB_INIT(&iocom->router->statewr_tree);
144         TAILQ_INIT(&iocom->freeq);
145         TAILQ_INIT(&iocom->freeq_aux);
146         TAILQ_INIT(&iocom->router->txmsgq);
147         iocom->router->iocom = iocom;
148         iocom->sock_fd = sock_fd;
149         iocom->alt_fd = alt_fd;
150         iocom->flags = HAMMER2_IOCOMF_RREQ;
151         if (signal_func)
152                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
153         hammer2_ioq_init(iocom, &iocom->ioq_rx);
154         hammer2_ioq_init(iocom, &iocom->ioq_tx);
155         if (pipe(iocom->wakeupfds) < 0)
156                 assert(0);
157         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
158         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
159
160         /*
161          * Negotiate session crypto synchronously.  This will mark the
162          * connection as error'd if it fails.  If this is a pipe it's
163          * a linkage that we set up ourselves to the filesystem and there
164          * is no crypto.
165          */
166         if (fstat(sock_fd, &st) < 0)
167                 assert(0);
168         if (S_ISSOCK(st.st_mode))
169                 hammer2_crypto_negotiate(iocom);
170
171         /*
172          * Make sure our fds are set to non-blocking for the iocom core.
173          */
174         if (sock_fd >= 0)
175                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
176 #if 0
177         /* if line buffered our single fgets() should be fine */
178         if (alt_fd >= 0)
179                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
180 #endif
181 }
182
183 /*
184  * May only be called from a callback from iocom_core.
185  *
186  * Adjust state machine functions, set flags to guarantee that both
187  * the recevmsg_func and the sendmsg_func is called at least once.
188  */
189 void
190 hammer2_router_restate(hammer2_router_t *router,
191                    void (*signal_func)(hammer2_router_t *),
192                    void (*rcvmsg_func)(hammer2_msg_t *msg),
193                    void (*altmsg_func)(hammer2_iocom_t *))
194 {
195         router->signal_callback = signal_func;
196         router->rcvmsg_callback = rcvmsg_func;
197         router->altmsg_callback = altmsg_func;
198         if (signal_func)
199                 router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
200         else
201                 router->iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
202 }
203
204 void
205 hammer2_router_signal(hammer2_router_t *router)
206 {
207         if (router->signal_callback)
208                 router->iocom->flags |= HAMMER2_IOCOMF_SWORK;
209 }
210
211 /*
212  * Cleanup a terminating iocom.
213  *
214  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
215  * from all possible references to it.
216  */
217 void
218 hammer2_iocom_done(hammer2_iocom_t *iocom)
219 {
220         hammer2_msg_t *msg;
221
222         if (iocom->sock_fd >= 0) {
223                 close(iocom->sock_fd);
224                 iocom->sock_fd = -1;
225         }
226         if (iocom->alt_fd >= 0) {
227                 close(iocom->alt_fd);
228                 iocom->alt_fd = -1;
229         }
230         hammer2_ioq_done(iocom, &iocom->ioq_rx);
231         hammer2_ioq_done(iocom, &iocom->ioq_tx);
232         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
233                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
234                 free(msg);
235         }
236         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
237                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
238                 free(msg->aux_data);
239                 msg->aux_data = NULL;
240                 free(msg);
241         }
242         if (iocom->wakeupfds[0] >= 0) {
243                 close(iocom->wakeupfds[0]);
244                 iocom->wakeupfds[0] = -1;
245         }
246         if (iocom->wakeupfds[1] >= 0) {
247                 close(iocom->wakeupfds[1]);
248                 iocom->wakeupfds[1] = -1;
249         }
250         pthread_mutex_destroy(&iocom->mtx);
251 }
252
253 /*
254  * Allocate a new one-way message.
255  */
256 hammer2_msg_t *
257 hammer2_msg_alloc(hammer2_router_t *router, size_t aux_size, uint32_t cmd,
258                   void (*func)(hammer2_msg_t *), void *data)
259 {
260         hammer2_state_t *state = NULL;
261         hammer2_iocom_t *iocom = router->iocom;
262         hammer2_msg_t *msg;
263         int hbytes;
264
265         pthread_mutex_lock(&iocom->mtx);
266         if (aux_size) {
267                 aux_size = (aux_size + DMSG_ALIGNMASK) &
268                            ~DMSG_ALIGNMASK;
269                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
270                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
271         } else {
272                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
273                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
274         }
275         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
276                 /*
277                  * Create state when CREATE is set without REPLY.
278                  *
279                  * NOTE: CREATE in txcmd handled by hammer2_msg_write()
280                  * NOTE: DELETE in txcmd handled by hammer2_state_cleanuptx()
281                  */
282                 state = malloc(sizeof(*state));
283                 bzero(state, sizeof(*state));
284                 state->iocom = iocom;
285                 state->flags = HAMMER2_STATE_DYNAMIC;
286                 state->msgid = (uint64_t)(uintptr_t)state;
287                 state->router = router;
288                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
289                 state->rxcmd = DMSGF_REPLY;
290                 state->func = func;
291                 state->any.any = data;
292                 pthread_mutex_lock(&iocom->mtx);
293                 RB_INSERT(hammer2_state_tree,
294                           &iocom->router->statewr_tree,
295                           state);
296                 pthread_mutex_unlock(&iocom->mtx);
297                 state->flags |= HAMMER2_STATE_INSERTED;
298         }
299         pthread_mutex_unlock(&iocom->mtx);
300         if (msg == NULL) {
301                 msg = malloc(sizeof(*msg));
302                 bzero(msg, sizeof(*msg));
303                 msg->aux_data = NULL;
304                 msg->aux_size = 0;
305         }
306         if (msg->aux_size != aux_size) {
307                 if (msg->aux_data) {
308                         free(msg->aux_data);
309                         msg->aux_data = NULL;
310                         msg->aux_size = 0;
311                 }
312                 if (aux_size) {
313                         msg->aux_data = malloc(aux_size);
314                         msg->aux_size = aux_size;
315                 }
316         }
317         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
318         if (hbytes)
319                 bzero(&msg->any.head, hbytes);
320         msg->hdr_size = hbytes;
321         msg->any.head.cmd = cmd;
322         msg->any.head.aux_descr = 0;
323         msg->any.head.aux_crc = 0;
324         msg->router = router;
325         if (state) {
326                 msg->state = state;
327                 state->msg = msg;
328                 msg->any.head.msgid = state->msgid;
329         }
330         return (msg);
331 }
332
333 /*
334  * Free a message so it can be reused afresh.
335  *
336  * NOTE: aux_size can be 0 with a non-NULL aux_data.
337  */
338 static
339 void
340 hammer2_msg_free_locked(hammer2_msg_t *msg)
341 {
342         hammer2_iocom_t *iocom = msg->router->iocom;
343
344         msg->state = NULL;
345         if (msg->aux_data)
346                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
347         else
348                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
349 }
350
351 void
352 hammer2_msg_free(hammer2_msg_t *msg)
353 {
354         hammer2_iocom_t *iocom = msg->router->iocom;
355
356         pthread_mutex_lock(&iocom->mtx);
357         hammer2_msg_free_locked(msg);
358         pthread_mutex_unlock(&iocom->mtx);
359 }
360
361 /*
362  * I/O core loop for an iocom.
363  *
364  * Thread localized, iocom->mtx not held.
365  */
366 void
367 hammer2_iocom_core(hammer2_iocom_t *iocom)
368 {
369         struct pollfd fds[3];
370         char dummybuf[256];
371         hammer2_msg_t *msg;
372         int timeout;
373         int count;
374         int wi; /* wakeup pipe */
375         int si; /* socket */
376         int ai; /* alt bulk path socket */
377
378         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
379                 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
380                                      HAMMER2_IOCOMF_WWORK |
381                                      HAMMER2_IOCOMF_PWORK |
382                                      HAMMER2_IOCOMF_SWORK |
383                                      HAMMER2_IOCOMF_ARWORK |
384                                      HAMMER2_IOCOMF_AWWORK)) == 0) {
385                         /*
386                          * Only poll if no immediate work is pending.
387                          * Otherwise we are just wasting our time calling
388                          * poll.
389                          */
390                         timeout = 5000;
391
392                         count = 0;
393                         wi = -1;
394                         si = -1;
395                         ai = -1;
396
397                         /*
398                          * Always check the inter-thread pipe, e.g.
399                          * for iocom->txmsgq work.
400                          */
401                         wi = count++;
402                         fds[wi].fd = iocom->wakeupfds[0];
403                         fds[wi].events = POLLIN;
404                         fds[wi].revents = 0;
405
406                         /*
407                          * Check the socket input/output direction as
408                          * requested
409                          */
410                         if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
411                                             HAMMER2_IOCOMF_WREQ)) {
412                                 si = count++;
413                                 fds[si].fd = iocom->sock_fd;
414                                 fds[si].events = 0;
415                                 fds[si].revents = 0;
416
417                                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
418                                         fds[si].events |= POLLIN;
419                                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
420                                         fds[si].events |= POLLOUT;
421                         }
422
423                         /*
424                          * Check the alternative fd for work.
425                          */
426                         if (iocom->alt_fd >= 0) {
427                                 ai = count++;
428                                 fds[ai].fd = iocom->alt_fd;
429                                 fds[ai].events = POLLIN;
430                                 fds[ai].revents = 0;
431                         }
432                         poll(fds, count, timeout);
433
434                         if (wi >= 0 && (fds[wi].revents & POLLIN))
435                                 iocom->flags |= HAMMER2_IOCOMF_PWORK;
436                         if (si >= 0 && (fds[si].revents & POLLIN))
437                                 iocom->flags |= HAMMER2_IOCOMF_RWORK;
438                         if (si >= 0 && (fds[si].revents & POLLOUT))
439                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
440                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
441                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
442                         if (ai >= 0 && (fds[ai].revents & POLLIN))
443                                 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
444                 } else {
445                         /*
446                          * Always check the pipe
447                          */
448                         iocom->flags |= HAMMER2_IOCOMF_PWORK;
449                 }
450
451                 if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
452                         iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
453                         iocom->router->signal_callback(iocom->router);
454                 }
455
456                 /*
457                  * Pending message queues from other threads wake us up
458                  * with a write to the wakeupfds[] pipe.  We have to clear
459                  * the pipe with a dummy read.
460                  */
461                 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
462                         iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
463                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
464                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
465                         iocom->flags |= HAMMER2_IOCOMF_WWORK;
466                         if (TAILQ_FIRST(&iocom->router->txmsgq))
467                                 hammer2_iocom_flush1(iocom);
468                 }
469
470                 /*
471                  * Message write sequencing
472                  */
473                 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
474                         hammer2_iocom_flush1(iocom);
475
476                 /*
477                  * Message read sequencing.  Run this after the write
478                  * sequencing in case the write sequencing allowed another
479                  * auto-DELETE to occur on the read side.
480                  */
481                 if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
482                         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
483                                (msg = hammer2_ioq_read(iocom)) != NULL) {
484                                 if (DebugOpt) {
485                                         fprintf(stderr, "receive %s\n",
486                                                 hammer2_msg_str(msg));
487                                 }
488                                 iocom->router->rcvmsg_callback(msg);
489                                 hammer2_state_cleanuprx(iocom, msg);
490                         }
491                 }
492
493                 if (iocom->flags & HAMMER2_IOCOMF_ARWORK) {
494                         iocom->flags &= ~HAMMER2_IOCOMF_ARWORK;
495                         iocom->router->altmsg_callback(iocom);
496                 }
497         }
498 }
499
500 /*
501  * Make sure there's enough room in the FIFO to hold the
502  * needed data.
503  *
504  * Assume worst case encrypted form is 2x the size of the
505  * plaintext equivalent.
506  */
507 static
508 size_t
509 hammer2_ioq_makeroom(hammer2_ioq_t *ioq, size_t needed)
510 {
511         size_t bytes;
512         size_t nmax;
513
514         bytes = ioq->fifo_cdx - ioq->fifo_beg;
515         nmax = sizeof(ioq->buf) - ioq->fifo_end;
516         if (bytes + nmax / 2 < needed) {
517                 if (bytes) {
518                         bcopy(ioq->buf + ioq->fifo_beg,
519                               ioq->buf,
520                               bytes);
521                 }
522                 ioq->fifo_cdx -= ioq->fifo_beg;
523                 ioq->fifo_beg = 0;
524                 if (ioq->fifo_cdn < ioq->fifo_end) {
525                         bcopy(ioq->buf + ioq->fifo_cdn,
526                               ioq->buf + ioq->fifo_cdx,
527                               ioq->fifo_end - ioq->fifo_cdn);
528                 }
529                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
530                 ioq->fifo_cdn = ioq->fifo_cdx;
531                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
532         }
533         return(nmax);
534 }
535
536 /*
537  * Read the next ready message from the ioq, issuing I/O if needed.
538  * Caller should retry on a read-event when NULL is returned.
539  *
540  * If an error occurs during reception a DMSG_LNK_ERROR msg will
541  * be returned for each open transaction, then the ioq and iocom
542  * will be errored out and a non-transactional DMSG_LNK_ERROR
543  * msg will be returned as the final message.  The caller should not call
544  * us again after the final message is returned.
545  *
546  * Thread localized, iocom->mtx not held.
547  */
548 hammer2_msg_t *
549 hammer2_ioq_read(hammer2_iocom_t *iocom)
550 {
551         hammer2_ioq_t *ioq = &iocom->ioq_rx;
552         hammer2_msg_t *msg;
553         hammer2_state_t *state;
554         dmsg_hdr_t *head;
555         ssize_t n;
556         size_t bytes;
557         size_t nmax;
558         uint32_t xcrc32;
559         int error;
560
561 again:
562         iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
563
564         /*
565          * If a message is already pending we can just remove and
566          * return it.  Message state has already been processed.
567          * (currently not implemented)
568          */
569         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
570                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
571                 return (msg);
572         }
573
574         /*
575          * If the stream is errored out we stop processing it.
576          */
577         if (ioq->error)
578                 goto skip;
579
580         /*
581          * Message read in-progress (msg is NULL at the moment).  We don't
582          * allocate a msg until we have its core header.
583          */
584         nmax = sizeof(ioq->buf) - ioq->fifo_end;
585         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
586         msg = ioq->msg;
587
588         switch(ioq->state) {
589         case HAMMER2_MSGQ_STATE_HEADER1:
590                 /*
591                  * Load the primary header, fail on any non-trivial read
592                  * error or on EOF.  Since the primary header is the same
593                  * size is the message alignment it will never straddle
594                  * the end of the buffer.
595                  */
596                 nmax = hammer2_ioq_makeroom(ioq, sizeof(msg->any.head));
597                 if (bytes < sizeof(msg->any.head)) {
598                         n = read(iocom->sock_fd,
599                                  ioq->buf + ioq->fifo_end,
600                                  nmax);
601                         if (n <= 0) {
602                                 if (n == 0) {
603                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
604                                         break;
605                                 }
606                                 if (errno != EINTR &&
607                                     errno != EINPROGRESS &&
608                                     errno != EAGAIN) {
609                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
610                                         break;
611                                 }
612                                 n = 0;
613                                 /* fall through */
614                         }
615                         ioq->fifo_end += (size_t)n;
616                         nmax -= (size_t)n;
617                 }
618
619                 /*
620                  * Decrypt data received so far.  Data will be decrypted
621                  * in-place but might create gaps in the FIFO.  Partial
622                  * blocks are not immediately decrypted.
623                  *
624                  * WARNING!  The header might be in the wrong endian, we
625                  *           do not fix it up until we get the entire
626                  *           extended header.
627                  */
628                 if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
629                         hammer2_crypto_decrypt(iocom, ioq);
630                 } else {
631                         ioq->fifo_cdx = ioq->fifo_end;
632                         ioq->fifo_cdn = ioq->fifo_end;
633                 }
634                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
635
636                 /*
637                  * Insufficient data accumulated (msg is NULL, caller will
638                  * retry on event).
639                  */
640                 assert(msg == NULL);
641                 if (bytes < sizeof(msg->any.head))
642                         break;
643
644                 /*
645                  * Check and fixup the core header.  Note that the icrc
646                  * has to be calculated before any fixups, but the crc
647                  * fields in the msg may have to be swapped like everything
648                  * else.
649                  */
650                 head = (void *)(ioq->buf + ioq->fifo_beg);
651                 if (head->magic != DMSG_HDR_MAGIC &&
652                     head->magic != DMSG_HDR_MAGIC_REV) {
653                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
654                         break;
655                 }
656
657                 /*
658                  * Calculate the full header size and aux data size
659                  */
660                 if (head->magic == DMSG_HDR_MAGIC_REV) {
661                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
662                                       DMSG_ALIGN;
663                         ioq->abytes = bswap32(head->aux_bytes) *
664                                       DMSG_ALIGN;
665                 } else {
666                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
667                                       DMSG_ALIGN;
668                         ioq->abytes = head->aux_bytes * DMSG_ALIGN;
669                 }
670                 if (ioq->hbytes < sizeof(msg->any.head) ||
671                     ioq->hbytes > sizeof(msg->any) ||
672                     ioq->abytes > DMSG_AUX_MAX) {
673                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
674                         break;
675                 }
676
677                 /*
678                  * Allocate the message, the next state will fill it in.
679                  */
680                 msg = hammer2_msg_alloc(iocom->router, ioq->abytes, 0,
681                                         NULL, NULL);
682                 ioq->msg = msg;
683
684                 /*
685                  * Fall through to the next state.  Make sure that the
686                  * extended header does not straddle the end of the buffer.
687                  * We still want to issue larger reads into our buffer,
688                  * book-keeping is easier if we don't bcopy() yet.
689                  *
690                  * Make sure there is enough room for bloated encrypt data.
691                  */
692                 nmax = hammer2_ioq_makeroom(ioq, ioq->hbytes);
693                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
694                 /* fall through */
695         case HAMMER2_MSGQ_STATE_HEADER2:
696                 /*
697                  * Fill out the extended header.
698                  */
699                 assert(msg != NULL);
700                 if (bytes < ioq->hbytes) {
701                         n = read(iocom->sock_fd,
702                                  ioq->buf + ioq->fifo_end,
703                                  nmax);
704                         if (n <= 0) {
705                                 if (n == 0) {
706                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
707                                         break;
708                                 }
709                                 if (errno != EINTR &&
710                                     errno != EINPROGRESS &&
711                                     errno != EAGAIN) {
712                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
713                                         break;
714                                 }
715                                 n = 0;
716                                 /* fall through */
717                         }
718                         ioq->fifo_end += (size_t)n;
719                         nmax -= (size_t)n;
720                 }
721
722                 if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
723                         hammer2_crypto_decrypt(iocom, ioq);
724                 } else {
725                         ioq->fifo_cdx = ioq->fifo_end;
726                         ioq->fifo_cdn = ioq->fifo_end;
727                 }
728                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
729
730                 /*
731                  * Insufficient data accumulated (set msg NULL so caller will
732                  * retry on event).
733                  */
734                 if (bytes < ioq->hbytes) {
735                         msg = NULL;
736                         break;
737                 }
738
739                 /*
740                  * Calculate the extended header, decrypt data received
741                  * so far.  Handle endian-conversion for the entire extended
742                  * header.
743                  */
744                 head = (void *)(ioq->buf + ioq->fifo_beg);
745
746                 /*
747                  * Check the CRC.
748                  */
749                 if (head->magic == DMSG_HDR_MAGIC_REV)
750                         xcrc32 = bswap32(head->hdr_crc);
751                 else
752                         xcrc32 = head->hdr_crc;
753                 head->hdr_crc = 0;
754                 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
755                         ioq->error = HAMMER2_IOQ_ERROR_XCRC;
756                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
757                                 xcrc32, hammer2_icrc32(head, ioq->hbytes),
758                                 hammer2_msg_str(msg));
759                         assert(0);
760                         break;
761                 }
762                 head->hdr_crc = xcrc32;
763
764                 if (head->magic == DMSG_HDR_MAGIC_REV) {
765                         hammer2_bswap_head(head);
766                 }
767
768                 /*
769                  * Copy the extended header into the msg and adjust the
770                  * FIFO.
771                  */
772                 bcopy(head, &msg->any, ioq->hbytes);
773
774                 /*
775                  * We are either done or we fall-through.
776                  */
777                 if (ioq->abytes == 0) {
778                         ioq->fifo_beg += ioq->hbytes;
779                         break;
780                 }
781
782                 /*
783                  * Must adjust bytes (and the state) when falling through.
784                  * nmax doesn't change.
785                  */
786                 ioq->fifo_beg += ioq->hbytes;
787                 bytes -= ioq->hbytes;
788                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
789                 /* fall through */
790         case HAMMER2_MSGQ_STATE_AUXDATA1:
791                 /*
792                  * Copy the partial or complete payload from remaining
793                  * bytes in the FIFO in order to optimize the makeroom call
794                  * in the AUXDATA2 state.  We have to fall-through either
795                  * way so we can check the crc.
796                  *
797                  * msg->aux_size tracks our aux data.
798                  */
799                 if (bytes >= ioq->abytes) {
800                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
801                               ioq->abytes);
802                         msg->aux_size = ioq->abytes;
803                         ioq->fifo_beg += ioq->abytes;
804                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
805                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
806                         bytes -= ioq->abytes;
807                 } else if (bytes) {
808                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
809                               bytes);
810                         msg->aux_size = bytes;
811                         ioq->fifo_beg += bytes;
812                         if (ioq->fifo_cdx < ioq->fifo_beg)
813                                 ioq->fifo_cdx = ioq->fifo_beg;
814                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
815                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
816                         bytes = 0;
817                 } else {
818                         msg->aux_size = 0;
819                 }
820                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
821                 /* fall through */
822         case HAMMER2_MSGQ_STATE_AUXDATA2:
823                 /*
824                  * Make sure there is enough room for more data.
825                  */
826                 assert(msg);
827                 nmax = hammer2_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
828
829                 /*
830                  * Read and decrypt more of the payload.
831                  */
832                 if (msg->aux_size < ioq->abytes) {
833                         assert(bytes == 0);
834                         n = read(iocom->sock_fd,
835                                  ioq->buf + ioq->fifo_end,
836                                  nmax);
837                         if (n <= 0) {
838                                 if (n == 0) {
839                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
840                                         break;
841                                 }
842                                 if (errno != EINTR &&
843                                     errno != EINPROGRESS &&
844                                     errno != EAGAIN) {
845                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
846                                         break;
847                                 }
848                                 n = 0;
849                                 /* fall through */
850                         }
851                         ioq->fifo_end += (size_t)n;
852                         nmax -= (size_t)n;
853                 }
854
855                 if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
856                         hammer2_crypto_decrypt(iocom, ioq);
857                 } else {
858                         ioq->fifo_cdx = ioq->fifo_end;
859                         ioq->fifo_cdn = ioq->fifo_end;
860                 }
861                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
862
863                 if (bytes > ioq->abytes - msg->aux_size)
864                         bytes = ioq->abytes - msg->aux_size;
865
866                 if (bytes) {
867                         bcopy(ioq->buf + ioq->fifo_beg,
868                               msg->aux_data + msg->aux_size,
869                               bytes);
870                         msg->aux_size += bytes;
871                         ioq->fifo_beg += bytes;
872                 }
873
874                 /*
875                  * Insufficient data accumulated (set msg NULL so caller will
876                  * retry on event).
877                  */
878                 if (msg->aux_size < ioq->abytes) {
879                         msg = NULL;
880                         break;
881                 }
882                 assert(msg->aux_size == ioq->abytes);
883
884                 /*
885                  * Check aux_crc, then we are done.
886                  */
887                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
888                 if (xcrc32 != msg->any.head.aux_crc) {
889                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
890                         break;
891                 }
892                 break;
893         case HAMMER2_MSGQ_STATE_ERROR:
894                 /*
895                  * Continued calls to drain recorded transactions (returning
896                  * a LNK_ERROR for each one), before we return the final
897                  * LNK_ERROR.
898                  */
899                 assert(msg == NULL);
900                 break;
901         default:
902                 /*
903                  * We don't double-return errors, the caller should not
904                  * have called us again after getting an error msg.
905                  */
906                 assert(0);
907                 break;
908         }
909
910         /*
911          * Check the message sequence.  The iv[] should prevent any
912          * possibility of a replay but we add this check anyway.
913          */
914         if (msg && ioq->error == 0) {
915                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
916                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
917                 } else {
918                         ++ioq->seq;
919                 }
920         }
921
922         /*
923          * Process transactional state for the message.
924          */
925         if (msg && ioq->error == 0) {
926                 error = hammer2_state_msgrx(msg);
927                 if (error) {
928                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
929                                 hammer2_msg_free(msg);
930                                 goto again;
931                         }
932                         ioq->error = error;
933                 }
934         }
935
936         /*
937          * Handle error, RREQ, or completion
938          *
939          * NOTE: nmax and bytes are invalid at this point, we don't bother
940          *       to update them when breaking out.
941          */
942         if (ioq->error) {
943 skip:
944                 /*
945                  * An unrecoverable error causes all active receive
946                  * transactions to be terminated with a LNK_ERROR message.
947                  *
948                  * Once all active transactions are exhausted we set the
949                  * iocom ERROR flag and return a non-transactional LNK_ERROR
950                  * message, which should cause master processing loops to
951                  * terminate.
952                  */
953                 assert(ioq->msg == msg);
954                 if (msg) {
955                         hammer2_msg_free(msg);
956                         ioq->msg = NULL;
957                 }
958
959                 /*
960                  * No more I/O read processing
961                  */
962                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
963
964                 /*
965                  * Simulate a remote LNK_ERROR DELETE msg for any open
966                  * transactions, ending with a final non-transactional
967                  * LNK_ERROR (that the session can detect) when no
968                  * transactions remain.
969                  */
970                 msg = hammer2_msg_alloc(iocom->router, 0, 0, NULL, NULL);
971                 bzero(&msg->any.head, sizeof(msg->any.head));
972                 msg->any.head.magic = DMSG_HDR_MAGIC;
973                 msg->any.head.cmd = DMSG_LNK_ERROR;
974                 msg->any.head.error = ioq->error;
975
976                 pthread_mutex_lock(&iocom->mtx);
977                 hammer2_iocom_drain(iocom);
978                 if ((state = RB_ROOT(&iocom->router->staterd_tree)) != NULL) {
979                         /*
980                          * Active remote transactions are still present.
981                          * Simulate the other end sending us a DELETE.
982                          */
983                         if (state->rxcmd & DMSGF_DELETE) {
984                                 hammer2_msg_free(msg);
985                                 msg = NULL;
986                         } else {
987                                 /*state->txcmd |= DMSGF_DELETE;*/
988                                 msg->state = state;
989                                 msg->router = state->router;
990                                 msg->any.head.msgid = state->msgid;
991                                 msg->any.head.cmd |= DMSGF_ABORT |
992                                                      DMSGF_DELETE;
993                         }
994                 } else if ((state = RB_ROOT(&iocom->router->statewr_tree)) !=
995                            NULL) {
996                         /*
997                          * Active local transactions are still present.
998                          * Simulate the other end sending us a DELETE.
999                          */
1000                         if (state->rxcmd & DMSGF_DELETE) {
1001                                 hammer2_msg_free(msg);
1002                                 msg = NULL;
1003                         } else {
1004                                 msg->state = state;
1005                                 msg->router = state->router;
1006                                 msg->any.head.msgid = state->msgid;
1007                                 msg->any.head.cmd |= DMSGF_ABORT |
1008                                                      DMSGF_DELETE |
1009                                                      DMSGF_REPLY;
1010                                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1011                                         msg->any.head.cmd |=
1012                                                      DMSGF_CREATE;
1013                                 }
1014                         }
1015                 } else {
1016                         /*
1017                          * No active local or remote transactions remain.
1018                          * Generate a final LNK_ERROR and flag EOF.
1019                          */
1020                         msg->state = NULL;
1021                         iocom->flags |= HAMMER2_IOCOMF_EOF;
1022                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1023                 }
1024                 pthread_mutex_unlock(&iocom->mtx);
1025
1026                 /*
1027                  * For the iocom error case we want to set RWORK to indicate
1028                  * that more messages might be pending.
1029                  *
1030                  * It is possible to return NULL when there is more work to
1031                  * do because each message has to be DELETEd in both
1032                  * directions before we continue on with the next (though
1033                  * this could be optimized).  The transmit direction will
1034                  * re-set RWORK.
1035                  */
1036                 if (msg)
1037                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
1038         } else if (msg == NULL) {
1039                 /*
1040                  * Insufficient data received to finish building the message,
1041                  * set RREQ and return NULL.
1042                  *
1043                  * Leave ioq->msg intact.
1044                  * Leave the FIFO intact.
1045                  */
1046                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
1047         } else {
1048                 /*
1049                  * Return msg.
1050                  *
1051                  * The fifo has already been advanced past the message.
1052                  * Trivially reset the FIFO indices if possible.
1053                  *
1054                  * clear the FIFO if it is now empty and set RREQ to wait
1055                  * for more from the socket.  If the FIFO is not empty set
1056                  * TWORK to bypass the poll so we loop immediately.
1057                  */
1058                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1059                     ioq->fifo_cdn == ioq->fifo_end) {
1060                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
1061                         ioq->fifo_cdx = 0;
1062                         ioq->fifo_cdn = 0;
1063                         ioq->fifo_beg = 0;
1064                         ioq->fifo_end = 0;
1065                 } else {
1066                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
1067                 }
1068                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
1069                 ioq->msg = NULL;
1070         }
1071         return (msg);
1072 }
1073
1074 /*
1075  * Calculate the header and data crc's and write a low-level message to
1076  * the connection.  If aux_crc is non-zero the aux_data crc is already
1077  * assumed to have been set.
1078  *
1079  * A non-NULL msg is added to the queue but not necessarily flushed.
1080  * Calling this function with msg == NULL will get a flush going.
1081  *
1082  * Caller must hold iocom->mtx.
1083  */
1084 void
1085 hammer2_iocom_flush1(hammer2_iocom_t *iocom)
1086 {
1087         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1088         hammer2_msg_t *msg;
1089         uint32_t xcrc32;
1090         int hbytes;
1091         hammer2_msg_queue_t tmpq;
1092
1093         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1094         TAILQ_INIT(&tmpq);
1095         pthread_mutex_lock(&iocom->mtx);
1096         while ((msg = TAILQ_FIRST(&iocom->router->txmsgq)) != NULL) {
1097                 TAILQ_REMOVE(&iocom->router->txmsgq, msg, qentry);
1098                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1099         }
1100         pthread_mutex_unlock(&iocom->mtx);
1101
1102         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1103                 /*
1104                  * Process terminal connection errors.
1105                  */
1106                 TAILQ_REMOVE(&tmpq, msg, qentry);
1107                 if (ioq->error) {
1108                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1109                         ++ioq->msgcount;
1110                         continue;
1111                 }
1112
1113                 /*
1114                  * Finish populating the msg fields.  The salt ensures that
1115                  * the iv[] array is ridiculously randomized and we also
1116                  * re-seed our PRNG every 32768 messages just to be sure.
1117                  */
1118                 msg->any.head.magic = DMSG_HDR_MAGIC;
1119                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1120                 ++ioq->seq;
1121                 if ((ioq->seq & 32767) == 0)
1122                         srandomdev();
1123
1124                 /*
1125                  * Calculate aux_crc if 0, then calculate hdr_crc.
1126                  */
1127                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1128                         assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
1129                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
1130                         msg->any.head.aux_crc = xcrc32;
1131                 }
1132                 msg->any.head.aux_bytes = msg->aux_size / DMSG_ALIGN;
1133                 assert((msg->aux_size & DMSG_ALIGNMASK) == 0);
1134
1135                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1136                          DMSG_ALIGN;
1137                 msg->any.head.hdr_crc = 0;
1138                 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
1139
1140                 /*
1141                  * Enqueue the message (the flush codes handles stream
1142                  * encryption).
1143                  */
1144                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1145                 ++ioq->msgcount;
1146         }
1147         hammer2_iocom_flush2(iocom);
1148 }
1149
1150 /*
1151  * Thread localized, iocom->mtx not held by caller.
1152  */
1153 void
1154 hammer2_iocom_flush2(hammer2_iocom_t *iocom)
1155 {
1156         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1157         hammer2_msg_t *msg;
1158         ssize_t n;
1159         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
1160         size_t nact;
1161         size_t hbytes;
1162         size_t abytes;
1163         size_t hoff;
1164         size_t aoff;
1165         int iovcnt;
1166
1167         if (ioq->error) {
1168                 hammer2_iocom_drain(iocom);
1169                 return;
1170         }
1171
1172         /*
1173          * Pump messages out the connection by building an iovec.
1174          *
1175          * ioq->hbytes/ioq->abytes tracks how much of the first message
1176          * in the queue has been successfully written out, so we can
1177          * resume writing.
1178          */
1179         iovcnt = 0;
1180         nact = 0;
1181         hoff = ioq->hbytes;
1182         aoff = ioq->abytes;
1183
1184         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1185                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1186                          DMSG_ALIGN;
1187                 abytes = msg->aux_size;
1188                 assert(hoff <= hbytes && aoff <= abytes);
1189
1190                 if (hoff < hbytes) {
1191                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1192                         iov[iovcnt].iov_len = hbytes - hoff;
1193                         nact += hbytes - hoff;
1194                         ++iovcnt;
1195                         if (iovcnt == HAMMER2_IOQ_MAXIOVEC)
1196                                 break;
1197                 }
1198                 if (aoff < abytes) {
1199                         assert(msg->aux_data != NULL);
1200                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1201                         iov[iovcnt].iov_len = abytes - aoff;
1202                         nact += abytes - aoff;
1203                         ++iovcnt;
1204                         if (iovcnt == HAMMER2_IOQ_MAXIOVEC)
1205                                 break;
1206                 }
1207                 hoff = 0;
1208                 aoff = 0;
1209         }
1210         if (iovcnt == 0)
1211                 return;
1212
1213         /*
1214          * Encrypt and write the data.  The crypto code will move the
1215          * data into the fifo and adjust the iov as necessary.  If
1216          * encryption is disabled the iov is left alone.
1217          *
1218          * May return a smaller iov (thus a smaller n), with aggregated
1219          * chunks.  May reduce nmax to what fits in the FIFO.
1220          *
1221          * This function sets nact to the number of original bytes now
1222          * encrypted, adding to the FIFO some number of bytes that might
1223          * be greater depending on the crypto mechanic.  iov[] is adjusted
1224          * to point at the FIFO if necessary.
1225          *
1226          * NOTE: The return value from the writev() is the post-encrypted
1227          *       byte count, not the plaintext count.
1228          */
1229         if (iocom->flags & HAMMER2_IOCOMF_CRYPTED) {
1230                 /*
1231                  * Make sure the FIFO has a reasonable amount of space
1232                  * left (if not completely full).
1233                  */
1234                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1235                     sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
1236                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1237                               ioq->fifo_end - ioq->fifo_beg);
1238                         ioq->fifo_cdx -= ioq->fifo_beg;
1239                         ioq->fifo_cdn -= ioq->fifo_beg;
1240                         ioq->fifo_end -= ioq->fifo_beg;
1241                         ioq->fifo_beg = 0;
1242                 }
1243
1244                 iovcnt = hammer2_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1245                 n = writev(iocom->sock_fd, iov, iovcnt);
1246                 if (n > 0) {
1247                         ioq->fifo_beg += n;
1248                         ioq->fifo_cdn += n;
1249                         ioq->fifo_cdx += n;
1250                         if (ioq->fifo_beg == ioq->fifo_end) {
1251                                 ioq->fifo_beg = 0;
1252                                 ioq->fifo_cdn = 0;
1253                                 ioq->fifo_cdx = 0;
1254                                 ioq->fifo_end = 0;
1255                         }
1256                 }
1257         } else {
1258                 n = writev(iocom->sock_fd, iov, iovcnt);
1259                 if (n > 0)
1260                         nact = n;
1261                 else
1262                         nact = 0;
1263         }
1264
1265         /*
1266          * Clean out the transmit queue based on what we successfully
1267          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1268          * represents the portion of the first message previously sent.
1269          */
1270         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1271                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1272                          DMSG_ALIGN;
1273                 abytes = msg->aux_size;
1274
1275                 if ((size_t)nact < hbytes - ioq->hbytes) {
1276                         ioq->hbytes += nact;
1277                         nact = 0;
1278                         break;
1279                 }
1280                 nact -= hbytes - ioq->hbytes;
1281                 ioq->hbytes = hbytes;
1282                 if ((size_t)nact < abytes - ioq->abytes) {
1283                         ioq->abytes += nact;
1284                         nact = 0;
1285                         break;
1286                 }
1287                 nact -= abytes - ioq->abytes;
1288
1289                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1290                 --ioq->msgcount;
1291                 ioq->hbytes = 0;
1292                 ioq->abytes = 0;
1293
1294                 hammer2_state_cleanuptx(msg);
1295         }
1296         assert(nact == 0);
1297
1298         /*
1299          * Process the return value from the write w/regards to blocking.
1300          */
1301         if (n < 0) {
1302                 if (errno != EINTR &&
1303                     errno != EINPROGRESS &&
1304                     errno != EAGAIN) {
1305                         /*
1306                          * Fatal write error
1307                          */
1308                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
1309                         hammer2_iocom_drain(iocom);
1310                 } else {
1311                         /*
1312                          * Wait for socket buffer space
1313                          */
1314                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
1315                 }
1316         } else {
1317                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1318         }
1319         if (ioq->error) {
1320                 hammer2_iocom_drain(iocom);
1321         }
1322 }
1323
1324 /*
1325  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1326  * write events will occur.  We don't kill read msgs because we want
1327  * the caller to pull off our contrived terminal error msg to detect
1328  * the connection failure.
1329  *
1330  * Thread localized, iocom->mtx not held by caller.
1331  */
1332 void
1333 hammer2_iocom_drain(hammer2_iocom_t *iocom)
1334 {
1335         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1336         hammer2_msg_t *msg;
1337
1338         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1339         ioq->hbytes = 0;
1340         ioq->abytes = 0;
1341
1342         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1343                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1344                 --ioq->msgcount;
1345                 hammer2_state_cleanuptx(msg);
1346         }
1347 }
1348
1349 /*
1350  * Write a message to an iocom, with additional state processing.
1351  */
1352 void
1353 hammer2_msg_write(hammer2_msg_t *msg)
1354 {
1355         hammer2_iocom_t *iocom = msg->router->iocom;
1356         hammer2_state_t *state;
1357         char dummy;
1358
1359         /*
1360          * Handle state processing, create state if necessary.
1361          */
1362         pthread_mutex_lock(&iocom->mtx);
1363         if ((state = msg->state) != NULL) {
1364                 /*
1365                  * Existing transaction (could be reply).  It is also
1366                  * possible for this to be the first reply (CREATE is set),
1367                  * in which case we populate state->txcmd.
1368                  *
1369                  * state->txcmd is adjusted to hold the final message cmd,
1370                  * and we also be sure to set the CREATE bit here.  We did
1371                  * not set it in hammer2_msg_alloc() because that would have
1372                  * not been serialized (state could have gotten ripped out
1373                  * from under the message prior to it being transmitted).
1374                  */
1375                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1376                     DMSGF_CREATE) {
1377                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1378                 }
1379                 msg->any.head.msgid = state->msgid;
1380                 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1381                 if (msg->any.head.cmd & DMSGF_CREATE)
1382                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1383         } else {
1384                 msg->any.head.msgid = 0;
1385                 /* XXX set spanid by router */
1386         }
1387         msg->any.head.source = 0;
1388         msg->any.head.target = msg->router->target;
1389
1390         /*
1391          * Queue it for output, wake up the I/O pthread.  Note that the
1392          * I/O thread is responsible for generating the CRCs and encryption.
1393          */
1394         TAILQ_INSERT_TAIL(&iocom->router->txmsgq, msg, qentry);
1395         dummy = 0;
1396         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1397         pthread_mutex_unlock(&iocom->mtx);
1398 }
1399
1400 /*
1401  * This is a shortcut to formulate a reply to msg with a simple error code,
1402  * It can reply to and terminate a transaction, or it can reply to a one-way
1403  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1404  * the error code (which can be 0).  Not all transactions are terminated
1405  * with DMSG_LNK_ERROR status (the low level only cares about the
1406  * MSGF_DELETE flag), but most are.
1407  *
1408  * Replies to one-way messages are a bit of an oxymoron but the feature
1409  * is used by the debug (DBG) protocol.
1410  *
1411  * The reply contains no extended data.
1412  */
1413 void
1414 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
1415 {
1416         hammer2_iocom_t *iocom = msg->router->iocom;
1417         hammer2_state_t *state = msg->state;
1418         hammer2_msg_t *nmsg;
1419         uint32_t cmd;
1420
1421
1422         /*
1423          * Reply with a simple error code and terminate the transaction.
1424          */
1425         cmd = DMSG_LNK_ERROR;
1426
1427         /*
1428          * Check if our direction has even been initiated yet, set CREATE.
1429          *
1430          * Check what direction this is (command or reply direction).  Note
1431          * that txcmd might not have been initiated yet.
1432          *
1433          * If our direction has already been closed we just return without
1434          * doing anything.
1435          */
1436         if (state) {
1437                 if (state->txcmd & DMSGF_DELETE)
1438                         return;
1439                 if (state->txcmd & DMSGF_REPLY)
1440                         cmd |= DMSGF_REPLY;
1441                 cmd |= DMSGF_DELETE;
1442         } else {
1443                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1444                         cmd |= DMSGF_REPLY;
1445         }
1446
1447         /*
1448          * Allocate the message and associate it with the existing state.
1449          * We cannot pass MSGF_CREATE to msg_alloc() because that may
1450          * allocate new state.  We have our state already.
1451          */
1452         nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
1453         if (state) {
1454                 if ((state->txcmd & DMSGF_CREATE) == 0)
1455                         nmsg->any.head.cmd |= DMSGF_CREATE;
1456         }
1457         nmsg->any.head.error = error;
1458         nmsg->state = state;
1459         hammer2_msg_write(nmsg);
1460 }
1461
1462 /*
1463  * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
1464  * we are generating a streaming reply or an intermediate acknowledgement
1465  * of some sort as part of the higher level protocol, with more to come
1466  * later.
1467  */
1468 void
1469 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
1470 {
1471         hammer2_iocom_t *iocom = msg->router->iocom;
1472         hammer2_state_t *state = msg->state;
1473         hammer2_msg_t *nmsg;
1474         uint32_t cmd;
1475
1476
1477         /*
1478          * Reply with a simple error code and terminate the transaction.
1479          */
1480         cmd = DMSG_LNK_ERROR;
1481
1482         /*
1483          * Check if our direction has even been initiated yet, set CREATE.
1484          *
1485          * Check what direction this is (command or reply direction).  Note
1486          * that txcmd might not have been initiated yet.
1487          *
1488          * If our direction has already been closed we just return without
1489          * doing anything.
1490          */
1491         if (state) {
1492                 if (state->txcmd & DMSGF_DELETE)
1493                         return;
1494                 if (state->txcmd & DMSGF_REPLY)
1495                         cmd |= DMSGF_REPLY;
1496                 /* continuing transaction, do not set MSGF_DELETE */
1497         } else {
1498                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1499                         cmd |= DMSGF_REPLY;
1500         }
1501
1502         nmsg = hammer2_msg_alloc(iocom->router, 0, cmd, NULL, NULL);
1503         if (state) {
1504                 if ((state->txcmd & DMSGF_CREATE) == 0)
1505                         nmsg->any.head.cmd |= DMSGF_CREATE;
1506         }
1507         nmsg->any.head.error = error;
1508         nmsg->state = state;
1509         hammer2_msg_write(nmsg);
1510 }
1511
1512 /*
1513  * Terminate a transaction given a state structure by issuing a DELETE.
1514  */
1515 void
1516 hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1517 {
1518         hammer2_msg_t *nmsg;
1519         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1520
1521         /*
1522          * Nothing to do if we already transmitted a delete
1523          */
1524         if (state->txcmd & DMSGF_DELETE)
1525                 return;
1526
1527         /*
1528          * Set REPLY if the other end initiated the command.  Otherwise
1529          * we are the command direction.
1530          */
1531         if (state->txcmd & DMSGF_REPLY)
1532                 cmd |= DMSGF_REPLY;
1533
1534         nmsg = hammer2_msg_alloc(state->iocom->router, 0, cmd, NULL, NULL);
1535         if (state) {
1536                 if ((state->txcmd & DMSGF_CREATE) == 0)
1537                         nmsg->any.head.cmd |= DMSGF_CREATE;
1538         }
1539         nmsg->any.head.error = error;
1540         nmsg->state = state;
1541         hammer2_msg_write(nmsg);
1542 }
1543
1544 /************************************************************************
1545  *                      TRANSACTION STATE HANDLING                      *
1546  ************************************************************************
1547  *
1548  */
1549
1550 /*
1551  * Process state tracking for a message after reception, prior to
1552  * execution.
1553  *
1554  * Called with msglk held and the msg dequeued.
1555  *
1556  * All messages are called with dummy state and return actual state.
1557  * (One-off messages often just return the same dummy state).
1558  *
1559  * May request that caller discard the message by setting *discardp to 1.
1560  * The returned state is not used in this case and is allowed to be NULL.
1561  *
1562  * --
1563  *
1564  * These routines handle persistent and command/reply message state via the
1565  * CREATE and DELETE flags.  The first message in a command or reply sequence
1566  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1567  *
1568  * There can be any number of intermediate messages belonging to the same
1569  * sequence sent inbetween the CREATE message and the DELETE message,
1570  * which set neither flag.  This represents a streaming command or reply.
1571  *
1572  * Any command message received with CREATE set expects a reply sequence to
1573  * be returned.  Reply sequences work the same as command sequences except the
1574  * REPLY bit is also sent.  Both the command side and reply side can
1575  * degenerate into a single message with both CREATE and DELETE set.  Note
1576  * that one side can be streaming and the other side not, or neither, or both.
1577  *
1578  * The msgid is unique for the initiator.  That is, two sides sending a new
1579  * message can use the same msgid without colliding.
1580  *
1581  * --
1582  *
1583  * ABORT sequences work by setting the ABORT flag along with normal message
1584  * state.  However, ABORTs can also be sent on half-closed messages, that is
1585  * even if the command or reply side has already sent a DELETE, as long as
1586  * the message has not been fully closed it can still send an ABORT+DELETE
1587  * to terminate the half-closed message state.
1588  *
1589  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1590  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1591  * also race, and in this situation the other side might have already
1592  * initiated a new unrelated command with the same message id.  Since
1593  * the abort has not set the CREATE flag the situation can be detected
1594  * and the message will also be discarded.
1595  *
1596  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1597  * The ABORT request is essentially integrated into the command instead
1598  * of being sent later on.  In this situation the command implementation
1599  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1600  * special-case non-blocking operation for the command.
1601  *
1602  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1603  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1604  *        one-way messages are not supported.
1605  *
1606  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1607  *        simply ignored.
1608  *
1609  * --
1610  *
1611  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1612  * set.  One-off messages cannot be aborted and typically aren't processed
1613  * by these routines.  The REPLY bit can be used to distinguish whether a
1614  * one-off message is a command or reply.  For example, one-off replies
1615  * will typically just contain status updates.
1616  */
1617 static int
1618 hammer2_state_msgrx(hammer2_msg_t *msg)
1619 {
1620         hammer2_iocom_t *iocom = msg->router->iocom;
1621         hammer2_state_t *state;
1622         hammer2_state_t dummy;
1623         int error;
1624
1625         /*
1626          * Lock RB tree and locate existing persistent state, if any.
1627          *
1628          * If received msg is a command state is on staterd_tree.
1629          * If received msg is a reply state is on statewr_tree.
1630          */
1631         dummy.msgid = msg->any.head.msgid;
1632         pthread_mutex_lock(&iocom->mtx);
1633         if (msg->any.head.cmd & DMSGF_REPLY) {
1634                 state = RB_FIND(hammer2_state_tree,
1635                                 &iocom->router->statewr_tree, &dummy);
1636         } else {
1637                 state = RB_FIND(hammer2_state_tree,
1638                                 &iocom->router->staterd_tree, &dummy);
1639         }
1640         msg->state = state;
1641         pthread_mutex_unlock(&iocom->mtx);
1642
1643         /*
1644          * Short-cut one-off or mid-stream messages (state may be NULL).
1645          */
1646         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1647                                   DMSGF_ABORT)) == 0) {
1648                 return(0);
1649         }
1650
1651         /*
1652          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1653          * inside the case statements.
1654          */
1655         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1656                                     DMSGF_REPLY)) {
1657         case DMSGF_CREATE:
1658         case DMSGF_CREATE | DMSGF_DELETE:
1659                 /*
1660                  * New persistant command received.
1661                  */
1662                 if (state) {
1663                         fprintf(stderr, "duplicate-trans %s\n",
1664                                 hammer2_msg_str(msg));
1665                         error = HAMMER2_IOQ_ERROR_TRANS;
1666                         assert(0);
1667                         break;
1668                 }
1669                 state = malloc(sizeof(*state));
1670                 bzero(state, sizeof(*state));
1671                 state->iocom = iocom;
1672                 state->flags = HAMMER2_STATE_DYNAMIC;
1673                 state->msg = msg;
1674                 state->txcmd = DMSGF_REPLY;
1675                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1676                 state->flags |= HAMMER2_STATE_INSERTED;
1677                 state->msgid = msg->any.head.msgid;
1678                 state->router = msg->router;
1679                 msg->state = state;
1680                 pthread_mutex_lock(&iocom->mtx);
1681                 RB_INSERT(hammer2_state_tree,
1682                           &iocom->router->staterd_tree, state);
1683                 pthread_mutex_unlock(&iocom->mtx);
1684                 error = 0;
1685                 if (DebugOpt) {
1686                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1687                                 state, (uint32_t)state->msgid, iocom);
1688                 }
1689                 break;
1690         case DMSGF_DELETE:
1691                 /*
1692                  * Persistent state is expected but might not exist if an
1693                  * ABORT+DELETE races the close.
1694                  */
1695                 if (state == NULL) {
1696                         if (msg->any.head.cmd & DMSGF_ABORT) {
1697                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1698                         } else {
1699                                 fprintf(stderr, "missing-state %s\n",
1700                                         hammer2_msg_str(msg));
1701                                 error = HAMMER2_IOQ_ERROR_TRANS;
1702                         assert(0);
1703                         }
1704                         break;
1705                 }
1706
1707                 /*
1708                  * Handle another ABORT+DELETE case if the msgid has already
1709                  * been reused.
1710                  */
1711                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1712                         if (msg->any.head.cmd & DMSGF_ABORT) {
1713                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1714                         } else {
1715                                 fprintf(stderr, "reused-state %s\n",
1716                                         hammer2_msg_str(msg));
1717                                 error = HAMMER2_IOQ_ERROR_TRANS;
1718                         assert(0);
1719                         }
1720                         break;
1721                 }
1722                 error = 0;
1723                 break;
1724         default:
1725                 /*
1726                  * Check for mid-stream ABORT command received, otherwise
1727                  * allow.
1728                  */
1729                 if (msg->any.head.cmd & DMSGF_ABORT) {
1730                         if (state == NULL ||
1731                             (state->rxcmd & DMSGF_CREATE) == 0) {
1732                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1733                                 break;
1734                         }
1735                 }
1736                 error = 0;
1737                 break;
1738         case DMSGF_REPLY | DMSGF_CREATE:
1739         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1740                 /*
1741                  * When receiving a reply with CREATE set the original
1742                  * persistent state message should already exist.
1743                  */
1744                 if (state == NULL) {
1745                         fprintf(stderr, "no-state(r) %s\n",
1746                                 hammer2_msg_str(msg));
1747                         error = HAMMER2_IOQ_ERROR_TRANS;
1748                         assert(0);
1749                         break;
1750                 }
1751                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1752                         DMSGF_REPLY) == 0);
1753                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1754                 error = 0;
1755                 break;
1756         case DMSGF_REPLY | DMSGF_DELETE:
1757                 /*
1758                  * Received REPLY+ABORT+DELETE in case where msgid has
1759                  * already been fully closed, ignore the message.
1760                  */
1761                 if (state == NULL) {
1762                         if (msg->any.head.cmd & DMSGF_ABORT) {
1763                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1764                         } else {
1765                                 fprintf(stderr, "no-state(r,d) %s\n",
1766                                         hammer2_msg_str(msg));
1767                                 error = HAMMER2_IOQ_ERROR_TRANS;
1768                         assert(0);
1769                         }
1770                         break;
1771                 }
1772
1773                 /*
1774                  * Received REPLY+ABORT+DELETE in case where msgid has
1775                  * already been reused for an unrelated message,
1776                  * ignore the message.
1777                  */
1778                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1779                         if (msg->any.head.cmd & DMSGF_ABORT) {
1780                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1781                         } else {
1782                                 fprintf(stderr, "reused-state(r,d) %s\n",
1783                                         hammer2_msg_str(msg));
1784                                 error = HAMMER2_IOQ_ERROR_TRANS;
1785                         assert(0);
1786                         }
1787                         break;
1788                 }
1789                 error = 0;
1790                 break;
1791         case DMSGF_REPLY:
1792                 /*
1793                  * Check for mid-stream ABORT reply received to sent command.
1794                  */
1795                 if (msg->any.head.cmd & DMSGF_ABORT) {
1796                         if (state == NULL ||
1797                             (state->rxcmd & DMSGF_CREATE) == 0) {
1798                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1799                                 break;
1800                         }
1801                 }
1802                 error = 0;
1803                 break;
1804         }
1805         return (error);
1806 }
1807
1808 void
1809 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1810 {
1811         hammer2_state_t *state;
1812
1813         if ((state = msg->state) == NULL) {
1814                 /*
1815                  * Free a non-transactional message, there is no state
1816                  * to worry about.
1817                  */
1818                 hammer2_msg_free(msg);
1819         } else if (msg->any.head.cmd & DMSGF_DELETE) {
1820                 /*
1821                  * Message terminating transaction, destroy the related
1822                  * state, the original message, and this message (if it
1823                  * isn't the original message due to a CREATE|DELETE).
1824                  */
1825                 pthread_mutex_lock(&iocom->mtx);
1826                 state->rxcmd |= DMSGF_DELETE;
1827                 if (state->txcmd & DMSGF_DELETE) {
1828                         if (state->msg == msg)
1829                                 state->msg = NULL;
1830                         assert(state->flags & HAMMER2_STATE_INSERTED);
1831                         if (state->rxcmd & DMSGF_REPLY) {
1832                                 assert(msg->any.head.cmd & DMSGF_REPLY);
1833                                 RB_REMOVE(hammer2_state_tree,
1834                                           &iocom->router->statewr_tree, state);
1835                         } else {
1836                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
1837                                 RB_REMOVE(hammer2_state_tree,
1838                                           &iocom->router->staterd_tree, state);
1839                         }
1840                         state->flags &= ~HAMMER2_STATE_INSERTED;
1841                         hammer2_state_free(state);
1842                 } else {
1843                         ;
1844                 }
1845                 pthread_mutex_unlock(&iocom->mtx);
1846                 hammer2_msg_free(msg);
1847         } else if (state->msg != msg) {
1848                 /*
1849                  * Message not terminating transaction, leave state intact
1850                  * and free message if it isn't the CREATE message.
1851                  */
1852                 hammer2_msg_free(msg);
1853         }
1854 }
1855
1856 static void
1857 hammer2_state_cleanuptx(hammer2_msg_t *msg)
1858 {
1859         hammer2_iocom_t *iocom = msg->router->iocom;
1860         hammer2_state_t *state;
1861
1862         if ((state = msg->state) == NULL) {
1863                 hammer2_msg_free(msg);
1864         } else if (msg->any.head.cmd & DMSGF_DELETE) {
1865                 pthread_mutex_lock(&iocom->mtx);
1866                 state->txcmd |= DMSGF_DELETE;
1867                 if (state->rxcmd & DMSGF_DELETE) {
1868                         if (state->msg == msg)
1869                                 state->msg = NULL;
1870                         assert(state->flags & HAMMER2_STATE_INSERTED);
1871                         if (state->txcmd & DMSGF_REPLY) {
1872                                 assert(msg->any.head.cmd & DMSGF_REPLY);
1873                                 RB_REMOVE(hammer2_state_tree,
1874                                           &iocom->router->staterd_tree, state);
1875                         } else {
1876                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
1877                                 RB_REMOVE(hammer2_state_tree,
1878                                           &iocom->router->statewr_tree, state);
1879                         }
1880                         state->flags &= ~HAMMER2_STATE_INSERTED;
1881                         hammer2_state_free(state);
1882                 } else {
1883                         ;
1884                 }
1885                 pthread_mutex_unlock(&iocom->mtx);
1886                 hammer2_msg_free(msg);
1887         } else if (state->msg != msg) {
1888                 hammer2_msg_free(msg);
1889         }
1890 }
1891
1892 /*
1893  * Called with iocom locked
1894  */
1895 void
1896 hammer2_state_free(hammer2_state_t *state)
1897 {
1898         hammer2_iocom_t *iocom = state->iocom;
1899         hammer2_msg_t *msg;
1900         char dummy;
1901
1902         if (DebugOpt) {
1903                 fprintf(stderr, "terminate state %p id=%08x\n",
1904                         state, (uint32_t)state->msgid);
1905         }
1906         assert(state->any.any == NULL);
1907         msg = state->msg;
1908         state->msg = NULL;
1909         if (msg)
1910                 hammer2_msg_free_locked(msg);
1911         free(state);
1912
1913         /*
1914          * When an iocom error is present we are trying to close down the
1915          * iocom, but we have to wait for all states to terminate before
1916          * we can do so.  The iocom rx code will terminate the receive side
1917          * for all transactions by simulating incoming DELETE messages,
1918          * but the state doesn't go away until both sides are terminated.
1919          *
1920          * We may have to wake up the rx code.
1921          */
1922         if (iocom->ioq_rx.error &&
1923             RB_EMPTY(&iocom->router->staterd_tree) &&
1924             RB_EMPTY(&iocom->router->statewr_tree)) {
1925                 dummy = 0;
1926                 write(iocom->wakeupfds[1], &dummy, 1);
1927         }
1928 }
1929
1930 /************************************************************************
1931  *                              ROUTING                                 *
1932  ************************************************************************
1933  *
1934  * Incoming messages are routed by their spanid, matched up against
1935  * outgoing LNK_SPANs managed by h2span_relay structures (see msg_lnk.c).
1936  * Any replies run through the same router.
1937  *
1938  * Originated messages are routed by their spanid, matched up against
1939  * incoming LNK_SPANs managed by h2span_link structures (see msg_lnk.c).
1940  * Replies come back through the same route.
1941  *
1942  * Keep in mind that ALL MESSAGE TRAFFIC pertaining to a particular
1943  * transaction runs through the same route.  Commands and replies both.
1944  *
1945  * An originated message will use a different routing spanid to
1946  * reach a target node than a message which originates from that node.
1947  * They might use the same physical pipes (each pipe can have multiple
1948  * SPANs and RELAYs), but the routes are distinct from the perspective
1949  * of the router.
1950  */
1951 hammer2_router_t *
1952 hammer2_router_alloc(void)
1953 {
1954         hammer2_router_t *router;
1955
1956         router = hammer2_alloc(sizeof(*router));
1957         TAILQ_INIT(&router->txmsgq);
1958         return (router);
1959 }
1960
1961 void
1962 hammer2_router_connect(hammer2_router_t *router)
1963 {
1964         hammer2_router_t *tmp;
1965
1966         assert(router->link || router->relay);
1967         assert((router->flags & HAMMER2_ROUTER_CONNECTED) == 0);
1968
1969         pthread_mutex_lock(&router_mtx);
1970         if (router->link)
1971                 tmp = RB_INSERT(hammer2_router_tree, &router_ltree, router);
1972         else
1973                 tmp = RB_INSERT(hammer2_router_tree, &router_rtree, router);
1974         assert(tmp == NULL);
1975         router->flags |= HAMMER2_ROUTER_CONNECTED;
1976         pthread_mutex_unlock(&router_mtx);
1977 }
1978
1979 void
1980 hammer2_router_disconnect(hammer2_router_t **routerp)
1981 {
1982         hammer2_router_t *router;
1983
1984         router = *routerp;
1985         assert(router->link || router->relay);
1986         assert(router->flags & HAMMER2_ROUTER_CONNECTED);
1987
1988         pthread_mutex_lock(&router_mtx);
1989         if (router->link)
1990                 RB_REMOVE(hammer2_router_tree, &router_ltree, router);
1991         else
1992                 RB_REMOVE(hammer2_router_tree, &router_rtree, router);
1993         router->flags &= ~HAMMER2_ROUTER_CONNECTED;
1994         *routerp = NULL;
1995         pthread_mutex_unlock(&router_mtx);
1996 }
1997
1998 #if 0
1999 /*
2000  * XXX
2001  */
2002 hammer2_router_t *
2003 hammer2_route_msg(hammer2_msg_t *msg)
2004 {
2005 }
2006 #endif
2007
2008 /************************************************************************
2009  *                              DEBUGGING                               *
2010  ************************************************************************/
2011
2012 const char *
2013 hammer2_basecmd_str(uint32_t cmd)
2014 {
2015         static char buf[64];
2016         char protobuf[32];
2017         char cmdbuf[32];
2018         const char *protostr;
2019         const char *cmdstr;
2020
2021         switch(cmd & DMSGF_PROTOS) {
2022         case DMSG_PROTO_LNK:
2023                 protostr = "LNK_";
2024                 break;
2025         case DMSG_PROTO_DBG:
2026                 protostr = "DBG_";
2027                 break;
2028         case DMSG_PROTO_DOM:
2029                 protostr = "DOM_";
2030                 break;
2031         case DMSG_PROTO_CAC:
2032                 protostr = "CAC_";
2033                 break;
2034         case DMSG_PROTO_QRM:
2035                 protostr = "QRM_";
2036                 break;
2037         case DMSG_PROTO_BLK:
2038                 protostr = "BLK_";
2039                 break;
2040         case DMSG_PROTO_VOP:
2041                 protostr = "VOP_";
2042                 break;
2043         default:
2044                 snprintf(protobuf, sizeof(protobuf), "%x_",
2045                         (cmd & DMSGF_PROTOS) >> 20);
2046                 protostr = protobuf;
2047                 break;
2048         }
2049
2050         switch(cmd & (DMSGF_PROTOS |
2051                       DMSGF_CMDS |
2052                       DMSGF_SIZE)) {
2053         case DMSG_LNK_PAD:
2054                 cmdstr = "PAD";
2055                 break;
2056         case DMSG_LNK_PING:
2057                 cmdstr = "PING";
2058                 break;
2059         case DMSG_LNK_AUTH:
2060                 cmdstr = "AUTH";
2061                 break;
2062         case DMSG_LNK_CONN:
2063                 cmdstr = "CONN";
2064                 break;
2065         case DMSG_LNK_SPAN:
2066                 cmdstr = "SPAN";
2067                 break;
2068         case DMSG_LNK_VOLCONF:
2069                 cmdstr = "VOLCONF";
2070                 break;
2071         case DMSG_LNK_ERROR:
2072                 if (cmd & DMSGF_DELETE)
2073                         cmdstr = "RETURN";
2074                 else
2075                         cmdstr = "RESULT";
2076                 break;
2077         case DMSG_DBG_SHELL:
2078                 cmdstr = "SHELL";
2079                 break;
2080         default:
2081                 snprintf(cmdbuf, sizeof(cmdbuf),
2082                          "%06x", (cmd & (DMSGF_PROTOS |
2083                                          DMSGF_CMDS |
2084                                          DMSGF_SIZE)));
2085                 cmdstr = cmdbuf;
2086                 break;
2087         }
2088         snprintf(buf, sizeof(buf), "%s%s", protostr, cmdstr);
2089         return (buf);
2090 }
2091
2092 const char *
2093 hammer2_msg_str(hammer2_msg_t *msg)
2094 {
2095         hammer2_state_t *state;
2096         static char buf[256];
2097         char errbuf[16];
2098         char statebuf[64];
2099         char flagbuf[64];
2100         const char *statestr;
2101         const char *errstr;
2102         uint32_t basecmd;
2103         int i;
2104
2105         /*
2106          * Parse the state
2107          */
2108         if ((state = msg->state) != NULL) {
2109                 basecmd = (state->rxcmd & DMSGF_REPLY) ?
2110                           state->txcmd : state->rxcmd;
2111                 snprintf(statebuf, sizeof(statebuf),
2112                          " %s=%s,L=%s%s,R=%s%s",
2113                          ((state->txcmd & DMSGF_REPLY) ?
2114                                 "rcvcmd" : "sndcmd"),
2115                          hammer2_basecmd_str(basecmd),
2116                          ((state->txcmd & DMSGF_CREATE) ? "C" : ""),
2117                          ((state->txcmd & DMSGF_DELETE) ? "D" : ""),
2118                          ((state->rxcmd & DMSGF_CREATE) ? "C" : ""),
2119                          ((state->rxcmd & DMSGF_DELETE) ? "D" : "")
2120                 );
2121                 statestr = statebuf;
2122         } else {
2123                 statestr = "";
2124         }
2125
2126         /*
2127          * Parse the error
2128          */
2129         switch(msg->any.head.error) {
2130         case 0:
2131                 errstr = "";
2132                 break;
2133         case HAMMER2_IOQ_ERROR_SYNC:
2134                 errstr = "err=IOQ:NOSYNC";
2135                 break;
2136         case HAMMER2_IOQ_ERROR_EOF:
2137                 errstr = "err=IOQ:STREAMEOF";
2138                 break;
2139         case HAMMER2_IOQ_ERROR_SOCK:
2140                 errstr = "err=IOQ:SOCKERR";
2141                 break;
2142         case HAMMER2_IOQ_ERROR_FIELD:
2143                 errstr = "err=IOQ:BADFIELD";
2144                 break;
2145         case HAMMER2_IOQ_ERROR_HCRC:
2146                 errstr = "err=IOQ:BADHCRC";
2147                 break;
2148         case HAMMER2_IOQ_ERROR_XCRC:
2149                 errstr = "err=IOQ:BADXCRC";
2150                 break;
2151         case HAMMER2_IOQ_ERROR_ACRC:
2152                 errstr = "err=IOQ:BADACRC";
2153                 break;
2154         case HAMMER2_IOQ_ERROR_STATE:
2155                 errstr = "err=IOQ:BADSTATE";
2156                 break;
2157         case HAMMER2_IOQ_ERROR_NOPEER:
2158                 errstr = "err=IOQ:PEERCONFIG";
2159                 break;
2160         case HAMMER2_IOQ_ERROR_NORKEY:
2161                 errstr = "err=IOQ:BADRKEY";
2162                 break;
2163         case HAMMER2_IOQ_ERROR_NOLKEY:
2164                 errstr = "err=IOQ:BADLKEY";
2165                 break;
2166         case HAMMER2_IOQ_ERROR_KEYXCHGFAIL:
2167                 errstr = "err=IOQ:BADKEYXCHG";
2168                 break;
2169         case HAMMER2_IOQ_ERROR_KEYFMT:
2170                 errstr = "err=IOQ:BADFMT";
2171                 break;
2172         case HAMMER2_IOQ_ERROR_BADURANDOM:
2173                 errstr = "err=IOQ:BADRANDOM";
2174                 break;
2175         case HAMMER2_IOQ_ERROR_MSGSEQ:
2176                 errstr = "err=IOQ:BADSEQ";
2177                 break;
2178         case HAMMER2_IOQ_ERROR_EALREADY:
2179                 errstr = "err=IOQ:DUPMSG";
2180                 break;
2181         case HAMMER2_IOQ_ERROR_TRANS:
2182                 errstr = "err=IOQ:BADTRANS";
2183                 break;
2184         case HAMMER2_IOQ_ERROR_IVWRAP:
2185                 errstr = "err=IOQ:IVWRAP";
2186                 break;
2187         case HAMMER2_IOQ_ERROR_MACFAIL:
2188                 errstr = "err=IOQ:MACFAIL";
2189                 break;
2190         case HAMMER2_IOQ_ERROR_ALGO:
2191                 errstr = "err=IOQ:ALGOFAIL";
2192                 break;
2193         case DMSG_ERR_NOSUPP:
2194                 errstr = "err=NOSUPPORT";
2195                 break;
2196         default:
2197                 snprintf(errbuf, sizeof(errbuf),
2198                          " err=%d", msg->any.head.error);
2199                 errstr = errbuf;
2200                 break;
2201         }
2202
2203         /*
2204          * Message flags
2205          */
2206         i = 0;
2207         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2208                                  DMSGF_ABORT | DMSGF_REPLY)) {
2209                 flagbuf[i++] = '|';
2210                 if (msg->any.head.cmd & DMSGF_CREATE)
2211                         flagbuf[i++] = 'C';
2212                 if (msg->any.head.cmd & DMSGF_DELETE)
2213                         flagbuf[i++] = 'D';
2214                 if (msg->any.head.cmd & DMSGF_REPLY)
2215                         flagbuf[i++] = 'R';
2216                 if (msg->any.head.cmd & DMSGF_ABORT)
2217                         flagbuf[i++] = 'A';
2218         }
2219         flagbuf[i] = 0;
2220
2221         /*
2222          * Generate the buf
2223          */
2224         snprintf(buf, sizeof(buf),
2225                 "msg=%s%s %s id=%08x src=%08x tgt=%08x %s",
2226                  hammer2_basecmd_str(msg->any.head.cmd),
2227                  flagbuf,
2228                  errstr,
2229                  (uint32_t)(intmax_t)msg->any.head.msgid,   /* for brevity */
2230                  (uint32_t)(intmax_t)msg->any.head.source,  /* for brevity */
2231                  (uint32_t)(intmax_t)msg->any.head.target,  /* for brevity */
2232                  statestr);
2233
2234         return(buf);
2235 }