cluster - misc cleanup
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
43
44 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
45 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
46
47 /*
48  * STATE TREE - Represents open transactions which are indexed by their
49  *              { msgid } relative to the governing iocom.
50  */
51 int
52 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
53 {
54         if (state1->msgid < state2->msgid)
55                 return(-1);
56         if (state1->msgid > state2->msgid)
57                 return(1);
58         return(0);
59 }
60
61 /*
62  * CIRCUIT TREE - Represents open circuits which are indexed by their
63  *                { msgid } relative to the governing iocom.
64  */
65 int
66 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
67 {
68         if (circuit1->msgid < circuit2->msgid)
69                 return(-1);
70         if (circuit1->msgid > circuit2->msgid)
71                 return(1);
72         return(0);
73 }
74
75 /*
76  * Initialize a low-level ioq
77  */
78 void
79 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
80 {
81         bzero(ioq, sizeof(*ioq));
82         ioq->state = DMSG_MSGQ_STATE_HEADER1;
83         TAILQ_INIT(&ioq->msgq);
84 }
85
86 /*
87  * Cleanup queue.
88  *
89  * caller holds iocom->mtx.
90  */
91 void
92 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
93 {
94         dmsg_msg_t *msg;
95
96         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
97                 assert(0);      /* shouldn't happen */
98                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
99                 dmsg_msg_free(msg);
100         }
101         if ((msg = ioq->msg) != NULL) {
102                 ioq->msg = NULL;
103                 dmsg_msg_free(msg);
104         }
105 }
106
107 /*
108  * Initialize a low-level communications channel.
109  *
110  * NOTE: The signal_func() is called at least once from the loop and can be
111  *       re-armed via dmsg_iocom_restate().
112  */
113 void
114 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
115                    void (*signal_func)(dmsg_iocom_t *),
116                    void (*rcvmsg_func)(dmsg_msg_t *),
117                    void (*dbgmsg_func)(dmsg_msg_t *),
118                    void (*altmsg_func)(dmsg_iocom_t *))
119 {
120         struct stat st;
121
122         bzero(iocom, sizeof(*iocom));
123
124         asprintf(&iocom->label, "iocom-%p", iocom);
125         iocom->signal_callback = signal_func;
126         iocom->rcvmsg_callback = rcvmsg_func;
127         iocom->altmsg_callback = altmsg_func;
128         iocom->dbgmsg_callback = dbgmsg_func;
129
130         pthread_mutex_init(&iocom->mtx, NULL);
131         RB_INIT(&iocom->circuit_tree);
132         TAILQ_INIT(&iocom->freeq);
133         TAILQ_INIT(&iocom->freeq_aux);
134         TAILQ_INIT(&iocom->txmsgq);
135         iocom->sock_fd = sock_fd;
136         iocom->alt_fd = alt_fd;
137         iocom->flags = DMSG_IOCOMF_RREQ;
138         if (signal_func)
139                 iocom->flags |= DMSG_IOCOMF_SWORK;
140         dmsg_ioq_init(iocom, &iocom->ioq_rx);
141         dmsg_ioq_init(iocom, &iocom->ioq_tx);
142         if (pipe(iocom->wakeupfds) < 0)
143                 assert(0);
144         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
145         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
146
147         dmsg_circuit_init(iocom, &iocom->circuit0);
148
149         /*
150          * Negotiate session crypto synchronously.  This will mark the
151          * connection as error'd if it fails.  If this is a pipe it's
152          * a linkage that we set up ourselves to the filesystem and there
153          * is no crypto.
154          */
155         if (fstat(sock_fd, &st) < 0)
156                 assert(0);
157         if (S_ISSOCK(st.st_mode))
158                 dmsg_crypto_negotiate(iocom);
159
160         /*
161          * Make sure our fds are set to non-blocking for the iocom core.
162          */
163         if (sock_fd >= 0)
164                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
165 #if 0
166         /* if line buffered our single fgets() should be fine */
167         if (alt_fd >= 0)
168                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
169 #endif
170 }
171
172 void
173 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
174 {
175         va_list va;
176         char *optr;
177
178         va_start(va, ctl);
179         optr = iocom->label;
180         vasprintf(&iocom->label, ctl, va);
181         va_end(va);
182         if (optr)
183                 free(optr);
184 }
185
186 /*
187  * May only be called from a callback from iocom_core.
188  *
189  * Adjust state machine functions, set flags to guarantee that both
190  * the recevmsg_func and the sendmsg_func is called at least once.
191  */
192 void
193 dmsg_iocom_restate(dmsg_iocom_t *iocom,
194                    void (*signal_func)(dmsg_iocom_t *),
195                    void (*rcvmsg_func)(dmsg_msg_t *msg),
196                    void (*altmsg_func)(dmsg_iocom_t *))
197 {
198         pthread_mutex_lock(&iocom->mtx);
199         iocom->signal_callback = signal_func;
200         iocom->rcvmsg_callback = rcvmsg_func;
201         iocom->altmsg_callback = altmsg_func;
202         if (signal_func)
203                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
204         else
205                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206         pthread_mutex_unlock(&iocom->mtx);
207 }
208
209 void
210 dmsg_iocom_signal(dmsg_iocom_t *iocom)
211 {
212         pthread_mutex_lock(&iocom->mtx);
213         if (iocom->signal_callback)
214                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
215         pthread_mutex_unlock(&iocom->mtx);
216 }
217
218 /*
219  * Cleanup a terminating iocom.
220  *
221  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
222  * from all possible references to it.
223  */
224 void
225 dmsg_iocom_done(dmsg_iocom_t *iocom)
226 {
227         dmsg_msg_t *msg;
228
229         if (iocom->sock_fd >= 0) {
230                 close(iocom->sock_fd);
231                 iocom->sock_fd = -1;
232         }
233         if (iocom->alt_fd >= 0) {
234                 close(iocom->alt_fd);
235                 iocom->alt_fd = -1;
236         }
237         dmsg_ioq_done(iocom, &iocom->ioq_rx);
238         dmsg_ioq_done(iocom, &iocom->ioq_tx);
239         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
240                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
241                 free(msg);
242         }
243         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
244                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
245                 free(msg->aux_data);
246                 msg->aux_data = NULL;
247                 free(msg);
248         }
249         if (iocom->wakeupfds[0] >= 0) {
250                 close(iocom->wakeupfds[0]);
251                 iocom->wakeupfds[0] = -1;
252         }
253         if (iocom->wakeupfds[1] >= 0) {
254                 close(iocom->wakeupfds[1]);
255                 iocom->wakeupfds[1] = -1;
256         }
257         pthread_mutex_destroy(&iocom->mtx);
258 }
259
260 /*
261  * Basic initialization of a circuit structure.
262  *
263  * The circuit structure is initialized with one ref.
264  */
265 void
266 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
267 {
268         circuit->refs = 1;
269         circuit->iocom = iocom;
270         RB_INIT(&circuit->staterd_tree);
271         RB_INIT(&circuit->statewr_tree);
272 }
273
274 /*
275  * Allocate a new one-way message.
276  */
277 dmsg_msg_t *
278 dmsg_msg_alloc(dmsg_circuit_t *circuit,
279                size_t aux_size, uint32_t cmd,
280                void (*func)(dmsg_msg_t *), void *data)
281 {
282         dmsg_iocom_t *iocom = circuit->iocom;
283         dmsg_state_t *state = NULL;
284         dmsg_msg_t *msg;
285         int hbytes;
286         size_t aligned_size;
287
288         pthread_mutex_lock(&iocom->mtx);
289 #if 0
290         if (aux_size) {
291                 aligned_size = DMSG_DOALIGN(aux_size);
292                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
293                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
294         } else {
295                 aligned_size = 0;
296                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
297                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
298         }
299 #endif
300         aligned_size = DMSG_DOALIGN(aux_size);
301         msg = NULL;
302         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
303                 /*
304                  * Create state when CREATE is set without REPLY.
305                  * Assign a unique msgid, in this case simply using
306                  * the pointer value for 'state'.
307                  *
308                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
309                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
310                  *
311                  * NOTE: state initiated by us and state initiated by
312                  *       a remote create are placed in different RB trees.
313                  *       The msgid for SPAN state is used in source/target
314                  *       for message routing as appropriate.
315                  */
316                 state = malloc(sizeof(*state));
317                 bzero(state, sizeof(*state));
318                 state->iocom = iocom;
319                 state->circuit = circuit;
320                 state->flags = DMSG_STATE_DYNAMIC;
321                 state->msgid = (uint64_t)(uintptr_t)state;
322                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
323                 state->rxcmd = DMSGF_REPLY;
324                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
325                 state->func = func;
326                 state->any.any = data;
327                 pthread_mutex_lock(&iocom->mtx);
328                 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
329                 pthread_mutex_unlock(&iocom->mtx);
330                 state->flags |= DMSG_STATE_INSERTED;
331         }
332         /* XXX SMP race for state */
333         pthread_mutex_unlock(&iocom->mtx);
334         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
335         if (msg == NULL) {
336                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
337                 bzero(msg, offsetof(struct dmsg_msg, any.head));
338                 *(int *)((char *)msg +
339                          offsetof(struct dmsg_msg, any.head) + hbytes) =
340                                  0x71B2C3D4;
341 #if 0
342                 msg = malloc(sizeof(*msg));
343                 bzero(msg, sizeof(*msg));
344 #endif
345         }
346
347         /*
348          * [re]allocate the auxillary data buffer.  The caller knows that
349          * a size-aligned buffer will be allocated but we do not want to
350          * force the caller to zero any tail piece, so we do that ourself.
351          */
352         if (msg->aux_size != aux_size) {
353                 if (msg->aux_data) {
354                         free(msg->aux_data);
355                         msg->aux_data = NULL;
356                         msg->aux_size = 0;
357                 }
358                 if (aux_size) {
359                         msg->aux_data = malloc(aligned_size);
360                         msg->aux_size = aux_size;
361                         if (aux_size != aligned_size) {
362                                 bzero(msg->aux_data + aux_size,
363                                       aligned_size - aux_size);
364                         }
365                 }
366         }
367         if (hbytes)
368                 bzero(&msg->any.head, hbytes);
369         msg->hdr_size = hbytes;
370         msg->any.head.magic = DMSG_HDR_MAGIC;
371         msg->any.head.cmd = cmd;
372         msg->any.head.aux_descr = 0;
373         msg->any.head.aux_crc = 0;
374         msg->any.head.circuit = 0;
375         msg->circuit = circuit;
376         msg->iocom = iocom;
377         dmsg_circuit_hold(circuit);
378         if (state) {
379                 msg->state = state;
380                 state->msg = msg;
381                 msg->any.head.msgid = state->msgid;
382         } else {
383                 msg->any.head.msgid = 0;
384         }
385         return (msg);
386 }
387
388 /*
389  * Free a message so it can be reused afresh.
390  *
391  * NOTE: aux_size can be 0 with a non-NULL aux_data.
392  */
393 static
394 void
395 dmsg_msg_free_locked(dmsg_msg_t *msg)
396 {
397         /*dmsg_iocom_t *iocom = msg->iocom;*/
398 #if 1
399         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
400         if (*(int *)((char *)msg +
401                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
402              0x71B2C3D4) {
403                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
404                 assert(0);
405         }
406 #endif
407         if (msg->circuit) {
408                 dmsg_circuit_drop_locked(msg->circuit);
409                 msg->circuit = NULL;
410         }
411         msg->state = NULL;
412         if (msg->aux_data) {
413                 free(msg->aux_data);
414                 msg->aux_data = NULL;
415         }
416         msg->aux_size = 0;
417         free (msg);
418 #if 0
419         if (msg->aux_data)
420                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
421         else
422                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
423 #endif
424 }
425
426 void
427 dmsg_msg_free(dmsg_msg_t *msg)
428 {
429         dmsg_iocom_t *iocom = msg->iocom;
430
431         pthread_mutex_lock(&iocom->mtx);
432         dmsg_msg_free_locked(msg);
433         pthread_mutex_unlock(&iocom->mtx);
434 }
435
436 /*
437  * I/O core loop for an iocom.
438  *
439  * Thread localized, iocom->mtx not held.
440  */
441 void
442 dmsg_iocom_core(dmsg_iocom_t *iocom)
443 {
444         struct pollfd fds[3];
445         char dummybuf[256];
446         dmsg_msg_t *msg;
447         int timeout;
448         int count;
449         int wi; /* wakeup pipe */
450         int si; /* socket */
451         int ai; /* alt bulk path socket */
452
453         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
454                 /*
455                  * These iocom->flags are only manipulated within the
456                  * context of the current thread.  However, modifications
457                  * still require atomic ops.
458                  */
459                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
460                                      DMSG_IOCOMF_WWORK |
461                                      DMSG_IOCOMF_PWORK |
462                                      DMSG_IOCOMF_SWORK |
463                                      DMSG_IOCOMF_ARWORK |
464                                      DMSG_IOCOMF_AWWORK)) == 0) {
465                         /*
466                          * Only poll if no immediate work is pending.
467                          * Otherwise we are just wasting our time calling
468                          * poll.
469                          */
470                         timeout = 5000;
471
472                         count = 0;
473                         wi = -1;
474                         si = -1;
475                         ai = -1;
476
477                         /*
478                          * Always check the inter-thread pipe, e.g.
479                          * for iocom->txmsgq work.
480                          */
481                         wi = count++;
482                         fds[wi].fd = iocom->wakeupfds[0];
483                         fds[wi].events = POLLIN;
484                         fds[wi].revents = 0;
485
486                         /*
487                          * Check the socket input/output direction as
488                          * requested
489                          */
490                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
491                                             DMSG_IOCOMF_WREQ)) {
492                                 si = count++;
493                                 fds[si].fd = iocom->sock_fd;
494                                 fds[si].events = 0;
495                                 fds[si].revents = 0;
496
497                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
498                                         fds[si].events |= POLLIN;
499                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
500                                         fds[si].events |= POLLOUT;
501                         }
502
503                         /*
504                          * Check the alternative fd for work.
505                          */
506                         if (iocom->alt_fd >= 0) {
507                                 ai = count++;
508                                 fds[ai].fd = iocom->alt_fd;
509                                 fds[ai].events = POLLIN;
510                                 fds[ai].revents = 0;
511                         }
512                         poll(fds, count, timeout);
513
514                         if (wi >= 0 && (fds[wi].revents & POLLIN))
515                                 atomic_set_int(&iocom->flags,
516                                                DMSG_IOCOMF_PWORK);
517                         if (si >= 0 && (fds[si].revents & POLLIN))
518                                 atomic_set_int(&iocom->flags,
519                                                DMSG_IOCOMF_RWORK);
520                         if (si >= 0 && (fds[si].revents & POLLOUT))
521                                 atomic_set_int(&iocom->flags,
522                                                DMSG_IOCOMF_WWORK);
523                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
524                                 atomic_set_int(&iocom->flags,
525                                                DMSG_IOCOMF_WWORK);
526                         if (ai >= 0 && (fds[ai].revents & POLLIN))
527                                 atomic_set_int(&iocom->flags,
528                                                DMSG_IOCOMF_ARWORK);
529                 } else {
530                         /*
531                          * Always check the pipe
532                          */
533                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
534                 }
535
536                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
537                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
538                         iocom->signal_callback(iocom);
539                 }
540
541                 /*
542                  * Pending message queues from other threads wake us up
543                  * with a write to the wakeupfds[] pipe.  We have to clear
544                  * the pipe with a dummy read.
545                  */
546                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
547                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
548                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
549                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
550                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
551                         if (TAILQ_FIRST(&iocom->txmsgq))
552                                 dmsg_iocom_flush1(iocom);
553                 }
554
555                 /*
556                  * Message write sequencing
557                  */
558                 if (iocom->flags & DMSG_IOCOMF_WWORK)
559                         dmsg_iocom_flush1(iocom);
560
561                 /*
562                  * Message read sequencing.  Run this after the write
563                  * sequencing in case the write sequencing allowed another
564                  * auto-DELETE to occur on the read side.
565                  */
566                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
567                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
568                                (msg = dmsg_ioq_read(iocom)) != NULL) {
569                                 if (DMsgDebugOpt) {
570                                         fprintf(stderr, "receive %s\n",
571                                                 dmsg_msg_str(msg));
572                                 }
573                                 iocom->rcvmsg_callback(msg);
574                                 dmsg_state_cleanuprx(iocom, msg);
575                         }
576                 }
577
578                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
579                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
580                         iocom->altmsg_callback(iocom);
581                 }
582         }
583 }
584
585 /*
586  * Make sure there's enough room in the FIFO to hold the
587  * needed data.
588  *
589  * Assume worst case encrypted form is 2x the size of the
590  * plaintext equivalent.
591  */
592 static
593 size_t
594 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
595 {
596         size_t bytes;
597         size_t nmax;
598
599         bytes = ioq->fifo_cdx - ioq->fifo_beg;
600         nmax = sizeof(ioq->buf) - ioq->fifo_end;
601         if (bytes + nmax / 2 < needed) {
602                 if (bytes) {
603                         bcopy(ioq->buf + ioq->fifo_beg,
604                               ioq->buf,
605                               bytes);
606                 }
607                 ioq->fifo_cdx -= ioq->fifo_beg;
608                 ioq->fifo_beg = 0;
609                 if (ioq->fifo_cdn < ioq->fifo_end) {
610                         bcopy(ioq->buf + ioq->fifo_cdn,
611                               ioq->buf + ioq->fifo_cdx,
612                               ioq->fifo_end - ioq->fifo_cdn);
613                 }
614                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
615                 ioq->fifo_cdn = ioq->fifo_cdx;
616                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
617         }
618         return(nmax);
619 }
620
621 /*
622  * Read the next ready message from the ioq, issuing I/O if needed.
623  * Caller should retry on a read-event when NULL is returned.
624  *
625  * If an error occurs during reception a DMSG_LNK_ERROR msg will
626  * be returned for each open transaction, then the ioq and iocom
627  * will be errored out and a non-transactional DMSG_LNK_ERROR
628  * msg will be returned as the final message.  The caller should not call
629  * us again after the final message is returned.
630  *
631  * Thread localized, iocom->mtx not held.
632  */
633 dmsg_msg_t *
634 dmsg_ioq_read(dmsg_iocom_t *iocom)
635 {
636         dmsg_ioq_t *ioq = &iocom->ioq_rx;
637         dmsg_msg_t *msg;
638         dmsg_state_t *state;
639         dmsg_circuit_t *circuit0;
640         dmsg_hdr_t *head;
641         ssize_t n;
642         size_t bytes;
643         size_t nmax;
644         uint32_t aux_size;
645         uint32_t xcrc32;
646         int error;
647
648 again:
649         /*
650          * If a message is already pending we can just remove and
651          * return it.  Message state has already been processed.
652          * (currently not implemented)
653          */
654         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
655                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
656                 return (msg);
657         }
658         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
659
660         /*
661          * If the stream is errored out we stop processing it.
662          */
663         if (ioq->error)
664                 goto skip;
665
666         /*
667          * Message read in-progress (msg is NULL at the moment).  We don't
668          * allocate a msg until we have its core header.
669          */
670         nmax = sizeof(ioq->buf) - ioq->fifo_end;
671         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
672         msg = ioq->msg;
673
674         switch(ioq->state) {
675         case DMSG_MSGQ_STATE_HEADER1:
676                 /*
677                  * Load the primary header, fail on any non-trivial read
678                  * error or on EOF.  Since the primary header is the same
679                  * size is the message alignment it will never straddle
680                  * the end of the buffer.
681                  */
682                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
683                 if (bytes < sizeof(msg->any.head)) {
684                         n = read(iocom->sock_fd,
685                                  ioq->buf + ioq->fifo_end,
686                                  nmax);
687                         if (n <= 0) {
688                                 if (n == 0) {
689                                         ioq->error = DMSG_IOQ_ERROR_EOF;
690                                         break;
691                                 }
692                                 if (errno != EINTR &&
693                                     errno != EINPROGRESS &&
694                                     errno != EAGAIN) {
695                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
696                                         break;
697                                 }
698                                 n = 0;
699                                 /* fall through */
700                         }
701                         ioq->fifo_end += (size_t)n;
702                         nmax -= (size_t)n;
703                 }
704
705                 /*
706                  * Decrypt data received so far.  Data will be decrypted
707                  * in-place but might create gaps in the FIFO.  Partial
708                  * blocks are not immediately decrypted.
709                  *
710                  * WARNING!  The header might be in the wrong endian, we
711                  *           do not fix it up until we get the entire
712                  *           extended header.
713                  */
714                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
715                         dmsg_crypto_decrypt(iocom, ioq);
716                 } else {
717                         ioq->fifo_cdx = ioq->fifo_end;
718                         ioq->fifo_cdn = ioq->fifo_end;
719                 }
720                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
721
722                 /*
723                  * Insufficient data accumulated (msg is NULL, caller will
724                  * retry on event).
725                  */
726                 assert(msg == NULL);
727                 if (bytes < sizeof(msg->any.head))
728                         break;
729
730                 /*
731                  * Check and fixup the core header.  Note that the icrc
732                  * has to be calculated before any fixups, but the crc
733                  * fields in the msg may have to be swapped like everything
734                  * else.
735                  */
736                 head = (void *)(ioq->buf + ioq->fifo_beg);
737                 if (head->magic != DMSG_HDR_MAGIC &&
738                     head->magic != DMSG_HDR_MAGIC_REV) {
739                         fprintf(stderr, "%s: head->magic is bad %02x\n",
740                                 iocom->label, head->magic);
741                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
742                                 fprintf(stderr, "(on encrypted link)\n");
743                         ioq->error = DMSG_IOQ_ERROR_SYNC;
744                         break;
745                 }
746
747                 /*
748                  * Calculate the full header size and aux data size
749                  */
750                 if (head->magic == DMSG_HDR_MAGIC_REV) {
751                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
752                                       DMSG_ALIGN;
753                         aux_size = bswap32(head->aux_bytes);
754                 } else {
755                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
756                                       DMSG_ALIGN;
757                         aux_size = head->aux_bytes;
758                 }
759                 ioq->abytes = DMSG_DOALIGN(aux_size);
760                 ioq->unaligned_aux_size = aux_size;
761                 if (ioq->hbytes < sizeof(msg->any.head) ||
762                     ioq->hbytes > sizeof(msg->any) ||
763                     ioq->abytes > DMSG_AUX_MAX) {
764                         ioq->error = DMSG_IOQ_ERROR_FIELD;
765                         break;
766                 }
767
768                 /*
769                  * Allocate the message, the next state will fill it in.
770                  * Note that the aux_data buffer will be sized to an aligned
771                  * value and the aligned remainder zero'd for convenience.
772                  */
773                 msg = dmsg_msg_alloc(&iocom->circuit0, aux_size,
774                                      ioq->hbytes / DMSG_ALIGN,
775                                      NULL, NULL);
776                 ioq->msg = msg;
777
778                 /*
779                  * Fall through to the next state.  Make sure that the
780                  * extended header does not straddle the end of the buffer.
781                  * We still want to issue larger reads into our buffer,
782                  * book-keeping is easier if we don't bcopy() yet.
783                  *
784                  * Make sure there is enough room for bloated encrypt data.
785                  */
786                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
787                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
788                 /* fall through */
789         case DMSG_MSGQ_STATE_HEADER2:
790                 /*
791                  * Fill out the extended header.
792                  */
793                 assert(msg != NULL);
794                 if (bytes < ioq->hbytes) {
795                         n = read(iocom->sock_fd,
796                                  ioq->buf + ioq->fifo_end,
797                                  nmax);
798                         if (n <= 0) {
799                                 if (n == 0) {
800                                         ioq->error = DMSG_IOQ_ERROR_EOF;
801                                         break;
802                                 }
803                                 if (errno != EINTR &&
804                                     errno != EINPROGRESS &&
805                                     errno != EAGAIN) {
806                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
807                                         break;
808                                 }
809                                 n = 0;
810                                 /* fall through */
811                         }
812                         ioq->fifo_end += (size_t)n;
813                         nmax -= (size_t)n;
814                 }
815
816                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
817                         dmsg_crypto_decrypt(iocom, ioq);
818                 } else {
819                         ioq->fifo_cdx = ioq->fifo_end;
820                         ioq->fifo_cdn = ioq->fifo_end;
821                 }
822                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
823
824                 /*
825                  * Insufficient data accumulated (set msg NULL so caller will
826                  * retry on event).
827                  */
828                 if (bytes < ioq->hbytes) {
829                         msg = NULL;
830                         break;
831                 }
832
833                 /*
834                  * Calculate the extended header, decrypt data received
835                  * so far.  Handle endian-conversion for the entire extended
836                  * header.
837                  */
838                 head = (void *)(ioq->buf + ioq->fifo_beg);
839
840                 /*
841                  * Check the CRC.
842                  */
843                 if (head->magic == DMSG_HDR_MAGIC_REV)
844                         xcrc32 = bswap32(head->hdr_crc);
845                 else
846                         xcrc32 = head->hdr_crc;
847                 head->hdr_crc = 0;
848                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
849                         ioq->error = DMSG_IOQ_ERROR_XCRC;
850                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
851                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
852                                 dmsg_msg_str(msg));
853                         assert(0);
854                         break;
855                 }
856                 head->hdr_crc = xcrc32;
857
858                 if (head->magic == DMSG_HDR_MAGIC_REV) {
859                         dmsg_bswap_head(head);
860                 }
861
862                 /*
863                  * Copy the extended header into the msg and adjust the
864                  * FIFO.
865                  */
866                 bcopy(head, &msg->any, ioq->hbytes);
867
868                 /*
869                  * We are either done or we fall-through.
870                  */
871                 if (ioq->abytes == 0) {
872                         ioq->fifo_beg += ioq->hbytes;
873                         break;
874                 }
875
876                 /*
877                  * Must adjust bytes (and the state) when falling through.
878                  * nmax doesn't change.
879                  */
880                 ioq->fifo_beg += ioq->hbytes;
881                 bytes -= ioq->hbytes;
882                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
883                 /* fall through */
884         case DMSG_MSGQ_STATE_AUXDATA1:
885                 /*
886                  * Copy the partial or complete [decrypted] payload from
887                  * remaining bytes in the FIFO in order to optimize the
888                  * makeroom call in the AUXDATA2 state.  We have to
889                  * fall-through either way so we can check the crc.
890                  *
891                  * msg->aux_size tracks our aux data.
892                  *
893                  * (Lets not complicate matters if the data is encrypted,
894                  *  since the data in-stream is not the same size as the
895                  *  data decrypted).
896                  */
897                 if (bytes >= ioq->abytes) {
898                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
899                               ioq->abytes);
900                         msg->aux_size = ioq->abytes;
901                         ioq->fifo_beg += ioq->abytes;
902                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
903                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
904                         bytes -= ioq->abytes;
905                 } else if (bytes) {
906                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
907                               bytes);
908                         msg->aux_size = bytes;
909                         ioq->fifo_beg += bytes;
910                         if (ioq->fifo_cdx < ioq->fifo_beg)
911                                 ioq->fifo_cdx = ioq->fifo_beg;
912                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
913                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
914                         bytes = 0;
915                 } else {
916                         msg->aux_size = 0;
917                 }
918                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
919                 /* fall through */
920         case DMSG_MSGQ_STATE_AUXDATA2:
921                 /*
922                  * Make sure there is enough room for more data.
923                  */
924                 assert(msg);
925                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
926
927                 /*
928                  * Read and decrypt more of the payload.
929                  */
930                 if (msg->aux_size < ioq->abytes) {
931                         assert(bytes == 0);
932                         n = read(iocom->sock_fd,
933                                  ioq->buf + ioq->fifo_end,
934                                  nmax);
935                         if (n <= 0) {
936                                 if (n == 0) {
937                                         ioq->error = DMSG_IOQ_ERROR_EOF;
938                                         break;
939                                 }
940                                 if (errno != EINTR &&
941                                     errno != EINPROGRESS &&
942                                     errno != EAGAIN) {
943                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
944                                         break;
945                                 }
946                                 n = 0;
947                                 /* fall through */
948                         }
949                         ioq->fifo_end += (size_t)n;
950                         nmax -= (size_t)n;
951                 }
952
953                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
954                         dmsg_crypto_decrypt(iocom, ioq);
955                 } else {
956                         ioq->fifo_cdx = ioq->fifo_end;
957                         ioq->fifo_cdn = ioq->fifo_end;
958                 }
959                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
960
961                 if (bytes > ioq->abytes - msg->aux_size)
962                         bytes = ioq->abytes - msg->aux_size;
963
964                 if (bytes) {
965                         bcopy(ioq->buf + ioq->fifo_beg,
966                               msg->aux_data + msg->aux_size,
967                               bytes);
968                         msg->aux_size += bytes;
969                         ioq->fifo_beg += bytes;
970                 }
971
972                 /*
973                  * Insufficient data accumulated (set msg NULL so caller will
974                  * retry on event).
975                  *
976                  * Assert the auxillary data size is correct, then record the
977                  * original unaligned size from the message header.
978                  */
979                 if (msg->aux_size < ioq->abytes) {
980                         msg = NULL;
981                         break;
982                 }
983                 assert(msg->aux_size == ioq->abytes);
984                 msg->aux_size = ioq->unaligned_aux_size;
985
986                 /*
987                  * Check aux_crc, then we are done.  Note that the crc
988                  * is calculated over the aligned size, not the actual
989                  * size.
990                  */
991                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
992                 if (xcrc32 != msg->any.head.aux_crc) {
993                         ioq->error = DMSG_IOQ_ERROR_ACRC;
994                         fprintf(stderr, "iocom: ACRC error %08x vs %08x msgid %016jx msgcmd %08x auxsize %d\n",
995                                 xcrc32, msg->any.head.aux_crc, (intmax_t)msg->any.head.msgid, msg->any.head.cmd, msg->any.head.aux_bytes);
996                         break;
997                 }
998                 break;
999         case DMSG_MSGQ_STATE_ERROR:
1000                 /*
1001                  * Continued calls to drain recorded transactions (returning
1002                  * a LNK_ERROR for each one), before we return the final
1003                  * LNK_ERROR.
1004                  */
1005                 assert(msg == NULL);
1006                 break;
1007         default:
1008                 /*
1009                  * We don't double-return errors, the caller should not
1010                  * have called us again after getting an error msg.
1011                  */
1012                 assert(0);
1013                 break;
1014         }
1015
1016         /*
1017          * Check the message sequence.  The iv[] should prevent any
1018          * possibility of a replay but we add this check anyway.
1019          */
1020         if (msg && ioq->error == 0) {
1021                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1022                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1023                 } else {
1024                         ++ioq->seq;
1025                 }
1026         }
1027
1028         /*
1029          * Handle error, RREQ, or completion
1030          *
1031          * NOTE: nmax and bytes are invalid at this point, we don't bother
1032          *       to update them when breaking out.
1033          */
1034         if (ioq->error) {
1035 skip:
1036                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1037                 /*
1038                  * An unrecoverable error causes all active receive
1039                  * transactions to be terminated with a LNK_ERROR message.
1040                  *
1041                  * Once all active transactions are exhausted we set the
1042                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1043                  * message, which should cause master processing loops to
1044                  * terminate.
1045                  */
1046                 assert(ioq->msg == msg);
1047                 if (msg) {
1048                         dmsg_msg_free(msg);
1049                         ioq->msg = NULL;
1050                 }
1051
1052                 /*
1053                  * No more I/O read processing
1054                  */
1055                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1056
1057                 /*
1058                  * Simulate a remote LNK_ERROR DELETE msg for any open
1059                  * transactions, ending with a final non-transactional
1060                  * LNK_ERROR (that the session can detect) when no
1061                  * transactions remain.
1062                  *
1063                  * We only need to scan transactions on circuit0 as these
1064                  * will contain all circuit forges, and terminating circuit
1065                  * forges will automatically terminate the transactions on
1066                  * any other circuits as well as those circuits.
1067                  */
1068                 circuit0 = &iocom->circuit0;
1069                 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
1070                 msg->any.head.error = ioq->error;
1071
1072                 pthread_mutex_lock(&iocom->mtx);
1073                 dmsg_iocom_drain(iocom);
1074
1075                 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1076                         /*
1077                          * Active remote transactions are still present.
1078                          * Simulate the other end sending us a DELETE.
1079                          */
1080                         if (state->rxcmd & DMSGF_DELETE) {
1081                                 dmsg_msg_free(msg);
1082                                 fprintf(stderr,
1083                                         "iocom: ioq error(rd) %d sleeping "
1084                                         "state %p rxcmd %08x txcmd %08x "
1085                                         "func %p\n",
1086                                         ioq->error, state, state->rxcmd,
1087                                         state->txcmd, state->func);
1088                                 usleep(100000); /* XXX */
1089                                 atomic_set_int(&iocom->flags,
1090                                                DMSG_IOCOMF_RWORK);
1091                                 msg = NULL;
1092                         } else {
1093                                 /*state->txcmd |= DMSGF_DELETE;*/
1094                                 msg->state = state;
1095                                 msg->iocom = iocom;
1096                                 msg->any.head.msgid = state->msgid;
1097                                 msg->any.head.cmd |= DMSGF_ABORT |
1098                                                      DMSGF_DELETE;
1099                         }
1100                 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
1101                         /*
1102                          * Active local transactions are still present.
1103                          * Simulate the other end sending us a DELETE.
1104                          */
1105                         if (state->rxcmd & DMSGF_DELETE) {
1106                                 dmsg_msg_free(msg);
1107                                 fprintf(stderr,
1108                                         "iocom: ioq error(wr) %d sleeping "
1109                                         "state %p rxcmd %08x txcmd %08x "
1110                                         "func %p\n",
1111                                         ioq->error, state, state->rxcmd,
1112                                         state->txcmd, state->func);
1113                                 usleep(100000); /* XXX */
1114                                 atomic_set_int(&iocom->flags,
1115                                                DMSG_IOCOMF_RWORK);
1116                                 msg = NULL;
1117                         } else {
1118                                 msg->state = state;
1119                                 msg->iocom = iocom;
1120                                 msg->any.head.msgid = state->msgid;
1121                                 msg->any.head.cmd |= DMSGF_ABORT |
1122                                                      DMSGF_DELETE |
1123                                                      DMSGF_REPLY;
1124                                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1125                                         msg->any.head.cmd |=
1126                                                      DMSGF_CREATE;
1127                                 }
1128                         }
1129                 } else {
1130                         /*
1131                          * No active local or remote transactions remain.
1132                          * Generate a final LNK_ERROR and flag EOF.
1133                          */
1134                         msg->state = NULL;
1135                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1136                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1137                 }
1138                 pthread_mutex_unlock(&iocom->mtx);
1139
1140                 /*
1141                  * For the iocom error case we want to set RWORK to indicate
1142                  * that more messages might be pending.
1143                  *
1144                  * It is possible to return NULL when there is more work to
1145                  * do because each message has to be DELETEd in both
1146                  * directions before we continue on with the next (though
1147                  * this could be optimized).  The transmit direction will
1148                  * re-set RWORK.
1149                  */
1150                 if (msg)
1151                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1152         } else if (msg == NULL) {
1153                 /*
1154                  * Insufficient data received to finish building the message,
1155                  * set RREQ and return NULL.
1156                  *
1157                  * Leave ioq->msg intact.
1158                  * Leave the FIFO intact.
1159                  */
1160                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1161         } else {
1162                 /*
1163                  * Continue processing msg.
1164                  *
1165                  * The fifo has already been advanced past the message.
1166                  * Trivially reset the FIFO indices if possible.
1167                  *
1168                  * clear the FIFO if it is now empty and set RREQ to wait
1169                  * for more from the socket.  If the FIFO is not empty set
1170                  * TWORK to bypass the poll so we loop immediately.
1171                  */
1172                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1173                     ioq->fifo_cdn == ioq->fifo_end) {
1174                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1175                         ioq->fifo_cdx = 0;
1176                         ioq->fifo_cdn = 0;
1177                         ioq->fifo_beg = 0;
1178                         ioq->fifo_end = 0;
1179                 } else {
1180                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1181                 }
1182                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1183                 ioq->msg = NULL;
1184
1185                 /*
1186                  * Handle message routing.  Validates non-zero sources
1187                  * and routes message.  Error will be 0 if the message is
1188                  * destined for us.
1189                  *
1190                  * State processing only occurs for messages destined for us.
1191                  */
1192                 if (DMsgDebugOpt >= 5) {
1193                         fprintf(stderr,
1194                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1195                                 msg->any.head.cmd,
1196                                 (intmax_t)msg->any.head.msgid,
1197                                 (intmax_t)msg->any.head.circuit);
1198                 }
1199                 if (msg->any.head.circuit)
1200                         error = dmsg_circuit_route(msg);
1201                 else
1202                         error = dmsg_state_msgrx(msg);
1203
1204                 if (error) {
1205                         /*
1206                          * Abort-after-closure, throw message away and
1207                          * start reading another.
1208                          */
1209                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1210                                 dmsg_msg_free(msg);
1211                                 goto again;
1212                         }
1213
1214                         /*
1215                          * msg routed, msg pointer no longer owned by us.
1216                          * Go to the top and start reading another.
1217                          */
1218                         if (error == DMSG_IOQ_ERROR_ROUTED)
1219                                 goto again;
1220
1221                         /*
1222                          * Process real error and throw away message.
1223                          */
1224                         ioq->error = error;
1225                         goto skip;
1226                 }
1227                 /* no error, not routed.  Fall through and return msg */
1228         }
1229         return (msg);
1230 }
1231
1232 /*
1233  * Calculate the header and data crc's and write a low-level message to
1234  * the connection.  If aux_crc is non-zero the aux_data crc is already
1235  * assumed to have been set.
1236  *
1237  * A non-NULL msg is added to the queue but not necessarily flushed.
1238  * Calling this function with msg == NULL will get a flush going.
1239  *
1240  * (called from iocom_core only)
1241  */
1242 void
1243 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1244 {
1245         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1246         dmsg_msg_t *msg;
1247         uint32_t xcrc32;
1248         size_t hbytes;
1249         size_t abytes;
1250         dmsg_msg_queue_t tmpq;
1251
1252         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1253         TAILQ_INIT(&tmpq);
1254         pthread_mutex_lock(&iocom->mtx);
1255         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1256                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1257                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1258         }
1259         pthread_mutex_unlock(&iocom->mtx);
1260
1261         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1262                 /*
1263                  * Process terminal connection errors.
1264                  */
1265                 TAILQ_REMOVE(&tmpq, msg, qentry);
1266                 if (ioq->error) {
1267                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1268                         ++ioq->msgcount;
1269                         continue;
1270                 }
1271
1272                 /*
1273                  * Finish populating the msg fields.  The salt ensures that
1274                  * the iv[] array is ridiculously randomized and we also
1275                  * re-seed our PRNG every 32768 messages just to be sure.
1276                  */
1277                 msg->any.head.magic = DMSG_HDR_MAGIC;
1278                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1279                 ++ioq->seq;
1280                 if ((ioq->seq & 32767) == 0)
1281                         srandomdev();
1282
1283                 /*
1284                  * Calculate aux_crc if 0, then calculate hdr_crc.
1285                  */
1286                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1287                         abytes = DMSG_DOALIGN(msg->aux_size);
1288                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1289                         msg->any.head.aux_crc = xcrc32;
1290                 }
1291                 msg->any.head.aux_bytes = msg->aux_size;
1292
1293                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1294                          DMSG_ALIGN;
1295                 msg->any.head.hdr_crc = 0;
1296                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1297
1298                 /*
1299                  * Enqueue the message (the flush codes handles stream
1300                  * encryption).
1301                  */
1302                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1303                 ++ioq->msgcount;
1304         }
1305         dmsg_iocom_flush2(iocom);
1306 }
1307
1308 /*
1309  * Thread localized, iocom->mtx not held by caller.
1310  *
1311  * (called from iocom_core via iocom_flush1 only)
1312  */
1313 void
1314 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1315 {
1316         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1317         dmsg_msg_t *msg;
1318         ssize_t n;
1319         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1320         size_t nact;
1321         size_t hbytes;
1322         size_t abytes;
1323         size_t hoff;
1324         size_t aoff;
1325         int iovcnt;
1326
1327         if (ioq->error) {
1328                 dmsg_iocom_drain(iocom);
1329                 return;
1330         }
1331
1332         /*
1333          * Pump messages out the connection by building an iovec.
1334          *
1335          * ioq->hbytes/ioq->abytes tracks how much of the first message
1336          * in the queue has been successfully written out, so we can
1337          * resume writing.
1338          */
1339         iovcnt = 0;
1340         nact = 0;
1341         hoff = ioq->hbytes;
1342         aoff = ioq->abytes;
1343
1344         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1345                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1346                          DMSG_ALIGN;
1347                 abytes = DMSG_DOALIGN(msg->aux_size);
1348                 assert(hoff <= hbytes && aoff <= abytes);
1349
1350                 if (hoff < hbytes) {
1351                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1352                         iov[iovcnt].iov_len = hbytes - hoff;
1353                         nact += hbytes - hoff;
1354                         ++iovcnt;
1355                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1356                                 break;
1357                 }
1358                 if (aoff < abytes) {
1359                         assert(msg->aux_data != NULL);
1360                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1361                         iov[iovcnt].iov_len = abytes - aoff;
1362                         nact += abytes - aoff;
1363                         ++iovcnt;
1364                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1365                                 break;
1366                 }
1367                 hoff = 0;
1368                 aoff = 0;
1369         }
1370         if (iovcnt == 0)
1371                 return;
1372
1373         /*
1374          * Encrypt and write the data.  The crypto code will move the
1375          * data into the fifo and adjust the iov as necessary.  If
1376          * encryption is disabled the iov is left alone.
1377          *
1378          * May return a smaller iov (thus a smaller n), with aggregated
1379          * chunks.  May reduce nmax to what fits in the FIFO.
1380          *
1381          * This function sets nact to the number of original bytes now
1382          * encrypted, adding to the FIFO some number of bytes that might
1383          * be greater depending on the crypto mechanic.  iov[] is adjusted
1384          * to point at the FIFO if necessary.
1385          *
1386          * NOTE: The return value from the writev() is the post-encrypted
1387          *       byte count, not the plaintext count.
1388          */
1389         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1390                 /*
1391                  * Make sure the FIFO has a reasonable amount of space
1392                  * left (if not completely full).
1393                  *
1394                  * In this situation we are staging the encrypted message
1395                  * data in the FIFO.  (nact) represents how much plaintext
1396                  * has been staged, (n) represents how much encrypted data
1397                  * has been flushed.  The two are independent of each other.
1398                  */
1399                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1400                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1401                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1402                               ioq->fifo_end - ioq->fifo_beg);
1403                         ioq->fifo_cdx -= ioq->fifo_beg;
1404                         ioq->fifo_cdn -= ioq->fifo_beg;
1405                         ioq->fifo_end -= ioq->fifo_beg;
1406                         ioq->fifo_beg = 0;
1407                 }
1408
1409                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1410                 n = writev(iocom->sock_fd, iov, iovcnt);
1411                 if (n > 0) {
1412                         ioq->fifo_beg += n;
1413                         ioq->fifo_cdn += n;
1414                         ioq->fifo_cdx += n;
1415                         if (ioq->fifo_beg == ioq->fifo_end) {
1416                                 ioq->fifo_beg = 0;
1417                                 ioq->fifo_cdn = 0;
1418                                 ioq->fifo_cdx = 0;
1419                                 ioq->fifo_end = 0;
1420                         }
1421                 }
1422                 /*
1423                  * We don't mess with the nact returned by the crypto_encrypt
1424                  * call, which represents the filling of the FIFO.  (n) tells
1425                  * us how much we were able to write from the FIFO.  The two
1426                  * are different beasts when encrypting.
1427                  */
1428         } else {
1429                 /*
1430                  * In this situation we are not staging the messages to the
1431                  * FIFO but instead writing them directly from the msg
1432                  * structure(s), so (nact) is basically (n).
1433                  */
1434                 n = writev(iocom->sock_fd, iov, iovcnt);
1435                 if (n > 0)
1436                         nact = n;
1437                 else
1438                         nact = 0;
1439         }
1440
1441         /*
1442          * Clean out the transmit queue based on what we successfully
1443          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1444          * represents the portion of the first message previously sent.
1445          */
1446         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1447                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1448                          DMSG_ALIGN;
1449                 abytes = DMSG_DOALIGN(msg->aux_size);
1450
1451                 if ((size_t)nact < hbytes - ioq->hbytes) {
1452                         ioq->hbytes += nact;
1453                         nact = 0;
1454                         break;
1455                 }
1456                 nact -= hbytes - ioq->hbytes;
1457                 ioq->hbytes = hbytes;
1458                 if ((size_t)nact < abytes - ioq->abytes) {
1459                         ioq->abytes += nact;
1460                         nact = 0;
1461                         break;
1462                 }
1463                 nact -= abytes - ioq->abytes;
1464                 /* ioq->abytes = abytes; optimized out */
1465
1466                 if (DMsgDebugOpt >= 5) {
1467                         fprintf(stderr,
1468                                 "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1469                                 msg->any.head.cmd,
1470                                 (intmax_t)msg->any.head.msgid,
1471                                 (intmax_t)msg->any.head.circuit);
1472                 }
1473
1474                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1475                 --ioq->msgcount;
1476                 ioq->hbytes = 0;
1477                 ioq->abytes = 0;
1478
1479                 dmsg_state_cleanuptx(msg);
1480         }
1481         assert(nact == 0);
1482
1483         /*
1484          * Process the return value from the write w/regards to blocking.
1485          */
1486         if (n < 0) {
1487                 if (errno != EINTR &&
1488                     errno != EINPROGRESS &&
1489                     errno != EAGAIN) {
1490                         /*
1491                          * Fatal write error
1492                          */
1493                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1494                         dmsg_iocom_drain(iocom);
1495                 } else {
1496                         /*
1497                          * Wait for socket buffer space
1498                          */
1499                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1500                 }
1501         } else {
1502                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1503         }
1504         if (ioq->error) {
1505                 dmsg_iocom_drain(iocom);
1506         }
1507 }
1508
1509 /*
1510  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1511  * write events will occur.  We don't kill read msgs because we want
1512  * the caller to pull off our contrived terminal error msg to detect
1513  * the connection failure.
1514  *
1515  * Localized to iocom_core thread, iocom->mtx not held by caller.
1516  */
1517 void
1518 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1519 {
1520         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1521         dmsg_msg_t *msg;
1522
1523         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1524         ioq->hbytes = 0;
1525         ioq->abytes = 0;
1526
1527         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1528                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1529                 --ioq->msgcount;
1530                 dmsg_state_cleanuptx(msg);
1531         }
1532 }
1533
1534 /*
1535  * Write a message to an iocom, with additional state processing.
1536  */
1537 void
1538 dmsg_msg_write(dmsg_msg_t *msg)
1539 {
1540         dmsg_iocom_t *iocom = msg->iocom;
1541         dmsg_state_t *state;
1542         char dummy;
1543
1544         /*
1545          * Handle state processing, create state if necessary.
1546          */
1547         pthread_mutex_lock(&iocom->mtx);
1548         if ((state = msg->state) != NULL) {
1549                 /*
1550                  * Existing transaction (could be reply).  It is also
1551                  * possible for this to be the first reply (CREATE is set),
1552                  * in which case we populate state->txcmd.
1553                  *
1554                  * state->txcmd is adjusted to hold the final message cmd,
1555                  * and we also be sure to set the CREATE bit here.  We did
1556                  * not set it in dmsg_msg_alloc() because that would have
1557                  * not been serialized (state could have gotten ripped out
1558                  * from under the message prior to it being transmitted).
1559                  */
1560                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1561                     DMSGF_CREATE) {
1562                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1563                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1564                 }
1565                 msg->any.head.msgid = state->msgid;
1566                 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1567                 if (msg->any.head.cmd & DMSGF_CREATE) {
1568                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1569                 }
1570         }
1571
1572         /*
1573          * Queue it for output, wake up the I/O pthread.  Note that the
1574          * I/O thread is responsible for generating the CRCs and encryption.
1575          */
1576         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1577         dummy = 0;
1578         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1579         pthread_mutex_unlock(&iocom->mtx);
1580 }
1581
1582 /*
1583  * This is a shortcut to formulate a reply to msg with a simple error code,
1584  * It can reply to and terminate a transaction, or it can reply to a one-way
1585  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1586  * the error code (which can be 0).  Not all transactions are terminated
1587  * with DMSG_LNK_ERROR status (the low level only cares about the
1588  * MSGF_DELETE flag), but most are.
1589  *
1590  * Replies to one-way messages are a bit of an oxymoron but the feature
1591  * is used by the debug (DBG) protocol.
1592  *
1593  * The reply contains no extended data.
1594  */
1595 void
1596 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1597 {
1598         dmsg_state_t *state = msg->state;
1599         dmsg_msg_t *nmsg;
1600         uint32_t cmd;
1601
1602
1603         /*
1604          * Reply with a simple error code and terminate the transaction.
1605          */
1606         cmd = DMSG_LNK_ERROR;
1607
1608         /*
1609          * Check if our direction has even been initiated yet, set CREATE.
1610          *
1611          * Check what direction this is (command or reply direction).  Note
1612          * that txcmd might not have been initiated yet.
1613          *
1614          * If our direction has already been closed we just return without
1615          * doing anything.
1616          */
1617         if (state) {
1618                 if (state->txcmd & DMSGF_DELETE)
1619                         return;
1620                 if (state->txcmd & DMSGF_REPLY)
1621                         cmd |= DMSGF_REPLY;
1622                 cmd |= DMSGF_DELETE;
1623         } else {
1624                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1625                         cmd |= DMSGF_REPLY;
1626         }
1627
1628         /*
1629          * Allocate the message and associate it with the existing state.
1630          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1631          * allocate new state.  We have our state already.
1632          */
1633         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1634         if (state) {
1635                 if ((state->txcmd & DMSGF_CREATE) == 0)
1636                         nmsg->any.head.cmd |= DMSGF_CREATE;
1637         }
1638         nmsg->any.head.error = error;
1639         nmsg->any.head.msgid = msg->any.head.msgid;
1640         nmsg->any.head.circuit = msg->any.head.circuit;
1641         nmsg->state = state;
1642         dmsg_msg_write(nmsg);
1643 }
1644
1645 /*
1646  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1647  * we are generating a streaming reply or an intermediate acknowledgement
1648  * of some sort as part of the higher level protocol, with more to come
1649  * later.
1650  */
1651 void
1652 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1653 {
1654         dmsg_state_t *state = msg->state;
1655         dmsg_msg_t *nmsg;
1656         uint32_t cmd;
1657
1658
1659         /*
1660          * Reply with a simple error code and terminate the transaction.
1661          */
1662         cmd = DMSG_LNK_ERROR;
1663
1664         /*
1665          * Check if our direction has even been initiated yet, set CREATE.
1666          *
1667          * Check what direction this is (command or reply direction).  Note
1668          * that txcmd might not have been initiated yet.
1669          *
1670          * If our direction has already been closed we just return without
1671          * doing anything.
1672          */
1673         if (state) {
1674                 if (state->txcmd & DMSGF_DELETE)
1675                         return;
1676                 if (state->txcmd & DMSGF_REPLY)
1677                         cmd |= DMSGF_REPLY;
1678                 /* continuing transaction, do not set MSGF_DELETE */
1679         } else {
1680                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1681                         cmd |= DMSGF_REPLY;
1682         }
1683
1684         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1685         if (state) {
1686                 if ((state->txcmd & DMSGF_CREATE) == 0)
1687                         nmsg->any.head.cmd |= DMSGF_CREATE;
1688         }
1689         nmsg->any.head.error = error;
1690         nmsg->any.head.msgid = msg->any.head.msgid;
1691         nmsg->any.head.circuit = msg->any.head.circuit;
1692         nmsg->state = state;
1693         dmsg_msg_write(nmsg);
1694 }
1695
1696 /*
1697  * Terminate a transaction given a state structure by issuing a DELETE.
1698  */
1699 void
1700 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1701 {
1702         dmsg_msg_t *nmsg;
1703         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1704
1705         /*
1706          * Nothing to do if we already transmitted a delete
1707          */
1708         if (state->txcmd & DMSGF_DELETE)
1709                 return;
1710
1711         /*
1712          * Set REPLY if the other end initiated the command.  Otherwise
1713          * we are the command direction.
1714          */
1715         if (state->txcmd & DMSGF_REPLY)
1716                 cmd |= DMSGF_REPLY;
1717
1718         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1719         if (state) {
1720                 if ((state->txcmd & DMSGF_CREATE) == 0)
1721                         nmsg->any.head.cmd |= DMSGF_CREATE;
1722         }
1723         nmsg->any.head.error = error;
1724         nmsg->any.head.msgid = state->msgid;
1725         nmsg->any.head.circuit = state->msg->any.head.circuit;
1726         nmsg->state = state;
1727         dmsg_msg_write(nmsg);
1728 }
1729
1730 /*
1731  * Terminate a transaction given a state structure by issuing a DELETE.
1732  */
1733 void
1734 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1735 {
1736         dmsg_msg_t *nmsg;
1737         uint32_t cmd = DMSG_LNK_ERROR;
1738
1739         /*
1740          * Nothing to do if we already transmitted a delete
1741          */
1742         if (state->txcmd & DMSGF_DELETE)
1743                 return;
1744
1745         /*
1746          * Set REPLY if the other end initiated the command.  Otherwise
1747          * we are the command direction.
1748          */
1749         if (state->txcmd & DMSGF_REPLY)
1750                 cmd |= DMSGF_REPLY;
1751
1752         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1753         if (state) {
1754                 if ((state->txcmd & DMSGF_CREATE) == 0)
1755                         nmsg->any.head.cmd |= DMSGF_CREATE;
1756         }
1757         nmsg->any.head.error = error;
1758         nmsg->any.head.msgid = state->msgid;
1759         nmsg->any.head.circuit = state->msg->any.head.circuit;
1760         nmsg->state = state;
1761         dmsg_msg_write(nmsg);
1762 }
1763
1764 /************************************************************************
1765  *                      TRANSACTION STATE HANDLING                      *
1766  ************************************************************************
1767  *
1768  */
1769
1770 /*
1771  * Process circuit and state tracking for a message after reception, prior
1772  * to execution.
1773  *
1774  * Called with msglk held and the msg dequeued.
1775  *
1776  * All messages are called with dummy state and return actual state.
1777  * (One-off messages often just return the same dummy state).
1778  *
1779  * May request that caller discard the message by setting *discardp to 1.
1780  * The returned state is not used in this case and is allowed to be NULL.
1781  *
1782  * --
1783  *
1784  * These routines handle persistent and command/reply message state via the
1785  * CREATE and DELETE flags.  The first message in a command or reply sequence
1786  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1787  *
1788  * There can be any number of intermediate messages belonging to the same
1789  * sequence sent inbetween the CREATE message and the DELETE message,
1790  * which set neither flag.  This represents a streaming command or reply.
1791  *
1792  * Any command message received with CREATE set expects a reply sequence to
1793  * be returned.  Reply sequences work the same as command sequences except the
1794  * REPLY bit is also sent.  Both the command side and reply side can
1795  * degenerate into a single message with both CREATE and DELETE set.  Note
1796  * that one side can be streaming and the other side not, or neither, or both.
1797  *
1798  * The msgid is unique for the initiator.  That is, two sides sending a new
1799  * message can use the same msgid without colliding.
1800  *
1801  * --
1802  *
1803  * ABORT sequences work by setting the ABORT flag along with normal message
1804  * state.  However, ABORTs can also be sent on half-closed messages, that is
1805  * even if the command or reply side has already sent a DELETE, as long as
1806  * the message has not been fully closed it can still send an ABORT+DELETE
1807  * to terminate the half-closed message state.
1808  *
1809  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1810  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1811  * also race, and in this situation the other side might have already
1812  * initiated a new unrelated command with the same message id.  Since
1813  * the abort has not set the CREATE flag the situation can be detected
1814  * and the message will also be discarded.
1815  *
1816  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1817  * The ABORT request is essentially integrated into the command instead
1818  * of being sent later on.  In this situation the command implementation
1819  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1820  * special-case non-blocking operation for the command.
1821  *
1822  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1823  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1824  *        one-way messages are not supported.
1825  *
1826  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1827  *        simply ignored.
1828  *
1829  * --
1830  *
1831  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1832  * set.  One-off messages cannot be aborted and typically aren't processed
1833  * by these routines.  The REPLY bit can be used to distinguish whether a
1834  * one-off message is a command or reply.  For example, one-off replies
1835  * will typically just contain status updates.
1836  */
1837 static int
1838 dmsg_state_msgrx(dmsg_msg_t *msg)
1839 {
1840         dmsg_iocom_t *iocom = msg->iocom;
1841         dmsg_circuit_t *circuit;
1842         dmsg_circuit_t *ocircuit;
1843         dmsg_state_t *state;
1844         dmsg_state_t sdummy;
1845         dmsg_circuit_t cdummy;
1846         int error;
1847
1848         pthread_mutex_lock(&iocom->mtx);
1849
1850         /*
1851          * Locate existing persistent circuit and state, if any.
1852          */
1853         if (msg->any.head.circuit == 0) {
1854                 circuit = &iocom->circuit0;
1855         } else {
1856                 cdummy.msgid = msg->any.head.circuit;
1857                 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1858                                   &cdummy);
1859                 if (circuit == NULL) {
1860                         pthread_mutex_unlock(&iocom->mtx);
1861                         return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1862                 }
1863         }
1864
1865         /*
1866          * Replace circuit0 with actual
1867          */
1868         dmsg_circuit_hold(circuit);
1869         ocircuit = msg->circuit;
1870         msg->circuit = circuit;
1871
1872         /*
1873          * If received msg is a command state is on staterd_tree.
1874          * If received msg is a reply state is on statewr_tree.
1875          */
1876         sdummy.msgid = msg->any.head.msgid;
1877         if (msg->any.head.cmd & DMSGF_REPLY) {
1878                 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1879                                 &sdummy);
1880         } else {
1881                 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1882                                 &sdummy);
1883         }
1884         msg->state = state;
1885
1886         pthread_mutex_unlock(&iocom->mtx);
1887         if (ocircuit)
1888                 dmsg_circuit_drop(ocircuit);
1889
1890         /*
1891          * Short-cut one-off or mid-stream messages (state may be NULL).
1892          */
1893         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1894                                   DMSGF_ABORT)) == 0) {
1895                 return(0);
1896         }
1897
1898         /*
1899          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1900          * inside the case statements.
1901          */
1902         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1903                                     DMSGF_REPLY)) {
1904         case DMSGF_CREATE:
1905         case DMSGF_CREATE | DMSGF_DELETE:
1906                 /*
1907                  * New persistant command received.
1908                  */
1909                 if (state) {
1910                         fprintf(stderr, "duplicate-trans %s\n",
1911                                 dmsg_msg_str(msg));
1912                         error = DMSG_IOQ_ERROR_TRANS;
1913                         assert(0);
1914                         break;
1915                 }
1916                 state = malloc(sizeof(*state));
1917                 bzero(state, sizeof(*state));
1918                 state->iocom = iocom;
1919                 state->circuit = circuit;
1920                 state->flags = DMSG_STATE_DYNAMIC;
1921                 state->msg = msg;
1922                 state->txcmd = DMSGF_REPLY;
1923                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1924                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1925                 state->flags |= DMSG_STATE_INSERTED;
1926                 state->msgid = msg->any.head.msgid;
1927                 msg->state = state;
1928                 pthread_mutex_lock(&iocom->mtx);
1929                 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1930                 pthread_mutex_unlock(&iocom->mtx);
1931                 error = 0;
1932                 if (DMsgDebugOpt) {
1933                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1934                                 state, (uint32_t)state->msgid, iocom);
1935                 }
1936                 break;
1937         case DMSGF_DELETE:
1938                 /*
1939                  * Persistent state is expected but might not exist if an
1940                  * ABORT+DELETE races the close.
1941                  */
1942                 if (state == NULL) {
1943                         if (msg->any.head.cmd & DMSGF_ABORT) {
1944                                 error = DMSG_IOQ_ERROR_EALREADY;
1945                         } else {
1946                                 fprintf(stderr, "missing-state %s\n",
1947                                         dmsg_msg_str(msg));
1948                                 error = DMSG_IOQ_ERROR_TRANS;
1949                         assert(0);
1950                         }
1951                         break;
1952                 }
1953
1954                 /*
1955                  * Handle another ABORT+DELETE case if the msgid has already
1956                  * been reused.
1957                  */
1958                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1959                         if (msg->any.head.cmd & DMSGF_ABORT) {
1960                                 error = DMSG_IOQ_ERROR_EALREADY;
1961                         } else {
1962                                 fprintf(stderr, "reused-state %s\n",
1963                                         dmsg_msg_str(msg));
1964                                 error = DMSG_IOQ_ERROR_TRANS;
1965                         assert(0);
1966                         }
1967                         break;
1968                 }
1969                 error = 0;
1970                 break;
1971         default:
1972                 /*
1973                  * Check for mid-stream ABORT command received, otherwise
1974                  * allow.
1975                  */
1976                 if (msg->any.head.cmd & DMSGF_ABORT) {
1977                         if (state == NULL ||
1978                             (state->rxcmd & DMSGF_CREATE) == 0) {
1979                                 error = DMSG_IOQ_ERROR_EALREADY;
1980                                 break;
1981                         }
1982                 }
1983                 error = 0;
1984                 break;
1985         case DMSGF_REPLY | DMSGF_CREATE:
1986         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1987                 /*
1988                  * When receiving a reply with CREATE set the original
1989                  * persistent state message should already exist.
1990                  */
1991                 if (state == NULL) {
1992                         fprintf(stderr, "no-state(r) %s\n",
1993                                 dmsg_msg_str(msg));
1994                         error = DMSG_IOQ_ERROR_TRANS;
1995                         assert(0);
1996                         break;
1997                 }
1998                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1999                         DMSGF_REPLY) == 0);
2000                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2001                 error = 0;
2002                 break;
2003         case DMSGF_REPLY | DMSGF_DELETE:
2004                 /*
2005                  * Received REPLY+ABORT+DELETE in case where msgid has
2006                  * already been fully closed, ignore the message.
2007                  */
2008                 if (state == NULL) {
2009                         if (msg->any.head.cmd & DMSGF_ABORT) {
2010                                 error = DMSG_IOQ_ERROR_EALREADY;
2011                         } else {
2012                                 fprintf(stderr, "no-state(r,d) %s\n",
2013                                         dmsg_msg_str(msg));
2014                                 error = DMSG_IOQ_ERROR_TRANS;
2015                         assert(0);
2016                         }
2017                         break;
2018                 }
2019
2020                 /*
2021                  * Received REPLY+ABORT+DELETE in case where msgid has
2022                  * already been reused for an unrelated message,
2023                  * ignore the message.
2024                  */
2025                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2026                         if (msg->any.head.cmd & DMSGF_ABORT) {
2027                                 error = DMSG_IOQ_ERROR_EALREADY;
2028                         } else {
2029                                 fprintf(stderr, "reused-state(r,d) %s\n",
2030                                         dmsg_msg_str(msg));
2031                                 error = DMSG_IOQ_ERROR_TRANS;
2032                         assert(0);
2033                         }
2034                         break;
2035                 }
2036                 error = 0;
2037                 break;
2038         case DMSGF_REPLY:
2039                 /*
2040                  * Check for mid-stream ABORT reply received to sent command.
2041                  */
2042                 if (msg->any.head.cmd & DMSGF_ABORT) {
2043                         if (state == NULL ||
2044                             (state->rxcmd & DMSGF_CREATE) == 0) {
2045                                 error = DMSG_IOQ_ERROR_EALREADY;
2046                                 break;
2047                         }
2048                 }
2049                 error = 0;
2050                 break;
2051         }
2052         return (error);
2053 }
2054
2055 void
2056 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2057 {
2058         dmsg_state_t *state;
2059
2060         if ((state = msg->state) == NULL) {
2061                 /*
2062                  * Free a non-transactional message, there is no state
2063                  * to worry about.
2064                  */
2065                 dmsg_msg_free(msg);
2066         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2067                 /*
2068                  * Message terminating transaction, destroy the related
2069                  * state, the original message, and this message (if it
2070                  * isn't the original message due to a CREATE|DELETE).
2071                  */
2072                 pthread_mutex_lock(&iocom->mtx);
2073                 state->rxcmd |= DMSGF_DELETE;
2074                 if (state->txcmd & DMSGF_DELETE) {
2075                         if (state->msg == msg)
2076                                 state->msg = NULL;
2077                         assert(state->flags & DMSG_STATE_INSERTED);
2078                         if (state->rxcmd & DMSGF_REPLY) {
2079                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2080                                 RB_REMOVE(dmsg_state_tree,
2081                                           &msg->circuit->statewr_tree, state);
2082                         } else {
2083                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2084                                 RB_REMOVE(dmsg_state_tree,
2085                                           &msg->circuit->staterd_tree, state);
2086                         }
2087                         state->flags &= ~DMSG_STATE_INSERTED;
2088                         dmsg_state_free(state);
2089                 } else {
2090                         ;
2091                 }
2092                 pthread_mutex_unlock(&iocom->mtx);
2093                 dmsg_msg_free(msg);
2094         } else if (state->msg != msg) {
2095                 /*
2096                  * Message not terminating transaction, leave state intact
2097                  * and free message if it isn't the CREATE message.
2098                  */
2099                 dmsg_msg_free(msg);
2100         }
2101 }
2102
2103 static void
2104 dmsg_state_cleanuptx(dmsg_msg_t *msg)
2105 {
2106         dmsg_iocom_t *iocom = msg->iocom;
2107         dmsg_state_t *state;
2108
2109         if ((state = msg->state) == NULL) {
2110                 dmsg_msg_free(msg);
2111         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2112                 pthread_mutex_lock(&iocom->mtx);
2113                 assert((state->txcmd & DMSGF_DELETE) == 0);
2114                 state->txcmd |= DMSGF_DELETE;
2115                 if (state->rxcmd & DMSGF_DELETE) {
2116                         if (state->msg == msg)
2117                                 state->msg = NULL;
2118                         assert(state->flags & DMSG_STATE_INSERTED);
2119                         if (state->txcmd & DMSGF_REPLY) {
2120                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2121                                 RB_REMOVE(dmsg_state_tree,
2122                                           &msg->circuit->staterd_tree, state);
2123                         } else {
2124                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2125                                 RB_REMOVE(dmsg_state_tree,
2126                                           &msg->circuit->statewr_tree, state);
2127                         }
2128                         state->flags &= ~DMSG_STATE_INSERTED;
2129                         dmsg_state_free(state);
2130                 } else {
2131                         ;
2132                 }
2133                 pthread_mutex_unlock(&iocom->mtx);
2134                 dmsg_msg_free(msg);
2135         } else if (state->msg != msg) {
2136                 dmsg_msg_free(msg);
2137         }
2138 }
2139
2140 /*
2141  * Called with iocom locked
2142  */
2143 void
2144 dmsg_state_free(dmsg_state_t *state)
2145 {
2146         dmsg_msg_t *msg;
2147
2148         if (DMsgDebugOpt) {
2149                 fprintf(stderr, "terminate state %p id=%08x\n",
2150                         state, (uint32_t)state->msgid);
2151         }
2152         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2153                 closefrom(3);
2154         assert(state->any.any == NULL);
2155         msg = state->msg;
2156         state->msg = NULL;
2157         if (msg)
2158                 dmsg_msg_free_locked(msg);
2159         free(state);
2160 }
2161
2162 /*
2163  * Called with iocom locked
2164  */
2165 void
2166 dmsg_circuit_hold(dmsg_circuit_t *circuit)
2167 {
2168         assert(circuit->refs > 0);              /* caller must hold ref */
2169         atomic_add_int(&circuit->refs, 1);      /* to safely add more */
2170 }
2171
2172 /*
2173  * Called with iocom locked
2174  */
2175 void
2176 dmsg_circuit_drop(dmsg_circuit_t *circuit)
2177 {
2178         dmsg_iocom_t *iocom = circuit->iocom;
2179         char dummy;
2180
2181         assert(circuit->refs > 0);
2182         assert(iocom);
2183
2184         /*
2185          * Decrement circuit refs, destroy circuit when refs drops to 0.
2186          */
2187         if (atomic_fetchadd_int(&circuit->refs, -1) != 1)
2188                 return;
2189         assert(circuit != &iocom->circuit0);
2190
2191         assert(RB_EMPTY(&circuit->staterd_tree));
2192         assert(RB_EMPTY(&circuit->statewr_tree));
2193         pthread_mutex_lock(&iocom->mtx);
2194         RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2195         circuit->iocom = NULL;
2196         pthread_mutex_unlock(&iocom->mtx);
2197         dmsg_free(circuit);
2198
2199         /*
2200          * When an iocom error is present the rx code will terminate the
2201          * receive side for all transactions and (indirectly) all circuits
2202          * by simulating DELETE messages.  The state and related circuits
2203          * don't disappear until the related states are closed in both
2204          * directions
2205          *
2206          * Detect the case where the last circuit is now gone (and thus all
2207          * states for all circuits are gone), and wakeup the rx thread to
2208          * complete the termination.
2209          */
2210         if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2211                 dummy = 0;
2212                 write(iocom->wakeupfds[1], &dummy, 1);
2213         }
2214 }
2215
2216 void
2217 dmsg_circuit_drop_locked(dmsg_circuit_t *circuit)
2218 {
2219         dmsg_iocom_t *iocom;
2220
2221         iocom = circuit->iocom;
2222         assert(circuit->refs > 0);
2223         assert(iocom);
2224
2225         if (atomic_fetchadd_int(&circuit->refs, -1) == 1) {
2226                 assert(circuit != &iocom->circuit0);
2227                 assert(RB_EMPTY(&circuit->staterd_tree));
2228                 assert(RB_EMPTY(&circuit->statewr_tree));
2229                 RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2230                 circuit->iocom = NULL;
2231                 dmsg_free(circuit);
2232                 if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2233                         char dummy = 0;
2234                         write(iocom->wakeupfds[1], &dummy, 1);
2235                 }
2236         }
2237 }
2238
2239 /*
2240  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2241  * header is not adjusted, just the core header.
2242  */
2243 void
2244 dmsg_bswap_head(dmsg_hdr_t *head)
2245 {
2246         head->magic     = bswap16(head->magic);
2247         head->reserved02 = bswap16(head->reserved02);
2248         head->salt      = bswap32(head->salt);
2249
2250         head->msgid     = bswap64(head->msgid);
2251         head->circuit   = bswap64(head->circuit);
2252         head->reserved18= bswap64(head->reserved18);
2253
2254         head->cmd       = bswap32(head->cmd);
2255         head->aux_crc   = bswap32(head->aux_crc);
2256         head->aux_bytes = bswap32(head->aux_bytes);
2257         head->error     = bswap32(head->error);
2258         head->aux_descr = bswap64(head->aux_descr);
2259         head->reserved38= bswap32(head->reserved38);
2260         head->hdr_crc   = bswap32(head->hdr_crc);
2261 }