4cb2aafaa1d3cceac0eaae0ecd651f107690db82
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39
40 static int dmsg_state_msgrx(dmsg_msg_t *msg);
41 static void dmsg_state_cleanuptx(dmsg_msg_t *msg);
42
43 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
44 RB_GENERATE(dmsg_circuit_tree, dmsg_circuit, rbnode, dmsg_circuit_cmp);
45
46 /*
47  * STATE TREE - Represents open transactions which are indexed by their
48  *              { msgid } relative to the governing iocom.
49  */
50 int
51 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
52 {
53         if (state1->msgid < state2->msgid)
54                 return(-1);
55         if (state1->msgid > state2->msgid)
56                 return(1);
57         return(0);
58 }
59
60 /*
61  * CIRCUIT TREE - Represents open circuits which are indexed by their
62  *                { msgid } relative to the governing iocom.
63  */
64 int
65 dmsg_circuit_cmp(dmsg_circuit_t *circuit1, dmsg_circuit_t *circuit2)
66 {
67         if (circuit1->msgid < circuit2->msgid)
68                 return(-1);
69         if (circuit1->msgid > circuit2->msgid)
70                 return(1);
71         return(0);
72 }
73
74 /*
75  * Initialize a low-level ioq
76  */
77 void
78 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
79 {
80         bzero(ioq, sizeof(*ioq));
81         ioq->state = DMSG_MSGQ_STATE_HEADER1;
82         TAILQ_INIT(&ioq->msgq);
83 }
84
85 /*
86  * Cleanup queue.
87  *
88  * caller holds iocom->mtx.
89  */
90 void
91 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
92 {
93         dmsg_msg_t *msg;
94
95         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
96                 assert(0);      /* shouldn't happen */
97                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
98                 dmsg_msg_free(msg);
99         }
100         if ((msg = ioq->msg) != NULL) {
101                 ioq->msg = NULL;
102                 dmsg_msg_free(msg);
103         }
104 }
105
106 /*
107  * Initialize a low-level communications channel.
108  *
109  * NOTE: The signal_func() is called at least once from the loop and can be
110  *       re-armed via dmsg_iocom_restate().
111  */
112 void
113 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
114                    void (*signal_func)(dmsg_iocom_t *),
115                    void (*rcvmsg_func)(dmsg_msg_t *),
116                    void (*dbgmsg_func)(dmsg_msg_t *),
117                    void (*altmsg_func)(dmsg_iocom_t *))
118 {
119         struct stat st;
120
121         bzero(iocom, sizeof(*iocom));
122
123         asprintf(&iocom->label, "iocom-%p", iocom);
124         iocom->signal_callback = signal_func;
125         iocom->rcvmsg_callback = rcvmsg_func;
126         iocom->altmsg_callback = altmsg_func;
127         iocom->dbgmsg_callback = dbgmsg_func;
128
129         pthread_mutex_init(&iocom->mtx, NULL);
130         RB_INIT(&iocom->circuit_tree);
131         TAILQ_INIT(&iocom->freeq);
132         TAILQ_INIT(&iocom->freeq_aux);
133         TAILQ_INIT(&iocom->txmsgq);
134         iocom->sock_fd = sock_fd;
135         iocom->alt_fd = alt_fd;
136         iocom->flags = DMSG_IOCOMF_RREQ;
137         if (signal_func)
138                 iocom->flags |= DMSG_IOCOMF_SWORK;
139         dmsg_ioq_init(iocom, &iocom->ioq_rx);
140         dmsg_ioq_init(iocom, &iocom->ioq_tx);
141         if (pipe(iocom->wakeupfds) < 0)
142                 assert(0);
143         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
144         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
145
146         dmsg_circuit_init(iocom, &iocom->circuit0);
147
148         /*
149          * Negotiate session crypto synchronously.  This will mark the
150          * connection as error'd if it fails.  If this is a pipe it's
151          * a linkage that we set up ourselves to the filesystem and there
152          * is no crypto.
153          */
154         if (fstat(sock_fd, &st) < 0)
155                 assert(0);
156         if (S_ISSOCK(st.st_mode))
157                 dmsg_crypto_negotiate(iocom);
158
159         /*
160          * Make sure our fds are set to non-blocking for the iocom core.
161          */
162         if (sock_fd >= 0)
163                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
164 #if 0
165         /* if line buffered our single fgets() should be fine */
166         if (alt_fd >= 0)
167                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
168 #endif
169 }
170
171 void
172 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
173 {
174         va_list va;
175         char *optr;
176
177         va_start(va, ctl);
178         optr = iocom->label;
179         vasprintf(&iocom->label, ctl, va);
180         va_end(va);
181         if (optr)
182                 free(optr);
183 }
184
185 /*
186  * May only be called from a callback from iocom_core.
187  *
188  * Adjust state machine functions, set flags to guarantee that both
189  * the recevmsg_func and the sendmsg_func is called at least once.
190  */
191 void
192 dmsg_iocom_restate(dmsg_iocom_t *iocom,
193                    void (*signal_func)(dmsg_iocom_t *),
194                    void (*rcvmsg_func)(dmsg_msg_t *msg),
195                    void (*altmsg_func)(dmsg_iocom_t *))
196 {
197         iocom->signal_callback = signal_func;
198         iocom->rcvmsg_callback = rcvmsg_func;
199         iocom->altmsg_callback = altmsg_func;
200         if (signal_func)
201                 iocom->flags |= DMSG_IOCOMF_SWORK;
202         else
203                 iocom->flags &= ~DMSG_IOCOMF_SWORK;
204 }
205
206 void
207 dmsg_iocom_signal(dmsg_iocom_t *iocom)
208 {
209         if (iocom->signal_callback)
210                 iocom->flags |= DMSG_IOCOMF_SWORK;
211 }
212
213 /*
214  * Cleanup a terminating iocom.
215  *
216  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
217  * from all possible references to it.
218  */
219 void
220 dmsg_iocom_done(dmsg_iocom_t *iocom)
221 {
222         dmsg_msg_t *msg;
223
224         if (iocom->sock_fd >= 0) {
225                 close(iocom->sock_fd);
226                 iocom->sock_fd = -1;
227         }
228         if (iocom->alt_fd >= 0) {
229                 close(iocom->alt_fd);
230                 iocom->alt_fd = -1;
231         }
232         dmsg_ioq_done(iocom, &iocom->ioq_rx);
233         dmsg_ioq_done(iocom, &iocom->ioq_tx);
234         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
235                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
236                 free(msg);
237         }
238         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
239                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
240                 free(msg->aux_data);
241                 msg->aux_data = NULL;
242                 free(msg);
243         }
244         if (iocom->wakeupfds[0] >= 0) {
245                 close(iocom->wakeupfds[0]);
246                 iocom->wakeupfds[0] = -1;
247         }
248         if (iocom->wakeupfds[1] >= 0) {
249                 close(iocom->wakeupfds[1]);
250                 iocom->wakeupfds[1] = -1;
251         }
252         pthread_mutex_destroy(&iocom->mtx);
253 }
254
255 /*
256  * Initialize a circuit structure and add it to the iocom's circuit_tree.
257  * circuit0 is left out and will not be added to the tree.
258  */
259 void
260 dmsg_circuit_init(dmsg_iocom_t *iocom, dmsg_circuit_t *circuit)
261 {
262         circuit->iocom = iocom;
263         RB_INIT(&circuit->staterd_tree);
264         RB_INIT(&circuit->statewr_tree);
265         if (circuit->msgid)
266                 RB_INSERT(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
267 }
268
269 /*
270  * Allocate a new one-way message.
271  */
272 dmsg_msg_t *
273 dmsg_msg_alloc(dmsg_circuit_t *circuit,
274                size_t aux_size, uint32_t cmd,
275                void (*func)(dmsg_msg_t *), void *data)
276 {
277         dmsg_iocom_t *iocom = circuit->iocom;
278         dmsg_state_t *state = NULL;
279         dmsg_msg_t *msg;
280         int hbytes;
281         size_t aligned_size;
282
283         pthread_mutex_lock(&iocom->mtx);
284         if (aux_size) {
285                 aligned_size = DMSG_DOALIGN(aux_size);
286                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
287                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
288         } else {
289                 aligned_size = 0;
290                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
291                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
292         }
293         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
294                 /*
295                  * Create state when CREATE is set without REPLY.
296                  * Assign a unique msgid, in this case simply using
297                  * the pointer value for 'state'.
298                  *
299                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
300                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
301                  *
302                  * NOTE: state initiated by us and state initiated by
303                  *       a remote create are placed in different RB trees.
304                  *       The msgid for SPAN state is used in source/target
305                  *       for message routing as appropriate.
306                  */
307                 state = malloc(sizeof(*state));
308                 bzero(state, sizeof(*state));
309                 state->iocom = iocom;
310                 state->circuit = circuit;
311                 state->flags = DMSG_STATE_DYNAMIC;
312                 state->msgid = (uint64_t)(uintptr_t)state;
313                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
314                 state->rxcmd = DMSGF_REPLY;
315                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
316                 state->func = func;
317                 state->any.any = data;
318                 pthread_mutex_lock(&iocom->mtx);
319                 RB_INSERT(dmsg_state_tree, &circuit->statewr_tree, state);
320                 pthread_mutex_unlock(&iocom->mtx);
321                 state->flags |= DMSG_STATE_INSERTED;
322         }
323         pthread_mutex_unlock(&iocom->mtx);
324         if (msg == NULL) {
325                 msg = malloc(sizeof(*msg));
326                 bzero(msg, sizeof(*msg));
327                 msg->aux_data = NULL;
328                 msg->aux_size = 0;
329         }
330
331         /*
332          * [re]allocate the auxillary data buffer.  The caller knows that
333          * a size-aligned buffer will be allocated but we do not want to
334          * force the caller to zero any tail piece, so we do that ourself.
335          */
336         if (msg->aux_size != aux_size) {
337                 if (msg->aux_data) {
338                         free(msg->aux_data);
339                         msg->aux_data = NULL;
340                         msg->aux_size = 0;
341                 }
342                 if (aux_size) {
343                         msg->aux_data = malloc(aligned_size);
344                         msg->aux_size = aux_size;
345                         if (aux_size != aligned_size) {
346                                 bzero(msg->aux_data + aux_size,
347                                       aligned_size - aux_size);
348                         }
349                 }
350         }
351         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
352         if (hbytes)
353                 bzero(&msg->any.head, hbytes);
354         msg->hdr_size = hbytes;
355         msg->any.head.magic = DMSG_HDR_MAGIC;
356         msg->any.head.cmd = cmd;
357         msg->any.head.aux_descr = 0;
358         msg->any.head.aux_crc = 0;
359         msg->any.head.circuit = 0;
360         msg->circuit = circuit;
361         msg->iocom = iocom;
362         if (state) {
363                 msg->state = state;
364                 state->msg = msg;
365                 msg->any.head.msgid = state->msgid;
366         } else {
367                 msg->any.head.msgid = 0;
368         }
369         return (msg);
370 }
371
372 /*
373  * Free a message so it can be reused afresh.
374  *
375  * NOTE: aux_size can be 0 with a non-NULL aux_data.
376  */
377 static
378 void
379 dmsg_msg_free_locked(dmsg_msg_t *msg)
380 {
381         dmsg_iocom_t *iocom = msg->iocom;
382
383         msg->state = NULL;
384         if (msg->aux_data)
385                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
386         else
387                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
388 }
389
390 void
391 dmsg_msg_free(dmsg_msg_t *msg)
392 {
393         dmsg_iocom_t *iocom = msg->iocom;
394
395         pthread_mutex_lock(&iocom->mtx);
396         dmsg_msg_free_locked(msg);
397         pthread_mutex_unlock(&iocom->mtx);
398 }
399
400 /*
401  * I/O core loop for an iocom.
402  *
403  * Thread localized, iocom->mtx not held.
404  */
405 void
406 dmsg_iocom_core(dmsg_iocom_t *iocom)
407 {
408         struct pollfd fds[3];
409         char dummybuf[256];
410         dmsg_msg_t *msg;
411         int timeout;
412         int count;
413         int wi; /* wakeup pipe */
414         int si; /* socket */
415         int ai; /* alt bulk path socket */
416
417         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
418                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
419                                      DMSG_IOCOMF_WWORK |
420                                      DMSG_IOCOMF_PWORK |
421                                      DMSG_IOCOMF_SWORK |
422                                      DMSG_IOCOMF_ARWORK |
423                                      DMSG_IOCOMF_AWWORK)) == 0) {
424                         /*
425                          * Only poll if no immediate work is pending.
426                          * Otherwise we are just wasting our time calling
427                          * poll.
428                          */
429                         timeout = 5000;
430
431                         count = 0;
432                         wi = -1;
433                         si = -1;
434                         ai = -1;
435
436                         /*
437                          * Always check the inter-thread pipe, e.g.
438                          * for iocom->txmsgq work.
439                          */
440                         wi = count++;
441                         fds[wi].fd = iocom->wakeupfds[0];
442                         fds[wi].events = POLLIN;
443                         fds[wi].revents = 0;
444
445                         /*
446                          * Check the socket input/output direction as
447                          * requested
448                          */
449                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
450                                             DMSG_IOCOMF_WREQ)) {
451                                 si = count++;
452                                 fds[si].fd = iocom->sock_fd;
453                                 fds[si].events = 0;
454                                 fds[si].revents = 0;
455
456                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
457                                         fds[si].events |= POLLIN;
458                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
459                                         fds[si].events |= POLLOUT;
460                         }
461
462                         /*
463                          * Check the alternative fd for work.
464                          */
465                         if (iocom->alt_fd >= 0) {
466                                 ai = count++;
467                                 fds[ai].fd = iocom->alt_fd;
468                                 fds[ai].events = POLLIN;
469                                 fds[ai].revents = 0;
470                         }
471                         poll(fds, count, timeout);
472
473                         if (wi >= 0 && (fds[wi].revents & POLLIN))
474                                 iocom->flags |= DMSG_IOCOMF_PWORK;
475                         if (si >= 0 && (fds[si].revents & POLLIN))
476                                 iocom->flags |= DMSG_IOCOMF_RWORK;
477                         if (si >= 0 && (fds[si].revents & POLLOUT))
478                                 iocom->flags |= DMSG_IOCOMF_WWORK;
479                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
480                                 iocom->flags |= DMSG_IOCOMF_WWORK;
481                         if (ai >= 0 && (fds[ai].revents & POLLIN))
482                                 iocom->flags |= DMSG_IOCOMF_ARWORK;
483                 } else {
484                         /*
485                          * Always check the pipe
486                          */
487                         iocom->flags |= DMSG_IOCOMF_PWORK;
488                 }
489
490                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
491                         iocom->flags &= ~DMSG_IOCOMF_SWORK;
492                         iocom->signal_callback(iocom);
493                 }
494
495                 /*
496                  * Pending message queues from other threads wake us up
497                  * with a write to the wakeupfds[] pipe.  We have to clear
498                  * the pipe with a dummy read.
499                  */
500                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
501                         iocom->flags &= ~DMSG_IOCOMF_PWORK;
502                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
503                         iocom->flags |= DMSG_IOCOMF_RWORK;
504                         iocom->flags |= DMSG_IOCOMF_WWORK;
505                         if (TAILQ_FIRST(&iocom->txmsgq))
506                                 dmsg_iocom_flush1(iocom);
507                 }
508
509                 /*
510                  * Message write sequencing
511                  */
512                 if (iocom->flags & DMSG_IOCOMF_WWORK)
513                         dmsg_iocom_flush1(iocom);
514
515                 /*
516                  * Message read sequencing.  Run this after the write
517                  * sequencing in case the write sequencing allowed another
518                  * auto-DELETE to occur on the read side.
519                  */
520                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
521                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
522                                (msg = dmsg_ioq_read(iocom)) != NULL) {
523                                 if (DMsgDebugOpt) {
524                                         fprintf(stderr, "receive %s\n",
525                                                 dmsg_msg_str(msg));
526                                 }
527                                 iocom->rcvmsg_callback(msg);
528                                 dmsg_state_cleanuprx(iocom, msg);
529                         }
530                 }
531
532                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
533                         iocom->flags &= ~DMSG_IOCOMF_ARWORK;
534                         iocom->altmsg_callback(iocom);
535                 }
536         }
537 }
538
539 /*
540  * Make sure there's enough room in the FIFO to hold the
541  * needed data.
542  *
543  * Assume worst case encrypted form is 2x the size of the
544  * plaintext equivalent.
545  */
546 static
547 size_t
548 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
549 {
550         size_t bytes;
551         size_t nmax;
552
553         bytes = ioq->fifo_cdx - ioq->fifo_beg;
554         nmax = sizeof(ioq->buf) - ioq->fifo_end;
555         if (bytes + nmax / 2 < needed) {
556                 if (bytes) {
557                         bcopy(ioq->buf + ioq->fifo_beg,
558                               ioq->buf,
559                               bytes);
560                 }
561                 ioq->fifo_cdx -= ioq->fifo_beg;
562                 ioq->fifo_beg = 0;
563                 if (ioq->fifo_cdn < ioq->fifo_end) {
564                         bcopy(ioq->buf + ioq->fifo_cdn,
565                               ioq->buf + ioq->fifo_cdx,
566                               ioq->fifo_end - ioq->fifo_cdn);
567                 }
568                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
569                 ioq->fifo_cdn = ioq->fifo_cdx;
570                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
571         }
572         return(nmax);
573 }
574
575 /*
576  * Read the next ready message from the ioq, issuing I/O if needed.
577  * Caller should retry on a read-event when NULL is returned.
578  *
579  * If an error occurs during reception a DMSG_LNK_ERROR msg will
580  * be returned for each open transaction, then the ioq and iocom
581  * will be errored out and a non-transactional DMSG_LNK_ERROR
582  * msg will be returned as the final message.  The caller should not call
583  * us again after the final message is returned.
584  *
585  * Thread localized, iocom->mtx not held.
586  */
587 dmsg_msg_t *
588 dmsg_ioq_read(dmsg_iocom_t *iocom)
589 {
590         dmsg_ioq_t *ioq = &iocom->ioq_rx;
591         dmsg_msg_t *msg;
592         dmsg_state_t *state;
593         dmsg_circuit_t *circuit0;
594         dmsg_hdr_t *head;
595         ssize_t n;
596         size_t bytes;
597         size_t nmax;
598         uint32_t aux_size;
599         uint32_t xcrc32;
600         int error;
601
602 again:
603         iocom->flags &= ~(DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
604
605         /*
606          * If a message is already pending we can just remove and
607          * return it.  Message state has already been processed.
608          * (currently not implemented)
609          */
610         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
611                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
612                 return (msg);
613         }
614
615         /*
616          * If the stream is errored out we stop processing it.
617          */
618         if (ioq->error)
619                 goto skip;
620
621         /*
622          * Message read in-progress (msg is NULL at the moment).  We don't
623          * allocate a msg until we have its core header.
624          */
625         nmax = sizeof(ioq->buf) - ioq->fifo_end;
626         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
627         msg = ioq->msg;
628
629         switch(ioq->state) {
630         case DMSG_MSGQ_STATE_HEADER1:
631                 /*
632                  * Load the primary header, fail on any non-trivial read
633                  * error or on EOF.  Since the primary header is the same
634                  * size is the message alignment it will never straddle
635                  * the end of the buffer.
636                  */
637                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
638                 if (bytes < sizeof(msg->any.head)) {
639                         n = read(iocom->sock_fd,
640                                  ioq->buf + ioq->fifo_end,
641                                  nmax);
642                         if (n <= 0) {
643                                 if (n == 0) {
644                                         ioq->error = DMSG_IOQ_ERROR_EOF;
645                                         break;
646                                 }
647                                 if (errno != EINTR &&
648                                     errno != EINPROGRESS &&
649                                     errno != EAGAIN) {
650                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
651                                         break;
652                                 }
653                                 n = 0;
654                                 /* fall through */
655                         }
656                         ioq->fifo_end += (size_t)n;
657                         nmax -= (size_t)n;
658                 }
659
660                 /*
661                  * Decrypt data received so far.  Data will be decrypted
662                  * in-place but might create gaps in the FIFO.  Partial
663                  * blocks are not immediately decrypted.
664                  *
665                  * WARNING!  The header might be in the wrong endian, we
666                  *           do not fix it up until we get the entire
667                  *           extended header.
668                  */
669                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
670                         dmsg_crypto_decrypt(iocom, ioq);
671                 } else {
672                         ioq->fifo_cdx = ioq->fifo_end;
673                         ioq->fifo_cdn = ioq->fifo_end;
674                 }
675                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
676
677                 /*
678                  * Insufficient data accumulated (msg is NULL, caller will
679                  * retry on event).
680                  */
681                 assert(msg == NULL);
682                 if (bytes < sizeof(msg->any.head))
683                         break;
684
685                 /*
686                  * Check and fixup the core header.  Note that the icrc
687                  * has to be calculated before any fixups, but the crc
688                  * fields in the msg may have to be swapped like everything
689                  * else.
690                  */
691                 head = (void *)(ioq->buf + ioq->fifo_beg);
692                 if (head->magic != DMSG_HDR_MAGIC &&
693                     head->magic != DMSG_HDR_MAGIC_REV) {
694                         fprintf(stderr, "%s: head->magic is bad %02x\n",
695                                 iocom->label, head->magic);
696                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
697                                 fprintf(stderr, "(on encrypted link)\n");
698                         ioq->error = DMSG_IOQ_ERROR_SYNC;
699                         break;
700                 }
701
702                 /*
703                  * Calculate the full header size and aux data size
704                  */
705                 if (head->magic == DMSG_HDR_MAGIC_REV) {
706                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
707                                       DMSG_ALIGN;
708                         aux_size = bswap32(head->aux_bytes);
709                 } else {
710                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
711                                       DMSG_ALIGN;
712                         aux_size = head->aux_bytes;
713                 }
714                 ioq->abytes = DMSG_DOALIGN(aux_size);
715                 ioq->unaligned_aux_size = aux_size;
716                 if (ioq->hbytes < sizeof(msg->any.head) ||
717                     ioq->hbytes > sizeof(msg->any) ||
718                     ioq->abytes > DMSG_AUX_MAX) {
719                         ioq->error = DMSG_IOQ_ERROR_FIELD;
720                         break;
721                 }
722
723                 /*
724                  * Allocate the message, the next state will fill it in.
725                  * Note that the actual buffer will be sized to an aligned
726                  * value and the aligned remainder zero'd for convenience.
727                  */
728                 msg = dmsg_msg_alloc(&iocom->circuit0, aux_size, 0,
729                                      NULL, NULL);
730                 ioq->msg = msg;
731
732                 /*
733                  * Fall through to the next state.  Make sure that the
734                  * extended header does not straddle the end of the buffer.
735                  * We still want to issue larger reads into our buffer,
736                  * book-keeping is easier if we don't bcopy() yet.
737                  *
738                  * Make sure there is enough room for bloated encrypt data.
739                  */
740                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
741                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
742                 /* fall through */
743         case DMSG_MSGQ_STATE_HEADER2:
744                 /*
745                  * Fill out the extended header.
746                  */
747                 assert(msg != NULL);
748                 if (bytes < ioq->hbytes) {
749                         n = read(iocom->sock_fd,
750                                  ioq->buf + ioq->fifo_end,
751                                  nmax);
752                         if (n <= 0) {
753                                 if (n == 0) {
754                                         ioq->error = DMSG_IOQ_ERROR_EOF;
755                                         break;
756                                 }
757                                 if (errno != EINTR &&
758                                     errno != EINPROGRESS &&
759                                     errno != EAGAIN) {
760                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
761                                         break;
762                                 }
763                                 n = 0;
764                                 /* fall through */
765                         }
766                         ioq->fifo_end += (size_t)n;
767                         nmax -= (size_t)n;
768                 }
769
770                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
771                         dmsg_crypto_decrypt(iocom, ioq);
772                 } else {
773                         ioq->fifo_cdx = ioq->fifo_end;
774                         ioq->fifo_cdn = ioq->fifo_end;
775                 }
776                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
777
778                 /*
779                  * Insufficient data accumulated (set msg NULL so caller will
780                  * retry on event).
781                  */
782                 if (bytes < ioq->hbytes) {
783                         msg = NULL;
784                         break;
785                 }
786
787                 /*
788                  * Calculate the extended header, decrypt data received
789                  * so far.  Handle endian-conversion for the entire extended
790                  * header.
791                  */
792                 head = (void *)(ioq->buf + ioq->fifo_beg);
793
794                 /*
795                  * Check the CRC.
796                  */
797                 if (head->magic == DMSG_HDR_MAGIC_REV)
798                         xcrc32 = bswap32(head->hdr_crc);
799                 else
800                         xcrc32 = head->hdr_crc;
801                 head->hdr_crc = 0;
802                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
803                         ioq->error = DMSG_IOQ_ERROR_XCRC;
804                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
805                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
806                                 dmsg_msg_str(msg));
807                         assert(0);
808                         break;
809                 }
810                 head->hdr_crc = xcrc32;
811
812                 if (head->magic == DMSG_HDR_MAGIC_REV) {
813                         dmsg_bswap_head(head);
814                 }
815
816                 /*
817                  * Copy the extended header into the msg and adjust the
818                  * FIFO.
819                  */
820                 bcopy(head, &msg->any, ioq->hbytes);
821
822                 /*
823                  * We are either done or we fall-through.
824                  */
825                 if (ioq->abytes == 0) {
826                         ioq->fifo_beg += ioq->hbytes;
827                         break;
828                 }
829
830                 /*
831                  * Must adjust bytes (and the state) when falling through.
832                  * nmax doesn't change.
833                  */
834                 ioq->fifo_beg += ioq->hbytes;
835                 bytes -= ioq->hbytes;
836                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
837                 /* fall through */
838         case DMSG_MSGQ_STATE_AUXDATA1:
839                 /*
840                  * Copy the partial or complete payload from remaining
841                  * bytes in the FIFO in order to optimize the makeroom call
842                  * in the AUXDATA2 state.  We have to fall-through either
843                  * way so we can check the crc.
844                  *
845                  * msg->aux_size tracks our aux data.
846                  */
847                 if (bytes >= ioq->abytes) {
848                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
849                               ioq->abytes);
850                         msg->aux_size = ioq->abytes;
851                         ioq->fifo_beg += ioq->abytes;
852                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
853                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
854                         bytes -= ioq->abytes;
855                 } else if (bytes) {
856                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
857                               bytes);
858                         msg->aux_size = bytes;
859                         ioq->fifo_beg += bytes;
860                         if (ioq->fifo_cdx < ioq->fifo_beg)
861                                 ioq->fifo_cdx = ioq->fifo_beg;
862                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
863                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
864                         bytes = 0;
865                 } else {
866                         msg->aux_size = 0;
867                 }
868                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
869                 /* fall through */
870         case DMSG_MSGQ_STATE_AUXDATA2:
871                 /*
872                  * Make sure there is enough room for more data.
873                  */
874                 assert(msg);
875                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
876
877                 /*
878                  * Read and decrypt more of the payload.
879                  */
880                 if (msg->aux_size < ioq->abytes) {
881                         assert(bytes == 0);
882                         n = read(iocom->sock_fd,
883                                  ioq->buf + ioq->fifo_end,
884                                  nmax);
885                         if (n <= 0) {
886                                 if (n == 0) {
887                                         ioq->error = DMSG_IOQ_ERROR_EOF;
888                                         break;
889                                 }
890                                 if (errno != EINTR &&
891                                     errno != EINPROGRESS &&
892                                     errno != EAGAIN) {
893                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
894                                         break;
895                                 }
896                                 n = 0;
897                                 /* fall through */
898                         }
899                         ioq->fifo_end += (size_t)n;
900                         nmax -= (size_t)n;
901                 }
902
903                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
904                         dmsg_crypto_decrypt(iocom, ioq);
905                 } else {
906                         ioq->fifo_cdx = ioq->fifo_end;
907                         ioq->fifo_cdn = ioq->fifo_end;
908                 }
909                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
910
911                 if (bytes > ioq->abytes - msg->aux_size)
912                         bytes = ioq->abytes - msg->aux_size;
913
914                 if (bytes) {
915                         bcopy(ioq->buf + ioq->fifo_beg,
916                               msg->aux_data + msg->aux_size,
917                               bytes);
918                         msg->aux_size += bytes;
919                         ioq->fifo_beg += bytes;
920                 }
921
922                 /*
923                  * Insufficient data accumulated (set msg NULL so caller will
924                  * retry on event).
925                  *
926                  * Assert the auxillary data size is correct, then record the
927                  * original unaligned size from the message header.
928                  */
929                 if (msg->aux_size < ioq->abytes) {
930                         msg = NULL;
931                         break;
932                 }
933                 assert(msg->aux_size == ioq->abytes);
934                 msg->aux_size = ioq->unaligned_aux_size;
935
936                 /*
937                  * Check aux_crc, then we are done.  Note that the crc
938                  * is calculated over the aligned size, not the actual
939                  * size.
940                  */
941                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
942                 if (xcrc32 != msg->any.head.aux_crc) {
943                         ioq->error = DMSG_IOQ_ERROR_ACRC;
944                         break;
945                 }
946                 break;
947         case DMSG_MSGQ_STATE_ERROR:
948                 /*
949                  * Continued calls to drain recorded transactions (returning
950                  * a LNK_ERROR for each one), before we return the final
951                  * LNK_ERROR.
952                  */
953                 assert(msg == NULL);
954                 break;
955         default:
956                 /*
957                  * We don't double-return errors, the caller should not
958                  * have called us again after getting an error msg.
959                  */
960                 assert(0);
961                 break;
962         }
963
964         /*
965          * Check the message sequence.  The iv[] should prevent any
966          * possibility of a replay but we add this check anyway.
967          */
968         if (msg && ioq->error == 0) {
969                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
970                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
971                 } else {
972                         ++ioq->seq;
973                 }
974         }
975
976         /*
977          * Handle error, RREQ, or completion
978          *
979          * NOTE: nmax and bytes are invalid at this point, we don't bother
980          *       to update them when breaking out.
981          */
982         if (ioq->error) {
983 skip:
984                 /*
985                  * An unrecoverable error causes all active receive
986                  * transactions to be terminated with a LNK_ERROR message.
987                  *
988                  * Once all active transactions are exhausted we set the
989                  * iocom ERROR flag and return a non-transactional LNK_ERROR
990                  * message, which should cause master processing loops to
991                  * terminate.
992                  */
993                 assert(ioq->msg == msg);
994                 if (msg) {
995                         dmsg_msg_free(msg);
996                         ioq->msg = NULL;
997                 }
998
999                 /*
1000                  * No more I/O read processing
1001                  */
1002                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1003
1004                 /*
1005                  * Simulate a remote LNK_ERROR DELETE msg for any open
1006                  * transactions, ending with a final non-transactional
1007                  * LNK_ERROR (that the session can detect) when no
1008                  * transactions remain.
1009                  *
1010                  * We only need to scan transactions on circuit0 as these
1011                  * will contain all circuit forges, and terminating circuit
1012                  * forges will automatically terminate the transactions on
1013                  * any other circuits as well as those circuits.
1014                  */
1015                 circuit0 = &iocom->circuit0;
1016                 msg = dmsg_msg_alloc(circuit0, 0, DMSG_LNK_ERROR, NULL, NULL);
1017                 msg->any.head.error = ioq->error;
1018
1019                 pthread_mutex_lock(&iocom->mtx);
1020                 dmsg_iocom_drain(iocom);
1021
1022                 if ((state = RB_ROOT(&circuit0->staterd_tree)) != NULL) {
1023                         /*
1024                          * Active remote transactions are still present.
1025                          * Simulate the other end sending us a DELETE.
1026                          */
1027                         if (state->rxcmd & DMSGF_DELETE) {
1028                                 dmsg_msg_free(msg);
1029                                 msg = NULL;
1030                         } else {
1031                                 /*state->txcmd |= DMSGF_DELETE;*/
1032                                 msg->state = state;
1033                                 msg->iocom = iocom;
1034                                 msg->any.head.msgid = state->msgid;
1035                                 msg->any.head.cmd |= DMSGF_ABORT |
1036                                                      DMSGF_DELETE;
1037                         }
1038                 } else if ((state = RB_ROOT(&circuit0->statewr_tree)) != NULL) {
1039                         /*
1040                          * Active local transactions are still present.
1041                          * Simulate the other end sending us a DELETE.
1042                          */
1043                         if (state->rxcmd & DMSGF_DELETE) {
1044                                 dmsg_msg_free(msg);
1045                                 msg = NULL;
1046                         } else {
1047                                 msg->state = state;
1048                                 msg->iocom = iocom;
1049                                 msg->any.head.msgid = state->msgid;
1050                                 msg->any.head.cmd |= DMSGF_ABORT |
1051                                                      DMSGF_DELETE |
1052                                                      DMSGF_REPLY;
1053                                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1054                                         msg->any.head.cmd |=
1055                                                      DMSGF_CREATE;
1056                                 }
1057                         }
1058                 } else {
1059                         /*
1060                          * No active local or remote transactions remain.
1061                          * Generate a final LNK_ERROR and flag EOF.
1062                          */
1063                         msg->state = NULL;
1064                         iocom->flags |= DMSG_IOCOMF_EOF;
1065                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1066                 }
1067                 pthread_mutex_unlock(&iocom->mtx);
1068
1069                 /*
1070                  * For the iocom error case we want to set RWORK to indicate
1071                  * that more messages might be pending.
1072                  *
1073                  * It is possible to return NULL when there is more work to
1074                  * do because each message has to be DELETEd in both
1075                  * directions before we continue on with the next (though
1076                  * this could be optimized).  The transmit direction will
1077                  * re-set RWORK.
1078                  */
1079                 if (msg)
1080                         iocom->flags |= DMSG_IOCOMF_RWORK;
1081         } else if (msg == NULL) {
1082                 /*
1083                  * Insufficient data received to finish building the message,
1084                  * set RREQ and return NULL.
1085                  *
1086                  * Leave ioq->msg intact.
1087                  * Leave the FIFO intact.
1088                  */
1089                 iocom->flags |= DMSG_IOCOMF_RREQ;
1090         } else {
1091                 /*
1092                  * Continue processing msg.
1093                  *
1094                  * The fifo has already been advanced past the message.
1095                  * Trivially reset the FIFO indices if possible.
1096                  *
1097                  * clear the FIFO if it is now empty and set RREQ to wait
1098                  * for more from the socket.  If the FIFO is not empty set
1099                  * TWORK to bypass the poll so we loop immediately.
1100                  */
1101                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1102                     ioq->fifo_cdn == ioq->fifo_end) {
1103                         iocom->flags |= DMSG_IOCOMF_RREQ;
1104                         ioq->fifo_cdx = 0;
1105                         ioq->fifo_cdn = 0;
1106                         ioq->fifo_beg = 0;
1107                         ioq->fifo_end = 0;
1108                 } else {
1109                         iocom->flags |= DMSG_IOCOMF_RWORK;
1110                 }
1111                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1112                 ioq->msg = NULL;
1113
1114                 /*
1115                  * Handle message routing.  Validates non-zero sources
1116                  * and routes message.  Error will be 0 if the message is
1117                  * destined for us.
1118                  *
1119                  * State processing only occurs for messages destined for us.
1120                  */
1121                 if (msg->any.head.circuit)
1122                         error = dmsg_circuit_relay(msg);
1123                 else
1124                         error = dmsg_state_msgrx(msg);
1125
1126                 if (error) {
1127                         /*
1128                          * Abort-after-closure, throw message away and
1129                          * start reading another.
1130                          */
1131                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1132                                 dmsg_msg_free(msg);
1133                                 goto again;
1134                         }
1135
1136                         /*
1137                          * msg routed, msg pointer no longer owned by us.
1138                          * Go to the top and start reading another.
1139                          */
1140                         if (error == DMSG_IOQ_ERROR_ROUTED)
1141                                 goto again;
1142
1143                         /*
1144                          * Process real error and throw away message.
1145                          */
1146                         ioq->error = error;
1147                         goto skip;
1148                 }
1149                 /* no error, not routed.  Fall through and return msg */
1150         }
1151         return (msg);
1152 }
1153
1154 /*
1155  * Calculate the header and data crc's and write a low-level message to
1156  * the connection.  If aux_crc is non-zero the aux_data crc is already
1157  * assumed to have been set.
1158  *
1159  * A non-NULL msg is added to the queue but not necessarily flushed.
1160  * Calling this function with msg == NULL will get a flush going.
1161  *
1162  * Caller must hold iocom->mtx.
1163  */
1164 void
1165 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1166 {
1167         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1168         dmsg_msg_t *msg;
1169         uint32_t xcrc32;
1170         size_t hbytes;
1171         size_t abytes;
1172         dmsg_msg_queue_t tmpq;
1173
1174         iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1175         TAILQ_INIT(&tmpq);
1176         pthread_mutex_lock(&iocom->mtx);
1177         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1178                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1179                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1180         }
1181         pthread_mutex_unlock(&iocom->mtx);
1182
1183         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1184                 /*
1185                  * Process terminal connection errors.
1186                  */
1187                 TAILQ_REMOVE(&tmpq, msg, qentry);
1188                 if (ioq->error) {
1189                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1190                         ++ioq->msgcount;
1191                         continue;
1192                 }
1193
1194                 /*
1195                  * Finish populating the msg fields.  The salt ensures that
1196                  * the iv[] array is ridiculously randomized and we also
1197                  * re-seed our PRNG every 32768 messages just to be sure.
1198                  */
1199                 msg->any.head.magic = DMSG_HDR_MAGIC;
1200                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1201                 ++ioq->seq;
1202                 if ((ioq->seq & 32767) == 0)
1203                         srandomdev();
1204
1205                 /*
1206                  * Calculate aux_crc if 0, then calculate hdr_crc.
1207                  */
1208                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1209                         abytes = DMSG_DOALIGN(msg->aux_size);
1210                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1211                         msg->any.head.aux_crc = xcrc32;
1212                 }
1213                 msg->any.head.aux_bytes = msg->aux_size;
1214
1215                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1216                          DMSG_ALIGN;
1217                 msg->any.head.hdr_crc = 0;
1218                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1219
1220                 /*
1221                  * Enqueue the message (the flush codes handles stream
1222                  * encryption).
1223                  */
1224                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1225                 ++ioq->msgcount;
1226         }
1227         dmsg_iocom_flush2(iocom);
1228 }
1229
1230 /*
1231  * Thread localized, iocom->mtx not held by caller.
1232  */
1233 void
1234 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1235 {
1236         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1237         dmsg_msg_t *msg;
1238         ssize_t n;
1239         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1240         size_t nact;
1241         size_t hbytes;
1242         size_t abytes;
1243         size_t hoff;
1244         size_t aoff;
1245         int iovcnt;
1246
1247         if (ioq->error) {
1248                 dmsg_iocom_drain(iocom);
1249                 return;
1250         }
1251
1252         /*
1253          * Pump messages out the connection by building an iovec.
1254          *
1255          * ioq->hbytes/ioq->abytes tracks how much of the first message
1256          * in the queue has been successfully written out, so we can
1257          * resume writing.
1258          */
1259         iovcnt = 0;
1260         nact = 0;
1261         hoff = ioq->hbytes;
1262         aoff = ioq->abytes;
1263
1264         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1265                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1266                          DMSG_ALIGN;
1267                 abytes = DMSG_DOALIGN(msg->aux_size);
1268                 assert(hoff <= hbytes && aoff <= abytes);
1269
1270                 if (hoff < hbytes) {
1271                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1272                         iov[iovcnt].iov_len = hbytes - hoff;
1273                         nact += hbytes - hoff;
1274                         ++iovcnt;
1275                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1276                                 break;
1277                 }
1278                 if (aoff < abytes) {
1279                         assert(msg->aux_data != NULL);
1280                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1281                         iov[iovcnt].iov_len = abytes - aoff;
1282                         nact += abytes - aoff;
1283                         ++iovcnt;
1284                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1285                                 break;
1286                 }
1287                 hoff = 0;
1288                 aoff = 0;
1289         }
1290         if (iovcnt == 0)
1291                 return;
1292
1293         /*
1294          * Encrypt and write the data.  The crypto code will move the
1295          * data into the fifo and adjust the iov as necessary.  If
1296          * encryption is disabled the iov is left alone.
1297          *
1298          * May return a smaller iov (thus a smaller n), with aggregated
1299          * chunks.  May reduce nmax to what fits in the FIFO.
1300          *
1301          * This function sets nact to the number of original bytes now
1302          * encrypted, adding to the FIFO some number of bytes that might
1303          * be greater depending on the crypto mechanic.  iov[] is adjusted
1304          * to point at the FIFO if necessary.
1305          *
1306          * NOTE: The return value from the writev() is the post-encrypted
1307          *       byte count, not the plaintext count.
1308          */
1309         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1310                 /*
1311                  * Make sure the FIFO has a reasonable amount of space
1312                  * left (if not completely full).
1313                  */
1314                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1315                     sizeof(ioq->buf) - ioq->fifo_end >= DMSG_ALIGN * 2) {
1316                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1317                               ioq->fifo_end - ioq->fifo_beg);
1318                         ioq->fifo_cdx -= ioq->fifo_beg;
1319                         ioq->fifo_cdn -= ioq->fifo_beg;
1320                         ioq->fifo_end -= ioq->fifo_beg;
1321                         ioq->fifo_beg = 0;
1322                 }
1323
1324                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1325                 n = writev(iocom->sock_fd, iov, iovcnt);
1326                 if (n > 0) {
1327                         ioq->fifo_beg += n;
1328                         ioq->fifo_cdn += n;
1329                         ioq->fifo_cdx += n;
1330                         if (ioq->fifo_beg == ioq->fifo_end) {
1331                                 ioq->fifo_beg = 0;
1332                                 ioq->fifo_cdn = 0;
1333                                 ioq->fifo_cdx = 0;
1334                                 ioq->fifo_end = 0;
1335                         }
1336                         /* XXX what if interrupted mid-write? */
1337                 } else {
1338                         nact = 0;
1339                 }
1340         } else {
1341                 n = writev(iocom->sock_fd, iov, iovcnt);
1342                 if (n > 0)
1343                         nact = n;
1344                 else
1345                         nact = 0;
1346         }
1347
1348         /*
1349          * Clean out the transmit queue based on what we successfully
1350          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1351          * represents the portion of the first message previously sent.
1352          */
1353         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1354                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1355                          DMSG_ALIGN;
1356                 abytes = DMSG_DOALIGN(msg->aux_size);
1357
1358                 if ((size_t)nact < hbytes - ioq->hbytes) {
1359                         ioq->hbytes += nact;
1360                         nact = 0;
1361                         break;
1362                 }
1363                 nact -= hbytes - ioq->hbytes;
1364                 ioq->hbytes = hbytes;
1365                 if ((size_t)nact < abytes - ioq->abytes) {
1366                         ioq->abytes += nact;
1367                         nact = 0;
1368                         break;
1369                 }
1370                 nact -= abytes - ioq->abytes;
1371
1372                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1373                 --ioq->msgcount;
1374                 ioq->hbytes = 0;
1375                 ioq->abytes = 0;
1376
1377                 dmsg_state_cleanuptx(msg);
1378         }
1379         assert(nact == 0);
1380
1381         /*
1382          * Process the return value from the write w/regards to blocking.
1383          */
1384         if (n < 0) {
1385                 if (errno != EINTR &&
1386                     errno != EINPROGRESS &&
1387                     errno != EAGAIN) {
1388                         /*
1389                          * Fatal write error
1390                          */
1391                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1392                         dmsg_iocom_drain(iocom);
1393                 } else {
1394                         /*
1395                          * Wait for socket buffer space
1396                          */
1397                         iocom->flags |= DMSG_IOCOMF_WREQ;
1398                 }
1399         } else {
1400                 iocom->flags |= DMSG_IOCOMF_WREQ;
1401         }
1402         if (ioq->error) {
1403                 dmsg_iocom_drain(iocom);
1404         }
1405 }
1406
1407 /*
1408  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1409  * write events will occur.  We don't kill read msgs because we want
1410  * the caller to pull off our contrived terminal error msg to detect
1411  * the connection failure.
1412  *
1413  * Thread localized, iocom->mtx not held by caller.
1414  */
1415 void
1416 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1417 {
1418         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1419         dmsg_msg_t *msg;
1420
1421         iocom->flags &= ~(DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1422         ioq->hbytes = 0;
1423         ioq->abytes = 0;
1424
1425         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1426                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1427                 --ioq->msgcount;
1428                 dmsg_state_cleanuptx(msg);
1429         }
1430 }
1431
1432 /*
1433  * Write a message to an iocom, with additional state processing.
1434  */
1435 void
1436 dmsg_msg_write(dmsg_msg_t *msg)
1437 {
1438         dmsg_iocom_t *iocom = msg->iocom;
1439         dmsg_state_t *state;
1440         char dummy;
1441
1442         /*
1443          * Handle state processing, create state if necessary.
1444          */
1445         pthread_mutex_lock(&iocom->mtx);
1446         if ((state = msg->state) != NULL) {
1447                 /*
1448                  * Existing transaction (could be reply).  It is also
1449                  * possible for this to be the first reply (CREATE is set),
1450                  * in which case we populate state->txcmd.
1451                  *
1452                  * state->txcmd is adjusted to hold the final message cmd,
1453                  * and we also be sure to set the CREATE bit here.  We did
1454                  * not set it in dmsg_msg_alloc() because that would have
1455                  * not been serialized (state could have gotten ripped out
1456                  * from under the message prior to it being transmitted).
1457                  */
1458                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1459                     DMSGF_CREATE) {
1460                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1461                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1462                 }
1463                 msg->any.head.msgid = state->msgid;
1464                 assert(((state->txcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
1465                 if (msg->any.head.cmd & DMSGF_CREATE) {
1466                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1467                 }
1468         }
1469
1470         /*
1471          * Queue it for output, wake up the I/O pthread.  Note that the
1472          * I/O thread is responsible for generating the CRCs and encryption.
1473          */
1474         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1475         dummy = 0;
1476         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1477         pthread_mutex_unlock(&iocom->mtx);
1478 }
1479
1480 /*
1481  * This is a shortcut to formulate a reply to msg with a simple error code,
1482  * It can reply to and terminate a transaction, or it can reply to a one-way
1483  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1484  * the error code (which can be 0).  Not all transactions are terminated
1485  * with DMSG_LNK_ERROR status (the low level only cares about the
1486  * MSGF_DELETE flag), but most are.
1487  *
1488  * Replies to one-way messages are a bit of an oxymoron but the feature
1489  * is used by the debug (DBG) protocol.
1490  *
1491  * The reply contains no extended data.
1492  */
1493 void
1494 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1495 {
1496         dmsg_state_t *state = msg->state;
1497         dmsg_msg_t *nmsg;
1498         uint32_t cmd;
1499
1500
1501         /*
1502          * Reply with a simple error code and terminate the transaction.
1503          */
1504         cmd = DMSG_LNK_ERROR;
1505
1506         /*
1507          * Check if our direction has even been initiated yet, set CREATE.
1508          *
1509          * Check what direction this is (command or reply direction).  Note
1510          * that txcmd might not have been initiated yet.
1511          *
1512          * If our direction has already been closed we just return without
1513          * doing anything.
1514          */
1515         if (state) {
1516                 if (state->txcmd & DMSGF_DELETE)
1517                         return;
1518                 if (state->txcmd & DMSGF_REPLY)
1519                         cmd |= DMSGF_REPLY;
1520                 cmd |= DMSGF_DELETE;
1521         } else {
1522                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1523                         cmd |= DMSGF_REPLY;
1524         }
1525
1526         /*
1527          * Allocate the message and associate it with the existing state.
1528          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1529          * allocate new state.  We have our state already.
1530          */
1531         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1532         if (state) {
1533                 if ((state->txcmd & DMSGF_CREATE) == 0)
1534                         nmsg->any.head.cmd |= DMSGF_CREATE;
1535         }
1536         nmsg->any.head.error = error;
1537         nmsg->any.head.msgid = msg->any.head.msgid;
1538         nmsg->any.head.circuit = msg->any.head.circuit;
1539         nmsg->state = state;
1540         dmsg_msg_write(nmsg);
1541 }
1542
1543 /*
1544  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1545  * we are generating a streaming reply or an intermediate acknowledgement
1546  * of some sort as part of the higher level protocol, with more to come
1547  * later.
1548  */
1549 void
1550 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1551 {
1552         dmsg_state_t *state = msg->state;
1553         dmsg_msg_t *nmsg;
1554         uint32_t cmd;
1555
1556
1557         /*
1558          * Reply with a simple error code and terminate the transaction.
1559          */
1560         cmd = DMSG_LNK_ERROR;
1561
1562         /*
1563          * Check if our direction has even been initiated yet, set CREATE.
1564          *
1565          * Check what direction this is (command or reply direction).  Note
1566          * that txcmd might not have been initiated yet.
1567          *
1568          * If our direction has already been closed we just return without
1569          * doing anything.
1570          */
1571         if (state) {
1572                 if (state->txcmd & DMSGF_DELETE)
1573                         return;
1574                 if (state->txcmd & DMSGF_REPLY)
1575                         cmd |= DMSGF_REPLY;
1576                 /* continuing transaction, do not set MSGF_DELETE */
1577         } else {
1578                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1579                         cmd |= DMSGF_REPLY;
1580         }
1581
1582         nmsg = dmsg_msg_alloc(msg->circuit, 0, cmd, NULL, NULL);
1583         if (state) {
1584                 if ((state->txcmd & DMSGF_CREATE) == 0)
1585                         nmsg->any.head.cmd |= DMSGF_CREATE;
1586         }
1587         nmsg->any.head.error = error;
1588         nmsg->any.head.msgid = msg->any.head.msgid;
1589         nmsg->any.head.circuit = msg->any.head.circuit;
1590         nmsg->state = state;
1591         dmsg_msg_write(nmsg);
1592 }
1593
1594 /*
1595  * Terminate a transaction given a state structure by issuing a DELETE.
1596  */
1597 void
1598 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1599 {
1600         dmsg_msg_t *nmsg;
1601         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1602
1603         /*
1604          * Nothing to do if we already transmitted a delete
1605          */
1606         if (state->txcmd & DMSGF_DELETE)
1607                 return;
1608
1609         /*
1610          * Set REPLY if the other end initiated the command.  Otherwise
1611          * we are the command direction.
1612          */
1613         if (state->txcmd & DMSGF_REPLY)
1614                 cmd |= DMSGF_REPLY;
1615
1616         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1617         if (state) {
1618                 if ((state->txcmd & DMSGF_CREATE) == 0)
1619                         nmsg->any.head.cmd |= DMSGF_CREATE;
1620         }
1621         nmsg->any.head.error = error;
1622         nmsg->any.head.msgid = state->msgid;
1623         nmsg->any.head.circuit = state->msg->any.head.circuit;
1624         nmsg->state = state;
1625         dmsg_msg_write(nmsg);
1626 }
1627
1628 /*
1629  * Terminate a transaction given a state structure by issuing a DELETE.
1630  */
1631 void
1632 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1633 {
1634         dmsg_msg_t *nmsg;
1635         uint32_t cmd = DMSG_LNK_ERROR;
1636
1637         /*
1638          * Nothing to do if we already transmitted a delete
1639          */
1640         if (state->txcmd & DMSGF_DELETE)
1641                 return;
1642
1643         /*
1644          * Set REPLY if the other end initiated the command.  Otherwise
1645          * we are the command direction.
1646          */
1647         if (state->txcmd & DMSGF_REPLY)
1648                 cmd |= DMSGF_REPLY;
1649
1650         nmsg = dmsg_msg_alloc(state->circuit, 0, cmd, NULL, NULL);
1651         if (state) {
1652                 if ((state->txcmd & DMSGF_CREATE) == 0)
1653                         nmsg->any.head.cmd |= DMSGF_CREATE;
1654         }
1655         nmsg->any.head.error = error;
1656         nmsg->any.head.msgid = state->msgid;
1657         nmsg->any.head.circuit = state->msg->any.head.circuit;
1658         nmsg->state = state;
1659         dmsg_msg_write(nmsg);
1660 }
1661
1662 /************************************************************************
1663  *                      TRANSACTION STATE HANDLING                      *
1664  ************************************************************************
1665  *
1666  */
1667
1668 /*
1669  * Process circuit and state tracking for a message after reception, prior
1670  * to execution.
1671  *
1672  * Called with msglk held and the msg dequeued.
1673  *
1674  * All messages are called with dummy state and return actual state.
1675  * (One-off messages often just return the same dummy state).
1676  *
1677  * May request that caller discard the message by setting *discardp to 1.
1678  * The returned state is not used in this case and is allowed to be NULL.
1679  *
1680  * --
1681  *
1682  * These routines handle persistent and command/reply message state via the
1683  * CREATE and DELETE flags.  The first message in a command or reply sequence
1684  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1685  *
1686  * There can be any number of intermediate messages belonging to the same
1687  * sequence sent inbetween the CREATE message and the DELETE message,
1688  * which set neither flag.  This represents a streaming command or reply.
1689  *
1690  * Any command message received with CREATE set expects a reply sequence to
1691  * be returned.  Reply sequences work the same as command sequences except the
1692  * REPLY bit is also sent.  Both the command side and reply side can
1693  * degenerate into a single message with both CREATE and DELETE set.  Note
1694  * that one side can be streaming and the other side not, or neither, or both.
1695  *
1696  * The msgid is unique for the initiator.  That is, two sides sending a new
1697  * message can use the same msgid without colliding.
1698  *
1699  * --
1700  *
1701  * ABORT sequences work by setting the ABORT flag along with normal message
1702  * state.  However, ABORTs can also be sent on half-closed messages, that is
1703  * even if the command or reply side has already sent a DELETE, as long as
1704  * the message has not been fully closed it can still send an ABORT+DELETE
1705  * to terminate the half-closed message state.
1706  *
1707  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1708  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1709  * also race, and in this situation the other side might have already
1710  * initiated a new unrelated command with the same message id.  Since
1711  * the abort has not set the CREATE flag the situation can be detected
1712  * and the message will also be discarded.
1713  *
1714  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1715  * The ABORT request is essentially integrated into the command instead
1716  * of being sent later on.  In this situation the command implementation
1717  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1718  * special-case non-blocking operation for the command.
1719  *
1720  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1721  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1722  *        one-way messages are not supported.
1723  *
1724  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1725  *        simply ignored.
1726  *
1727  * --
1728  *
1729  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1730  * set.  One-off messages cannot be aborted and typically aren't processed
1731  * by these routines.  The REPLY bit can be used to distinguish whether a
1732  * one-off message is a command or reply.  For example, one-off replies
1733  * will typically just contain status updates.
1734  */
1735 static int
1736 dmsg_state_msgrx(dmsg_msg_t *msg)
1737 {
1738         dmsg_iocom_t *iocom = msg->iocom;
1739         dmsg_circuit_t *circuit;
1740         dmsg_state_t *state;
1741         dmsg_state_t sdummy;
1742         dmsg_circuit_t cdummy;
1743         int error;
1744
1745         pthread_mutex_lock(&iocom->mtx);
1746
1747         /*
1748          * Locate existing persistent circuit and state, if any.
1749          */
1750         if (msg->any.head.circuit == 0) {
1751                 circuit = &iocom->circuit0;
1752         } else {
1753                 cdummy.msgid = msg->any.head.circuit;
1754                 circuit = RB_FIND(dmsg_circuit_tree, &iocom->circuit_tree,
1755                                   &cdummy);
1756                 if (circuit == NULL)
1757                         return (DMSG_IOQ_ERROR_BAD_CIRCUIT);
1758         }
1759         msg->circuit = circuit;
1760         ++circuit->refs;
1761
1762         /*
1763          * If received msg is a command state is on staterd_tree.
1764          * If received msg is a reply state is on statewr_tree.
1765          */
1766         sdummy.msgid = msg->any.head.msgid;
1767         if (msg->any.head.cmd & DMSGF_REPLY) {
1768                 state = RB_FIND(dmsg_state_tree, &circuit->statewr_tree,
1769                                 &sdummy);
1770         } else {
1771                 state = RB_FIND(dmsg_state_tree, &circuit->staterd_tree,
1772                                 &sdummy);
1773         }
1774         msg->state = state;
1775         pthread_mutex_unlock(&iocom->mtx);
1776
1777         /*
1778          * Short-cut one-off or mid-stream messages (state may be NULL).
1779          */
1780         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1781                                   DMSGF_ABORT)) == 0) {
1782                 return(0);
1783         }
1784
1785         /*
1786          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1787          * inside the case statements.
1788          */
1789         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1790                                     DMSGF_REPLY)) {
1791         case DMSGF_CREATE:
1792         case DMSGF_CREATE | DMSGF_DELETE:
1793                 /*
1794                  * New persistant command received.
1795                  */
1796                 if (state) {
1797                         fprintf(stderr, "duplicate-trans %s\n",
1798                                 dmsg_msg_str(msg));
1799                         error = DMSG_IOQ_ERROR_TRANS;
1800                         assert(0);
1801                         break;
1802                 }
1803                 state = malloc(sizeof(*state));
1804                 bzero(state, sizeof(*state));
1805                 state->iocom = iocom;
1806                 state->circuit = circuit;
1807                 state->flags = DMSG_STATE_DYNAMIC;
1808                 state->msg = msg;
1809                 state->txcmd = DMSGF_REPLY;
1810                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1811                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
1812                 state->flags |= DMSG_STATE_INSERTED;
1813                 state->msgid = msg->any.head.msgid;
1814                 msg->state = state;
1815                 pthread_mutex_lock(&iocom->mtx);
1816                 RB_INSERT(dmsg_state_tree, &circuit->staterd_tree, state);
1817                 pthread_mutex_unlock(&iocom->mtx);
1818                 error = 0;
1819                 if (DMsgDebugOpt) {
1820                         fprintf(stderr, "create state %p id=%08x on iocom staterd %p\n",
1821                                 state, (uint32_t)state->msgid, iocom);
1822                 }
1823                 break;
1824         case DMSGF_DELETE:
1825                 /*
1826                  * Persistent state is expected but might not exist if an
1827                  * ABORT+DELETE races the close.
1828                  */
1829                 if (state == NULL) {
1830                         if (msg->any.head.cmd & DMSGF_ABORT) {
1831                                 error = DMSG_IOQ_ERROR_EALREADY;
1832                         } else {
1833                                 fprintf(stderr, "missing-state %s\n",
1834                                         dmsg_msg_str(msg));
1835                                 error = DMSG_IOQ_ERROR_TRANS;
1836                         assert(0);
1837                         }
1838                         break;
1839                 }
1840
1841                 /*
1842                  * Handle another ABORT+DELETE case if the msgid has already
1843                  * been reused.
1844                  */
1845                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1846                         if (msg->any.head.cmd & DMSGF_ABORT) {
1847                                 error = DMSG_IOQ_ERROR_EALREADY;
1848                         } else {
1849                                 fprintf(stderr, "reused-state %s\n",
1850                                         dmsg_msg_str(msg));
1851                                 error = DMSG_IOQ_ERROR_TRANS;
1852                         assert(0);
1853                         }
1854                         break;
1855                 }
1856                 error = 0;
1857                 break;
1858         default:
1859                 /*
1860                  * Check for mid-stream ABORT command received, otherwise
1861                  * allow.
1862                  */
1863                 if (msg->any.head.cmd & DMSGF_ABORT) {
1864                         if (state == NULL ||
1865                             (state->rxcmd & DMSGF_CREATE) == 0) {
1866                                 error = DMSG_IOQ_ERROR_EALREADY;
1867                                 break;
1868                         }
1869                 }
1870                 error = 0;
1871                 break;
1872         case DMSGF_REPLY | DMSGF_CREATE:
1873         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
1874                 /*
1875                  * When receiving a reply with CREATE set the original
1876                  * persistent state message should already exist.
1877                  */
1878                 if (state == NULL) {
1879                         fprintf(stderr, "no-state(r) %s\n",
1880                                 dmsg_msg_str(msg));
1881                         error = DMSG_IOQ_ERROR_TRANS;
1882                         assert(0);
1883                         break;
1884                 }
1885                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1886                         DMSGF_REPLY) == 0);
1887                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1888                 error = 0;
1889                 break;
1890         case DMSGF_REPLY | DMSGF_DELETE:
1891                 /*
1892                  * Received REPLY+ABORT+DELETE in case where msgid has
1893                  * already been fully closed, ignore the message.
1894                  */
1895                 if (state == NULL) {
1896                         if (msg->any.head.cmd & DMSGF_ABORT) {
1897                                 error = DMSG_IOQ_ERROR_EALREADY;
1898                         } else {
1899                                 fprintf(stderr, "no-state(r,d) %s\n",
1900                                         dmsg_msg_str(msg));
1901                                 error = DMSG_IOQ_ERROR_TRANS;
1902                         assert(0);
1903                         }
1904                         break;
1905                 }
1906
1907                 /*
1908                  * Received REPLY+ABORT+DELETE in case where msgid has
1909                  * already been reused for an unrelated message,
1910                  * ignore the message.
1911                  */
1912                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
1913                         if (msg->any.head.cmd & DMSGF_ABORT) {
1914                                 error = DMSG_IOQ_ERROR_EALREADY;
1915                         } else {
1916                                 fprintf(stderr, "reused-state(r,d) %s\n",
1917                                         dmsg_msg_str(msg));
1918                                 error = DMSG_IOQ_ERROR_TRANS;
1919                         assert(0);
1920                         }
1921                         break;
1922                 }
1923                 error = 0;
1924                 break;
1925         case DMSGF_REPLY:
1926                 /*
1927                  * Check for mid-stream ABORT reply received to sent command.
1928                  */
1929                 if (msg->any.head.cmd & DMSGF_ABORT) {
1930                         if (state == NULL ||
1931                             (state->rxcmd & DMSGF_CREATE) == 0) {
1932                                 error = DMSG_IOQ_ERROR_EALREADY;
1933                                 break;
1934                         }
1935                 }
1936                 error = 0;
1937                 break;
1938         }
1939         return (error);
1940 }
1941
1942 void
1943 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
1944 {
1945         dmsg_state_t *state;
1946
1947         if ((state = msg->state) == NULL) {
1948                 /*
1949                  * Free a non-transactional message, there is no state
1950                  * to worry about.
1951                  */
1952                 dmsg_msg_free(msg);
1953         } else if (msg->any.head.cmd & DMSGF_DELETE) {
1954                 /*
1955                  * Message terminating transaction, destroy the related
1956                  * state, the original message, and this message (if it
1957                  * isn't the original message due to a CREATE|DELETE).
1958                  */
1959                 pthread_mutex_lock(&iocom->mtx);
1960                 state->rxcmd |= DMSGF_DELETE;
1961                 if (state->txcmd & DMSGF_DELETE) {
1962                         if (state->msg == msg)
1963                                 state->msg = NULL;
1964                         assert(state->flags & DMSG_STATE_INSERTED);
1965                         if (state->rxcmd & DMSGF_REPLY) {
1966                                 assert(msg->any.head.cmd & DMSGF_REPLY);
1967                                 RB_REMOVE(dmsg_state_tree,
1968                                           &msg->circuit->statewr_tree, state);
1969                         } else {
1970                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
1971                                 RB_REMOVE(dmsg_state_tree,
1972                                           &msg->circuit->staterd_tree, state);
1973                         }
1974                         state->flags &= ~DMSG_STATE_INSERTED;
1975                         dmsg_state_free(state);
1976                 } else {
1977                         ;
1978                 }
1979                 pthread_mutex_unlock(&iocom->mtx);
1980                 dmsg_msg_free(msg);
1981         } else if (state->msg != msg) {
1982                 /*
1983                  * Message not terminating transaction, leave state intact
1984                  * and free message if it isn't the CREATE message.
1985                  */
1986                 dmsg_msg_free(msg);
1987         }
1988 }
1989
1990 static void
1991 dmsg_state_cleanuptx(dmsg_msg_t *msg)
1992 {
1993         dmsg_iocom_t *iocom = msg->iocom;
1994         dmsg_state_t *state;
1995
1996         if ((state = msg->state) == NULL) {
1997                 dmsg_msg_free(msg);
1998         } else if (msg->any.head.cmd & DMSGF_DELETE) {
1999                 pthread_mutex_lock(&iocom->mtx);
2000                 assert((state->txcmd & DMSGF_DELETE) == 0);
2001                 state->txcmd |= DMSGF_DELETE;
2002                 if (state->rxcmd & DMSGF_DELETE) {
2003                         if (state->msg == msg)
2004                                 state->msg = NULL;
2005                         assert(state->flags & DMSG_STATE_INSERTED);
2006                         if (state->txcmd & DMSGF_REPLY) {
2007                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2008                                 RB_REMOVE(dmsg_state_tree,
2009                                           &msg->circuit->staterd_tree, state);
2010                         } else {
2011                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2012                                 RB_REMOVE(dmsg_state_tree,
2013                                           &msg->circuit->statewr_tree, state);
2014                         }
2015                         state->flags &= ~DMSG_STATE_INSERTED;
2016                         dmsg_state_free(state);
2017                 } else {
2018                         ;
2019                 }
2020                 pthread_mutex_unlock(&iocom->mtx);
2021                 dmsg_msg_free(msg);
2022         } else if (state->msg != msg) {
2023                 dmsg_msg_free(msg);
2024         }
2025 }
2026
2027 /*
2028  * Called with iocom locked
2029  */
2030 void
2031 dmsg_state_free(dmsg_state_t *state)
2032 {
2033         dmsg_msg_t *msg;
2034
2035         if (DMsgDebugOpt) {
2036                 fprintf(stderr, "terminate state %p id=%08x\n",
2037                         state, (uint32_t)state->msgid);
2038         }
2039         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2040                 closefrom(3);
2041         assert(state->any.any == NULL);
2042         msg = state->msg;
2043         state->msg = NULL;
2044         if (msg)
2045                 dmsg_msg_free_locked(msg);
2046         free(state);
2047 }
2048
2049 /*
2050  * Called with iocom locked
2051  */
2052 void
2053 dmsg_circuit_drop(dmsg_circuit_t *circuit)
2054 {
2055         dmsg_iocom_t *iocom = circuit->iocom;
2056         char dummy;
2057
2058         assert(circuit->refs > 0);
2059         assert(iocom);
2060
2061         /*
2062          * Decrement circuit refs, destroy circuit when refs drops to 0.
2063          */
2064         if (--circuit->refs > 0)
2065                 return;
2066
2067         assert(RB_EMPTY(&circuit->staterd_tree));
2068         assert(RB_EMPTY(&circuit->statewr_tree));
2069         RB_REMOVE(dmsg_circuit_tree, &iocom->circuit_tree, circuit);
2070         circuit->iocom = NULL;
2071         dmsg_free(circuit);
2072
2073         /*
2074          * When an iocom error is present the rx code will terminate the
2075          * receive side for all transactions and (indirectly) all circuits
2076          * by simulating DELETE messages.  The state and related circuits
2077          * don't disappear until the related states are closed in both
2078          * directions
2079          *
2080          * Detect the case where the last circuit is now gone (and thus all
2081          * states for all circuits are gone), and wakeup the rx thread to
2082          * complete the termination.
2083          */
2084         if (iocom->ioq_rx.error && RB_EMPTY(&iocom->circuit_tree)) {
2085                 dummy = 0;
2086                 write(iocom->wakeupfds[1], &dummy, 1);
2087         }
2088 }
2089
2090 /*
2091  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2092  * header is not adjusted, just the core header.
2093  */
2094 void
2095 dmsg_bswap_head(dmsg_hdr_t *head)
2096 {
2097         head->magic     = bswap16(head->magic);
2098         head->reserved02 = bswap16(head->reserved02);
2099         head->salt      = bswap32(head->salt);
2100
2101         head->msgid     = bswap64(head->msgid);
2102         head->circuit   = bswap64(head->circuit);
2103         head->reserved18= bswap64(head->reserved18);
2104
2105         head->cmd       = bswap32(head->cmd);
2106         head->aux_crc   = bswap32(head->aux_crc);
2107         head->aux_bytes = bswap32(head->aux_bytes);
2108         head->error     = bswap32(head->error);
2109         head->aux_descr = bswap64(head->aux_descr);
2110         head->reserved38= bswap32(head->reserved38);
2111         head->hdr_crc   = bswap32(head->hdr_crc);
2112 }