Merge branch 'vendor/DIFFUTILS'
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
43
44 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
45 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
46
47 /*
48  * STATE TREE - Represents open transactions which are indexed by their
49  *              { msgid } relative to the governing iocom.
50  */
51 int
52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
53 {
54         if (state1->msgid < state2->msgid)
55                 return(-1);
56         if (state1->msgid > state2->msgid)
57                 return(1);
58         return(0);
59 }
60
61 /*
62  * CIRCUIT TREE - Represents open circuits which are indexed by their
63  *                { msgid } relative to the governing iocom.
64  */
65 int
66 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
67 {
68         if (circuit1->msgid < circuit2->msgid)
69                 return(-1);
70         if (circuit1->msgid > circuit2->msgid)
71                 return(1);
72         return(0);
73 }
74
75 /*
76  * Initialize a low-level ioq
77  */
78 void
79 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
80 {
81         bzero(ioq, sizeof(*ioq));
82         ioq->state = DMSG_MSGQ_STATE_HEADER1;
83         TAILQ_INIT(&ioq->msgq);
84 }
85
86 /*
87  * Cleanup queue.
88  *
89  * caller holds iocom->mtx.
90  */
91 void
92 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
93 {
94         dmsg_msg_t *msg;
95
96         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
97                 assert(0);      /* shouldn't happen */
98                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
99                 dmsg_msg_free(msg);
100         }
101         if ((msg = ioq->msg) != NULL) {
102                 ioq->msg = NULL;
103                 dmsg_msg_free(msg);
104         }
105 }
106
107 /*
108  * Initialize a low-level communications channel.
109  *
110  * NOTE: The signal_func() is called at least once from the loop and can be
111  *       re-armed via dmsg_iocom_restate().
112  */
113 void
114 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
115                    void (*signal_func)(dmsg_iocom_t *),
116                    void (*rcvmsg_func)(dmsg_msg_t *),
117                    void (*dbgmsg_func)(dmsg_msg_t *),
118                    void (*altmsg_func)(dmsg_iocom_t *))
119 {
120         struct stat st;
121
122         bzero(iocom, sizeof(*iocom));
123
124         asprintf(&iocom->label, "iocom-%p", iocom);
125         iocom->signal_callback = signal_func;
126         iocom->rcvmsg_callback = rcvmsg_func;
127         iocom->altmsg_callback = altmsg_func;
128         iocom->dbgmsg_callback = dbgmsg_func;
129
130         pthread_mutex_init(&iocom->mtx, NULL);
131         RB_INIT(&iocom->circuit_tree);
132         TAILQ_INIT(&iocom->freeq);
133         TAILQ_INIT(&iocom->freeq_aux);
134         TAILQ_INIT(&iocom->txmsgq);
135         iocom->sock_fd = sock_fd;
136         iocom->alt_fd = alt_fd;
137         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
138         if (signal_func)
139                 iocom->flags |= DMSG_IOCOMF_SWORK;
140         dmsg_ioq_init(iocom, &iocom->ioq_rx);
141         dmsg_ioq_init(iocom, &iocom->ioq_tx);
142         if (pipe(iocom->wakeupfds) < 0)
143                 assert(0);
144         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
145         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
146
147         dmsg_circuit_init(iocom, &iocom->circuit0);
148
149         /*
150          * Negotiate session crypto synchronously.  This will mark the
151          * connection as error'd if it fails.  If this is a pipe it's
152          * a linkage that we set up ourselves to the filesystem and there
153          * is no crypto.
154          */
155         if (fstat(sock_fd, &st) < 0)
156                 assert(0);
157         if (S_ISSOCK(st.st_mode))
158                 dmsg_crypto_negotiate(iocom);
159
160         /*
161          * Make sure our fds are set to non-blocking for the iocom core.
162          */
163         if (sock_fd >= 0)
164                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
165 #if 0
166         /* if line buffered our single fgets() should be fine */
167         if (alt_fd >= 0)
168                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
169 #endif
170 }
171
172 void
173 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
174 {
175         va_list va;
176         char *optr;
177
178         va_start(va, ctl);
179         optr = iocom->label;
180         vasprintf(&iocom->label, ctl, va);
181         va_end(va);
182         if (optr)
183                 free(optr);
184 }
185
186 /*
187  * May only be called from a callback from iocom_core.
188  *
189  * Adjust state machine functions, set flags to guarantee that both
190  * the recevmsg_func and the sendmsg_func is called at least once.
191  */
192 void
193 dmsg_iocom_restate(dmsg_iocom_t *iocom,
194                    void (*signal_func)(dmsg_iocom_t *),
195                    void (*rcvmsg_func)(dmsg_msg_t *msg),
196                    void (*altmsg_func)(dmsg_iocom_t *))
197 {
198         pthread_mutex_lock(&iocom->mtx);
199         iocom->signal_callback = signal_func;
200         iocom->rcvmsg_callback = rcvmsg_func;
201         iocom->altmsg_callback = altmsg_func;
202         if (signal_func)
203                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
204         else
205                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206         pthread_mutex_unlock(&iocom->mtx);
207 }
208
209 void
210 dmsg_iocom_signal(dmsg_iocom_t *iocom)
211 {
212         pthread_mutex_lock(&iocom->mtx);
213         if (iocom->signal_callback)
214                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
215         pthread_mutex_unlock(&iocom->mtx);
216 }
217
218 /*
219  * Cleanup a terminating iocom.
220  *
221  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
222  * from all possible references to it.
223  */
224 void
225 dmsg_iocom_done(dmsg_iocom_t *iocom)
226 {
227         dmsg_msg_t *msg;
228
229         if (iocom->sock_fd >= 0) {
230                 close(iocom->sock_fd);
231                 iocom->sock_fd = -1;
232         }
233         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
234                 close(iocom->alt_fd);
235                 iocom->alt_fd = -1;
236         }
237         dmsg_ioq_done(iocom, &iocom->ioq_rx);
238         dmsg_ioq_done(iocom, &iocom->ioq_tx);
239         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
240                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
241                 free(msg);
242         }
243         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
244                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
245                 free(msg->aux_data);
246                 msg->aux_data = NULL;
247                 free(msg);
248         }
249         if (iocom->wakeupfds[0] >= 0) {
250                 close(iocom->wakeupfds[0]);
251                 iocom->wakeupfds[0] = -1;
252         }
253         if (iocom->wakeupfds[1] >= 0) {
254                 close(iocom->wakeupfds[1]);
255                 iocom->wakeupfds[1] = -1;
256         }
257         pthread_mutex_destroy(&iocom->mtx);
258 }
259
260 /*
261  * Basic initialization of a circuit structure.
262  *
263  * The circuit structure is initialized with one ref.
264  */
265 void
266 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
267 {
268         circuit->refs = 1;
269         circuit->iocom = iocom;
270         RB_INIT(&circuit->staterd_tree);
271         RB_INIT(&circuit->statewr_tree);
272 }
273
274 /*
275  * Allocate a new one-way message.
276  */
277 dmsg_msg_t *
278 dmsg_msg_alloc(dmsg_circuit_t *circuit,
279                size_t aux_size, uint32_t cmd,
280                void (*func)(dmsg_msg_t *), void *data)
281 {
282         dmsg_iocom_t *iocom = circuit->iocom;
283         dmsg_state_t *state = NULL;
284         dmsg_msg_t *msg;
285         int hbytes;
286         size_t aligned_size;
287
288         pthread_mutex_lock(&iocom->mtx);
289 #if 0
290         if (aux_size) {
291                 aligned_size = DMSG_DOALIGN(aux_size);
292                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
293                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
294         } else {
295                 aligned_size = 0;
296                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
297                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
298         }
299 #endif
300         aligned_size = DMSG_DOALIGN(aux_size);
301         msg = NULL;
302         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
303                 /*
304                  * Create state when CREATE is set without REPLY.
305                  * Assign a unique msgid, in this case simply using
306                  * the pointer value for 'state'.
307                  *
308                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
309                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
310                  *
311                  * NOTE: state initiated by us and state initiated by
312                  *       a remote create are placed in different RB trees.
313                  *       The msgid for SPAN state is used in source/target
314                  *       for message routing as appropriate.
315                  */
316                 state = malloc(sizeof(*state));
317                 bzero(state, sizeof(*state));
318                 state->iocom = iocom;
319                 state->circuit = circuit;
320                 state->flags = DMSG_STATE_DYNAMIC;
321                 state->msgid = (uint64_t)(uintptr_t)state;
322                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
323                 state->rxcmd = DMSGF_REPLY;
324                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
325                 state->func = func;
326                 state->any.any = data;
327                 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
328                 state->flags |= DMSG_STATE_INSERTED;
329         }
330         /* XXX SMP race for state */
331         pthread_mutex_unlock(&iocom->mtx);
332         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
333         if (msg == NULL) {
334                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
335                 bzero(msg, offsetof(struct dmsg_msg, any.head));
336                 *(int *)((char *)msg +
337                          offsetof(struct dmsg_msg, any.head) + hbytes) =
338                                  0x71B2C3D4;
339 #if 0
340                 msg = malloc(sizeof(*msg));
341                 bzero(msg, sizeof(*msg));
342 #endif
343         }
344
345         /*
346          * [re]allocate the auxillary data buffer.  The caller knows that
347          * a size-aligned buffer will be allocated but we do not want to
348          * force the caller to zero any tail piece, so we do that ourself.
349          */
350         if (msg->aux_size != aux_size) {
351                 if (msg->aux_data) {
352                         free(msg->aux_data);
353                         msg->aux_data = NULL;
354                         msg->aux_size = 0;
355                 }
356                 if (aux_size) {
357                         msg->aux_data = malloc(aligned_size);
358                         msg->aux_size = aux_size;
359                         if (aux_size != aligned_size) {
360                                 bzero(msg->aux_data + aux_size,
361                                       aligned_size - aux_size);
362                         }
363                 }
364         }
365         if (hbytes)
366                 bzero(&msg->any.head, hbytes);
367         msg->hdr_size = hbytes;
368         msg->any.head.magic = DMSG_HDR_MAGIC;
369         msg->any.head.cmd = cmd;
370         msg->any.head.aux_descr = 0;
371         msg->any.head.aux_crc = 0;
372         msg->any.head.circuit = 0;
373         msg->circuit = circuit;
374         msg->iocom = iocom;
375         dmsg_circuit_hold(circuit);
376         if (state) {
377                 msg->state = state;
378                 state->msg = msg;
379                 msg->any.head.msgid = state->msgid;
380         } else {
381                 msg->any.head.msgid = 0;
382         }
383         return (msg);
384 }
385
386 /*
387  * Free a message so it can be reused afresh.
388  *
389  * NOTE: aux_size can be 0 with a non-NULL aux_data.
390  */
391 static
392 void
393 dmsg_msg_free_locked(dmsg_msg_t *msg)
394 {
395         /*dmsg_iocom_t *iocom = msg->iocom;*/
396 #if 1
397         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
398         if (*(int *)((char *)msg +
399                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
400              0x71B2C3D4) {
401                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
402                 assert(0);
403         }
404 #endif
405         if (msg->circuit) {
406                 dmsg_circuit_drop_locked(msg->circuit);
407                 msg->circuit = NULL;
408         }
409         msg->state = NULL;
410         if (msg->aux_data) {
411                 free(msg->aux_data);
412                 msg->aux_data = NULL;
413         }
414         msg->aux_size = 0;
415         free (msg);
416 #if 0
417         if (msg->aux_data)
418                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
419         else
420                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
421 #endif
422 }
423
424 void
425 dmsg_msg_free(dmsg_msg_t *msg)
426 {
427         dmsg_iocom_t *iocom = msg->iocom;
428
429         pthread_mutex_lock(&iocom->mtx);
430         dmsg_msg_free_locked(msg);
431         pthread_mutex_unlock(&iocom->mtx);
432 }
433
434 /*
435  * I/O core loop for an iocom.
436  *
437  * Thread localized, iocom->mtx not held.
438  */
439 void
440 dmsg_iocom_core(dmsg_iocom_t *iocom)
441 {
442         struct pollfd fds[3];
443         char dummybuf[256];
444         dmsg_msg_t *msg;
445         int timeout;
446         int count;
447         int wi; /* wakeup pipe */
448         int si; /* socket */
449         int ai; /* alt bulk path socket */
450
451         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
452                 /*
453                  * These iocom->flags are only manipulated within the
454                  * context of the current thread.  However, modifications
455                  * still require atomic ops.
456                  */
457                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
458                                      DMSG_IOCOMF_WWORK |
459                                      DMSG_IOCOMF_PWORK |
460                                      DMSG_IOCOMF_SWORK |
461                                      DMSG_IOCOMF_ARWORK |
462                                      DMSG_IOCOMF_AWWORK)) == 0) {
463                         /*
464                          * Only poll if no immediate work is pending.
465                          * Otherwise we are just wasting our time calling
466                          * poll.
467                          */
468                         timeout = 5000;
469
470                         count = 0;
471                         wi = -1;
472                         si = -1;
473                         ai = -1;
474
475                         /*
476                          * Always check the inter-thread pipe, e.g.
477                          * for iocom->txmsgq work.
478                          */
479                         wi = count++;
480                         fds[wi].fd = iocom->wakeupfds[0];
481                         fds[wi].events = POLLIN;
482                         fds[wi].revents = 0;
483
484                         /*
485                          * Check the socket input/output direction as
486                          * requested
487                          */
488                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
489                                             DMSG_IOCOMF_WREQ)) {
490                                 si = count++;
491                                 fds[si].fd = iocom->sock_fd;
492                                 fds[si].events = 0;
493                                 fds[si].revents = 0;
494
495                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
496                                         fds[si].events |= POLLIN;
497                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
498                                         fds[si].events |= POLLOUT;
499                         }
500
501                         /*
502                          * Check the alternative fd for work.
503                          */
504                         if (iocom->alt_fd >= 0) {
505                                 ai = count++;
506                                 fds[ai].fd = iocom->alt_fd;
507                                 fds[ai].events = POLLIN;
508                                 fds[ai].revents = 0;
509                         }
510                         poll(fds, count, timeout);
511
512                         if (wi >= 0 && (fds[wi].revents & POLLIN))
513                                 atomic_set_int(&iocom->flags,
514                                                DMSG_IOCOMF_PWORK);
515                         if (si >= 0 && (fds[si].revents & POLLIN))
516                                 atomic_set_int(&iocom->flags,
517                                                DMSG_IOCOMF_RWORK);
518                         if (si >= 0 && (fds[si].revents & POLLOUT))
519                                 atomic_set_int(&iocom->flags,
520                                                DMSG_IOCOMF_WWORK);
521                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
522                                 atomic_set_int(&iocom->flags,
523                                                DMSG_IOCOMF_WWORK);
524                         if (ai >= 0 && (fds[ai].revents & POLLIN))
525                                 atomic_set_int(&iocom->flags,
526                                                DMSG_IOCOMF_ARWORK);
527                 } else {
528                         /*
529                          * Always check the pipe
530                          */
531                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
532                 }
533
534                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
535                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
536                         iocom->signal_callback(iocom);
537                 }
538
539                 /*
540                  * Pending message queues from other threads wake us up
541                  * with a write to the wakeupfds[] pipe.  We have to clear
542                  * the pipe with a dummy read.
543                  */
544                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
545                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
546                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
547                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
548                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
549                         if (TAILQ_FIRST(&iocom->txmsgq))
550                                 dmsg_iocom_flush1(iocom);
551                 }
552
553                 /*
554                  * Message write sequencing
555                  */
556                 if (iocom->flags & DMSG_IOCOMF_WWORK)
557                         dmsg_iocom_flush1(iocom);
558
559                 /*
560                  * Message read sequencing.  Run this after the write
561                  * sequencing in case the write sequencing allowed another
562                  * auto-DELETE to occur on the read side.
563                  */
564                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
565                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
566                                (msg = dmsg_ioq_read(iocom)) != NULL) {
567                                 if (DMsgDebugOpt) {
568                                         fprintf(stderr, "receive %s\n",
569                                                 dmsg_msg_str(msg));
570                                 }
571                                 iocom->rcvmsg_callback(msg);
572                                 dmsg_state_cleanuprx(iocom, msg);
573                         }
574                 }
575
576                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
577                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
578                         iocom->altmsg_callback(iocom);
579                 }
580         }
581 }
582
583 /*
584  * Make sure there's enough room in the FIFO to hold the
585  * needed data.
586  *
587  * Assume worst case encrypted form is 2x the size of the
588  * plaintext equivalent.
589  */
590 static
591 size_t
592 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
593 {
594         size_t bytes;
595         size_t nmax;
596
597         bytes = ioq->fifo_cdx - ioq->fifo_beg;
598         nmax = sizeof(ioq->buf) - ioq->fifo_end;
599         if (bytes + nmax / 2 < needed) {
600                 if (bytes) {
601                         bcopy(ioq->buf + ioq->fifo_beg,
602                               ioq->buf,
603                               bytes);
604                 }
605                 ioq->fifo_cdx -= ioq->fifo_beg;
606                 ioq->fifo_beg = 0;
607                 if (ioq->fifo_cdn < ioq->fifo_end) {
608                         bcopy(ioq->buf + ioq->fifo_cdn,
609                               ioq->buf + ioq->fifo_cdx,
610                               ioq->fifo_end - ioq->fifo_cdn);
611                 }
612                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
613                 ioq->fifo_cdn = ioq->fifo_cdx;
614                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
615         }
616         return(nmax);
617 }
618
619 /*
620  * Read the next ready message from the ioq, issuing I/O if needed.
621  * Caller should retry on a read-event when NULL is returned.
622  *
623  * If an error occurs during reception a DMSG_LNK_ERROR msg will
624  * be returned for each open transaction, then the ioq and iocom
625  * will be errored out and a non-transactional DMSG_LNK_ERROR
626  * msg will be returned as the final message.  The caller should not call
627  * us again after the final message is returned.
628  *
629  * Thread localized, iocom->mtx not held.
630  */
631 dmsg_msg_t *
632 dmsg_ioq_read(dmsg_iocom_t *iocom)
633 {
634         dmsg_ioq_t *ioq = &iocom->ioq_rx;
635         dmsg_msg_t *msg;
636         dmsg_state_t *state;
637         dmsg_circuit_t *circuit0;
638         dmsg_hdr_t *head;
639         ssize_t n;
640         size_t bytes;
641         size_t nmax;
642         uint32_t aux_size;
643         uint32_t xcrc32;
644         int error;
645
646 again:
647         /*
648          * If a message is already pending we can just remove and
649          * return it.  Message state has already been processed.
650          * (currently not implemented)
651          */
652         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
653                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
654                 return (msg);
655         }
656         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
657
658         /*
659          * If the stream is errored out we stop processing it.
660          */
661         if (ioq->error)
662                 goto skip;
663
664         /*
665          * Message read in-progress (msg is NULL at the moment).  We don't
666          * allocate a msg until we have its core header.
667          */
668         nmax = sizeof(ioq->buf) - ioq->fifo_end;
669         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
670         msg = ioq->msg;
671
672         switch(ioq->state) {
673         case DMSG_MSGQ_STATE_HEADER1:
674                 /*
675                  * Load the primary header, fail on any non-trivial read
676                  * error or on EOF.  Since the primary header is the same
677                  * size is the message alignment it will never straddle
678                  * the end of the buffer.
679                  */
680                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
681                 if (bytes < sizeof(msg->any.head)) {
682                         n = read(iocom->sock_fd,
683                                  ioq->buf + ioq->fifo_end,
684                                  nmax);
685                         if (n <= 0) {
686                                 if (n == 0) {
687                                         ioq->error = DMSG_IOQ_ERROR_EOF;
688                                         break;
689                                 }
690                                 if (errno != EINTR &&
691                                     errno != EINPROGRESS &&
692                                     errno != EAGAIN) {
693                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
694                                         break;
695                                 }
696                                 n = 0;
697                                 /* fall through */
698                         }
699                         ioq->fifo_end += (size_t)n;
700                         nmax -= (size_t)n;
701                 }
702
703                 /*
704                  * Decrypt data received so far.  Data will be decrypted
705                  * in-place but might create gaps in the FIFO.  Partial
706                  * blocks are not immediately decrypted.
707                  *
708                  * WARNING!  The header might be in the wrong endian, we
709                  *           do not fix it up until we get the entire
710                  *           extended header.
711                  */
712                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
713                         dmsg_crypto_decrypt(iocom, ioq);
714                 } else {
715                         ioq->fifo_cdx = ioq->fifo_end;
716                         ioq->fifo_cdn = ioq->fifo_end;
717                 }
718                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
719
720                 /*
721                  * Insufficient data accumulated (msg is NULL, caller will
722                  * retry on event).
723                  */
724                 assert(msg == NULL);
725                 if (bytes < sizeof(msg->any.head))
726                         break;
727
728                 /*
729                  * Check and fixup the core header.  Note that the icrc
730                  * has to be calculated before any fixups, but the crc
731                  * fields in the msg may have to be swapped like everything
732                  * else.
733                  */
734                 head = (void *)(ioq->buf + ioq->fifo_beg);
735                 if (head->magic != DMSG_HDR_MAGIC &&
736                     head->magic != DMSG_HDR_MAGIC_REV) {
737                         fprintf(stderr, "%s: head->magic is bad %02x\n",
738                                 iocom->label, head->magic);
739                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
740                                 fprintf(stderr, "(on encrypted link)\n");
741                         ioq->error = DMSG_IOQ_ERROR_SYNC;
742                         break;
743                 }
744
745                 /*
746                  * Calculate the full header size and aux data size
747                  */
748                 if (head->magic == DMSG_HDR_MAGIC_REV) {
749                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
750                                       DMSG_ALIGN;
751                         aux_size = bswap32(head->aux_bytes);
752                 } else {
753                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
754                                       DMSG_ALIGN;
755                         aux_size = head->aux_bytes;
756                 }
757                 ioq->abytes = DMSG_DOALIGN(aux_size);
758                 ioq->unaligned_aux_size = aux_size;
759                 if (ioq->hbytes < sizeof(msg->any.head) ||
760                     ioq->hbytes > sizeof(msg->any) ||
761                     ioq->abytes > DMSG_AUX_MAX) {
762                         ioq->error = DMSG_IOQ_ERROR_FIELD;
763                         break;
764                 }
765
766                 /*
767                  * Allocate the message, the next state will fill it in.
768                  * Note that the aux_data buffer will be sized to an aligned
769                  * value and the aligned remainder zero'd for convenience.
770                  */
771                 msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
772                                      ioq->hbytes / DMSG_ALIGN,
773                                      NULL, NULL);
774                 ioq->msg = msg;
775
776                 /*
777                  * Fall through to the next state.  Make sure that the
778                  * extended header does not straddle the end of the buffer.
779                  * We still want to issue larger reads into our buffer,
780                  * book-keeping is easier if we don't bcopy() yet.
781                  *
782                  * Make sure there is enough room for bloated encrypt data.
783                  */
784                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
785                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
786                 /* fall through */
787         case DMSG_MSGQ_STATE_HEADER2:
788                 /*
789                  * Fill out the extended header.
790                  */
791                 assert(msg != NULL);
792                 if (bytes < ioq->hbytes) {
793                         n = read(iocom->sock_fd,
794                                  ioq->buf + ioq->fifo_end,
795                                  nmax);
796                         if (n <= 0) {
797                                 if (n == 0) {
798                                         ioq->error = DMSG_IOQ_ERROR_EOF;
799                                         break;
800                                 }
801                                 if (errno != EINTR &&
802                                     errno != EINPROGRESS &&
803                                     errno != EAGAIN) {
804                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
805                                         break;
806                                 }
807                                 n = 0;
808                                 /* fall through */
809                         }
810                         ioq->fifo_end += (size_t)n;
811                         nmax -= (size_t)n;
812                 }
813
814                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
815                         dmsg_crypto_decrypt(iocom, ioq);
816                 } else {
817                         ioq->fifo_cdx = ioq->fifo_end;
818                         ioq->fifo_cdn = ioq->fifo_end;
819                 }
820                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
821
822                 /*
823                  * Insufficient data accumulated (set msg NULL so caller will
824                  * retry on event).
825                  */
826                 if (bytes < ioq->hbytes) {
827                         msg = NULL;
828                         break;
829                 }
830
831                 /*
832                  * Calculate the extended header, decrypt data received
833                  * so far.  Handle endian-conversion for the entire extended
834                  * header.
835                  */
836                 head = (void *)(ioq->buf + ioq->fifo_beg);
837
838                 /*
839                  * Check the CRC.
840                  */
841                 if (head->magic == DMSG_HDR_MAGIC_REV)
842                         xcrc32 = bswap32(head->hdr_crc);
843                 else
844                         xcrc32 = head->hdr_crc;
845                 head->hdr_crc = 0;
846                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
847                         ioq->error = DMSG_IOQ_ERROR_XCRC;
848                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
849                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
850                                 dmsg_msg_str(msg));
851                         assert(0);
852                         break;
853                 }
854                 head->hdr_crc = xcrc32;
855
856                 if (head->magic == DMSG_HDR_MAGIC_REV) {
857                         dmsg_bswap_head(head);
858                 }
859
860                 /*
861                  * Copy the extended header into the msg and adjust the
862                  * FIFO.
863                  */
864                 bcopy(head, &msg->any, ioq->hbytes);
865
866                 /*
867                  * We are either done or we fall-through.
868                  */
869                 if (ioq->abytes == 0) {
870                         ioq->fifo_beg += ioq->hbytes;
871                         break;
872                 }
873
874                 /*
875                  * Must adjust bytes (and the state) when falling through.
876                  * nmax doesn't change.
877                  */
878                 ioq->fifo_beg += ioq->hbytes;
879                 bytes -= ioq->hbytes;
880                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
881                 /* fall through */
882         case DMSG_MSGQ_STATE_AUXDATA1:
883                 /*
884                  * Copy the partial or complete [decrypted] payload from
885                  * remaining bytes in the FIFO in order to optimize the
886                  * makeroom call in the AUXDATA2 state.  We have to
887                  * fall-through either way so we can check the crc.
888                  *
889                  * msg->aux_size tracks our aux data.
890                  *
891                  * (Lets not complicate matters if the data is encrypted,
892                  *  since the data in-stream is not the same size as the
893                  *  data decrypted).
894                  */
895                 if (bytes >= ioq->abytes) {
896                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
897                               ioq->abytes);
898                         msg->aux_size = ioq->abytes;
899                         ioq->fifo_beg += ioq->abytes;
900                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
901                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
902                         bytes -= ioq->abytes;
903                 } else if (bytes) {
904                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
905                               bytes);
906                         msg->aux_size = bytes;
907                         ioq->fifo_beg += bytes;
908                         if (ioq->fifo_cdx < ioq->fifo_beg)
909                                 ioq->fifo_cdx = ioq->fifo_beg;
910                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
911                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
912                         bytes = 0;
913                 } else {
914                         msg->aux_size = 0;
915                 }
916                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
917                 /* fall through */
918         case DMSG_MSGQ_STATE_AUXDATA2:
919                 /*
920                  * Make sure there is enough room for more data.
921                  */
922                 assert(msg);
923                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
924
925                 /*
926                  * Read and decrypt more of the payload.
927                  */
928                 if (msg->aux_size < ioq->abytes) {
929                         assert(bytes == 0);
930                         n = read(iocom->sock_fd,
931                                  ioq->buf + ioq->fifo_end,
932                                  nmax);
933                         if (n <= 0) {
934                                 if (n == 0) {
935                                         ioq->error = DMSG_IOQ_ERROR_EOF;
936                                         break;
937                                 }
938                                 if (errno != EINTR &&
939                                     errno != EINPROGRESS &&
940                                     errno != EAGAIN) {
941                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
942                                         break;
943                                 }
944                                 n = 0;
945                                 /* fall through */
946                         }
947                         ioq->fifo_end += (size_t)n;
948                         nmax -= (size_t)n;
949                 }
950
951                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
952                         dmsg_crypto_decrypt(iocom, ioq);
953                 } else {
954                         ioq->fifo_cdx = ioq->fifo_end;
955                         ioq->fifo_cdn = ioq->fifo_end;
956                 }
957                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
958
959                 if (bytes > ioq->abytes - msg->aux_size)
960                         bytes = ioq->abytes - msg->aux_size;
961
962                 if (bytes) {
963                         bcopy(ioq->buf + ioq->fifo_beg,
964                               msg->aux_data + msg->aux_size,
965                               bytes);
966                         msg->aux_size += bytes;
967                         ioq->fifo_beg += bytes;
968                 }
969
970                 /*
971                  * Insufficient data accumulated (set msg NULL so caller will
972                  * retry on event).
973                  *
974                  * Assert the auxillary data size is correct, then record the
975                  * original unaligned size from the message header.
976                  */
977                 if (msg->aux_size < ioq->abytes) {
978                         msg = NULL;
979                         break;
980                 }
981                 assert(msg->aux_size == ioq->abytes);
982                 msg->aux_size = ioq->unaligned_aux_size;
983
984                 /*
985                  * Check aux_crc, then we are done.  Note that the crc
986                  * is calculated over the aligned size, not the actual
987                  * size.
988                  */
989                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
990                 if (xcrc32 != msg->any.head.aux_crc) {
991                         ioq->error = DMSG_IOQ_ERROR_ACRC;
992                         fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
993                                 xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
994                         break;
995                 }
996                 break;
997         case DMSG_MSGQ_STATE_ERROR:
998                 /*
999                  * Continued calls to drain recorded transactions (returning
1000                  * a LNK_ERROR for each one), before we return the final
1001                  * LNK_ERROR.
1002                  */
1003                 assert(msg == NULL);
1004                 break;
1005         default:
1006                 /*
1007                  * We don't double-return errors, the caller should not
1008                  * have called us again after getting an error msg.
1009                  */
1010                 assert(0);
1011                 break;
1012         }
1013
1014         /*
1015          * Check the message sequence.  The iv[] should prevent any
1016          * possibility of a replay but we add this check anyway.
1017          */
1018         if (msg && ioq->error == 0) {
1019                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1020                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1021                 } else {
1022                         ++ioq->seq;
1023                 }
1024         }
1025
1026         /*
1027          * Handle error, RREQ, or completion
1028          *
1029          * NOTE: nmax and bytes are invalid at this point, we don't bother
1030          *       to update them when breaking out.
1031          */
1032         if (ioq->error) {
1033 skip:
1034                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1035                 /*
1036                  * An unrecoverable error causes all active receive
1037                  * transactions to be terminated with a LNK_ERROR message.
1038                  *
1039                  * Once all active transactions are exhausted we set the
1040                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1041                  * message, which should cause master processing loops to
1042                  * terminate.
1043                  */
1044                 assert(ioq->msg == msg);
1045                 if (msg) {
1046                         dmsg_msg_free(msg);
1047                         ioq->msg = NULL;
1048                 }
1049
1050                 /*
1051                  * No more I/O read processing
1052                  */
1053                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1054
1055                 /*
1056                  * Simulate a remote LNK_ERROR DELETE msg for any open
1057                  * transactions, ending with a final non-transactional
1058                  * LNK_ERROR (that the session can detect) when no
1059                  * transactions remain.
1060                  *
1061                  * We only need to scan transactions on circuit0 as these
1062                  * will contain all circuit forges, and terminating circuit
1063                  * forges will automatically terminate the transactions on
1064                  * any other circuits as well as those circuits.
1065                  */
1066                 circuit0 = &iocom->circuit0;
1067                 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
1068                 msg->any.head.error = ioq->error;
1069
1070                 pthread_mutex_lock(&iocom->mtx);
1071                 dmsg_iocom_drain(iocom);
1072
1073                 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1074                         /*
1075                          * Active remote transactions are still present.
1076                          * Simulate the other end sending us a DELETE.
1077                          */
1078                         if (state->rxcmd & DMSGF_DELETE) {
1079                                 dmsg_msg_free(msg);
1080                                 fprintf(stderr,
1081                                         "iocom: ioq error(rd) %d sleeping "
1082                                         "state %p rxcmd %08x txcmd %08x "
1083                                         "func %p\n",
1084                                         ioq->error, state, state->rxcmd,
1085                                         state->txcmd, state->func);
1086                                 usleep(100000); /* XXX */
1087                                 atomic_set_int(&iocom->flags,
1088                                                DMSG_IOCOMF_RWORK);
1089                                 msg = NULL;
1090                         } else {
1091                                 /*state->txcmd |= DMSGF_DELETE;*/
1092                                 msg->state = state;
1093                                 msg->iocom = iocom;
1094                                 msg->any.head.msgid = state->msgid;
1095                                 msg->any.head.cmd |= DMSGF_ABORT |
1096                                                      DMSGF_DELETE;
1097                         }
1098                 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
1099                         /*
1100                          * Active local transactions are still present.
1101                          * Simulate the other end sending us a DELETE.
1102                          */
1103                         if (state->rxcmd & DMSGF_DELETE) {
1104                                 dmsg_msg_free(msg);
1105                                 fprintf(stderr,
1106                                         "iocom: ioq error(wr) %d sleeping "
1107                                         "state %p rxcmd %08x txcmd %08x "
1108                                         "func %p\n",
1109                                         ioq->error, state, state->rxcmd,
1110                                         state->txcmd, state->func);
1111                                 usleep(100000); /* XXX */
1112                                 atomic_set_int(&iocom->flags,
1113                                                DMSG_IOCOMF_RWORK);
1114                                 msg = NULL;
1115                         } else {
1116                                 msg->state = state;
1117                                 msg->iocom = iocom;
1118                                 msg->any.head.msgid = state->msgid;
1119                                 msg->any.head.cmd |= DMSGF_ABORT |
1120                                                      DMSGF_DELETE |
1121                                                      DMSGF_REPLY;
1122                                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1123                                         msg->any.head.cmd |=
1124                                                      DMSGF_CREATE;
1125                                 }
1126                         }
1127                 } else {
1128                         /*
1129                          * No active local or remote transactions remain.
1130                          * Generate a final LNK_ERROR and flag EOF.
1131                          */
1132                         msg->state = NULL;
1133                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1134                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1135                 }
1136                 pthread_mutex_unlock(&iocom->mtx);
1137
1138                 /*
1139                  * For the iocom error case we want to set RWORK to indicate
1140                  * that more messages might be pending.
1141                  *
1142                  * It is possible to return NULL when there is more work to
1143                  * do because each message has to be DELETEd in both
1144                  * directions before we continue on with the next (though
1145                  * this could be optimized).  The transmit direction will
1146                  * re-set RWORK.
1147                  */
1148                 if (msg)
1149                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1150         } else if (msg == NULL) {
1151                 /*
1152                  * Insufficient data received to finish building the message,
1153                  * set RREQ and return NULL.
1154                  *
1155                  * Leave ioq->msg intact.
1156                  * Leave the FIFO intact.
1157                  */
1158                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1159         } else {
1160                 /*
1161                  * Continue processing msg.
1162                  *
1163                  * The fifo has already been advanced past the message.
1164                  * Trivially reset the FIFO indices if possible.
1165                  *
1166                  * clear the FIFO if it is now empty and set RREQ to wait
1167                  * for more from the socket.  If the FIFO is not empty set
1168                  * TWORK to bypass the poll so we loop immediately.
1169                  */
1170                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1171                     ioq->fifo_cdn == ioq->fifo_end) {
1172                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1173                         ioq->fifo_cdx = 0;
1174                         ioq->fifo_cdn = 0;
1175                         ioq->fifo_beg = 0;
1176                         ioq->fifo_end = 0;
1177                 } else {
1178                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1179                 }
1180                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1181                 ioq->msg = NULL;
1182
1183                 /*
1184                  * Handle message routing.  Validates non-zero sources
1185                  * and routes message.  Error will be 0 if the message is
1186                  * destined for us.
1187                  *
1188                  * State processing only occurs for messages destined for us.
1189                  */
1190                 if (DMsgDebugOpt >= 5) {
1191                         fprintf(stderr,
1192                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1193                                 msg->any.head.cmd,
1194                                 (intmax_t)msg->any.head.msgid,
1195                                 (intmax_t)msg->any.head.circuit);
1196                 }
1197                 if (msg->any.head.circuit)
1198                         error = dmsg_circuit_route(msg);
1199                 else
1200                         error = dmsg_state_msgrx(msg);
1201
1202                 if (error) {
1203                         /*
1204                          * Abort-after-closure, throw message away and
1205                          * start reading another.
1206                          */
1207                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1208                                 dmsg_msg_free(msg);
1209                                 goto again;
1210                         }
1211
1212                         /*
1213                          * msg routed, msg pointer no longer owned by us.
1214                          * Go to the top and start reading another.
1215                          */
1216                         if (error == DMSG_IOQ_ERROR_ROUTED)
1217                                 goto again;
1218
1219                         /*
1220                          * Process real error and throw away message.
1221                          */
1222                         ioq->error = error;
1223                         goto skip;
1224                 }
1225                 /* no error, not routed.  Fall through and return msg */
1226         }
1227         return (msg);
1228 }
1229
1230 /*
1231  * Calculate the header and data crc's and write a low-level message to
1232  * the connection.  If aux_crc is non-zero the aux_data crc is already
1233  * assumed to have been set.
1234  *
1235  * A non-NULL msg is added to the queue but not necessarily flushed.
1236  * Calling this function with msg == NULL will get a flush going.
1237  *
1238  * (called from iocom_core only)
1239  */
1240 void
1241 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1242 {
1243         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1244         dmsg_msg_t *msg;
1245         uint32_t xcrc32;
1246         size_t hbytes;
1247         size_t abytes;
1248         dmsg_msg_queue_t tmpq;
1249
1250         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1251         TAILQ_INIT(&tmpq);
1252         pthread_mutex_lock(&iocom->mtx);
1253         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1254                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1255                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1256         }
1257         pthread_mutex_unlock(&iocom->mtx);
1258
1259         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1260                 /*
1261                  * Process terminal connection errors.
1262                  */
1263                 TAILQ_REMOVE(&tmpq, msg, qentry);
1264                 if (ioq->error) {
1265                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1266                         ++ioq->msgcount;
1267                         continue;
1268                 }
1269
1270                 /*
1271                  * Finish populating the msg fields.  The salt ensures that
1272                  * the iv[] array is ridiculously randomized and we also
1273                  * re-seed our PRNG every 32768 messages just to be sure.
1274                  */
1275                 msg->any.head.magic = DMSG_HDR_MAGIC;
1276                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1277                 ++ioq->seq;
1278                 if ((ioq->seq & 32767) == 0)
1279                         srandomdev();
1280
1281                 /*
1282                  * Calculate aux_crc if 0, then calculate hdr_crc.
1283                  */
1284                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1285                         abytes = DMSG_DOALIGN(msg->aux_size);
1286                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1287                         msg->any.head.aux_crc = xcrc32;
1288                 }
1289                 msg->any.head.aux_bytes = msg->aux_size;
1290
1291                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1292                          DMSG_ALIGN;
1293                 msg->any.head.hdr_crc = 0;
1294                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1295
1296                 /*
1297                  * Enqueue the message (the flush codes handles stream
1298                  * encryption).
1299                  */
1300                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1301                 ++ioq->msgcount;
1302         }
1303         dmsg_iocom_flush2(iocom);
1304 }
1305
1306 /*
1307  * Thread localized, iocom->mtx not held by caller.
1308  *
1309  * (called from iocom_core via iocom_flush1 only)
1310  */
1311 void
1312 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1313 {
1314         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1315         dmsg_msg_t *msg;
1316         ssize_t n;
1317         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1318         size_t nact;
1319         size_t hbytes;
1320         size_t abytes;
1321         size_t hoff;
1322         size_t aoff;
1323         int iovcnt;
1324
1325         if (ioq->error) {
1326                 dmsg_iocom_drain(iocom);
1327                 return;
1328         }
1329
1330         /*
1331          * Pump messages out the connection by building an iovec.
1332          *
1333          * ioq->hbytes/ioq->abytes tracks how much of the first message
1334          * in the queue has been successfully written out, so we can
1335          * resume writing.
1336          */
1337         iovcnt = 0;
1338         nact = 0;
1339         hoff = ioq->hbytes;
1340         aoff = ioq->abytes;
1341
1342         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1343                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1344                          DMSG_ALIGN;
1345                 abytes = DMSG_DOALIGN(msg->aux_size);
1346                 assert(hoff <= hbytes && aoff <= abytes);
1347
1348                 if (hoff < hbytes) {
1349                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1350                         iov[iovcnt].iov_len = hbytes - hoff;
1351                         nact += hbytes - hoff;
1352                         ++iovcnt;
1353                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1354                                 break;
1355                 }
1356                 if (aoff < abytes) {
1357                         assert(msg->aux_data != NULL);
1358                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1359                         iov[iovcnt].iov_len = abytes - aoff;
1360                         nact += abytes - aoff;
1361                         ++iovcnt;
1362                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1363                                 break;
1364                 }
1365                 hoff = 0;
1366                 aoff = 0;
1367         }
1368         if (iovcnt == 0)
1369                 return;
1370
1371         /*
1372          * Encrypt and write the data.  The crypto code will move the
1373          * data into the fifo and adjust the iov as necessary.  If
1374          * encryption is disabled the iov is left alone.
1375          *
1376          * May return a smaller iov (thus a smaller n), with aggregated
1377          * chunks.  May reduce nmax to what fits in the FIFO.
1378          *
1379          * This function sets nact to the number of original bytes now
1380          * encrypted, adding to the FIFO some number of bytes that might
1381          * be greater depending on the crypto mechanic.  iov[] is adjusted
1382          * to point at the FIFO if necessary.
1383          *
1384          * NOTE: The return value from the writev() is the post-encrypted
1385          *       byte count, not the plaintext count.
1386          */
1387         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1388                 /*
1389                  * Make sure the FIFO has a reasonable amount of space
1390                  * left (if not completely full).
1391                  *
1392                  * In this situation we are staging the encrypted message
1393                  * data in the FIFO.  (nact) represents how much plaintext
1394                  * has been staged, (n) represents how much encrypted data
1395                  * has been flushed.  The two are independent of each other.
1396                  */
1397                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1398                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1399                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1400                               ioq->fifo_end - ioq->fifo_beg);
1401                         ioq->fifo_cdx -= ioq->fifo_beg;
1402                         ioq->fifo_cdn -= ioq->fifo_beg;
1403                         ioq->fifo_end -= ioq->fifo_beg;
1404                         ioq->fifo_beg = 0;
1405                 }
1406
1407                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1408                 n = writev(iocom->sock_fd, iov, iovcnt);
1409                 if (n > 0) {
1410                         ioq->fifo_beg += n;
1411                         ioq->fifo_cdn += n;
1412                         ioq->fifo_cdx += n;
1413                         if (ioq->fifo_beg == ioq->fifo_end) {
1414                                 ioq->fifo_beg = 0;
1415                                 ioq->fifo_cdn = 0;
1416                                 ioq->fifo_cdx = 0;
1417                                 ioq->fifo_end = 0;
1418                         }
1419                 }
1420                 /*
1421                  * We don't mess with the nact returned by the crypto_encrypt
1422                  * call, which represents the filling of the FIFO.  (n) tells
1423                  * us how much we were able to write from the FIFO.  The two
1424                  * are different beasts when encrypting.
1425                  */
1426         } else {
1427                 /*
1428                  * In this situation we are not staging the messages to the
1429                  * FIFO but instead writing them directly from the msg
1430                  * structure(s), so (nact) is basically (n).
1431                  */
1432                 n = writev(iocom->sock_fd, iov, iovcnt);
1433                 if (n > 0)
1434                         nact = n;
1435                 else
1436                         nact = 0;
1437         }
1438
1439         /*
1440          * Clean out the transmit queue based on what we successfully
1441          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1442          * represents the portion of the first message previously sent.
1443          */
1444         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1445                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1446                          DMSG_ALIGN;
1447                 abytes = DMSG_DOALIGN(msg->aux_size);
1448
1449                 if ((size_t)nact < hbytes - ioq->hbytes) {
1450                         ioq->hbytes += nact;
1451                         nact = 0;
1452                         break;
1453                 }
1454                 nact -= hbytes - ioq->hbytes;
1455                 ioq->hbytes = hbytes;
1456                 if ((size_t)nact < abytes - ioq->abytes) {
1457                         ioq->abytes += nact;
1458                         nact = 0;
1459                         break;
1460                 }
1461                 nact -= abytes - ioq->abytes;
1462                 /* ioq->abytes = abytes; optimized out */
1463
1464                 if (DMsgDebugOpt >= 5) {
1465                         fprintf(stderr,
1466                                 "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1467                                 msg->any.head.cmd,
1468                                 (intmax_t)msg->any.head.msgid,
1469                                 (intmax_t)msg->any.head.circuit);
1470                 }
1471
1472                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1473                 --ioq->msgcount;
1474                 ioq->hbytes = 0;
1475                 ioq->abytes = 0;
1476
1477                 dmsg_state_cleanuptx(msg);
1478         }
1479         assert(nact == 0);
1480
1481         /*
1482          * Process the return value from the write w/regards to blocking.
1483          */
1484         if (n < 0) {
1485                 if (errno != EINTR &&
1486                     errno != EINPROGRESS &&
1487                     errno != EAGAIN) {
1488                         /*
1489                          * Fatal write error
1490                          */
1491                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1492                         dmsg_iocom_drain(iocom);
1493                 } else {
1494                         /*
1495                          * Wait for socket buffer space
1496                          */
1497                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1498                 }
1499         } else {
1500                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1501         }
1502         if (ioq->error) {
1503                 dmsg_iocom_drain(iocom);
1504         }
1505 }
1506
1507 /*
1508  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1509  * write events will occur.  We don't kill read msgs because we want
1510  * the caller to pull off our contrived terminal error msg to detect
1511  * the connection failure.
1512  *
1513  * Localized to iocom_core thread, iocom->mtx not held by caller.
1514  */
1515 void
1516 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1517 {
1518         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1519         dmsg_msg_t *msg;
1520
1521         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1522         ioq->hbytes = 0;
1523         ioq->abytes = 0;
1524
1525         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1526                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1527                 --ioq->msgcount;
1528                 dmsg_state_cleanuptx(msg);
1529         }
1530 }
1531
1532 /*
1533  * Write a message to an iocom, with additional state processing.
1534  */
1535 void
1536 dmsg_msg_write(dmsg_msg_t *msg)
1537 {
1538         dmsg_iocom_t *iocom = msg->iocom;
1539         dmsg_state_t *state;
1540         char dummy;
1541
1542         /*
1543          * Handle state processing, create state if necessary.
1544          */
1545         pthread_mutex_lock(&iocom->mtx);
1546         if ((state = msg->state) != NULL) {
1547                 /*
1548                  * Existing transaction (could be reply).  It is also
1549                  * possible for this to be the first reply (CREATE is set),
1550                  * in which case we populate state->txcmd.
1551                  *
1552                  * state->txcmd is adjusted to hold the final message cmd,
1553                  * and we also be sure to set the CREATE bit here.  We did
1554                  * not set it in dmsg_msg_alloc() because that would have
1555                  * not been serialized (state could have gotten ripped out
1556                  * from under the message prior to it being transmitted).
1557                  */
1558                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1559                     DMSGF_CREATE) {
1560                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1561                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1562                 }
1563                 msg->any.head.msgid = state->msgid;
1564                 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1565                 if (msg->any.head.cmd & DMSGF_CREATE) {
1566                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1567                 }
1568         }
1569
1570         /*
1571          * Queue it for output, wake up the I/O pthread.  Note that the
1572          * I/O thread is responsible for generating the CRCs and encryption.
1573          */
1574         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1575         dummy = 0;
1576         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1577         pthread_mutex_unlock(&iocom->mtx);
1578 }
1579
1580 /*
1581  * This is a shortcut to formulate a reply to msg with a simple error code,
1582  * It can reply to and terminate a transaction, or it can reply to a one-way
1583  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1584  * the error code (which can be 0).  Not all transactions are terminated
1585  * with DMSG_LNK_ERROR status (the low level only cares about the
1586  * MSGF_DELETE flag), but most are.
1587  *
1588  * Replies to one-way messages are a bit of an oxymoron but the feature
1589  * is used by the debug (DBG) protocol.
1590  *
1591  * The reply contains no extended data.
1592  */
1593 void
1594 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1595 {
1596         dmsg_state_t *state = msg->state;
1597         dmsg_msg_t *nmsg;
1598         uint32_t cmd;
1599
1600
1601         /*
1602          * Reply with a simple error code and terminate the transaction.
1603          */
1604         cmd = DMSG_LNK_ERROR;
1605
1606         /*
1607          * Check if our direction has even been initiated yet, set CREATE.
1608          *
1609          * Check what direction this is (command or reply direction).  Note
1610          * that txcmd might not have been initiated yet.
1611          *
1612          * If our direction has already been closed we just return without
1613          * doing anything.
1614          */
1615         if (state) {
1616                 if (state->txcmd & DMSGF_DELETE)
1617                         return;
1618                 if (state->txcmd & DMSGF_REPLY)
1619                         cmd |= DMSGF_REPLY;
1620                 cmd |= DMSGF_DELETE;
1621         } else {
1622                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1623                         cmd |= DMSGF_REPLY;
1624         }
1625
1626         /*
1627          * Allocate the message and associate it with the existing state.
1628          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1629          * allocate new state.  We have our state already.
1630          */
1631         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1632         if (state) {
1633                 if ((state->txcmd & DMSGF_CREATE) == 0)
1634                         nmsg->any.head.cmd |= DMSGF_CREATE;
1635         }
1636         nmsg->any.head.error = error;
1637         nmsg->any.head.msgid = msg->any.head.msgid;
1638         nmsg->any.head.circuit = msg->any.head.circuit;
1639         nmsg->state = state;
1640         dmsg_msg_write(nmsg);
1641 }
1642
1643 /*
1644  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1645  * we are generating a streaming reply or an intermediate acknowledgement
1646  * of some sort as part of the higher level protocol, with more to come
1647  * later.
1648  */
1649 void
1650 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1651 {
1652         dmsg_state_t *state = msg->state;
1653         dmsg_msg_t *nmsg;
1654         uint32_t cmd;
1655
1656
1657         /*
1658          * Reply with a simple error code and terminate the transaction.
1659          */
1660         cmd = DMSG_LNK_ERROR;
1661
1662         /*
1663          * Check if our direction has even been initiated yet, set CREATE.
1664          *
1665          * Check what direction this is (command or reply direction).  Note
1666          * that txcmd might not have been initiated yet.
1667          *
1668          * If our direction has already been closed we just return without
1669          * doing anything.
1670          */
1671         if (state) {
1672                 if (state->txcmd & DMSGF_DELETE)
1673                         return;
1674                 if (state->txcmd & DMSGF_REPLY)
1675                         cmd |= DMSGF_REPLY;
1676                 /* continuing transaction, do not set MSGF_DELETE */
1677         } else {
1678                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1679                         cmd |= DMSGF_REPLY;
1680         }
1681
1682         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1683         if (state) {
1684                 if ((state->txcmd & DMSGF_CREATE) == 0)
1685                         nmsg->any.head.cmd |= DMSGF_CREATE;
1686         }
1687         nmsg->any.head.error = error;
1688         nmsg->any.head.msgid = msg->any.head.msgid;
1689         nmsg->any.head.circuit = msg->any.head.circuit;
1690         nmsg->state = state;
1691         dmsg_msg_write(nmsg);
1692 }
1693
1694 /*
1695  * Terminate a transaction given a state structure by issuing a DELETE.
1696  */
1697 void
1698 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1699 {
1700         dmsg_msg_t *nmsg;
1701         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1702
1703         /*
1704          * Nothing to do if we already transmitted a delete
1705          */
1706         if (state->txcmd & DMSGF_DELETE)
1707                 return;
1708
1709         /*
1710          * Set REPLY if the other end initiated the command.  Otherwise
1711          * we are the command direction.
1712          */
1713         if (state->txcmd & DMSGF_REPLY)
1714                 cmd |= DMSGF_REPLY;
1715
1716         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1717         if (state) {
1718                 if ((state->txcmd & DMSGF_CREATE) == 0)
1719                         nmsg->any.head.cmd |= DMSGF_CREATE;
1720         }
1721         nmsg->any.head.error = error;
1722         nmsg->any.head.msgid = state->msgid;
1723         nmsg->any.head.circuit = state->msg->any.head.circuit;
1724         nmsg->state = state;
1725         dmsg_msg_write(nmsg);
1726 }
1727
1728 /*
1729  * Terminate a transaction given a state structure by issuing a DELETE.
1730  */
1731 void
1732 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1733 {
1734         dmsg_msg_t *nmsg;
1735         uint32_t cmd = DMSG_LNK_ERROR;
1736
1737         /*
1738          * Nothing to do if we already transmitted a delete
1739          */
1740         if (state->txcmd & DMSGF_DELETE)
1741                 return;
1742
1743         /*
1744          * Set REPLY if the other end initiated the command.  Otherwise
1745          * we are the command direction.
1746          */
1747         if (state->txcmd & DMSGF_REPLY)
1748                 cmd |= DMSGF_REPLY;
1749
1750         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1751         if (state) {
1752                 if ((state->txcmd & DMSGF_CREATE) == 0)
1753                         nmsg->any.head.cmd |= DMSGF_CREATE;
1754         }
1755         nmsg->any.head.error = error;
1756         nmsg->any.head.msgid = state->msgid;
1757         nmsg->any.head.circuit = state->msg->any.head.circuit;
1758         nmsg->state = state;
1759         dmsg_msg_write(nmsg);
1760 }
1761
1762 /************************************************************************
1763  *                      TRANSACTION STATE HANDLING                      *
1764  ************************************************************************
1765  *
1766  */
1767
1768 /*
1769  * Process circuit and state tracking for a message after reception, prior
1770  * to execution.
1771  *
1772  * Called with msglk held and the msg dequeued.
1773  *
1774  * All messages are called with dummy state and return actual state.
1775  * (One-off messages often just return the same dummy state).
1776  *
1777  * May request that caller discard the message by setting *discardp to 1.
1778  * The returned state is not used in this case and is allowed to be NULL.
1779  *
1780  * --
1781  *
1782  * These routines handle persistent and command/reply message state via the
1783  * CREATE and DELETE flags.  The first message in a command or reply sequence
1784  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1785  *
1786  * There can be any number of intermediate messages belonging to the same
1787  * sequence sent inbetween the CREATE message and the DELETE message,
1788  * which set neither flag.  This represents a streaming command or reply.
1789  *
1790  * Any command message received with CREATE set expects a reply sequence to
1791  * be returned.  Reply sequences work the same as command sequences except the
1792  * REPLY bit is also sent.  Both the command side and reply side can
1793  * degenerate into a single message with both CREATE and DELETE set.  Note
1794  * that one side can be streaming and the other side not, or neither, or both.
1795  *
1796  * The msgid is unique for the initiator.  That is, two sides sending a new
1797  * message can use the same msgid without colliding.
1798  *
1799  * --
1800  *
1801  * ABORT sequences work by setting the ABORT flag along with normal message
1802  * state.  However, ABORTs can also be sent on half-closed messages, that is
1803  * even if the command or reply side has already sent a DELETE, as long as
1804  * the message has not been fully closed it can still send an ABORT+DELETE
1805  * to terminate the half-closed message state.
1806  *
1807  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1808  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1809  * also race, and in this situation the other side might have already
1810  * initiated a new unrelated command with the same message id.  Since
1811  * the abort has not set the CREATE flag the situation can be detected
1812  * and the message will also be discarded.
1813  *
1814  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1815  * The ABORT request is essentially integrated into the command instead
1816  * of being sent later on.  In this situation the command implementation
1817  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1818  * special-case non-blocking operation for the command.
1819  *
1820  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1821  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1822  *        one-way messages are not supported.
1823  *
1824  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1825  *        simply ignored.
1826  *
1827  * --
1828  *
1829  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1830  * set.  One-off messages cannot be aborted and typically aren't processed
1831  * by these routines.  The REPLY bit can be used to distinguish whether a
1832  * one-off message is a command or reply.  For example, one-off replies
1833  * will typically just contain status updates.
1834  */
1835 static int
1836 dmsg_state_msgrx(dmsg_msg_t *msg)
1837 {
1838         dmsg_iocom_t *iocom = msg->iocom;
1839         dmsg_circuit_t *circuit;
1840         dmsg_circuit_t *ocircuit;
1841         dmsg_state_t *state;
1842         dmsg_state_t sdummy;
1843         dmsg_circuit_t cdummy;
1844         int error;
1845
1846         pthread_mutex_lock(&iocom->mtx);
1847
1848         /*
1849          * Locate existing persistent circuit and state, if any.
1850          */
1851         if (msg->any.head.circuit == 0) {
1852                 circuit = &iocom->circuit0;
1853         } else {
1854                 cdummy.msgid = msg->any.head.circuit;
1855                 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1856                                   &cdummy);
1857                 if (circuit == NULL) {
1858                         pthread_mutex_unlock(&iocom->mtx);
1859                         return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1860                 }
1861         }
1862
1863         /*
1864          * Replace circuit0 with actual
1865          */
1866         dmsg_circuit_hold(circuit);
1867         ocircuit = msg->circuit;
1868         msg->circuit = circuit;
1869
1870         /*
1871          * If received msg is a command state is on staterd_tree.
1872          * If received msg is a reply state is on statewr_tree.
1873          */
1874         sdummy.msgid = msg->any.head.msgid;
1875         if (msg->any.head.cmd & DMSGF_REPLY) {
1876                 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1877                                 &sdummy);
1878         } else {
1879                 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1880                                 &sdummy);
1881         }
1882         msg->state = state;
1883
1884         pthread_mutex_unlock(&iocom->mtx);
1885         if (ocircuit)
1886                 dmsg_circuit_drop(ocircuit);
1887
1888         /*
1889          * Short-cut one-off or mid-stream messages (state may be NULL).
1890          */
1891         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1892                                   DMSGF_ABORT)) == 0) {
1893                 return(0);
1894         }
1895
1896         /*
1897          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1898          * inside the case statements.
1899          */
1900         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1901                                     DMSGF_REPLY)) {
1902         case DMSGF_CREATE:
1903         case DMSGF_CREATE | DMSGF_DELETE:
1904                 /*
1905                  * New persistant command received.
1906                  */
1907                 if (state) {
1908                         fprintf(stderr, "duplicate-trans %s\n",
1909                                 dmsg_msg_str(msg));
1910                         error = DMSG_IOQ_ERROR_TRANS;
1911                         assert(0);
1912                         break;
1913                 }
1914                 state = malloc(sizeof(*state));
1915                 bzero(state, sizeof(*state));
1916                 state->iocom = iocom;
1917                 state->circuit = circuit;
1918                 state->flags = DMSG_STATE_DYNAMIC;
1919                 state->msg = msg;
1920                 state->txcmd = DMSGF_REPLY;
1921                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1922                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1923                 state->flags |= DMSG_STATE_INSERTED;
1924                 state->msgid = msg->any.head.msgid;
1925                 msg->state = state;
1926                 pthread_mutex_lock(&iocom->mtx);
1927                 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1928                 pthread_mutex_unlock(&iocom->mtx);
1929                 error = 0;
1930                 if (DMsgDebugOpt) {
1931                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1932                                 state, (uint32_t)state->msgid, iocom);
1933                 }
1934                 break;
1935         case DMSGF_DELETE:
1936                 /*
1937                  * Persistent state is expected but might not exist if an
1938                  * ABORT+DELETE races the close.
1939                  */
1940                 if (state == NULL) {
1941                         if (msg->any.head.cmd & DMSGF_ABORT) {
1942                                 error = DMSG_IOQ_ERROR_EALREADY;
1943                         } else {
1944                                 fprintf(stderr, "missing-state %s\n",
1945                                         dmsg_msg_str(msg));
1946                                 error = DMSG_IOQ_ERROR_TRANS;
1947                         assert(0);
1948                         }
1949                         break;
1950                 }
1951
1952                 /*
1953                  * Handle another ABORT+DELETE case if the msgid has already
1954                  * been reused.
1955                  */
1956                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1957                         if (msg->any.head.cmd & DMSGF_ABORT) {
1958                                 error = DMSG_IOQ_ERROR_EALREADY;
1959                         } else {
1960                                 fprintf(stderr, "reused-state %s\n",
1961                                         dmsg_msg_str(msg));
1962                                 error = DMSG_IOQ_ERROR_TRANS;
1963                         assert(0);
1964                         }
1965                         break;
1966                 }
1967                 error = 0;
1968                 break;
1969         default:
1970                 /*
1971                  * Check for mid-stream ABORT command received, otherwise
1972                  * allow.
1973                  */
1974                 if (msg->any.head.cmd & DMSGF_ABORT) {
1975                         if (state == NULL ||
1976                             (state->rxcmd & DMSGF_CREATE) == 0) {
1977                                 error = DMSG_IOQ_ERROR_EALREADY;
1978                                 break;
1979                         }
1980                 }
1981                 error = 0;
1982                 break;
1983         case DMSGF_REPLY | DMSGF_CREATE:
1984         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1985                 /*
1986                  * When receiving a reply with CREATE set the original
1987                  * persistent state message should already exist.
1988                  */
1989                 if (state == NULL) {
1990                         fprintf(stderr, "no-state(r) %s\n",
1991                                 dmsg_msg_str(msg));
1992                         error = DMSG_IOQ_ERROR_TRANS;
1993                         assert(0);
1994                         break;
1995                 }
1996                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1997                         DMSGF_REPLY) == 0);
1998                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1999                 error = 0;
2000                 break;
2001         case DMSGF_REPLY | DMSGF_DELETE:
2002                 /*
2003                  * Received REPLY+ABORT+DELETE in case where msgid has
2004                  * already been fully closed, ignore the message.
2005                  */
2006                 if (state == NULL) {
2007                         if (msg->any.head.cmd & DMSGF_ABORT) {
2008                                 error = DMSG_IOQ_ERROR_EALREADY;
2009                         } else {
2010                                 fprintf(stderr, "no-state(r,d) %s\n",
2011                                         dmsg_msg_str(msg));
2012                                 error = DMSG_IOQ_ERROR_TRANS;
2013                         assert(0);
2014                         }
2015                         break;
2016                 }
2017
2018                 /*
2019                  * Received REPLY+ABORT+DELETE in case where msgid has
2020                  * already been reused for an unrelated message,
2021                  * ignore the message.
2022                  */
2023                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2024                         if (msg->any.head.cmd & DMSGF_ABORT) {
2025                                 error = DMSG_IOQ_ERROR_EALREADY;
2026                         } else {
2027                                 fprintf(stderr, "reused-state(r,d) %s\n",
2028                                         dmsg_msg_str(msg));
2029                                 error = DMSG_IOQ_ERROR_TRANS;
2030                         assert(0);
2031                         }
2032                         break;
2033                 }
2034                 error = 0;
2035                 break;
2036         case DMSGF_REPLY:
2037                 /*
2038                  * Check for mid-stream ABORT reply received to sent command.
2039                  */
2040                 if (msg->any.head.cmd & DMSGF_ABORT) {
2041                         if (state == NULL ||
2042                             (state->rxcmd & DMSGF_CREATE) == 0) {
2043                                 error = DMSG_IOQ_ERROR_EALREADY;
2044                                 break;
2045                         }
2046                 }
2047                 error = 0;
2048                 break;
2049         }
2050         return (error);
2051 }
2052
2053 void
2054 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2055 {
2056         dmsg_state_t *state;
2057
2058         if ((state = msg->state) == NULL) {
2059                 /*
2060                  * Free a non-transactional message, there is no state
2061                  * to worry about.
2062                  */
2063                 dmsg_msg_free(msg);
2064         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2065                 /*
2066                  * Message terminating transaction, destroy the related
2067                  * state, the original message, and this message (if it
2068                  * isn't the original message due to a CREATE|DELETE).
2069                  */
2070                 pthread_mutex_lock(&iocom->mtx);
2071                 state->rxcmd |= DMSGF_DELETE;
2072                 if (state->txcmd & DMSGF_DELETE) {
2073                         if (state->msg == msg)
2074                                 state->msg = NULL;
2075                         assert(state->flags & DMSG_STATE_INSERTED);
2076                         if (state->rxcmd & DMSGF_REPLY) {
2077                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2078                                 RB_REMOVE(dmsg_state_tree,
2079                                           &msg->circuit->statewr_tree, state);
2080                         } else {
2081                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2082                                 RB_REMOVE(dmsg_state_tree,
2083                                           &msg->circuit->staterd_tree, state);
2084                         }
2085                         state->flags &= ~DMSG_STATE_INSERTED;
2086                         dmsg_state_free(state);
2087                 } else {
2088                         ;
2089                 }
2090                 pthread_mutex_unlock(&iocom->mtx);
2091                 dmsg_msg_free(msg);
2092         } else if (state->msg != msg) {
2093                 /*
2094                  * Message not terminating transaction, leave state intact
2095                  * and free message if it isn't the CREATE message.
2096                  */
2097                 dmsg_msg_free(msg);
2098         }
2099 }
2100
2101 static void
2102 dmsg_state_cleanuptx(dmsg_msg_t *msg)
2103 {
2104         dmsg_iocom_t *iocom = msg->iocom;
2105         dmsg_state_t *state;
2106
2107         if ((state = msg->state) == NULL) {
2108                 dmsg_msg_free(msg);
2109         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2110                 pthread_mutex_lock(&iocom->mtx);
2111                 assert((state->txcmd & DMSGF_DELETE) == 0);
2112                 state->txcmd |= DMSGF_DELETE;
2113                 if (state->rxcmd & DMSGF_DELETE) {
2114                         if (state->msg == msg)
2115                                 state->msg = NULL;
2116                         assert(state->flags & DMSG_STATE_INSERTED);
2117                         if (state->txcmd & DMSGF_REPLY) {
2118                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2119                                 RB_REMOVE(dmsg_state_tree,
2120                                           &msg->circuit->staterd_tree, state);
2121                         } else {
2122                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2123                                 RB_REMOVE(dmsg_state_tree,
2124                                           &msg->circuit->statewr_tree, state);
2125                         }
2126                         state->flags &= ~DMSG_STATE_INSERTED;
2127                         dmsg_state_free(state);
2128                 } else {
2129                         ;
2130                 }
2131                 pthread_mutex_unlock(&iocom->mtx);
2132                 dmsg_msg_free(msg);
2133         } else if (state->msg != msg) {
2134                 dmsg_msg_free(msg);
2135         }
2136 }
2137
2138 /*
2139  * Called with iocom locked
2140  */
2141 void
2142 dmsg_state_free(dmsg_state_t *state)
2143 {
2144         dmsg_msg_t *msg;
2145
2146         if (DMsgDebugOpt) {
2147                 fprintf(stderr, "terminate state %p id=%08x\n",
2148                         state, (uint32_t)state->msgid);
2149         }
2150         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2151                 closefrom(3);
2152         assert(state->any.any == NULL);
2153         msg = state->msg;
2154         state->msg = NULL;
2155         if (msg)
2156                 dmsg_msg_free_locked(msg);
2157         free(state);
2158 }
2159
2160 /*
2161  * Called with iocom locked
2162  */
2163 void
2164 dmsg_circuit_hold(dmsg_circuit_t *circuit)
2165 {
2166         assert(circuit->refs > 0);              /* caller must hold ref */
2167         atomic_add_int(&circuit->refs, 1);      /* to safely add more */
2168 }
2169
2170 /*
2171  * Called with iocom locked
2172  */
2173 void
2174 dmsg_circuit_drop(dmsg_circuit_t *circuit)
2175 {
2176         dmsg_iocom_t *iocom = circuit->iocom;
2177         char dummy;
2178
2179         assert(circuit->refs > 0);
2180         assert(iocom);
2181
2182         /*
2183          * Decrement circuit refs, destroy circuit when refs drops to 0.
2184          */
2185         if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
2186                 return;
2187         assert(circuit != &iocom->circuit0);
2188
2189         assert(RB_EMPTY(&circuit->staterd_tree));
2190         assert(RB_EMPTY(&circuit->statewr_tree));
2191         pthread_mutex_lock(&iocom->mtx);
2192         RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2193         circuit->iocom = NULL;
2194         pthread_mutex_unlock(&iocom->mtx);
2195         dmsg_free(circuit);
2196
2197         /*
2198          * When an iocom error is present the rx code will terminate the
2199          * receive side for all transactions and (indirectly) all circuits
2200          * by simulating DELETE messages.  The state and related circuits
2201          * don't disappear until the related states are closed in both
2202          * directions
2203          *
2204          * Detect the case where the last circuit is now gone (and thus all
2205          * states for all circuits are gone), and wakeup the rx thread to
2206          * complete the termination.
2207          */
2208         if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2209                 dummy = 0;
2210                 write(iocom->wakeupfds[1], &dummy, 1);
2211         }
2212 }
2213
2214 void
2215 dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
2216 {
2217         dmsg_iocom_t *iocom;
2218
2219         iocom = circuit->iocom;
2220         assert(circuit->refs > 0);
2221         assert(iocom);
2222
2223         if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
2224                 assert(circuit != &iocom->circuit0);
2225                 assert(RB_EMPTY(&circuit->staterd_tree));
2226                 assert(RB_EMPTY(&circuit->statewr_tree));
2227                 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2228                 circuit->iocom = NULL;
2229                 dmsg_free(circuit);
2230                 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2231                         char dummy = 0;
2232                         write(iocom->wakeupfds[1], &dummy, 1);
2233                 }
2234         }
2235 }
2236
2237 /*
2238  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2239  * header is not adjusted, just the core header.
2240  */
2241 void
2242 dmsg_bswap_head(dmsg_hdr_t *head)
2243 {
2244         head->magic     = bswap16(head->magic);
2245         head->reserved02 = bswap16(head->reserved02);
2246         head->salt      = bswap32(head->salt);
2247
2248         head->msgid     = bswap64(head->msgid);
2249         head->circuit   = bswap64(head->circuit);
2250         head->reserved18= bswap64(head->reserved18);
2251
2252         head->cmd       = bswap32(head->cmd);
2253         head->aux_crc   = bswap32(head->aux_crc);
2254         head->aux_bytes = bswap32(head->aux_bytes);
2255         head->error     = bswap32(head->error);
2256         head->aux_descr = bswap64(head->aux_descr);
2257         head->reserved38= bswap32(head->reserved38);
2258         head->hdr_crc   = bswap32(head->hdr_crc);
2259 }