cluster - more libdmsg work
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
43
44 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
45 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
46
47 /*
48  * STATE TREE - Represents open transactions which are indexed by their
49  *              { msgid } relative to the governing iocom.
50  */
51 int
52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
53 {
54         if (state1->msgid < state2->msgid)
55                 return(-1);
56         if (state1->msgid > state2->msgid)
57                 return(1);
58         return(0);
59 }
60
61 /*
62  * CIRCUIT TREE - Represents open circuits which are indexed by their
63  *                { msgid } relative to the governing iocom.
64  */
65 int
66 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
67 {
68         if (circuit1->msgid < circuit2->msgid)
69                 return(-1);
70         if (circuit1->msgid > circuit2->msgid)
71                 return(1);
72         return(0);
73 }
74
75 /*
76  * Initialize a low-level ioq
77  */
78 void
79 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
80 {
81         bzero(ioq, sizeof(*ioq));
82         ioq->state = DMSG_MSGQ_STATE_HEADER1;
83         TAILQ_INIT(&ioq->msgq);
84 }
85
86 /*
87  * Cleanup queue.
88  *
89  * caller holds iocom->mtx.
90  */
91 void
92 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
93 {
94         dmsg_msg_t *msg;
95
96         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
97                 assert(0);      /* shouldn't happen */
98                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
99                 dmsg_msg_free(msg);
100         }
101         if ((msg = ioq->msg) != NULL) {
102                 ioq->msg = NULL;
103                 dmsg_msg_free(msg);
104         }
105 }
106
107 /*
108  * Initialize a low-level communications channel.
109  *
110  * NOTE: The signal_func() is called at least once from the loop and can be
111  *       re-armed via dmsg_iocom_restate().
112  */
113 void
114 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
115                    void (*signal_func)(dmsg_iocom_t *),
116                    void (*rcvmsg_func)(dmsg_msg_t *),
117                    void (*dbgmsg_func)(dmsg_msg_t *),
118                    void (*altmsg_func)(dmsg_iocom_t *))
119 {
120         struct stat st;
121
122         bzero(iocom, sizeof(*iocom));
123
124         asprintf(&iocom->label, "iocom-%p", iocom);
125         iocom->signal_callback = signal_func;
126         iocom->rcvmsg_callback = rcvmsg_func;
127         iocom->altmsg_callback = altmsg_func;
128         iocom->dbgmsg_callback = dbgmsg_func;
129
130         pthread_mutex_init(&iocom->mtx, NULL);
131         RB_INIT(&iocom->circuit_tree);
132         TAILQ_INIT(&iocom->freeq);
133         TAILQ_INIT(&iocom->freeq_aux);
134         TAILQ_INIT(&iocom->txmsgq);
135         iocom->sock_fd = sock_fd;
136         iocom->alt_fd = alt_fd;
137         iocom->flags = DMSG_IOCOMF_RREQ;
138         if (signal_func)
139                 iocom->flags |= DMSG_IOCOMF_SWORK;
140         dmsg_ioq_init(iocom, &iocom->ioq_rx);
141         dmsg_ioq_init(iocom, &iocom->ioq_tx);
142         if (pipe(iocom->wakeupfds) < 0)
143                 assert(0);
144         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
145         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
146
147         dmsg_circuit_init(iocom, &iocom->circuit0);
148
149         /*
150          * Negotiate session crypto synchronously.  This will mark the
151          * connection as error'd if it fails.  If this is a pipe it's
152          * a linkage that we set up ourselves to the filesystem and there
153          * is no crypto.
154          */
155         if (fstat(sock_fd, &st) < 0)
156                 assert(0);
157         if (S_ISSOCK(st.st_mode))
158                 dmsg_crypto_negotiate(iocom);
159
160         /*
161          * Make sure our fds are set to non-blocking for the iocom core.
162          */
163         if (sock_fd >= 0)
164                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
165 #if 0
166         /* if line buffered our single fgets() should be fine */
167         if (alt_fd >= 0)
168                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
169 #endif
170 }
171
172 void
173 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
174 {
175         va_list va;
176         char *optr;
177
178         va_start(va, ctl);
179         optr = iocom->label;
180         vasprintf(&iocom->label, ctl, va);
181         va_end(va);
182         if (optr)
183                 free(optr);
184 }
185
186 /*
187  * May only be called from a callback from iocom_core.
188  *
189  * Adjust state machine functions, set flags to guarantee that both
190  * the recevmsg_func and the sendmsg_func is called at least once.
191  */
192 void
193 dmsg_iocom_restate(dmsg_iocom_t *iocom,
194                    void (*signal_func)(dmsg_iocom_t *),
195                    void (*rcvmsg_func)(dmsg_msg_t *msg),
196                    void (*altmsg_func)(dmsg_iocom_t *))
197 {
198         pthread_mutex_lock(&iocom->mtx);
199         iocom->signal_callback = signal_func;
200         iocom->rcvmsg_callback = rcvmsg_func;
201         iocom->altmsg_callback = altmsg_func;
202         if (signal_func)
203                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
204         else
205                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206         pthread_mutex_unlock(&iocom->mtx);
207 }
208
209 void
210 dmsg_iocom_signal(dmsg_iocom_t *iocom)
211 {
212         pthread_mutex_lock(&iocom->mtx);
213         if (iocom->signal_callback)
214                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
215         pthread_mutex_unlock(&iocom->mtx);
216 }
217
218 /*
219  * Cleanup a terminating iocom.
220  *
221  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
222  * from all possible references to it.
223  */
224 void
225 dmsg_iocom_done(dmsg_iocom_t *iocom)
226 {
227         dmsg_msg_t *msg;
228
229         if (iocom->sock_fd >= 0) {
230                 close(iocom->sock_fd);
231                 iocom->sock_fd = -1;
232         }
233         if (iocom->alt_fd >= 0) {
234                 close(iocom->alt_fd);
235                 iocom->alt_fd = -1;
236         }
237         dmsg_ioq_done(iocom, &iocom->ioq_rx);
238         dmsg_ioq_done(iocom, &iocom->ioq_tx);
239         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
240                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
241                 free(msg);
242         }
243         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
244                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
245                 free(msg->aux_data);
246                 msg->aux_data = NULL;
247                 free(msg);
248         }
249         if (iocom->wakeupfds[0] >= 0) {
250                 close(iocom->wakeupfds[0]);
251                 iocom->wakeupfds[0] = -1;
252         }
253         if (iocom->wakeupfds[1] >= 0) {
254                 close(iocom->wakeupfds[1]);
255                 iocom->wakeupfds[1] = -1;
256         }
257         pthread_mutex_destroy(&iocom->mtx);
258 }
259
260 /*
261  * Basic initialization of a circuit structure.
262  *
263  * The circuit structure is initialized with one ref.
264  */
265 void
266 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
267 {
268         circuit->refs = 1;
269         circuit->iocom = iocom;
270         RB_INIT(&circuit->staterd_tree);
271         RB_INIT(&circuit->statewr_tree);
272 }
273
274 /*
275  * Allocate a new one-way message.
276  */
277 dmsg_msg_t *
278 dmsg_msg_alloc(dmsg_circuit_t *circuit,
279                size_t aux_size, uint32_t cmd,
280                void (*func)(dmsg_msg_t *), void *data)
281 {
282         dmsg_iocom_t *iocom = circuit->iocom;
283         dmsg_state_t *state = NULL;
284         dmsg_msg_t *msg;
285         int hbytes;
286         size_t aligned_size;
287
288         pthread_mutex_lock(&iocom->mtx);
289 #if 0
290         if (aux_size) {
291                 aligned_size = DMSG_DOALIGN(aux_size);
292                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
293                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
294         } else {
295                 aligned_size = 0;
296                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
297                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
298         }
299 #endif
300         aligned_size = DMSG_DOALIGN(aux_size);
301         msg = NULL;
302         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
303                 /*
304                  * Create state when CREATE is set without REPLY.
305                  * Assign a unique msgid, in this case simply using
306                  * the pointer value for 'state'.
307                  *
308                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
309                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
310                  *
311                  * NOTE: state initiated by us and state initiated by
312                  *       a remote create are placed in different RB trees.
313                  *       The msgid for SPAN state is used in source/target
314                  *       for message routing as appropriate.
315                  */
316                 state = malloc(sizeof(*state));
317                 bzero(state, sizeof(*state));
318                 state->iocom = iocom;
319                 state->circuit = circuit;
320                 state->flags = DMSG_STATE_DYNAMIC;
321                 state->msgid = (uint64_t)(uintptr_t)state;
322                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
323                 state->rxcmd = DMSGF_REPLY;
324                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
325                 state->func = func;
326                 state->any.any = data;
327                 pthread_mutex_lock(&iocom->mtx);
328                 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
329                 pthread_mutex_unlock(&iocom->mtx);
330                 state->flags |= DMSG_STATE_INSERTED;
331         }
332         /* XXX SMP race for state */
333         pthread_mutex_unlock(&iocom->mtx);
334         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
335         if (msg == NULL) {
336                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
337                 bzero(msg, offsetof(struct dmsg_msg, any.head));
338                 *(int *)((char *)msg +
339                          offsetof(struct dmsg_msg, any.head) + hbytes) =
340                                  0x71B2C3D4;
341 #if 0
342                 msg = malloc(sizeof(*msg));
343                 bzero(msg, sizeof(*msg));
344 #endif
345         }
346
347         /*
348          * [re]allocate the auxillary data buffer.  The caller knows that
349          * a size-aligned buffer will be allocated but we do not want to
350          * force the caller to zero any tail piece, so we do that ourself.
351          */
352         if (msg->aux_size != aux_size) {
353                 if (msg->aux_data) {
354                         free(msg->aux_data);
355                         msg->aux_data = NULL;
356                         msg->aux_size = 0;
357                 }
358                 if (aux_size) {
359                         msg->aux_data = malloc(aligned_size);
360                         msg->aux_size = aux_size;
361                         if (aux_size != aligned_size) {
362                                 bzero(msg->aux_data + aux_size,
363                                       aligned_size - aux_size);
364                         }
365                 }
366         }
367         if (hbytes)
368                 bzero(&msg->any.head, hbytes);
369         msg->hdr_size = hbytes;
370         msg->any.head.magic = DMSG_HDR_MAGIC;
371         msg->any.head.cmd = cmd;
372         msg->any.head.aux_descr = 0;
373         msg->any.head.aux_crc = 0;
374         msg->any.head.circuit = 0;
375         msg->circuit = circuit;
376         msg->iocom = iocom;
377         dmsg_circuit_hold(circuit);
378         if (state) {
379                 msg->state = state;
380                 state->msg = msg;
381                 msg->any.head.msgid = state->msgid;
382         } else {
383                 msg->any.head.msgid = 0;
384         }
385         return (msg);
386 }
387
388 /*
389  * Free a message so it can be reused afresh.
390  *
391  * NOTE: aux_size can be 0 with a non-NULL aux_data.
392  */
393 static
394 void
395 dmsg_msg_free_locked(dmsg_msg_t *msg)
396 {
397         /*dmsg_iocom_t *iocom = msg->iocom;*/
398 #if 1
399         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
400         if (*(int *)((char *)msg +
401                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
402              0x71B2C3D4) {
403                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
404                 assert(0);
405         }
406 #endif
407         if (msg->circuit) {
408                 dmsg_circuit_drop_locked(msg->circuit);
409                 msg->circuit = NULL;
410         }
411         msg->state = NULL;
412         if (msg->aux_data) {
413                 free(msg->aux_data);
414                 msg->aux_data = NULL;
415         }
416         msg->aux_size = 0;
417         free (msg);
418 #if 0
419         if (msg->aux_data)
420                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
421         else
422                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
423 #endif
424 }
425
426 void
427 dmsg_msg_free(dmsg_msg_t *msg)
428 {
429         dmsg_iocom_t *iocom = msg->iocom;
430
431         pthread_mutex_lock(&iocom->mtx);
432         dmsg_msg_free_locked(msg);
433         pthread_mutex_unlock(&iocom->mtx);
434 }
435
436 /*
437  * I/O core loop for an iocom.
438  *
439  * Thread localized, iocom->mtx not held.
440  */
441 void
442 dmsg_iocom_core(dmsg_iocom_t *iocom)
443 {
444         struct pollfd fds[3];
445         char dummybuf[256];
446         dmsg_msg_t *msg;
447         int timeout;
448         int count;
449         int wi; /* wakeup pipe */
450         int si; /* socket */
451         int ai; /* alt bulk path socket */
452
453         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
454                 /*
455                  * These iocom->flags are only manipulated within the
456                  * context of the current thread.  However, modifications
457                  * still require atomic ops.
458                  */
459                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
460                                      DMSG_IOCOMF_WWORK |
461                                      DMSG_IOCOMF_PWORK |
462                                      DMSG_IOCOMF_SWORK |
463                                      DMSG_IOCOMF_ARWORK |
464                                      DMSG_IOCOMF_AWWORK)) == 0) {
465                         /*
466                          * Only poll if no immediate work is pending.
467                          * Otherwise we are just wasting our time calling
468                          * poll.
469                          */
470                         timeout = 5000;
471
472                         count = 0;
473                         wi = -1;
474                         si = -1;
475                         ai = -1;
476
477                         /*
478                          * Always check the inter-thread pipe, e.g.
479                          * for iocom->txmsgq work.
480                          */
481                         wi = count++;
482                         fds[wi].fd = iocom->wakeupfds[0];
483                         fds[wi].events = POLLIN;
484                         fds[wi].revents = 0;
485
486                         /*
487                          * Check the socket input/output direction as
488                          * requested
489                          */
490                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
491                                             DMSG_IOCOMF_WREQ)) {
492                                 si = count++;
493                                 fds[si].fd = iocom->sock_fd;
494                                 fds[si].events = 0;
495                                 fds[si].revents = 0;
496
497                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
498                                         fds[si].events |= POLLIN;
499                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
500                                         fds[si].events |= POLLOUT;
501                         }
502
503                         /*
504                          * Check the alternative fd for work.
505                          */
506                         if (iocom->alt_fd >= 0) {
507                                 ai = count++;
508                                 fds[ai].fd = iocom->alt_fd;
509                                 fds[ai].events = POLLIN;
510                                 fds[ai].revents = 0;
511                         }
512                         poll(fds, count, timeout);
513
514                         if (wi >= 0 && (fds[wi].revents & POLLIN))
515                                 atomic_set_int(&iocom->flags,
516                                                DMSG_IOCOMF_PWORK);
517                         if (si >= 0 && (fds[si].revents & POLLIN))
518                                 atomic_set_int(&iocom->flags,
519                                                DMSG_IOCOMF_RWORK);
520                         if (si >= 0 && (fds[si].revents & POLLOUT))
521                                 atomic_set_int(&iocom->flags,
522                                                DMSG_IOCOMF_WWORK);
523                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
524                                 atomic_set_int(&iocom->flags,
525                                                DMSG_IOCOMF_WWORK);
526                         if (ai >= 0 && (fds[ai].revents & POLLIN))
527                                 atomic_set_int(&iocom->flags,
528                                                DMSG_IOCOMF_ARWORK);
529                 } else {
530                         /*
531                          * Always check the pipe
532                          */
533                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
534                 }
535
536                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
537                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
538                         iocom->signal_callback(iocom);
539                 }
540
541                 /*
542                  * Pending message queues from other threads wake us up
543                  * with a write to the wakeupfds[] pipe.  We have to clear
544                  * the pipe with a dummy read.
545                  */
546                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
547                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
548                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
549                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
550                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
551                         if (TAILQ_FIRST(&iocom->txmsgq))
552                                 dmsg_iocom_flush1(iocom);
553                 }
554
555                 /*
556                  * Message write sequencing
557                  */
558                 if (iocom->flags & DMSG_IOCOMF_WWORK)
559                         dmsg_iocom_flush1(iocom);
560
561                 /*
562                  * Message read sequencing.  Run this after the write
563                  * sequencing in case the write sequencing allowed another
564                  * auto-DELETE to occur on the read side.
565                  */
566                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
567                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
568                                (msg = dmsg_ioq_read(iocom)) != NULL) {
569                                 if (DMsgDebugOpt) {
570                                         fprintf(stderr, "receive %s\n",
571                                                 dmsg_msg_str(msg));
572                                 }
573                                 iocom->rcvmsg_callback(msg);
574                                 dmsg_state_cleanuprx(iocom, msg);
575                         }
576                 }
577
578                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
579                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
580                         iocom->altmsg_callback(iocom);
581                 }
582         }
583 }
584
585 /*
586  * Make sure there's enough room in the FIFO to hold the
587  * needed data.
588  *
589  * Assume worst case encrypted form is 2x the size of the
590  * plaintext equivalent.
591  */
592 static
593 size_t
594 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
595 {
596         size_t bytes;
597         size_t nmax;
598
599         bytes = ioq->fifo_cdx - ioq->fifo_beg;
600         nmax = sizeof(ioq->buf) - ioq->fifo_end;
601         if (bytes + nmax / 2 < needed) {
602                 if (bytes) {
603                         bcopy(ioq->buf + ioq->fifo_beg,
604                               ioq->buf,
605                               bytes);
606                 }
607                 ioq->fifo_cdx -= ioq->fifo_beg;
608                 ioq->fifo_beg = 0;
609                 if (ioq->fifo_cdn < ioq->fifo_end) {
610                         bcopy(ioq->buf + ioq->fifo_cdn,
611                               ioq->buf + ioq->fifo_cdx,
612                               ioq->fifo_end - ioq->fifo_cdn);
613                 }
614                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
615                 ioq->fifo_cdn = ioq->fifo_cdx;
616                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
617         }
618         return(nmax);
619 }
620
621 /*
622  * Read the next ready message from the ioq, issuing I/O if needed.
623  * Caller should retry on a read-event when NULL is returned.
624  *
625  * If an error occurs during reception a DMSG_LNK_ERROR msg will
626  * be returned for each open transaction, then the ioq and iocom
627  * will be errored out and a non-transactional DMSG_LNK_ERROR
628  * msg will be returned as the final message.  The caller should not call
629  * us again after the final message is returned.
630  *
631  * Thread localized, iocom->mtx not held.
632  */
633 dmsg_msg_t *
634 dmsg_ioq_read(dmsg_iocom_t *iocom)
635 {
636         dmsg_ioq_t *ioq = &iocom->ioq_rx;
637         dmsg_msg_t *msg;
638         dmsg_state_t *state;
639         dmsg_circuit_t *circuit0;
640         dmsg_hdr_t *head;
641         ssize_t n;
642         size_t bytes;
643         size_t nmax;
644         uint32_t aux_size;
645         uint32_t xcrc32;
646         int error;
647
648 again:
649         /*
650          * If a message is already pending we can just remove and
651          * return it.  Message state has already been processed.
652          * (currently not implemented)
653          */
654         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
655                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
656                 return (msg);
657         }
658         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
659
660         /*
661          * If the stream is errored out we stop processing it.
662          */
663         if (ioq->error)
664                 goto skip;
665
666         /*
667          * Message read in-progress (msg is NULL at the moment).  We don't
668          * allocate a msg until we have its core header.
669          */
670         nmax = sizeof(ioq->buf) - ioq->fifo_end;
671         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
672         msg = ioq->msg;
673
674         switch(ioq->state) {
675         case DMSG_MSGQ_STATE_HEADER1:
676                 /*
677                  * Load the primary header, fail on any non-trivial read
678                  * error or on EOF.  Since the primary header is the same
679                  * size is the message alignment it will never straddle
680                  * the end of the buffer.
681                  */
682                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
683                 if (bytes < sizeof(msg->any.head)) {
684                         n = read(iocom->sock_fd,
685                                  ioq->buf + ioq->fifo_end,
686                                  nmax);
687                         if (n <= 0) {
688                                 if (n == 0) {
689                                         ioq->error = DMSG_IOQ_ERROR_EOF;
690                                         break;
691                                 }
692                                 if (errno != EINTR &&
693                                     errno != EINPROGRESS &&
694                                     errno != EAGAIN) {
695                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
696                                         break;
697                                 }
698                                 n = 0;
699                                 /* fall through */
700                         }
701                         ioq->fifo_end += (size_t)n;
702                         nmax -= (size_t)n;
703                 }
704
705                 /*
706                  * Decrypt data received so far.  Data will be decrypted
707                  * in-place but might create gaps in the FIFO.  Partial
708                  * blocks are not immediately decrypted.
709                  *
710                  * WARNING!  The header might be in the wrong endian, we
711                  *           do not fix it up until we get the entire
712                  *           extended header.
713                  */
714                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
715                         dmsg_crypto_decrypt(iocom, ioq);
716                 } else {
717                         ioq->fifo_cdx = ioq->fifo_end;
718                         ioq->fifo_cdn = ioq->fifo_end;
719                 }
720                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
721
722                 /*
723                  * Insufficient data accumulated (msg is NULL, caller will
724                  * retry on event).
725                  */
726                 assert(msg == NULL);
727                 if (bytes < sizeof(msg->any.head))
728                         break;
729
730                 /*
731                  * Check and fixup the core header.  Note that the icrc
732                  * has to be calculated before any fixups, but the crc
733                  * fields in the msg may have to be swapped like everything
734                  * else.
735                  */
736                 head = (void *)(ioq->buf + ioq->fifo_beg);
737                 if (head->magic != DMSG_HDR_MAGIC &&
738                     head->magic != DMSG_HDR_MAGIC_REV) {
739                         fprintf(stderr, "%s: head->magic is bad %02x\n",
740                                 iocom->label, head->magic);
741                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
742                                 fprintf(stderr, "(on encrypted link)\n");
743                         ioq->error = DMSG_IOQ_ERROR_SYNC;
744                         break;
745                 }
746
747                 /*
748                  * Calculate the full header size and aux data size
749                  */
750                 if (head->magic == DMSG_HDR_MAGIC_REV) {
751                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
752                                       DMSG_ALIGN;
753                         aux_size = bswap32(head->aux_bytes);
754                 } else {
755                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
756                                       DMSG_ALIGN;
757                         aux_size = head->aux_bytes;
758                 }
759                 ioq->abytes = DMSG_DOALIGN(aux_size);
760                 ioq->unaligned_aux_size = aux_size;
761                 if (ioq->hbytes < sizeof(msg->any.head) ||
762                     ioq->hbytes > sizeof(msg->any) ||
763                     ioq->abytes > DMSG_AUX_MAX) {
764                         ioq->error = DMSG_IOQ_ERROR_FIELD;
765                         break;
766                 }
767
768                 /*
769                  * Allocate the message, the next state will fill it in.
770                  * Note that the aux_data buffer will be sized to an aligned
771                  * value and the aligned remainder zero'd for convenience.
772                  */
773                 msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
774                                      ioq->hbytes / DMSG_ALIGN,
775                                      NULL, NULL);
776                 ioq->msg = msg;
777
778                 /*
779                  * Fall through to the next state.  Make sure that the
780                  * extended header does not straddle the end of the buffer.
781                  * We still want to issue larger reads into our buffer,
782                  * book-keeping is easier if we don't bcopy() yet.
783                  *
784                  * Make sure there is enough room for bloated encrypt data.
785                  */
786                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
787                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
788                 /* fall through */
789         case DMSG_MSGQ_STATE_HEADER2:
790                 /*
791                  * Fill out the extended header.
792                  */
793                 assert(msg != NULL);
794                 if (bytes < ioq->hbytes) {
795                         n = read(iocom->sock_fd,
796                                  ioq->buf + ioq->fifo_end,
797                                  nmax);
798                         if (n <= 0) {
799                                 if (n == 0) {
800                                         ioq->error = DMSG_IOQ_ERROR_EOF;
801                                         break;
802                                 }
803                                 if (errno != EINTR &&
804                                     errno != EINPROGRESS &&
805                                     errno != EAGAIN) {
806                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
807                                         break;
808                                 }
809                                 n = 0;
810                                 /* fall through */
811                         }
812                         ioq->fifo_end += (size_t)n;
813                         nmax -= (size_t)n;
814                 }
815
816                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
817                         dmsg_crypto_decrypt(iocom, ioq);
818                 } else {
819                         ioq->fifo_cdx = ioq->fifo_end;
820                         ioq->fifo_cdn = ioq->fifo_end;
821                 }
822                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
823
824                 /*
825                  * Insufficient data accumulated (set msg NULL so caller will
826                  * retry on event).
827                  */
828                 if (bytes < ioq->hbytes) {
829                         msg = NULL;
830                         break;
831                 }
832
833                 /*
834                  * Calculate the extended header, decrypt data received
835                  * so far.  Handle endian-conversion for the entire extended
836                  * header.
837                  */
838                 head = (void *)(ioq->buf + ioq->fifo_beg);
839
840                 /*
841                  * Check the CRC.
842                  */
843                 if (head->magic == DMSG_HDR_MAGIC_REV)
844                         xcrc32 = bswap32(head->hdr_crc);
845                 else
846                         xcrc32 = head->hdr_crc;
847                 head->hdr_crc = 0;
848                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
849                         ioq->error = DMSG_IOQ_ERROR_XCRC;
850                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
851                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
852                                 dmsg_msg_str(msg));
853                         assert(0);
854                         break;
855                 }
856                 head->hdr_crc = xcrc32;
857
858                 if (head->magic == DMSG_HDR_MAGIC_REV) {
859                         dmsg_bswap_head(head);
860                 }
861
862                 /*
863                  * Copy the extended header into the msg and adjust the
864                  * FIFO.
865                  */
866                 bcopy(head, &msg->any, ioq->hbytes);
867
868                 /*
869                  * We are either done or we fall-through.
870                  */
871                 if (ioq->abytes == 0) {
872                         ioq->fifo_beg += ioq->hbytes;
873                         break;
874                 }
875
876                 /*
877                  * Must adjust bytes (and the state) when falling through.
878                  * nmax doesn't change.
879                  */
880                 ioq->fifo_beg += ioq->hbytes;
881                 bytes -= ioq->hbytes;
882                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
883                 /* fall through */
884         case DMSG_MSGQ_STATE_AUXDATA1:
885                 /*
886                  * Copy the partial or complete [decrypted] payload from
887                  * remaining bytes in the FIFO in order to optimize the
888                  * makeroom call in the AUXDATA2 state.  We have to
889                  * fall-through either way so we can check the crc.
890                  *
891                  * msg->aux_size tracks our aux data.
892                  *
893                  * (Lets not complicate matters if the data is encrypted,
894                  *  since the data in-stream is not the same size as the
895                  *  data decrypted).
896                  */
897                 if (bytes >= ioq->abytes) {
898                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
899                               ioq->abytes);
900                         msg->aux_size = ioq->abytes;
901                         ioq->fifo_beg += ioq->abytes;
902                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
903                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
904                         bytes -= ioq->abytes;
905                 } else if (bytes) {
906                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
907                               bytes);
908                         msg->aux_size = bytes;
909                         ioq->fifo_beg += bytes;
910                         if (ioq->fifo_cdx < ioq->fifo_beg)
911                                 ioq->fifo_cdx = ioq->fifo_beg;
912                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
913                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
914                         bytes = 0;
915                 } else {
916                         msg->aux_size = 0;
917                 }
918                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
919                 /* fall through */
920         case DMSG_MSGQ_STATE_AUXDATA2:
921                 /*
922                  * Make sure there is enough room for more data.
923                  */
924                 assert(msg);
925                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
926
927                 /*
928                  * Read and decrypt more of the payload.
929                  */
930                 if (msg->aux_size < ioq->abytes) {
931                         assert(bytes == 0);
932                         n = read(iocom->sock_fd,
933                                  ioq->buf + ioq->fifo_end,
934                                  nmax);
935                         if (n <= 0) {
936                                 if (n == 0) {
937                                         ioq->error = DMSG_IOQ_ERROR_EOF;
938                                         break;
939                                 }
940                                 if (errno != EINTR &&
941                                     errno != EINPROGRESS &&
942                                     errno != EAGAIN) {
943                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
944                                         break;
945                                 }
946                                 n = 0;
947                                 /* fall through */
948                         }
949                         ioq->fifo_end += (size_t)n;
950                         nmax -= (size_t)n;
951                 }
952
953                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
954                         dmsg_crypto_decrypt(iocom, ioq);
955                 } else {
956                         ioq->fifo_cdx = ioq->fifo_end;
957                         ioq->fifo_cdn = ioq->fifo_end;
958                 }
959                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
960
961                 if (bytes > ioq->abytes - msg->aux_size)
962                         bytes = ioq->abytes - msg->aux_size;
963
964                 if (bytes) {
965                         bcopy(ioq->buf + ioq->fifo_beg,
966                               msg->aux_data + msg->aux_size,
967                               bytes);
968                         msg->aux_size += bytes;
969                         ioq->fifo_beg += bytes;
970                 }
971
972                 /*
973                  * Insufficient data accumulated (set msg NULL so caller will
974                  * retry on event).
975                  *
976                  * Assert the auxillary data size is correct, then record the
977                  * original unaligned size from the message header.
978                  */
979                 if (msg->aux_size < ioq->abytes) {
980                         msg = NULL;
981                         break;
982                 }
983                 assert(msg->aux_size == ioq->abytes);
984                 msg->aux_size = ioq->unaligned_aux_size;
985
986                 /*
987                  * Check aux_crc, then we are done.  Note that the crc
988                  * is calculated over the aligned size, not the actual
989                  * size.
990                  */
991                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
992                 if (xcrc32 != msg->any.head.aux_crc) {
993                         ioq->error = DMSG_IOQ_ERROR_ACRC;
994                         fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
995                                 xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
996                         break;
997                 }
998                 break;
999         case DMSG_MSGQ_STATE_ERROR:
1000                 /*
1001                  * Continued calls to drain recorded transactions (returning
1002                  * a LNK_ERROR for each one), before we return the final
1003                  * LNK_ERROR.
1004                  */
1005                 assert(msg == NULL);
1006                 break;
1007         default:
1008                 /*
1009                  * We don't double-return errors, the caller should not
1010                  * have called us again after getting an error msg.
1011                  */
1012                 assert(0);
1013                 break;
1014         }
1015
1016         /*
1017          * Check the message sequence.  The iv[] should prevent any
1018          * possibility of a replay but we add this check anyway.
1019          */
1020         if (msg && ioq->error == 0) {
1021                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1022                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1023                 } else {
1024                         ++ioq->seq;
1025                 }
1026         }
1027
1028         /*
1029          * Handle error, RREQ, or completion
1030          *
1031          * NOTE: nmax and bytes are invalid at this point, we don't bother
1032          *       to update them when breaking out.
1033          */
1034         if (ioq->error) {
1035 skip:
1036                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1037                 /*
1038                  * An unrecoverable error causes all active receive
1039                  * transactions to be terminated with a LNK_ERROR message.
1040                  *
1041                  * Once all active transactions are exhausted we set the
1042                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1043                  * message, which should cause master processing loops to
1044                  * terminate.
1045                  */
1046                 assert(ioq->msg == msg);
1047                 if (msg) {
1048                         dmsg_msg_free(msg);
1049                         ioq->msg = NULL;
1050                 }
1051
1052                 /*
1053                  * No more I/O read processing
1054                  */
1055                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1056
1057                 /*
1058                  * Simulate a remote LNK_ERROR DELETE msg for any open
1059                  * transactions, ending with a final non-transactional
1060                  * LNK_ERROR (that the session can detect) when no
1061                  * transactions remain.
1062                  *
1063                  * We only need to scan transactions on circuit0 as these
1064                  * will contain all circuit forges, and terminating circuit
1065                  * forges will automatically terminate the transactions on
1066                  * any other circuits as well as those circuits.
1067                  */
1068                 circuit0 = &iocom->circuit0;
1069                 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
1070                 msg->any.head.error = ioq->error;
1071
1072                 pthread_mutex_lock(&iocom->mtx);
1073                 dmsg_iocom_drain(iocom);
1074
1075                 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1076                         /*
1077                          * Active remote transactions are still present.
1078                          * Simulate the other end sending us a DELETE.
1079                          */
1080                         if (state->rxcmd & DMSGF_DELETE) {
1081                                 dmsg_msg_free(msg);
1082                                 fprintf(stderr,
1083                                         "iocom: ioq error %d sleeping\n",
1084                                         ioq->error);
1085                                 sleep(1);       /* XXX */
1086                                 atomic_set_int(&iocom->flags,
1087                                                DMSG_IOCOMF_RWORK);
1088                                 msg = NULL;
1089                         } else {
1090                                 /*state->txcmd |= DMSGF_DELETE;*/
1091                                 msg->state = state;
1092                                 msg->iocom = iocom;
1093                                 msg->any.head.msgid = state->msgid;
1094                                 msg->any.head.cmd |= DMSGF_ABORT |
1095                                                      DMSGF_DELETE;
1096                         }
1097                 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
1098                         /*
1099                          * Active local transactions are still present.
1100                          * Simulate the other end sending us a DELETE.
1101                          */
1102                         if (state->rxcmd & DMSGF_DELETE) {
1103                                 dmsg_msg_free(msg);
1104                                 fprintf(stderr,
1105                                         "iocom: ioq error %d sleeping\n",
1106                                         ioq->error);
1107                                 sleep(1);       /* XXX */
1108                                 atomic_set_int(&iocom->flags,
1109                                                DMSG_IOCOMF_RWORK);
1110                                 msg = NULL;
1111                         } else {
1112                                 msg->state = state;
1113                                 msg->iocom = iocom;
1114                                 msg->any.head.msgid = state->msgid;
1115                                 msg->any.head.cmd |= DMSGF_ABORT |
1116                                                      DMSGF_DELETE |
1117                                                      DMSGF_REPLY;
1118                                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1119                                         msg->any.head.cmd |=
1120                                                      DMSGF_CREATE;
1121                                 }
1122                         }
1123                 } else {
1124                         /*
1125                          * No active local or remote transactions remain.
1126                          * Generate a final LNK_ERROR and flag EOF.
1127                          */
1128                         msg->state = NULL;
1129                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1130                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1131                 }
1132                 pthread_mutex_unlock(&iocom->mtx);
1133
1134                 /*
1135                  * For the iocom error case we want to set RWORK to indicate
1136                  * that more messages might be pending.
1137                  *
1138                  * It is possible to return NULL when there is more work to
1139                  * do because each message has to be DELETEd in both
1140                  * directions before we continue on with the next (though
1141                  * this could be optimized).  The transmit direction will
1142                  * re-set RWORK.
1143                  */
1144                 if (msg)
1145                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1146         } else if (msg == NULL) {
1147                 /*
1148                  * Insufficient data received to finish building the message,
1149                  * set RREQ and return NULL.
1150                  *
1151                  * Leave ioq->msg intact.
1152                  * Leave the FIFO intact.
1153                  */
1154                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1155         } else {
1156                 /*
1157                  * Continue processing msg.
1158                  *
1159                  * The fifo has already been advanced past the message.
1160                  * Trivially reset the FIFO indices if possible.
1161                  *
1162                  * clear the FIFO if it is now empty and set RREQ to wait
1163                  * for more from the socket.  If the FIFO is not empty set
1164                  * TWORK to bypass the poll so we loop immediately.
1165                  */
1166                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1167                     ioq->fifo_cdn == ioq->fifo_end) {
1168                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1169                         ioq->fifo_cdx = 0;
1170                         ioq->fifo_cdn = 0;
1171                         ioq->fifo_beg = 0;
1172                         ioq->fifo_end = 0;
1173                 } else {
1174                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1175                 }
1176                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1177                 ioq->msg = NULL;
1178
1179                 /*
1180                  * Handle message routing.  Validates non-zero sources
1181                  * and routes message.  Error will be 0 if the message is
1182                  * destined for us.
1183                  *
1184                  * State processing only occurs for messages destined for us.
1185                  */
1186                 if (DMsgDebugOpt >= 5) {
1187                         fprintf(stderr,
1188                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1189                                 msg->any.head.cmd,
1190                                 (intmax_t)msg->any.head.msgid,
1191                                 (intmax_t)msg->any.head.circuit);
1192                 }
1193                 if (msg->any.head.circuit)
1194                         error = dmsg_circuit_relay(msg);
1195                 else
1196                         error = dmsg_state_msgrx(msg);
1197
1198                 if (error) {
1199                         /*
1200                          * Abort-after-closure, throw message away and
1201                          * start reading another.
1202                          */
1203                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1204                                 dmsg_msg_free(msg);
1205                                 goto again;
1206                         }
1207
1208                         /*
1209                          * msg routed, msg pointer no longer owned by us.
1210                          * Go to the top and start reading another.
1211                          */
1212                         if (error == DMSG_IOQ_ERROR_ROUTED)
1213                                 goto again;
1214
1215                         /*
1216                          * Process real error and throw away message.
1217                          */
1218                         ioq->error = error;
1219                         goto skip;
1220                 }
1221                 /* no error, not routed.  Fall through and return msg */
1222         }
1223         return (msg);
1224 }
1225
1226 /*
1227  * Calculate the header and data crc's and write a low-level message to
1228  * the connection.  If aux_crc is non-zero the aux_data crc is already
1229  * assumed to have been set.
1230  *
1231  * A non-NULL msg is added to the queue but not necessarily flushed.
1232  * Calling this function with msg == NULL will get a flush going.
1233  *
1234  * (called from iocom_core only)
1235  */
1236 void
1237 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1238 {
1239         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1240         dmsg_msg_t *msg;
1241         uint32_t xcrc32;
1242         size_t hbytes;
1243         size_t abytes;
1244         dmsg_msg_queue_t tmpq;
1245
1246         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1247         TAILQ_INIT(&tmpq);
1248         pthread_mutex_lock(&iocom->mtx);
1249         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1250                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1251                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1252         }
1253         pthread_mutex_unlock(&iocom->mtx);
1254
1255         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1256                 /*
1257                  * Process terminal connection errors.
1258                  */
1259                 TAILQ_REMOVE(&tmpq, msg, qentry);
1260                 if (ioq->error) {
1261                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1262                         ++ioq->msgcount;
1263                         continue;
1264                 }
1265
1266                 /*
1267                  * Finish populating the msg fields.  The salt ensures that
1268                  * the iv[] array is ridiculously randomized and we also
1269                  * re-seed our PRNG every 32768 messages just to be sure.
1270                  */
1271                 msg->any.head.magic = DMSG_HDR_MAGIC;
1272                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1273                 ++ioq->seq;
1274                 if ((ioq->seq & 32767) == 0)
1275                         srandomdev();
1276
1277                 /*
1278                  * Calculate aux_crc if 0, then calculate hdr_crc.
1279                  */
1280                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1281                         abytes = DMSG_DOALIGN(msg->aux_size);
1282                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1283                         msg->any.head.aux_crc = xcrc32;
1284                 }
1285                 msg->any.head.aux_bytes = msg->aux_size;
1286
1287                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1288                          DMSG_ALIGN;
1289                 msg->any.head.hdr_crc = 0;
1290                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1291
1292                 /*
1293                  * Enqueue the message (the flush codes handles stream
1294                  * encryption).
1295                  */
1296                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1297                 ++ioq->msgcount;
1298         }
1299         dmsg_iocom_flush2(iocom);
1300 }
1301
1302 /*
1303  * Thread localized, iocom->mtx not held by caller.
1304  *
1305  * (called from iocom_core via iocom_flush1 only)
1306  */
1307 void
1308 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1309 {
1310         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1311         dmsg_msg_t *msg;
1312         ssize_t n;
1313         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1314         size_t nact;
1315         size_t hbytes;
1316         size_t abytes;
1317         size_t hoff;
1318         size_t aoff;
1319         int iovcnt;
1320
1321         if (ioq->error) {
1322                 dmsg_iocom_drain(iocom);
1323                 return;
1324         }
1325
1326         /*
1327          * Pump messages out the connection by building an iovec.
1328          *
1329          * ioq->hbytes/ioq->abytes tracks how much of the first message
1330          * in the queue has been successfully written out, so we can
1331          * resume writing.
1332          */
1333         iovcnt = 0;
1334         nact = 0;
1335         hoff = ioq->hbytes;
1336         aoff = ioq->abytes;
1337
1338         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1339                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1340                          DMSG_ALIGN;
1341                 abytes = DMSG_DOALIGN(msg->aux_size);
1342                 assert(hoff <= hbytes && aoff <= abytes);
1343
1344                 if (hoff < hbytes) {
1345                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1346                         iov[iovcnt].iov_len = hbytes - hoff;
1347                         nact += hbytes - hoff;
1348                         ++iovcnt;
1349                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1350                                 break;
1351                 }
1352                 if (aoff < abytes) {
1353                         assert(msg->aux_data != NULL);
1354                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1355                         iov[iovcnt].iov_len = abytes - aoff;
1356                         nact += abytes - aoff;
1357                         ++iovcnt;
1358                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1359                                 break;
1360                 }
1361                 hoff = 0;
1362                 aoff = 0;
1363         }
1364         if (iovcnt == 0)
1365                 return;
1366
1367         /*
1368          * Encrypt and write the data.  The crypto code will move the
1369          * data into the fifo and adjust the iov as necessary.  If
1370          * encryption is disabled the iov is left alone.
1371          *
1372          * May return a smaller iov (thus a smaller n), with aggregated
1373          * chunks.  May reduce nmax to what fits in the FIFO.
1374          *
1375          * This function sets nact to the number of original bytes now
1376          * encrypted, adding to the FIFO some number of bytes that might
1377          * be greater depending on the crypto mechanic.  iov[] is adjusted
1378          * to point at the FIFO if necessary.
1379          *
1380          * NOTE: The return value from the writev() is the post-encrypted
1381          *       byte count, not the plaintext count.
1382          */
1383         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1384                 /*
1385                  * Make sure the FIFO has a reasonable amount of space
1386                  * left (if not completely full).
1387                  *
1388                  * In this situation we are staging the encrypted message
1389                  * data in the FIFO.  (nact) represents how much plaintext
1390                  * has been staged, (n) represents how much encrypted data
1391                  * has been flushed.  The two are independent of each other.
1392                  */
1393                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1394                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1395                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1396                               ioq->fifo_end - ioq->fifo_beg);
1397                         ioq->fifo_cdx -= ioq->fifo_beg;
1398                         ioq->fifo_cdn -= ioq->fifo_beg;
1399                         ioq->fifo_end -= ioq->fifo_beg;
1400                         ioq->fifo_beg = 0;
1401                 }
1402
1403                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1404                 n = writev(iocom->sock_fd, iov, iovcnt);
1405                 if (n > 0) {
1406                         ioq->fifo_beg += n;
1407                         ioq->fifo_cdn += n;
1408                         ioq->fifo_cdx += n;
1409                         if (ioq->fifo_beg == ioq->fifo_end) {
1410                                 ioq->fifo_beg = 0;
1411                                 ioq->fifo_cdn = 0;
1412                                 ioq->fifo_cdx = 0;
1413                                 ioq->fifo_end = 0;
1414                         }
1415                 }
1416                 /*
1417                  * We don't mess with the nact returned by the crypto_encrypt
1418                  * call, which represents the filling of the FIFO.  (n) tells
1419                  * us how much we were able to write from the FIFO.  The two
1420                  * are different beasts when encrypting.
1421                  */
1422         } else {
1423                 /*
1424                  * In this situation we are not staging the messages to the
1425                  * FIFO but instead writing them directly from the msg
1426                  * structure(s), so (nact) is basically (n).
1427                  */
1428                 n = writev(iocom->sock_fd, iov, iovcnt);
1429                 if (n > 0)
1430                         nact = n;
1431                 else
1432                         nact = 0;
1433         }
1434
1435         /*
1436          * Clean out the transmit queue based on what we successfully
1437          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1438          * represents the portion of the first message previously sent.
1439          */
1440         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1441                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1442                          DMSG_ALIGN;
1443                 abytes = DMSG_DOALIGN(msg->aux_size);
1444
1445                 if ((size_t)nact < hbytes - ioq->hbytes) {
1446                         ioq->hbytes += nact;
1447                         nact = 0;
1448                         break;
1449                 }
1450                 nact -= hbytes - ioq->hbytes;
1451                 ioq->hbytes = hbytes;
1452                 if ((size_t)nact < abytes - ioq->abytes) {
1453                         ioq->abytes += nact;
1454                         nact = 0;
1455                         break;
1456                 }
1457                 nact -= abytes - ioq->abytes;
1458                 /* ioq->abytes = abytes; optimized out */
1459
1460                 if (DMsgDebugOpt >= 5) {
1461                         fprintf(stderr,
1462                                 "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1463                                 msg->any.head.cmd,
1464                                 (intmax_t)msg->any.head.msgid,
1465                                 (intmax_t)msg->any.head.circuit);
1466                 }
1467
1468                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1469                 --ioq->msgcount;
1470                 ioq->hbytes = 0;
1471                 ioq->abytes = 0;
1472
1473                 dmsg_state_cleanuptx(msg);
1474         }
1475         assert(nact == 0);
1476
1477         /*
1478          * Process the return value from the write w/regards to blocking.
1479          */
1480         if (n < 0) {
1481                 if (errno != EINTR &&
1482                     errno != EINPROGRESS &&
1483                     errno != EAGAIN) {
1484                         /*
1485                          * Fatal write error
1486                          */
1487                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1488                         dmsg_iocom_drain(iocom);
1489                 } else {
1490                         /*
1491                          * Wait for socket buffer space
1492                          */
1493                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1494                 }
1495         } else {
1496                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1497         }
1498         if (ioq->error) {
1499                 dmsg_iocom_drain(iocom);
1500         }
1501 }
1502
1503 /*
1504  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1505  * write events will occur.  We don't kill read msgs because we want
1506  * the caller to pull off our contrived terminal error msg to detect
1507  * the connection failure.
1508  *
1509  * Localized to iocom_core thread, iocom->mtx not held by caller.
1510  */
1511 void
1512 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1513 {
1514         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1515         dmsg_msg_t *msg;
1516
1517         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1518         ioq->hbytes = 0;
1519         ioq->abytes = 0;
1520
1521         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1522                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1523                 --ioq->msgcount;
1524                 dmsg_state_cleanuptx(msg);
1525         }
1526 }
1527
1528 /*
1529  * Write a message to an iocom, with additional state processing.
1530  */
1531 void
1532 dmsg_msg_write(dmsg_msg_t *msg)
1533 {
1534         dmsg_iocom_t *iocom = msg->iocom;
1535         dmsg_state_t *state;
1536         char dummy;
1537
1538         /*
1539          * Handle state processing, create state if necessary.
1540          */
1541         pthread_mutex_lock(&iocom->mtx);
1542         if ((state = msg->state) != NULL) {
1543                 /*
1544                  * Existing transaction (could be reply).  It is also
1545                  * possible for this to be the first reply (CREATE is set),
1546                  * in which case we populate state->txcmd.
1547                  *
1548                  * state->txcmd is adjusted to hold the final message cmd,
1549                  * and we also be sure to set the CREATE bit here.  We did
1550                  * not set it in dmsg_msg_alloc() because that would have
1551                  * not been serialized (state could have gotten ripped out
1552                  * from under the message prior to it being transmitted).
1553                  */
1554                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1555                     DMSGF_CREATE) {
1556                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1557                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1558                 }
1559                 msg->any.head.msgid = state->msgid;
1560                 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1561                 if (msg->any.head.cmd & DMSGF_CREATE) {
1562                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1563                 }
1564         }
1565
1566         /*
1567          * Queue it for output, wake up the I/O pthread.  Note that the
1568          * I/O thread is responsible for generating the CRCs and encryption.
1569          */
1570         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1571         dummy = 0;
1572         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1573         pthread_mutex_unlock(&iocom->mtx);
1574 }
1575
1576 /*
1577  * This is a shortcut to formulate a reply to msg with a simple error code,
1578  * It can reply to and terminate a transaction, or it can reply to a one-way
1579  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1580  * the error code (which can be 0).  Not all transactions are terminated
1581  * with DMSG_LNK_ERROR status (the low level only cares about the
1582  * MSGF_DELETE flag), but most are.
1583  *
1584  * Replies to one-way messages are a bit of an oxymoron but the feature
1585  * is used by the debug (DBG) protocol.
1586  *
1587  * The reply contains no extended data.
1588  */
1589 void
1590 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1591 {
1592         dmsg_state_t *state = msg->state;
1593         dmsg_msg_t *nmsg;
1594         uint32_t cmd;
1595
1596
1597         /*
1598          * Reply with a simple error code and terminate the transaction.
1599          */
1600         cmd = DMSG_LNK_ERROR;
1601
1602         /*
1603          * Check if our direction has even been initiated yet, set CREATE.
1604          *
1605          * Check what direction this is (command or reply direction).  Note
1606          * that txcmd might not have been initiated yet.
1607          *
1608          * If our direction has already been closed we just return without
1609          * doing anything.
1610          */
1611         if (state) {
1612                 if (state->txcmd & DMSGF_DELETE)
1613                         return;
1614                 if (state->txcmd & DMSGF_REPLY)
1615                         cmd |= DMSGF_REPLY;
1616                 cmd |= DMSGF_DELETE;
1617         } else {
1618                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1619                         cmd |= DMSGF_REPLY;
1620         }
1621
1622         /*
1623          * Allocate the message and associate it with the existing state.
1624          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1625          * allocate new state.  We have our state already.
1626          */
1627         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1628         if (state) {
1629                 if ((state->txcmd & DMSGF_CREATE) == 0)
1630                         nmsg->any.head.cmd |= DMSGF_CREATE;
1631         }
1632         nmsg->any.head.error = error;
1633         nmsg->any.head.msgid = msg->any.head.msgid;
1634         nmsg->any.head.circuit = msg->any.head.circuit;
1635         nmsg->state = state;
1636         dmsg_msg_write(nmsg);
1637 }
1638
1639 /*
1640  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1641  * we are generating a streaming reply or an intermediate acknowledgement
1642  * of some sort as part of the higher level protocol, with more to come
1643  * later.
1644  */
1645 void
1646 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1647 {
1648         dmsg_state_t *state = msg->state;
1649         dmsg_msg_t *nmsg;
1650         uint32_t cmd;
1651
1652
1653         /*
1654          * Reply with a simple error code and terminate the transaction.
1655          */
1656         cmd = DMSG_LNK_ERROR;
1657
1658         /*
1659          * Check if our direction has even been initiated yet, set CREATE.
1660          *
1661          * Check what direction this is (command or reply direction).  Note
1662          * that txcmd might not have been initiated yet.
1663          *
1664          * If our direction has already been closed we just return without
1665          * doing anything.
1666          */
1667         if (state) {
1668                 if (state->txcmd & DMSGF_DELETE)
1669                         return;
1670                 if (state->txcmd & DMSGF_REPLY)
1671                         cmd |= DMSGF_REPLY;
1672                 /* continuing transaction, do not set MSGF_DELETE */
1673         } else {
1674                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1675                         cmd |= DMSGF_REPLY;
1676         }
1677
1678         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1679         if (state) {
1680                 if ((state->txcmd & DMSGF_CREATE) == 0)
1681                         nmsg->any.head.cmd |= DMSGF_CREATE;
1682         }
1683         nmsg->any.head.error = error;
1684         nmsg->any.head.msgid = msg->any.head.msgid;
1685         nmsg->any.head.circuit = msg->any.head.circuit;
1686         nmsg->state = state;
1687         dmsg_msg_write(nmsg);
1688 }
1689
1690 /*
1691  * Terminate a transaction given a state structure by issuing a DELETE.
1692  */
1693 void
1694 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1695 {
1696         dmsg_msg_t *nmsg;
1697         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1698
1699         /*
1700          * Nothing to do if we already transmitted a delete
1701          */
1702         if (state->txcmd & DMSGF_DELETE)
1703                 return;
1704
1705         /*
1706          * Set REPLY if the other end initiated the command.  Otherwise
1707          * we are the command direction.
1708          */
1709         if (state->txcmd & DMSGF_REPLY)
1710                 cmd |= DMSGF_REPLY;
1711
1712         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1713         if (state) {
1714                 if ((state->txcmd & DMSGF_CREATE) == 0)
1715                         nmsg->any.head.cmd |= DMSGF_CREATE;
1716         }
1717         nmsg->any.head.error = error;
1718         nmsg->any.head.msgid = state->msgid;
1719         nmsg->any.head.circuit = state->msg->any.head.circuit;
1720         nmsg->state = state;
1721         dmsg_msg_write(nmsg);
1722 }
1723
1724 /*
1725  * Terminate a transaction given a state structure by issuing a DELETE.
1726  */
1727 void
1728 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1729 {
1730         dmsg_msg_t *nmsg;
1731         uint32_t cmd = DMSG_LNK_ERROR;
1732
1733         /*
1734          * Nothing to do if we already transmitted a delete
1735          */
1736         if (state->txcmd & DMSGF_DELETE)
1737                 return;
1738
1739         /*
1740          * Set REPLY if the other end initiated the command.  Otherwise
1741          * we are the command direction.
1742          */
1743         if (state->txcmd & DMSGF_REPLY)
1744                 cmd |= DMSGF_REPLY;
1745
1746         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1747         if (state) {
1748                 if ((state->txcmd & DMSGF_CREATE) == 0)
1749                         nmsg->any.head.cmd |= DMSGF_CREATE;
1750         }
1751         nmsg->any.head.error = error;
1752         nmsg->any.head.msgid = state->msgid;
1753         nmsg->any.head.circuit = state->msg->any.head.circuit;
1754         nmsg->state = state;
1755         dmsg_msg_write(nmsg);
1756 }
1757
1758 /************************************************************************
1759  *                      TRANSACTION STATE HANDLING                      *
1760  ************************************************************************
1761  *
1762  */
1763
1764 /*
1765  * Process circuit and state tracking for a message after reception, prior
1766  * to execution.
1767  *
1768  * Called with msglk held and the msg dequeued.
1769  *
1770  * All messages are called with dummy state and return actual state.
1771  * (One-off messages often just return the same dummy state).
1772  *
1773  * May request that caller discard the message by setting *discardp to 1.
1774  * The returned state is not used in this case and is allowed to be NULL.
1775  *
1776  * --
1777  *
1778  * These routines handle persistent and command/reply message state via the
1779  * CREATE and DELETE flags.  The first message in a command or reply sequence
1780  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1781  *
1782  * There can be any number of intermediate messages belonging to the same
1783  * sequence sent inbetween the CREATE message and the DELETE message,
1784  * which set neither flag.  This represents a streaming command or reply.
1785  *
1786  * Any command message received with CREATE set expects a reply sequence to
1787  * be returned.  Reply sequences work the same as command sequences except the
1788  * REPLY bit is also sent.  Both the command side and reply side can
1789  * degenerate into a single message with both CREATE and DELETE set.  Note
1790  * that one side can be streaming and the other side not, or neither, or both.
1791  *
1792  * The msgid is unique for the initiator.  That is, two sides sending a new
1793  * message can use the same msgid without colliding.
1794  *
1795  * --
1796  *
1797  * ABORT sequences work by setting the ABORT flag along with normal message
1798  * state.  However, ABORTs can also be sent on half-closed messages, that is
1799  * even if the command or reply side has already sent a DELETE, as long as
1800  * the message has not been fully closed it can still send an ABORT+DELETE
1801  * to terminate the half-closed message state.
1802  *
1803  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1804  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1805  * also race, and in this situation the other side might have already
1806  * initiated a new unrelated command with the same message id.  Since
1807  * the abort has not set the CREATE flag the situation can be detected
1808  * and the message will also be discarded.
1809  *
1810  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1811  * The ABORT request is essentially integrated into the command instead
1812  * of being sent later on.  In this situation the command implementation
1813  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1814  * special-case non-blocking operation for the command.
1815  *
1816  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1817  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1818  *        one-way messages are not supported.
1819  *
1820  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1821  *        simply ignored.
1822  *
1823  * --
1824  *
1825  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1826  * set.  One-off messages cannot be aborted and typically aren't processed
1827  * by these routines.  The REPLY bit can be used to distinguish whether a
1828  * one-off message is a command or reply.  For example, one-off replies
1829  * will typically just contain status updates.
1830  */
1831 static int
1832 dmsg_state_msgrx(dmsg_msg_t *msg)
1833 {
1834         dmsg_iocom_t *iocom = msg->iocom;
1835         dmsg_circuit_t *circuit;
1836         dmsg_circuit_t *ocircuit;
1837         dmsg_state_t *state;
1838         dmsg_state_t sdummy;
1839         dmsg_circuit_t cdummy;
1840         int error;
1841
1842         pthread_mutex_lock(&iocom->mtx);
1843
1844         /*
1845          * Locate existing persistent circuit and state, if any.
1846          */
1847         if (msg->any.head.circuit == 0) {
1848                 circuit = &iocom->circuit0;
1849         } else {
1850                 cdummy.msgid = msg->any.head.circuit;
1851                 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1852                                   &cdummy);
1853                 if (circuit == NULL) {
1854                         pthread_mutex_unlock(&iocom->mtx);
1855                         return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1856                 }
1857         }
1858
1859         /*
1860          * Replace circuit0 with actual
1861          */
1862         dmsg_circuit_hold(circuit);
1863         ocircuit = msg->circuit;
1864         msg->circuit = circuit;
1865
1866         /*
1867          * If received msg is a command state is on staterd_tree.
1868          * If received msg is a reply state is on statewr_tree.
1869          */
1870         sdummy.msgid = msg->any.head.msgid;
1871         if (msg->any.head.cmd & DMSGF_REPLY) {
1872                 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1873                                 &sdummy);
1874         } else {
1875                 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1876                                 &sdummy);
1877         }
1878         msg->state = state;
1879
1880         pthread_mutex_unlock(&iocom->mtx);
1881         if (ocircuit)
1882                 dmsg_circuit_drop(ocircuit);
1883
1884         /*
1885          * Short-cut one-off or mid-stream messages (state may be NULL).
1886          */
1887         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1888                                   DMSGF_ABORT)) == 0) {
1889                 return(0);
1890         }
1891
1892         /*
1893          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1894          * inside the case statements.
1895          */
1896         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1897                                     DMSGF_REPLY)) {
1898         case DMSGF_CREATE:
1899         case DMSGF_CREATE | DMSGF_DELETE:
1900                 /*
1901                  * New persistant command received.
1902                  */
1903                 if (state) {
1904                         fprintf(stderr, "duplicate-trans %s\n",
1905                                 dmsg_msg_str(msg));
1906                         error = DMSG_IOQ_ERROR_TRANS;
1907                         assert(0);
1908                         break;
1909                 }
1910                 state = malloc(sizeof(*state));
1911                 bzero(state, sizeof(*state));
1912                 state->iocom = iocom;
1913                 state->circuit = circuit;
1914                 state->flags = DMSG_STATE_DYNAMIC;
1915                 state->msg = msg;
1916                 state->txcmd = DMSGF_REPLY;
1917                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1918                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1919                 state->flags |= DMSG_STATE_INSERTED;
1920                 state->msgid = msg->any.head.msgid;
1921                 msg->state = state;
1922                 pthread_mutex_lock(&iocom->mtx);
1923                 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1924                 pthread_mutex_unlock(&iocom->mtx);
1925                 error = 0;
1926                 if (DMsgDebugOpt) {
1927                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1928                                 state, (uint32_t)state->msgid, iocom);
1929                 }
1930                 break;
1931         case DMSGF_DELETE:
1932                 /*
1933                  * Persistent state is expected but might not exist if an
1934                  * ABORT+DELETE races the close.
1935                  */
1936                 if (state == NULL) {
1937                         if (msg->any.head.cmd & DMSGF_ABORT) {
1938                                 error = DMSG_IOQ_ERROR_EALREADY;
1939                         } else {
1940                                 fprintf(stderr, "missing-state %s\n",
1941                                         dmsg_msg_str(msg));
1942                                 error = DMSG_IOQ_ERROR_TRANS;
1943                         assert(0);
1944                         }
1945                         break;
1946                 }
1947
1948                 /*
1949                  * Handle another ABORT+DELETE case if the msgid has already
1950                  * been reused.
1951                  */
1952                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1953                         if (msg->any.head.cmd & DMSGF_ABORT) {
1954                                 error = DMSG_IOQ_ERROR_EALREADY;
1955                         } else {
1956                                 fprintf(stderr, "reused-state %s\n",
1957                                         dmsg_msg_str(msg));
1958                                 error = DMSG_IOQ_ERROR_TRANS;
1959                         assert(0);
1960                         }
1961                         break;
1962                 }
1963                 error = 0;
1964                 break;
1965         default:
1966                 /*
1967                  * Check for mid-stream ABORT command received, otherwise
1968                  * allow.
1969                  */
1970                 if (msg->any.head.cmd & DMSGF_ABORT) {
1971                         if (state == NULL ||
1972                             (state->rxcmd & DMSGF_CREATE) == 0) {
1973                                 error = DMSG_IOQ_ERROR_EALREADY;
1974                                 break;
1975                         }
1976                 }
1977                 error = 0;
1978                 break;
1979         case DMSGF_REPLY | DMSGF_CREATE:
1980         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1981                 /*
1982                  * When receiving a reply with CREATE set the original
1983                  * persistent state message should already exist.
1984                  */
1985                 if (state == NULL) {
1986                         fprintf(stderr, "no-state(r) %s\n",
1987                                 dmsg_msg_str(msg));
1988                         error = DMSG_IOQ_ERROR_TRANS;
1989                         assert(0);
1990                         break;
1991                 }
1992                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1993                         DMSGF_REPLY) == 0);
1994                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1995                 error = 0;
1996                 break;
1997         case DMSGF_REPLY | DMSGF_DELETE:
1998                 /*
1999                  * Received REPLY+ABORT+DELETE in case where msgid has
2000                  * already been fully closed, ignore the message.
2001                  */
2002                 if (state == NULL) {
2003                         if (msg->any.head.cmd & DMSGF_ABORT) {
2004                                 error = DMSG_IOQ_ERROR_EALREADY;
2005                         } else {
2006                                 fprintf(stderr, "no-state(r,d) %s\n",
2007                                         dmsg_msg_str(msg));
2008                                 error = DMSG_IOQ_ERROR_TRANS;
2009                         assert(0);
2010                         }
2011                         break;
2012                 }
2013
2014                 /*
2015                  * Received REPLY+ABORT+DELETE in case where msgid has
2016                  * already been reused for an unrelated message,
2017                  * ignore the message.
2018                  */
2019                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2020                         if (msg->any.head.cmd & DMSGF_ABORT) {
2021                                 error = DMSG_IOQ_ERROR_EALREADY;
2022                         } else {
2023                                 fprintf(stderr, "reused-state(r,d) %s\n",
2024                                         dmsg_msg_str(msg));
2025                                 error = DMSG_IOQ_ERROR_TRANS;
2026                         assert(0);
2027                         }
2028                         break;
2029                 }
2030                 error = 0;
2031                 break;
2032         case DMSGF_REPLY:
2033                 /*
2034                  * Check for mid-stream ABORT reply received to sent command.
2035                  */
2036                 if (msg->any.head.cmd & DMSGF_ABORT) {
2037                         if (state == NULL ||
2038                             (state->rxcmd & DMSGF_CREATE) == 0) {
2039                                 error = DMSG_IOQ_ERROR_EALREADY;
2040                                 break;
2041                         }
2042                 }
2043                 error = 0;
2044                 break;
2045         }
2046         return (error);
2047 }
2048
2049 void
2050 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2051 {
2052         dmsg_state_t *state;
2053
2054         if ((state = msg->state) == NULL) {
2055                 /*
2056                  * Free a non-transactional message, there is no state
2057                  * to worry about.
2058                  */
2059                 dmsg_msg_free(msg);
2060         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2061                 /*
2062                  * Message terminating transaction, destroy the related
2063                  * state, the original message, and this message (if it
2064                  * isn't the original message due to a CREATE|DELETE).
2065                  */
2066                 pthread_mutex_lock(&iocom->mtx);
2067                 state->rxcmd |= DMSGF_DELETE;
2068                 if (state->txcmd & DMSGF_DELETE) {
2069                         if (state->msg == msg)
2070                                 state->msg = NULL;
2071                         assert(state->flags & DMSG_STATE_INSERTED);
2072                         if (state->rxcmd & DMSGF_REPLY) {
2073                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2074                                 RB_REMOVE(dmsg_state_tree,
2075                                           &msg->circuit->statewr_tree, state);
2076                         } else {
2077                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2078                                 RB_REMOVE(dmsg_state_tree,
2079                                           &msg->circuit->staterd_tree, state);
2080                         }
2081                         state->flags &= ~DMSG_STATE_INSERTED;
2082                         dmsg_state_free(state);
2083                 } else {
2084                         ;
2085                 }
2086                 pthread_mutex_unlock(&iocom->mtx);
2087                 dmsg_msg_free(msg);
2088         } else if (state->msg != msg) {
2089                 /*
2090                  * Message not terminating transaction, leave state intact
2091                  * and free message if it isn't the CREATE message.
2092                  */
2093                 dmsg_msg_free(msg);
2094         }
2095 }
2096
2097 static void
2098 dmsg_state_cleanuptx(dmsg_msg_t *msg)
2099 {
2100         dmsg_iocom_t *iocom = msg->iocom;
2101         dmsg_state_t *state;
2102
2103         if ((state = msg->state) == NULL) {
2104                 dmsg_msg_free(msg);
2105         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2106                 pthread_mutex_lock(&iocom->mtx);
2107                 assert((state->txcmd & DMSGF_DELETE) == 0);
2108                 state->txcmd |= DMSGF_DELETE;
2109                 if (state->rxcmd & DMSGF_DELETE) {
2110                         if (state->msg == msg)
2111                                 state->msg = NULL;
2112                         assert(state->flags & DMSG_STATE_INSERTED);
2113                         if (state->txcmd & DMSGF_REPLY) {
2114                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2115                                 RB_REMOVE(dmsg_state_tree,
2116                                           &msg->circuit->staterd_tree, state);
2117                         } else {
2118                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2119                                 RB_REMOVE(dmsg_state_tree,
2120                                           &msg->circuit->statewr_tree, state);
2121                         }
2122                         state->flags &= ~DMSG_STATE_INSERTED;
2123                         dmsg_state_free(state);
2124                 } else {
2125                         ;
2126                 }
2127                 pthread_mutex_unlock(&iocom->mtx);
2128                 dmsg_msg_free(msg);
2129         } else if (state->msg != msg) {
2130                 dmsg_msg_free(msg);
2131         }
2132 }
2133
2134 /*
2135  * Called with iocom locked
2136  */
2137 void
2138 dmsg_state_free(dmsg_state_t *state)
2139 {
2140         dmsg_msg_t *msg;
2141
2142         if (DMsgDebugOpt) {
2143                 fprintf(stderr, "terminate state %p id=%08x\n",
2144                         state, (uint32_t)state->msgid);
2145         }
2146         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2147                 closefrom(3);
2148         assert(state->any.any == NULL);
2149         msg = state->msg;
2150         state->msg = NULL;
2151         if (msg)
2152                 dmsg_msg_free_locked(msg);
2153         free(state);
2154 }
2155
2156 /*
2157  * Called with iocom locked
2158  */
2159 void
2160 dmsg_circuit_hold(dmsg_circuit_t *circuit)
2161 {
2162         assert(circuit->refs > 0);              /* caller must hold ref */
2163         atomic_add_int(&circuit->refs, 1);      /* to safely add more */
2164 }
2165
2166 /*
2167  * Called with iocom locked
2168  */
2169 void
2170 dmsg_circuit_drop(dmsg_circuit_t *circuit)
2171 {
2172         dmsg_iocom_t *iocom = circuit->iocom;
2173         char dummy;
2174
2175         assert(circuit->refs > 0);
2176         assert(iocom);
2177
2178         /*
2179          * Decrement circuit refs, destroy circuit when refs drops to 0.
2180          */
2181         if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
2182                 return;
2183         assert(circuit != &iocom->circuit0);
2184
2185         assert(RB_EMPTY(&circuit->staterd_tree));
2186         assert(RB_EMPTY(&circuit->statewr_tree));
2187         pthread_mutex_lock(&iocom->mtx);
2188         RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2189         circuit->iocom = NULL;
2190         pthread_mutex_unlock(&iocom->mtx);
2191         dmsg_free(circuit);
2192
2193         /*
2194          * When an iocom error is present the rx code will terminate the
2195          * receive side for all transactions and (indirectly) all circuits
2196          * by simulating DELETE messages.  The state and related circuits
2197          * don't disappear until the related states are closed in both
2198          * directions
2199          *
2200          * Detect the case where the last circuit is now gone (and thus all
2201          * states for all circuits are gone), and wakeup the rx thread to
2202          * complete the termination.
2203          */
2204         if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2205                 dummy = 0;
2206                 write(iocom->wakeupfds[1], &dummy, 1);
2207         }
2208 }
2209
2210 void
2211 dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
2212 {
2213         dmsg_iocom_t *iocom;
2214
2215         iocom = circuit->iocom;
2216         assert(circuit->refs > 0);
2217         assert(iocom);
2218
2219         if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
2220                 assert(circuit != &iocom->circuit0);
2221                 assert(RB_EMPTY(&circuit->staterd_tree));
2222                 assert(RB_EMPTY(&circuit->statewr_tree));
2223                 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2224                 circuit->iocom = NULL;
2225                 dmsg_free(circuit);
2226                 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2227                         char dummy = 0;
2228                         write(iocom->wakeupfds[1], &dummy, 1);
2229                 }
2230         }
2231 }
2232
2233 /*
2234  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2235  * header is not adjusted, just the core header.
2236  */
2237 void
2238 dmsg_bswap_head(dmsg_hdr_t *head)
2239 {
2240         head->magic     = bswap16(head->magic);
2241         head->reserved02 = bswap16(head->reserved02);
2242         head->salt      = bswap32(head->salt);
2243
2244         head->msgid     = bswap64(head->msgid);
2245         head->circuit   = bswap64(head->circuit);
2246         head->reserved18= bswap64(head->reserved18);
2247
2248         head->cmd       = bswap32(head->cmd);
2249         head->aux_crc   = bswap32(head->aux_crc);
2250         head->aux_bytes = bswap32(head->aux_bytes);
2251         head->error     = bswap32(head->error);
2252         head->aux_descr = bswap64(head->aux_descr);
2253         head->reserved38= bswap32(head->reserved38);
2254         head->hdr_crc   = bswap32(head->hdr_crc);
2255 }