dmsg - Stabilization work
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39 int dmsg_state_count;
40 #ifdef DMSG_BLOCK_DEBUG
41 static int biocount;
42 #endif
43
44 static int dmsg_state_msgrx(dmsg_msg_t *msg);
45 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
46 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
47 static void dmsg_state_free(dmsg_state_t *state);
48 static void dmsg_simulate_failure(dmsg_state_t *state, int error);
49
50 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
51
52 /*
53  * STATE TREE - Represents open transactions which are indexed by their
54  *              { msgid } relative to the governing iocom.
55  */
56 int
57 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
58 {
59         if (state1->msgid < state2->msgid)
60                 return(-1);
61         if (state1->msgid > state2->msgid)
62                 return(1);
63         return(0);
64 }
65
66 /*
67  * Initialize a low-level ioq
68  */
69 void
70 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
71 {
72         bzero(ioq, sizeof(*ioq));
73         ioq->state = DMSG_MSGQ_STATE_HEADER1;
74         TAILQ_INIT(&ioq->msgq);
75 }
76
77 /*
78  * Cleanup queue.
79  *
80  * caller holds iocom->mtx.
81  */
82 void
83 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
84 {
85         dmsg_msg_t *msg;
86
87         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
88                 assert(0);      /* shouldn't happen */
89                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
90                 dmsg_msg_free(msg);
91         }
92         if ((msg = ioq->msg) != NULL) {
93                 ioq->msg = NULL;
94                 dmsg_msg_free(msg);
95         }
96 }
97
98 /*
99  * Initialize a low-level communications channel.
100  *
101  * NOTE: The signal_func() is called at least once from the loop and can be
102  *       re-armed via dmsg_iocom_restate().
103  */
104 void
105 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
106                    void (*signal_func)(dmsg_iocom_t *iocom),
107                    void (*rcvmsg_func)(dmsg_msg_t *msg),
108                    void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
109                    void (*altmsg_func)(dmsg_iocom_t *iocom))
110 {
111         struct stat st;
112
113         bzero(iocom, sizeof(*iocom));
114
115         asprintf(&iocom->label, "iocom-%p", iocom);
116         iocom->signal_callback = signal_func;
117         iocom->rcvmsg_callback = rcvmsg_func;
118         iocom->altmsg_callback = altmsg_func;
119         iocom->usrmsg_callback = usrmsg_func;
120
121         pthread_mutex_init(&iocom->mtx, NULL);
122         RB_INIT(&iocom->staterd_tree);
123         RB_INIT(&iocom->statewr_tree);
124         TAILQ_INIT(&iocom->freeq);
125         TAILQ_INIT(&iocom->freeq_aux);
126         TAILQ_INIT(&iocom->txmsgq);
127         iocom->sock_fd = sock_fd;
128         iocom->alt_fd = alt_fd;
129         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
130         if (signal_func)
131                 iocom->flags |= DMSG_IOCOMF_SWORK;
132         dmsg_ioq_init(iocom, &iocom->ioq_rx);
133         dmsg_ioq_init(iocom, &iocom->ioq_tx);
134         iocom->state0.refs = 1;         /* should never trigger a free */
135         iocom->state0.iocom = iocom;
136         iocom->state0.parent = &iocom->state0;
137         iocom->state0.flags = DMSG_STATE_ROOT;
138         TAILQ_INIT(&iocom->state0.subq);
139
140         if (pipe(iocom->wakeupfds) < 0)
141                 assert(0);
142         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
143         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
144
145         /*
146          * Negotiate session crypto synchronously.  This will mark the
147          * connection as error'd if it fails.  If this is a pipe it's
148          * a linkage that we set up ourselves to the filesystem and there
149          * is no crypto.
150          */
151         if (fstat(sock_fd, &st) < 0)
152                 assert(0);
153         if (S_ISSOCK(st.st_mode))
154                 dmsg_crypto_negotiate(iocom);
155
156         /*
157          * Make sure our fds are set to non-blocking for the iocom core.
158          */
159         if (sock_fd >= 0)
160                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
161 #if 0
162         /* if line buffered our single fgets() should be fine */
163         if (alt_fd >= 0)
164                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
165 #endif
166 }
167
168 void
169 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
170 {
171         va_list va;
172         char *optr;
173
174         va_start(va, ctl);
175         optr = iocom->label;
176         vasprintf(&iocom->label, ctl, va);
177         va_end(va);
178         if (optr)
179                 free(optr);
180 }
181
182 /*
183  * May only be called from a callback from iocom_core.
184  *
185  * Adjust state machine functions, set flags to guarantee that both
186  * the recevmsg_func and the sendmsg_func is called at least once.
187  */
188 void
189 dmsg_iocom_restate(dmsg_iocom_t *iocom,
190                    void (*signal_func)(dmsg_iocom_t *),
191                    void (*rcvmsg_func)(dmsg_msg_t *msg))
192 {
193         pthread_mutex_lock(&iocom->mtx);
194         iocom->signal_callback = signal_func;
195         iocom->rcvmsg_callback = rcvmsg_func;
196         if (signal_func)
197                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
198         else
199                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
200         pthread_mutex_unlock(&iocom->mtx);
201 }
202
203 void
204 dmsg_iocom_signal(dmsg_iocom_t *iocom)
205 {
206         pthread_mutex_lock(&iocom->mtx);
207         if (iocom->signal_callback)
208                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
209         pthread_mutex_unlock(&iocom->mtx);
210 }
211
212 /*
213  * Cleanup a terminating iocom.
214  *
215  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
216  * from all possible references to it.
217  */
218 void
219 dmsg_iocom_done(dmsg_iocom_t *iocom)
220 {
221         dmsg_msg_t *msg;
222
223         if (iocom->sock_fd >= 0) {
224                 close(iocom->sock_fd);
225                 iocom->sock_fd = -1;
226         }
227         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
228                 close(iocom->alt_fd);
229                 iocom->alt_fd = -1;
230         }
231         dmsg_ioq_done(iocom, &iocom->ioq_rx);
232         dmsg_ioq_done(iocom, &iocom->ioq_tx);
233         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
234                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
235                 free(msg);
236         }
237         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
238                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
239                 free(msg->aux_data);
240                 msg->aux_data = NULL;
241                 free(msg);
242         }
243         if (iocom->wakeupfds[0] >= 0) {
244                 close(iocom->wakeupfds[0]);
245                 iocom->wakeupfds[0] = -1;
246         }
247         if (iocom->wakeupfds[1] >= 0) {
248                 close(iocom->wakeupfds[1]);
249                 iocom->wakeupfds[1] = -1;
250         }
251         pthread_mutex_destroy(&iocom->mtx);
252 }
253
254 /*
255  * Allocate a new message using the specified transaction state.
256  *
257  * If CREATE is set a new transaction is allocated relative to the passed-in
258  * transaction (the 'state' argument becomes pstate).
259  *
260  * If CREATE is not set the message is associated with the passed-in
261  * transaction.
262  */
263 dmsg_msg_t *
264 dmsg_msg_alloc(dmsg_state_t *state,
265                size_t aux_size, uint32_t cmd,
266                void (*func)(dmsg_msg_t *), void *data)
267 {
268         dmsg_iocom_t *iocom = state->iocom;
269         dmsg_msg_t *msg;
270
271         pthread_mutex_lock(&iocom->mtx);
272         msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
273         pthread_mutex_unlock(&iocom->mtx);
274
275         return msg;
276 }
277
278 dmsg_msg_t *
279 dmsg_msg_alloc_locked(dmsg_state_t *state,
280                size_t aux_size, uint32_t cmd,
281                void (*func)(dmsg_msg_t *), void *data)
282 {
283         dmsg_iocom_t *iocom = state->iocom;
284         dmsg_state_t *pstate;
285         dmsg_msg_t *msg;
286         int hbytes;
287         size_t aligned_size;
288
289 #if 0
290         if (aux_size) {
291                 aligned_size = DMSG_DOALIGN(aux_size);
292                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
293                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
294         } else {
295                 aligned_size = 0;
296                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
297                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
298         }
299 #endif
300         aligned_size = DMSG_DOALIGN(aux_size);
301         msg = NULL;
302         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
303                 /*
304                  * When CREATE is set without REPLY the caller is
305                  * initiating a new transaction stacked under the specified
306                  * circuit.
307                  *
308                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
309                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
310                  */
311                 pstate = state;
312                 state = malloc(sizeof(*state));
313                 atomic_add_int(&dmsg_state_count, 1);
314                 bzero(state, sizeof(*state));
315                 TAILQ_INIT(&state->subq);
316                 dmsg_state_hold(pstate);
317                 state->refs = 1;
318                 state->parent = pstate;
319                 state->iocom = iocom;
320                 state->flags = DMSG_STATE_DYNAMIC;
321                 state->msgid = (uint64_t)(uintptr_t)state;
322                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
323                 state->rxcmd = DMSGF_REPLY;
324                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
325                 state->func = func;
326                 state->any.any = data;
327
328                 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
329                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
330                 state->flags |= DMSG_STATE_SUBINSERTED |
331                                 DMSG_STATE_RBINSERTED;
332
333                 if (DMsgDebugOpt) {
334                         fprintf(stderr,
335                                 "create state %p id=%08x on iocom statewr %p\n",
336                                 state, (uint32_t)state->msgid, iocom);
337                 }
338         } else {
339                 /*
340                  * Otherwise the message is transmitted over the existing
341                  * open transaction.
342                  */
343                 pstate = state->parent;
344         }
345
346         /* XXX SMP race for state */
347         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
348         if (msg == NULL) {
349                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
350                 bzero(msg, offsetof(struct dmsg_msg, any.head));
351                 *(int *)((char *)msg +
352                          offsetof(struct dmsg_msg, any.head) + hbytes) =
353                                  0x71B2C3D4;
354                 if (DMsgDebugOpt) {
355                         fprintf(stderr,
356                                 "allo msg %p id=%08x on iocom %p\n",
357                                 msg, (int)msg->any.head.msgid, iocom);
358                 }
359 #if 0
360                 msg = malloc(sizeof(*msg));
361                 bzero(msg, sizeof(*msg));
362 #endif
363         }
364
365         /*
366          * [re]allocate the auxillary data buffer.  The caller knows that
367          * a size-aligned buffer will be allocated but we do not want to
368          * force the caller to zero any tail piece, so we do that ourself.
369          */
370         if (msg->aux_size != aux_size) {
371                 if (msg->aux_data) {
372                         free(msg->aux_data);
373                         msg->aux_data = NULL;
374                         msg->aux_size = 0;
375                 }
376                 if (aux_size) {
377                         msg->aux_data = malloc(aligned_size);
378                         msg->aux_size = aux_size;
379                         if (aux_size != aligned_size) {
380                                 bzero(msg->aux_data + aux_size,
381                                       aligned_size - aux_size);
382                         }
383                 }
384         }
385
386         /*
387          * Set REVTRANS if the transaction was remotely initiated
388          * Set REVCIRC if the circuit was remotely initiated
389          */
390         if (state->flags & DMSG_STATE_OPPOSITE)
391                 cmd |= DMSGF_REVTRANS;
392         if (pstate->flags & DMSG_STATE_OPPOSITE)
393                 cmd |= DMSGF_REVCIRC;
394
395         /*
396          * Finish filling out the header.
397          */
398         if (hbytes)
399                 bzero(&msg->any.head, hbytes);
400         msg->hdr_size = hbytes;
401         msg->any.head.magic = DMSG_HDR_MAGIC;
402         msg->any.head.cmd = cmd;
403         msg->any.head.aux_descr = 0;
404         msg->any.head.aux_crc = 0;
405         msg->any.head.msgid = state->msgid;
406         msg->any.head.circuit = pstate->msgid;
407         msg->state = state;
408
409         return (msg);
410 }
411
412 /*
413  * Free a message so it can be reused afresh.
414  *
415  * NOTE: aux_size can be 0 with a non-NULL aux_data.
416  */
417 static
418 void
419 dmsg_msg_free_locked(dmsg_msg_t *msg)
420 {
421         /*dmsg_iocom_t *iocom = msg->iocom;*/
422
423         if (DMsgDebugOpt) {
424                 fprintf(stderr,
425                         "free msg %p id=%08x on (aux %p)\n",
426                         msg, (int)msg->any.head.msgid, msg->aux_data);
427         }
428 #if 1
429         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
430         if (*(int *)((char *)msg +
431                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
432              0x71B2C3D4) {
433                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
434                 assert(0);
435         }
436 #endif
437         msg->state = NULL;      /* safety */
438         if (msg->aux_data) {
439                 free(msg->aux_data);
440                 msg->aux_data = NULL;
441         }
442         msg->aux_size = 0;
443         free (msg);
444 #if 0
445         if (msg->aux_data)
446                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
447         else
448                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
449 #endif
450 }
451
452 void
453 dmsg_msg_free(dmsg_msg_t *msg)
454 {
455         dmsg_iocom_t *iocom = msg->state->iocom;
456
457         pthread_mutex_lock(&iocom->mtx);
458         dmsg_msg_free_locked(msg);
459         pthread_mutex_unlock(&iocom->mtx);
460 }
461
462 /*
463  * I/O core loop for an iocom.
464  *
465  * Thread localized, iocom->mtx not held.
466  */
467 void
468 dmsg_iocom_core(dmsg_iocom_t *iocom)
469 {
470         struct pollfd fds[3];
471         char dummybuf[256];
472         dmsg_msg_t *msg;
473         int timeout;
474         int count;
475         int wi; /* wakeup pipe */
476         int si; /* socket */
477         int ai; /* alt bulk path socket */
478
479         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
480                 /*
481                  * These iocom->flags are only manipulated within the
482                  * context of the current thread.  However, modifications
483                  * still require atomic ops.
484                  */
485 #if 0
486                 fprintf(stderr, "iocom %p %08x\n", iocom, iocom->flags);
487 #endif
488                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
489                                      DMSG_IOCOMF_WWORK |
490                                      DMSG_IOCOMF_PWORK |
491                                      DMSG_IOCOMF_SWORK |
492                                      DMSG_IOCOMF_ARWORK |
493                                      DMSG_IOCOMF_AWWORK)) == 0) {
494                         /*
495                          * Only poll if no immediate work is pending.
496                          * Otherwise we are just wasting our time calling
497                          * poll.
498                          */
499                         timeout = 5000;
500
501                         count = 0;
502                         wi = -1;
503                         si = -1;
504                         ai = -1;
505
506                         /*
507                          * Always check the inter-thread pipe, e.g.
508                          * for iocom->txmsgq work.
509                          */
510                         wi = count++;
511                         fds[wi].fd = iocom->wakeupfds[0];
512                         fds[wi].events = POLLIN;
513                         fds[wi].revents = 0;
514
515                         /*
516                          * Check the socket input/output direction as
517                          * requested
518                          */
519                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
520                                             DMSG_IOCOMF_WREQ)) {
521                                 si = count++;
522                                 fds[si].fd = iocom->sock_fd;
523                                 fds[si].events = 0;
524                                 fds[si].revents = 0;
525
526                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
527                                         fds[si].events |= POLLIN;
528                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
529                                         fds[si].events |= POLLOUT;
530                         }
531
532                         /*
533                          * Check the alternative fd for work.
534                          */
535                         if (iocom->alt_fd >= 0) {
536                                 ai = count++;
537                                 fds[ai].fd = iocom->alt_fd;
538                                 fds[ai].events = POLLIN;
539                                 fds[ai].revents = 0;
540                         }
541                         poll(fds, count, timeout);
542
543                         if (wi >= 0 && (fds[wi].revents & POLLIN))
544                                 atomic_set_int(&iocom->flags,
545                                                DMSG_IOCOMF_PWORK);
546                         if (si >= 0 && (fds[si].revents & POLLIN))
547                                 atomic_set_int(&iocom->flags,
548                                                DMSG_IOCOMF_RWORK);
549                         if (si >= 0 && (fds[si].revents & POLLOUT))
550                                 atomic_set_int(&iocom->flags,
551                                                DMSG_IOCOMF_WWORK);
552                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
553                                 atomic_set_int(&iocom->flags,
554                                                DMSG_IOCOMF_WWORK);
555                         if (ai >= 0 && (fds[ai].revents & POLLIN))
556                                 atomic_set_int(&iocom->flags,
557                                                DMSG_IOCOMF_ARWORK);
558                 } else {
559                         /*
560                          * Always check the pipe
561                          */
562                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
563                 }
564
565                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
566                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
567                         iocom->signal_callback(iocom);
568                 }
569
570                 /*
571                  * Pending message queues from other threads wake us up
572                  * with a write to the wakeupfds[] pipe.  We have to clear
573                  * the pipe with a dummy read.
574                  */
575                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
576                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
577                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
578                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
579                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
580                 }
581
582                 /*
583                  * Message write sequencing
584                  */
585                 if (iocom->flags & DMSG_IOCOMF_WWORK)
586                         dmsg_iocom_flush1(iocom);
587
588                 /*
589                  * Message read sequencing.  Run this after the write
590                  * sequencing in case the write sequencing allowed another
591                  * auto-DELETE to occur on the read side.
592                  */
593                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
594                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
595                                (msg = dmsg_ioq_read(iocom)) != NULL) {
596                                 if (DMsgDebugOpt) {
597                                         fprintf(stderr, "receive %s\n",
598                                                 dmsg_msg_str(msg));
599                                 }
600                                 iocom->rcvmsg_callback(msg);
601                                 dmsg_state_cleanuprx(iocom, msg);
602                         }
603                 }
604
605                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
606                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
607                         iocom->altmsg_callback(iocom);
608                 }
609         }
610 }
611
612 /*
613  * Make sure there's enough room in the FIFO to hold the
614  * needed data.
615  *
616  * Assume worst case encrypted form is 2x the size of the
617  * plaintext equivalent.
618  */
619 static
620 size_t
621 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
622 {
623         size_t bytes;
624         size_t nmax;
625
626         bytes = ioq->fifo_cdx - ioq->fifo_beg;
627         nmax = sizeof(ioq->buf) - ioq->fifo_end;
628         if (bytes + nmax / 2 < needed) {
629                 if (bytes) {
630                         bcopy(ioq->buf + ioq->fifo_beg,
631                               ioq->buf,
632                               bytes);
633                 }
634                 ioq->fifo_cdx -= ioq->fifo_beg;
635                 ioq->fifo_beg = 0;
636                 if (ioq->fifo_cdn < ioq->fifo_end) {
637                         bcopy(ioq->buf + ioq->fifo_cdn,
638                               ioq->buf + ioq->fifo_cdx,
639                               ioq->fifo_end - ioq->fifo_cdn);
640                 }
641                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
642                 ioq->fifo_cdn = ioq->fifo_cdx;
643                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
644         }
645         return(nmax);
646 }
647
648 /*
649  * Read the next ready message from the ioq, issuing I/O if needed.
650  * Caller should retry on a read-event when NULL is returned.
651  *
652  * If an error occurs during reception a DMSG_LNK_ERROR msg will
653  * be returned for each open transaction, then the ioq and iocom
654  * will be errored out and a non-transactional DMSG_LNK_ERROR
655  * msg will be returned as the final message.  The caller should not call
656  * us again after the final message is returned.
657  *
658  * Thread localized, iocom->mtx not held.
659  */
660 dmsg_msg_t *
661 dmsg_ioq_read(dmsg_iocom_t *iocom)
662 {
663         dmsg_ioq_t *ioq = &iocom->ioq_rx;
664         dmsg_msg_t *msg;
665         dmsg_hdr_t *head;
666         ssize_t n;
667         size_t bytes;
668         size_t nmax;
669         uint32_t aux_size;
670         uint32_t xcrc32;
671         int error;
672
673 again:
674         /*
675          * If a message is already pending we can just remove and
676          * return it.  Message state has already been processed.
677          * (currently not implemented)
678          */
679         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
680                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
681
682                 if (msg->state == &iocom->state0) {
683                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
684                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
685                 }
686                 return (msg);
687         }
688         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
689
690         /*
691          * If the stream is errored out we stop processing it.
692          */
693         if (ioq->error)
694                 goto skip;
695
696         /*
697          * Message read in-progress (msg is NULL at the moment).  We don't
698          * allocate a msg until we have its core header.
699          */
700         nmax = sizeof(ioq->buf) - ioq->fifo_end;
701         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
702         msg = ioq->msg;
703
704         switch(ioq->state) {
705         case DMSG_MSGQ_STATE_HEADER1:
706                 /*
707                  * Load the primary header, fail on any non-trivial read
708                  * error or on EOF.  Since the primary header is the same
709                  * size is the message alignment it will never straddle
710                  * the end of the buffer.
711                  */
712                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
713                 if (bytes < sizeof(msg->any.head)) {
714                         n = read(iocom->sock_fd,
715                                  ioq->buf + ioq->fifo_end,
716                                  nmax);
717                         if (n <= 0) {
718                                 if (n == 0) {
719                                         ioq->error = DMSG_IOQ_ERROR_EOF;
720                                         break;
721                                 }
722                                 if (errno != EINTR &&
723                                     errno != EINPROGRESS &&
724                                     errno != EAGAIN) {
725                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
726                                         break;
727                                 }
728                                 n = 0;
729                                 /* fall through */
730                         }
731                         ioq->fifo_end += (size_t)n;
732                         nmax -= (size_t)n;
733                 }
734
735                 /*
736                  * Decrypt data received so far.  Data will be decrypted
737                  * in-place but might create gaps in the FIFO.  Partial
738                  * blocks are not immediately decrypted.
739                  *
740                  * WARNING!  The header might be in the wrong endian, we
741                  *           do not fix it up until we get the entire
742                  *           extended header.
743                  */
744                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
745                         dmsg_crypto_decrypt(iocom, ioq);
746                 } else {
747                         ioq->fifo_cdx = ioq->fifo_end;
748                         ioq->fifo_cdn = ioq->fifo_end;
749                 }
750                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
751
752                 /*
753                  * Insufficient data accumulated (msg is NULL, caller will
754                  * retry on event).
755                  */
756                 assert(msg == NULL);
757                 if (bytes < sizeof(msg->any.head))
758                         break;
759
760                 /*
761                  * Check and fixup the core header.  Note that the icrc
762                  * has to be calculated before any fixups, but the crc
763                  * fields in the msg may have to be swapped like everything
764                  * else.
765                  */
766                 head = (void *)(ioq->buf + ioq->fifo_beg);
767                 if (head->magic != DMSG_HDR_MAGIC &&
768                     head->magic != DMSG_HDR_MAGIC_REV) {
769                         fprintf(stderr, "%s: head->magic is bad %02x\n",
770                                 iocom->label, head->magic);
771                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
772                                 fprintf(stderr, "(on encrypted link)\n");
773                         ioq->error = DMSG_IOQ_ERROR_SYNC;
774                         break;
775                 }
776
777                 /*
778                  * Calculate the full header size and aux data size
779                  */
780                 if (head->magic == DMSG_HDR_MAGIC_REV) {
781                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
782                                       DMSG_ALIGN;
783                         aux_size = bswap32(head->aux_bytes);
784                 } else {
785                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
786                                       DMSG_ALIGN;
787                         aux_size = head->aux_bytes;
788                 }
789                 ioq->abytes = DMSG_DOALIGN(aux_size);
790                 ioq->unaligned_aux_size = aux_size;
791                 if (ioq->hbytes < sizeof(msg->any.head) ||
792                     ioq->hbytes > sizeof(msg->any) ||
793                     ioq->abytes > DMSG_AUX_MAX) {
794                         ioq->error = DMSG_IOQ_ERROR_FIELD;
795                         break;
796                 }
797
798                 /*
799                  * Allocate the message, the next state will fill it in.
800                  *
801                  * NOTE: The aux_data buffer will be sized to an aligned
802                  *       value and the aligned remainder zero'd for
803                  *       convenience.
804                  *
805                  * NOTE: Supply dummy state and a degenerate cmd without
806                  *       CREATE set.  The message will temporarily be
807                  *       associated with state0 until later post-processing.
808                  */
809                 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
810                                      ioq->hbytes / DMSG_ALIGN,
811                                      NULL, NULL);
812                 ioq->msg = msg;
813
814                 /*
815                  * Fall through to the next state.  Make sure that the
816                  * extended header does not straddle the end of the buffer.
817                  * We still want to issue larger reads into our buffer,
818                  * book-keeping is easier if we don't bcopy() yet.
819                  *
820                  * Make sure there is enough room for bloated encrypt data.
821                  */
822                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
823                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
824                 /* fall through */
825         case DMSG_MSGQ_STATE_HEADER2:
826                 /*
827                  * Fill out the extended header.
828                  */
829                 assert(msg != NULL);
830                 if (bytes < ioq->hbytes) {
831                         n = read(iocom->sock_fd,
832                                  ioq->buf + ioq->fifo_end,
833                                  nmax);
834                         if (n <= 0) {
835                                 if (n == 0) {
836                                         ioq->error = DMSG_IOQ_ERROR_EOF;
837                                         break;
838                                 }
839                                 if (errno != EINTR &&
840                                     errno != EINPROGRESS &&
841                                     errno != EAGAIN) {
842                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
843                                         break;
844                                 }
845                                 n = 0;
846                                 /* fall through */
847                         }
848                         ioq->fifo_end += (size_t)n;
849                         nmax -= (size_t)n;
850                 }
851
852                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
853                         dmsg_crypto_decrypt(iocom, ioq);
854                 } else {
855                         ioq->fifo_cdx = ioq->fifo_end;
856                         ioq->fifo_cdn = ioq->fifo_end;
857                 }
858                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
859
860                 /*
861                  * Insufficient data accumulated (set msg NULL so caller will
862                  * retry on event).
863                  */
864                 if (bytes < ioq->hbytes) {
865                         msg = NULL;
866                         break;
867                 }
868
869                 /*
870                  * Calculate the extended header, decrypt data received
871                  * so far.  Handle endian-conversion for the entire extended
872                  * header.
873                  */
874                 head = (void *)(ioq->buf + ioq->fifo_beg);
875
876                 /*
877                  * Check the CRC.
878                  */
879                 if (head->magic == DMSG_HDR_MAGIC_REV)
880                         xcrc32 = bswap32(head->hdr_crc);
881                 else
882                         xcrc32 = head->hdr_crc;
883                 head->hdr_crc = 0;
884                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
885                         ioq->error = DMSG_IOQ_ERROR_XCRC;
886                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
887                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
888                                 dmsg_msg_str(msg));
889                         assert(0);
890                         break;
891                 }
892                 head->hdr_crc = xcrc32;
893
894                 if (head->magic == DMSG_HDR_MAGIC_REV) {
895                         dmsg_bswap_head(head);
896                 }
897
898                 /*
899                  * Copy the extended header into the msg and adjust the
900                  * FIFO.
901                  */
902                 bcopy(head, &msg->any, ioq->hbytes);
903
904                 /*
905                  * We are either done or we fall-through.
906                  */
907                 if (ioq->abytes == 0) {
908                         ioq->fifo_beg += ioq->hbytes;
909                         break;
910                 }
911
912                 /*
913                  * Must adjust bytes (and the state) when falling through.
914                  * nmax doesn't change.
915                  */
916                 ioq->fifo_beg += ioq->hbytes;
917                 bytes -= ioq->hbytes;
918                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
919                 /* fall through */
920         case DMSG_MSGQ_STATE_AUXDATA1:
921                 /*
922                  * Copy the partial or complete [decrypted] payload from
923                  * remaining bytes in the FIFO in order to optimize the
924                  * makeroom call in the AUXDATA2 state.  We have to
925                  * fall-through either way so we can check the crc.
926                  *
927                  * msg->aux_size tracks our aux data.
928                  *
929                  * (Lets not complicate matters if the data is encrypted,
930                  *  since the data in-stream is not the same size as the
931                  *  data decrypted).
932                  */
933                 if (bytes >= ioq->abytes) {
934                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
935                               ioq->abytes);
936                         msg->aux_size = ioq->abytes;
937                         ioq->fifo_beg += ioq->abytes;
938                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
939                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
940                         bytes -= ioq->abytes;
941                 } else if (bytes) {
942                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
943                               bytes);
944                         msg->aux_size = bytes;
945                         ioq->fifo_beg += bytes;
946                         if (ioq->fifo_cdx < ioq->fifo_beg)
947                                 ioq->fifo_cdx = ioq->fifo_beg;
948                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
949                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
950                         bytes = 0;
951                 } else {
952                         msg->aux_size = 0;
953                 }
954                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
955                 /* fall through */
956         case DMSG_MSGQ_STATE_AUXDATA2:
957                 /*
958                  * Make sure there is enough room for more data.
959                  */
960                 assert(msg);
961                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
962
963                 /*
964                  * Read and decrypt more of the payload.
965                  */
966                 if (msg->aux_size < ioq->abytes) {
967                         assert(bytes == 0);
968                         n = read(iocom->sock_fd,
969                                  ioq->buf + ioq->fifo_end,
970                                  nmax);
971                         if (n <= 0) {
972                                 if (n == 0) {
973                                         ioq->error = DMSG_IOQ_ERROR_EOF;
974                                         break;
975                                 }
976                                 if (errno != EINTR &&
977                                     errno != EINPROGRESS &&
978                                     errno != EAGAIN) {
979                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
980                                         break;
981                                 }
982                                 n = 0;
983                                 /* fall through */
984                         }
985                         ioq->fifo_end += (size_t)n;
986                         nmax -= (size_t)n;
987                 }
988
989                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
990                         dmsg_crypto_decrypt(iocom, ioq);
991                 } else {
992                         ioq->fifo_cdx = ioq->fifo_end;
993                         ioq->fifo_cdn = ioq->fifo_end;
994                 }
995                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
996
997                 if (bytes > ioq->abytes - msg->aux_size)
998                         bytes = ioq->abytes - msg->aux_size;
999
1000                 if (bytes) {
1001                         bcopy(ioq->buf + ioq->fifo_beg,
1002                               msg->aux_data + msg->aux_size,
1003                               bytes);
1004                         msg->aux_size += bytes;
1005                         ioq->fifo_beg += bytes;
1006                 }
1007
1008                 /*
1009                  * Insufficient data accumulated (set msg NULL so caller will
1010                  * retry on event).
1011                  *
1012                  * Assert the auxillary data size is correct, then record the
1013                  * original unaligned size from the message header.
1014                  */
1015                 if (msg->aux_size < ioq->abytes) {
1016                         msg = NULL;
1017                         break;
1018                 }
1019                 assert(msg->aux_size == ioq->abytes);
1020                 msg->aux_size = ioq->unaligned_aux_size;
1021
1022                 /*
1023                  * Check aux_crc, then we are done.  Note that the crc
1024                  * is calculated over the aligned size, not the actual
1025                  * size.
1026                  */
1027                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
1028                 if (xcrc32 != msg->any.head.aux_crc) {
1029                         ioq->error = DMSG_IOQ_ERROR_ACRC;
1030                         fprintf(stderr,
1031                                 "iocom: ACRC error %08x vs %08x "
1032                                 "msgid %016jx msgcmd %08x auxsize %d\n",
1033                                 xcrc32,
1034                                 msg->any.head.aux_crc,
1035                                 (intmax_t)msg->any.head.msgid,
1036                                 msg->any.head.cmd,
1037                                 msg->any.head.aux_bytes);
1038                         break;
1039                 }
1040                 break;
1041         case DMSG_MSGQ_STATE_ERROR:
1042                 /*
1043                  * Continued calls to drain recorded transactions (returning
1044                  * a LNK_ERROR for each one), before we return the final
1045                  * LNK_ERROR.
1046                  */
1047                 assert(msg == NULL);
1048                 break;
1049         default:
1050                 /*
1051                  * We don't double-return errors, the caller should not
1052                  * have called us again after getting an error msg.
1053                  */
1054                 assert(0);
1055                 break;
1056         }
1057
1058         /*
1059          * Check the message sequence.  The iv[] should prevent any
1060          * possibility of a replay but we add this check anyway.
1061          */
1062         if (msg && ioq->error == 0) {
1063                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1064                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1065                 } else {
1066                         ++ioq->seq;
1067                 }
1068         }
1069
1070         /*
1071          * Handle error, RREQ, or completion
1072          *
1073          * NOTE: nmax and bytes are invalid at this point, we don't bother
1074          *       to update them when breaking out.
1075          */
1076         if (ioq->error) {
1077 skip:
1078                 /*
1079                  * An unrecoverable error causes all active receive
1080                  * transactions to be terminated with a LNK_ERROR message.
1081                  *
1082                  * Once all active transactions are exhausted we set the
1083                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1084                  * message, which should cause master processing loops to
1085                  * terminate.
1086                  */
1087                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1088                 assert(ioq->msg == msg);
1089                 if (msg) {
1090                         dmsg_msg_free(msg);
1091                         ioq->msg = NULL;
1092                         msg = NULL;
1093                 }
1094
1095                 /*
1096                  * No more I/O read processing
1097                  */
1098                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1099
1100                 /*
1101                  * Simulate a remote LNK_ERROR DELETE msg for any open
1102                  * transactions, ending with a final non-transactional
1103                  * LNK_ERROR (that the session can detect) when no
1104                  * transactions remain.
1105                  *
1106                  * NOTE: Temporarily supply state0 and a degenerate cmd
1107                  *       without CREATE set.  The real state will be
1108                  *       assigned in the loop.
1109                  *
1110                  * NOTE: We are simulating a received message using our
1111                  *       side of the state, so the DMSGF_REV* bits have
1112                  *       to be reversed.
1113                  */
1114                 pthread_mutex_lock(&iocom->mtx);
1115                 dmsg_iocom_drain(iocom);
1116                 dmsg_simulate_failure(&iocom->state0, ioq->error);
1117                 pthread_mutex_unlock(&iocom->mtx);
1118                 if (TAILQ_FIRST(&ioq->msgq))
1119                         goto again;
1120
1121 #if 0
1122                 /*
1123                  * For the iocom error case we want to set RWORK to indicate
1124                  * that more messages might be pending.
1125                  *
1126                  * It is possible to return NULL when there is more work to
1127                  * do because each message has to be DELETEd in both
1128                  * directions before we continue on with the next (though
1129                  * this could be optimized).  The transmit direction will
1130                  * re-set RWORK.
1131                  */
1132                 if (msg)
1133                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1134 #endif
1135         } else if (msg == NULL) {
1136                 /*
1137                  * Insufficient data received to finish building the message,
1138                  * set RREQ and return NULL.
1139                  *
1140                  * Leave ioq->msg intact.
1141                  * Leave the FIFO intact.
1142                  */
1143                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1144         } else {
1145                 /*
1146                  * Continue processing msg.
1147                  *
1148                  * The fifo has already been advanced past the message.
1149                  * Trivially reset the FIFO indices if possible.
1150                  *
1151                  * clear the FIFO if it is now empty and set RREQ to wait
1152                  * for more from the socket.  If the FIFO is not empty set
1153                  * TWORK to bypass the poll so we loop immediately.
1154                  */
1155                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1156                     ioq->fifo_cdn == ioq->fifo_end) {
1157                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1158                         ioq->fifo_cdx = 0;
1159                         ioq->fifo_cdn = 0;
1160                         ioq->fifo_beg = 0;
1161                         ioq->fifo_end = 0;
1162                 } else {
1163                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1164                 }
1165                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1166                 ioq->msg = NULL;
1167
1168                 /*
1169                  * Handle message routing.  Validates non-zero sources
1170                  * and routes message.  Error will be 0 if the message is
1171                  * destined for us.
1172                  *
1173                  * State processing only occurs for messages destined for us.
1174                  */
1175                 if (DMsgDebugOpt >= 5) {
1176                         fprintf(stderr,
1177                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1178                                 msg->any.head.cmd,
1179                                 (intmax_t)msg->any.head.msgid,
1180                                 (intmax_t)msg->any.head.circuit);
1181                 }
1182
1183                 error = dmsg_state_msgrx(msg);
1184
1185                 if (error) {
1186                         /*
1187                          * Abort-after-closure, throw message away and
1188                          * start reading another.
1189                          */
1190                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1191                                 dmsg_msg_free(msg);
1192                                 goto again;
1193                         }
1194
1195                         /*
1196                          * Process real error and throw away message.
1197                          */
1198                         ioq->error = error;
1199                         goto skip;
1200                 }
1201
1202                 /*
1203                  * No error and not routed
1204                  */
1205                 /* no error, not routed.  Fall through and return msg */
1206         }
1207         return (msg);
1208 }
1209
1210 /*
1211  * Calculate the header and data crc's and write a low-level message to
1212  * the connection.  If aux_crc is non-zero the aux_data crc is already
1213  * assumed to have been set.
1214  *
1215  * A non-NULL msg is added to the queue but not necessarily flushed.
1216  * Calling this function with msg == NULL will get a flush going.
1217  *
1218  * (called from iocom_core only)
1219  */
1220 void
1221 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1222 {
1223         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1224         dmsg_msg_t *msg;
1225         uint32_t xcrc32;
1226         size_t hbytes;
1227         size_t abytes;
1228         dmsg_msg_queue_t tmpq;
1229
1230         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1231         TAILQ_INIT(&tmpq);
1232         pthread_mutex_lock(&iocom->mtx);
1233         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1234                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1235                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1236         }
1237         pthread_mutex_unlock(&iocom->mtx);
1238
1239         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1240                 /*
1241                  * Process terminal connection errors.
1242                  */
1243                 TAILQ_REMOVE(&tmpq, msg, qentry);
1244                 if (ioq->error) {
1245                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1246                         ++ioq->msgcount;
1247                         continue;
1248                 }
1249
1250                 /*
1251                  * Finish populating the msg fields.  The salt ensures that
1252                  * the iv[] array is ridiculously randomized and we also
1253                  * re-seed our PRNG every 32768 messages just to be sure.
1254                  */
1255                 msg->any.head.magic = DMSG_HDR_MAGIC;
1256                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1257                 ++ioq->seq;
1258                 if ((ioq->seq & 32767) == 0)
1259                         srandomdev();
1260
1261                 /*
1262                  * Calculate aux_crc if 0, then calculate hdr_crc.
1263                  */
1264                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1265                         abytes = DMSG_DOALIGN(msg->aux_size);
1266                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1267                         msg->any.head.aux_crc = xcrc32;
1268                 }
1269                 msg->any.head.aux_bytes = msg->aux_size;
1270
1271                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1272                          DMSG_ALIGN;
1273                 msg->any.head.hdr_crc = 0;
1274                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1275
1276                 /*
1277                  * Enqueue the message (the flush codes handles stream
1278                  * encryption).
1279                  */
1280                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1281                 ++ioq->msgcount;
1282         }
1283         dmsg_iocom_flush2(iocom);
1284 }
1285
1286 /*
1287  * Thread localized, iocom->mtx not held by caller.
1288  *
1289  * (called from iocom_core via iocom_flush1 only)
1290  */
1291 void
1292 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1293 {
1294         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1295         dmsg_msg_t *msg;
1296         ssize_t n;
1297         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1298         size_t nact;
1299         size_t hbytes;
1300         size_t abytes;
1301         size_t hoff;
1302         size_t aoff;
1303         int iovcnt;
1304         int save_errno;
1305
1306         if (ioq->error) {
1307                 dmsg_iocom_drain(iocom);
1308                 return;
1309         }
1310
1311         /*
1312          * Pump messages out the connection by building an iovec.
1313          *
1314          * ioq->hbytes/ioq->abytes tracks how much of the first message
1315          * in the queue has been successfully written out, so we can
1316          * resume writing.
1317          */
1318         iovcnt = 0;
1319         nact = 0;
1320         hoff = ioq->hbytes;
1321         aoff = ioq->abytes;
1322
1323         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1324                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1325                          DMSG_ALIGN;
1326                 abytes = DMSG_DOALIGN(msg->aux_size);
1327                 assert(hoff <= hbytes && aoff <= abytes);
1328
1329                 if (hoff < hbytes) {
1330                         size_t maxlen = hbytes - hoff;
1331                         if (maxlen > sizeof(ioq->buf) / 2)
1332                                 maxlen = sizeof(ioq->buf) / 2;
1333                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1334                         iov[iovcnt].iov_len = maxlen;
1335                         nact += maxlen;
1336                         ++iovcnt;
1337                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1338                             maxlen != hbytes - hoff) {
1339                                 break;
1340                         }
1341                 }
1342                 if (aoff < abytes) {
1343                         size_t maxlen = abytes - aoff;
1344                         if (maxlen > sizeof(ioq->buf) / 2)
1345                                 maxlen = sizeof(ioq->buf) / 2;
1346
1347                         assert(msg->aux_data != NULL);
1348                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1349                         iov[iovcnt].iov_len = maxlen;
1350                         nact += maxlen;
1351                         ++iovcnt;
1352                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1353                             maxlen != abytes - aoff) {
1354                                 break;
1355                         }
1356                 }
1357                 hoff = 0;
1358                 aoff = 0;
1359         }
1360
1361         /*
1362          * Shortcut if no work to do.  Be sure to check for old work still
1363          * pending in the FIFO.
1364          */
1365         if (iovcnt == 0 && ioq->fifo_beg == ioq->fifo_cdx)
1366                 return;
1367
1368         /*
1369          * Encrypt and write the data.  The crypto code will move the
1370          * data into the fifo and adjust the iov as necessary.  If
1371          * encryption is disabled the iov is left alone.
1372          *
1373          * May return a smaller iov (thus a smaller n), with aggregated
1374          * chunks.  May reduce nmax to what fits in the FIFO.
1375          *
1376          * This function sets nact to the number of original bytes now
1377          * encrypted, adding to the FIFO some number of bytes that might
1378          * be greater depending on the crypto mechanic.  iov[] is adjusted
1379          * to point at the FIFO if necessary.
1380          *
1381          * NOTE: nact is the number of bytes eaten from the message.  For
1382          *       encrypted data this is the number of bytes processed for
1383          *       encryption and not necessarily the number of bytes writable.
1384          *       The return value from the writev() is the post-encrypted
1385          *       byte count which might be larger.
1386          *
1387          * NOTE: For direct writes, nact is the return value from the writev().
1388          */
1389         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1390                 /*
1391                  * Make sure the FIFO has a reasonable amount of space
1392                  * left (if not completely full).
1393                  *
1394                  * In this situation we are staging the encrypted message
1395                  * data in the FIFO.  (nact) represents how much plaintext
1396                  * has been staged, (n) represents how much encrypted data
1397                  * has been flushed.  The two are independent of each other.
1398                  */
1399                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1400                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1401                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1402                               ioq->fifo_end - ioq->fifo_beg);
1403                         ioq->fifo_cdx -= ioq->fifo_beg;
1404                         ioq->fifo_cdn -= ioq->fifo_beg;
1405                         ioq->fifo_end -= ioq->fifo_beg;
1406                         ioq->fifo_beg = 0;
1407                 }
1408
1409                 /* 
1410                  * beg .... cdx ............ cdn ............. end
1411                  * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1412                  *
1413                  * Advance fifo_beg on a successful write.
1414                  */
1415                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1416                 n = writev(iocom->sock_fd, iov, iovcnt);
1417                 save_errno = errno;
1418                 if (n > 0) {
1419                         ioq->fifo_beg += n;
1420                         if (ioq->fifo_beg == ioq->fifo_end) {
1421                                 ioq->fifo_beg = 0;
1422                                 ioq->fifo_cdn = 0;
1423                                 ioq->fifo_cdx = 0;
1424                                 ioq->fifo_end = 0;
1425                         }
1426                 }
1427
1428                 /*
1429                  * We don't mess with the nact returned by the crypto_encrypt
1430                  * call, which represents the filling of the FIFO.  (n) tells
1431                  * us how much we were able to write from the FIFO.  The two
1432                  * are different beasts when encrypting.
1433                  */
1434         } else {
1435                 /*
1436                  * In this situation we are not staging the messages to the
1437                  * FIFO but instead writing them directly from the msg
1438                  * structure(s) unencrypted, so (nact) is basically (n).
1439                  */
1440                 n = writev(iocom->sock_fd, iov, iovcnt);
1441                 save_errno = errno;
1442                 if (n > 0)
1443                         nact = n;
1444                 else
1445                         nact = 0;
1446         }
1447
1448         /*
1449          * Clean out the transmit queue based on what we successfully
1450          * encrypted (nact is the plaintext count) and is now in the FIFO.
1451          * ioq->hbytes/abytes represents the portion of the first message
1452          * previously sent.
1453          */
1454         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1455                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1456                          DMSG_ALIGN;
1457                 abytes = DMSG_DOALIGN(msg->aux_size);
1458
1459                 if ((size_t)nact < hbytes - ioq->hbytes) {
1460                         ioq->hbytes += nact;
1461                         nact = 0;
1462                         break;
1463                 }
1464                 nact -= hbytes - ioq->hbytes;
1465                 ioq->hbytes = hbytes;
1466                 if ((size_t)nact < abytes - ioq->abytes) {
1467                         ioq->abytes += nact;
1468                         nact = 0;
1469                         break;
1470                 }
1471                 nact -= abytes - ioq->abytes;
1472                 /* ioq->abytes = abytes; optimized out */
1473
1474 #if 0
1475                 fprintf(stderr,
1476                         "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1477                         msg->any.head.cmd,
1478                         (intmax_t)msg->any.head.msgid,
1479                         (intmax_t)msg->any.head.circuit);
1480 #endif
1481
1482 #ifdef DMSG_BLOCK_DEBUG
1483                 uint32_t tcmd;
1484
1485                 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
1486                         if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
1487                                 tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
1488                                             (msg->any.head.cmd & (DMSGF_CREATE |
1489                                                                   DMSGF_DELETE |
1490                                                                   DMSGF_REPLY));
1491                         } else {
1492                                 tcmd = 0;
1493                         }
1494                 } else {
1495                         tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
1496                 }
1497
1498                 switch (tcmd) {
1499                 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
1500                 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
1501                         fprintf(stderr, "write BIO %-3d %016jx %d@%016jx\n",
1502                                 biocount, msg->any.head.msgid,
1503                                 msg->any.blk_read.bytes,
1504                                 msg->any.blk_read.offset);
1505                         break;
1506                 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1507                 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1508                         fprintf(stderr, "wretr BIO %-3d %016jx %d@%016jx\n",
1509                                 biocount, msg->any.head.msgid,
1510                                 msg->any.blk_read.bytes,
1511                                 msg->any.blk_read.offset);
1512                         break;
1513                 default:
1514                         break;
1515                 }
1516 #endif
1517
1518                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1519                 --ioq->msgcount;
1520                 ioq->hbytes = 0;
1521                 ioq->abytes = 0;
1522                 dmsg_msg_free(msg);
1523         }
1524         assert(nact == 0);
1525
1526         /*
1527          * Process the return value from the write w/regards to blocking.
1528          */
1529         if (n < 0) {
1530                 if (save_errno != EINTR &&
1531                     save_errno != EINPROGRESS &&
1532                     save_errno != EAGAIN) {
1533                         /*
1534                          * Fatal write error
1535                          */
1536                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1537                         dmsg_iocom_drain(iocom);
1538                 } else {
1539                         /*
1540                          * Wait for socket buffer space, do not try to
1541                          * process more packets for transmit until space
1542                          * is available.
1543                          */
1544                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1545                 }
1546         } else if (TAILQ_FIRST(&ioq->msgq) ||
1547                    TAILQ_FIRST(&iocom->txmsgq) ||
1548                    ioq->fifo_beg != ioq->fifo_cdx) {
1549                 /*
1550                  * If the write succeeded and more messages are pending
1551                  * in either msgq, or the FIFO WWORK must remain set.
1552                  */
1553                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
1554         }
1555         /* else no transmit-side work remains */
1556
1557         if (ioq->error) {
1558                 dmsg_iocom_drain(iocom);
1559         }
1560 }
1561
1562 /*
1563  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1564  * write events will occur.  We don't kill read msgs because we want
1565  * the caller to pull off our contrived terminal error msg to detect
1566  * the connection failure.
1567  *
1568  * Localized to iocom_core thread, iocom->mtx not held by caller.
1569  */
1570 void
1571 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1572 {
1573         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1574         dmsg_msg_t *msg;
1575
1576         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1577         ioq->hbytes = 0;
1578         ioq->abytes = 0;
1579
1580         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1581                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1582                 --ioq->msgcount;
1583                 dmsg_msg_free(msg);
1584         }
1585 }
1586
1587 /*
1588  * Write a message to an iocom, with additional state processing.
1589  */
1590 void
1591 dmsg_msg_write(dmsg_msg_t *msg)
1592 {
1593         dmsg_iocom_t *iocom = msg->state->iocom;
1594         dmsg_state_t *state;
1595         char dummy;
1596
1597         pthread_mutex_lock(&iocom->mtx);
1598         state = msg->state;
1599
1600 #if 0
1601         /*
1602          * Make sure the parent transaction is still open in the transmit
1603          * direction.  If it isn't the message is dead and we have to
1604          * potentially simulate a rxmsg terminating the transaction.
1605          */
1606         if ((state->parent->txcmd & DMSGF_DELETE) ||
1607             (state->parent->rxcmd & DMSGF_DELETE)) {
1608                 fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1609                 dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1610                 dmsg_state_cleanuptx(iocom, msg);
1611                 dmsg_msg_free(msg);
1612                 pthread_mutex_unlock(&iocom->mtx);
1613                 return;
1614         }
1615 #endif
1616
1617         /*
1618          * Process state data into the message as needed, then update the
1619          * state based on the message.
1620          */
1621         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1622                 /*
1623                  * Existing transaction (could be reply).  It is also
1624                  * possible for this to be the first reply (CREATE is set),
1625                  * in which case we populate state->txcmd.
1626                  *
1627                  * state->txcmd is adjusted to hold the final message cmd,
1628                  * and we also be sure to set the CREATE bit here.  We did
1629                  * not set it in dmsg_msg_alloc() because that would have
1630                  * not been serialized (state could have gotten ripped out
1631                  * from under the message prior to it being transmitted).
1632                  */
1633                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1634                     DMSGF_CREATE) {
1635                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1636                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1637                 }
1638                 msg->any.head.msgid = state->msgid;
1639
1640                 if (msg->any.head.cmd & DMSGF_CREATE) {
1641                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1642                 }
1643         }
1644         dmsg_state_cleanuptx(iocom, msg);
1645
1646 #if 0
1647         fprintf(stderr,
1648                 "MSGWRITE %016jx %08x\n",
1649                 msg->any.head.msgid, msg->any.head.cmd);
1650 #endif
1651
1652         /*
1653          * Queue it for output, wake up the I/O pthread.  Note that the
1654          * I/O thread is responsible for generating the CRCs and encryption.
1655          */
1656         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1657         dummy = 0;
1658         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1659         pthread_mutex_unlock(&iocom->mtx);
1660 }
1661
1662 /*
1663  * Simulate reception of a transaction DELETE message when the link goes
1664  * bad.  This routine must recurse through state->subq and generate messages
1665  * and callbacks bottom-up.
1666  *
1667  * iocom->mtx must be held by caller.
1668  */
1669 static
1670 void
1671 dmsg_simulate_failure(dmsg_state_t *state, int error)
1672 {
1673         dmsg_state_t *substate;
1674         dmsg_iocom_t *iocom;
1675         dmsg_msg_t *msg;
1676
1677         while ((substate = TAILQ_FIRST(&state->subq)) != NULL) {
1678                 dmsg_simulate_failure(substate, error);
1679         }
1680
1681         iocom = state->iocom;
1682         if (state == &iocom->state0) {
1683                 /*
1684                  * No active local or remote transactions remain.
1685                  * Generate a final LNK_ERROR.  EOF will be flagged
1686                  * when the message is returned by dmsg_ioq_read().
1687                  */
1688                 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1689                                             DMSG_LNK_ERROR,
1690                                             NULL, NULL);
1691                 msg->any.head.error = error;
1692         } else if (state->flags & DMSG_STATE_OPPOSITE) {
1693                 /*
1694                  * Active remote transactions are still present.
1695                  * Simulate the other end sending us a DELETE.
1696                  */
1697                 if (state->flags & DMSG_STATE_SUBINSERTED) {
1698                         TAILQ_REMOVE(&state->parent->subq, state, entry);
1699                         state->flags &= ~DMSG_STATE_SUBINSERTED;
1700                 }
1701                 if ((state->rxcmd & DMSGF_DELETE) == 0) {
1702                         fprintf(stderr, "SIMULATE ERROR1\n");
1703                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1704                                              DMSG_LNK_ERROR,
1705                                              NULL, NULL);
1706                         /*state->txcmd |= DMSGF_DELETE;*/
1707                         msg->state = state;
1708                         msg->any.head.error = error;
1709                         msg->any.head.msgid = state->msgid;
1710                         msg->any.head.circuit = state->parent->msgid;
1711                         msg->any.head.cmd |= DMSGF_ABORT |
1712                                              DMSGF_DELETE;
1713                         if ((state->parent->flags &
1714                              DMSG_STATE_OPPOSITE) == 0) {
1715                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1716                         }
1717                 } else {
1718                         msg = NULL;
1719                 }
1720         } else {
1721                 /*
1722                  * Active local transactions are still present.
1723                  * Simulate the other end sending us a DELETE.
1724                  */
1725                 if (state->flags & DMSG_STATE_SUBINSERTED) {
1726                         TAILQ_REMOVE(&state->parent->subq, state, entry);
1727                         state->flags &= ~DMSG_STATE_SUBINSERTED;
1728                 }
1729                 if ((state->rxcmd & DMSGF_DELETE) == 0) {
1730                         fprintf(stderr, "SIMULATE ERROR1\n");
1731                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1732                                              DMSG_LNK_ERROR,
1733                                              NULL, NULL);
1734                         msg->state = state;
1735                         msg->any.head.error = error;
1736                         msg->any.head.msgid = state->msgid;
1737                         msg->any.head.circuit = state->parent->msgid;
1738                         msg->any.head.cmd |= DMSGF_ABORT |
1739                                              DMSGF_DELETE |
1740                                              DMSGF_REVTRANS |
1741                                              DMSGF_REPLY;
1742                         if ((state->parent->flags &
1743                              DMSG_STATE_OPPOSITE) == 0) {
1744                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1745                         }
1746                         if ((state->rxcmd & DMSGF_CREATE) == 0)
1747                                 msg->any.head.cmd |= DMSGF_CREATE;
1748                 } else {
1749                         msg = NULL;
1750                 }
1751         }
1752         if (msg) {
1753                 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1754                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1755         }
1756 }
1757
1758 /*
1759  * This is a shortcut to formulate a reply to msg with a simple error code,
1760  * It can reply to and terminate a transaction, or it can reply to a one-way
1761  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1762  * the error code (which can be 0).  Not all transactions are terminated
1763  * with DMSG_LNK_ERROR status (the low level only cares about the
1764  * MSGF_DELETE flag), but most are.
1765  *
1766  * Replies to one-way messages are a bit of an oxymoron but the feature
1767  * is used by the debug (DBG) protocol.
1768  *
1769  * The reply contains no extended data.
1770  */
1771 void
1772 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1773 {
1774         dmsg_state_t *state = msg->state;
1775         dmsg_msg_t *nmsg;
1776         uint32_t cmd;
1777
1778         /*
1779          * Reply with a simple error code and terminate the transaction.
1780          */
1781         cmd = DMSG_LNK_ERROR;
1782
1783         /*
1784          * Check if our direction has even been initiated yet, set CREATE.
1785          *
1786          * Check what direction this is (command or reply direction).  Note
1787          * that txcmd might not have been initiated yet.
1788          *
1789          * If our direction has already been closed we just return without
1790          * doing anything.
1791          */
1792         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1793                 if (state->txcmd & DMSGF_DELETE)
1794                         return;
1795                 if (state->txcmd & DMSGF_REPLY)
1796                         cmd |= DMSGF_REPLY;
1797                 cmd |= DMSGF_DELETE;
1798         } else {
1799                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1800                         cmd |= DMSGF_REPLY;
1801         }
1802
1803         /*
1804          * Allocate the message and associate it with the existing state.
1805          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1806          * allocate new state.  We have our state already.
1807          */
1808         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1809         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1810                 if ((state->txcmd & DMSGF_CREATE) == 0)
1811                         nmsg->any.head.cmd |= DMSGF_CREATE;
1812         }
1813         nmsg->any.head.error = error;
1814
1815         dmsg_msg_write(nmsg);
1816 }
1817
1818 /*
1819  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1820  * we are generating a streaming reply or an intermediate acknowledgement
1821  * of some sort as part of the higher level protocol, with more to come
1822  * later.
1823  */
1824 void
1825 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1826 {
1827         dmsg_state_t *state = msg->state;
1828         dmsg_msg_t *nmsg;
1829         uint32_t cmd;
1830
1831
1832         /*
1833          * Reply with a simple error code and terminate the transaction.
1834          */
1835         cmd = DMSG_LNK_ERROR;
1836
1837         /*
1838          * Check if our direction has even been initiated yet, set CREATE.
1839          *
1840          * Check what direction this is (command or reply direction).  Note
1841          * that txcmd might not have been initiated yet.
1842          *
1843          * If our direction has already been closed we just return without
1844          * doing anything.
1845          */
1846         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1847                 if (state->txcmd & DMSGF_DELETE)
1848                         return;
1849                 if (state->txcmd & DMSGF_REPLY)
1850                         cmd |= DMSGF_REPLY;
1851                 /* continuing transaction, do not set MSGF_DELETE */
1852         } else {
1853                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1854                         cmd |= DMSGF_REPLY;
1855         }
1856         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1857         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1858                 if ((state->txcmd & DMSGF_CREATE) == 0)
1859                         nmsg->any.head.cmd |= DMSGF_CREATE;
1860         }
1861         nmsg->any.head.error = error;
1862
1863         dmsg_msg_write(nmsg);
1864 }
1865
1866 /*
1867  * Terminate a transaction given a state structure by issuing a DELETE.
1868  * (the state structure must not be &iocom->state0)
1869  */
1870 void
1871 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1872 {
1873         dmsg_msg_t *nmsg;
1874         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1875
1876         /*
1877          * Nothing to do if we already transmitted a delete
1878          */
1879         if (state->txcmd & DMSGF_DELETE)
1880                 return;
1881
1882         /*
1883          * Set REPLY if the other end initiated the command.  Otherwise
1884          * we are the command direction.
1885          */
1886         if (state->txcmd & DMSGF_REPLY)
1887                 cmd |= DMSGF_REPLY;
1888
1889         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1890         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1891                 if ((state->txcmd & DMSGF_CREATE) == 0)
1892                         nmsg->any.head.cmd |= DMSGF_CREATE;
1893         }
1894         nmsg->any.head.error = error;
1895         dmsg_msg_write(nmsg);
1896 }
1897
1898 /*
1899  * Terminate a transaction given a state structure by issuing a DELETE.
1900  * (the state structure must not be &iocom->state0)
1901  */
1902 void
1903 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1904 {
1905         dmsg_msg_t *nmsg;
1906         uint32_t cmd = DMSG_LNK_ERROR;
1907
1908         /*
1909          * Nothing to do if we already transmitted a delete
1910          */
1911         if (state->txcmd & DMSGF_DELETE)
1912                 return;
1913
1914         /*
1915          * Set REPLY if the other end initiated the command.  Otherwise
1916          * we are the command direction.
1917          */
1918         if (state->txcmd & DMSGF_REPLY)
1919                 cmd |= DMSGF_REPLY;
1920
1921         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1922         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1923                 if ((state->txcmd & DMSGF_CREATE) == 0)
1924                         nmsg->any.head.cmd |= DMSGF_CREATE;
1925         }
1926         nmsg->any.head.error = error;
1927         dmsg_msg_write(nmsg);
1928 }
1929
1930 /************************************************************************
1931  *                      TRANSACTION STATE HANDLING                      *
1932  ************************************************************************
1933  *
1934  */
1935
1936 /*
1937  * Process state tracking for a message after reception, prior to execution.
1938  * Possibly route the message (consuming it).
1939  *
1940  * Called with msglk held and the msg dequeued.
1941  *
1942  * All messages are called with dummy state and return actual state.
1943  * (One-off messages often just return the same dummy state).
1944  *
1945  * May request that caller discard the message by setting *discardp to 1.
1946  * The returned state is not used in this case and is allowed to be NULL.
1947  *
1948  * --
1949  *
1950  * These routines handle persistent and command/reply message state via the
1951  * CREATE and DELETE flags.  The first message in a command or reply sequence
1952  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1953  *
1954  * There can be any number of intermediate messages belonging to the same
1955  * sequence sent inbetween the CREATE message and the DELETE message,
1956  * which set neither flag.  This represents a streaming command or reply.
1957  *
1958  * Any command message received with CREATE set expects a reply sequence to
1959  * be returned.  Reply sequences work the same as command sequences except the
1960  * REPLY bit is also sent.  Both the command side and reply side can
1961  * degenerate into a single message with both CREATE and DELETE set.  Note
1962  * that one side can be streaming and the other side not, or neither, or both.
1963  *
1964  * The msgid is unique for the initiator.  That is, two sides sending a new
1965  * message can use the same msgid without colliding.
1966  *
1967  * --
1968  *
1969  * The message may be running over a circuit.  If the circuit is half-deleted
1970  * The message is typically racing against a link failure and must be thrown
1971  * out.  As the circuit deletion propagates the library will automatically
1972  * generate terminations for sub states.
1973  *
1974  * --
1975  *
1976  * ABORT sequences work by setting the ABORT flag along with normal message
1977  * state.  However, ABORTs can also be sent on half-closed messages, that is
1978  * even if the command or reply side has already sent a DELETE, as long as
1979  * the message has not been fully closed it can still send an ABORT+DELETE
1980  * to terminate the half-closed message state.
1981  *
1982  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1983  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1984  * also race, and in this situation the other side might have already
1985  * initiated a new unrelated command with the same message id.  Since
1986  * the abort has not set the CREATE flag the situation can be detected
1987  * and the message will also be discarded.
1988  *
1989  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1990  * The ABORT request is essentially integrated into the command instead
1991  * of being sent later on.  In this situation the command implementation
1992  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1993  * special-case non-blocking operation for the command.
1994  *
1995  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1996  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1997  *        one-way messages are not supported.
1998  *
1999  * NOTE!  If a command sequence does not support aborts the ABORT flag is
2000  *        simply ignored.
2001  *
2002  * --
2003  *
2004  * One-off messages (no reply expected) are sent without an established
2005  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
2006  * For one-off messages sent over circuits msgid generally MUST be 0.
2007  *
2008  * One-off messages cannot be aborted and typically aren't processed
2009  * by these routines.  Order is still guaranteed for messages sent over
2010  * the same circuit.  The REPLY bit can be used to distinguish whether
2011  * a one-off message is a command or reply.  For example, one-off replies
2012  * will typically just contain status updates.
2013  */
2014 static int
2015 dmsg_state_msgrx(dmsg_msg_t *msg)
2016 {
2017         dmsg_iocom_t *iocom = msg->state->iocom;
2018         dmsg_state_t *state;
2019         dmsg_state_t *pstate;
2020         dmsg_state_t sdummy;
2021         int error;
2022
2023         pthread_mutex_lock(&iocom->mtx);
2024
2025         /*
2026          * Lookup the circuit (pstate).  The circuit will be an open
2027          * transaction.  The REVCIRC bit in the message tells us which side
2028          * initiated it.
2029          */
2030         if (msg->any.head.circuit) {
2031                 sdummy.msgid = msg->any.head.circuit;
2032
2033                 if (msg->any.head.cmd & DMSGF_REVCIRC) {
2034                         pstate = RB_FIND(dmsg_state_tree,
2035                                          &iocom->statewr_tree,
2036                                          &sdummy);
2037                 } else {
2038                         pstate = RB_FIND(dmsg_state_tree,
2039                                          &iocom->staterd_tree,
2040                                          &sdummy);
2041                 }
2042                 if (pstate == NULL) {
2043                         fprintf(stderr,
2044                                 "missing parent in stacked trans %s\n",
2045                                 dmsg_msg_str(msg));
2046                         error = DMSG_IOQ_ERROR_TRANS;
2047                         pthread_mutex_unlock(&iocom->mtx);
2048                         assert(0);
2049                 }
2050         } else {
2051                 pstate = &iocom->state0;
2052         }
2053
2054         /*
2055          * Lookup the msgid.
2056          *
2057          * If received msg is a command state is on staterd_tree.
2058          * If received msg is a reply state is on statewr_tree.
2059          * Otherwise there is no state (retain &iocom->state0)
2060          */
2061         sdummy.msgid = msg->any.head.msgid;
2062         if (msg->any.head.cmd & DMSGF_REVTRANS)
2063                 state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
2064         else
2065                 state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
2066
2067         if (state) {
2068                 /*
2069                  * Message over an existing transaction (CREATE should not
2070                  * be set).
2071                  */
2072                 msg->state = state;
2073                 assert(pstate == state->parent);
2074         } else {
2075                 /*
2076                  * Either a new transaction (if CREATE set) or a one-off.
2077                  */
2078                 state = pstate;
2079         }
2080
2081         pthread_mutex_unlock(&iocom->mtx);
2082
2083         /*
2084          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2085          * inside the case statements.
2086          *
2087          * Construct new state as necessary.
2088          */
2089         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2090                                     DMSGF_REPLY)) {
2091         case DMSGF_CREATE:
2092         case DMSGF_CREATE | DMSGF_DELETE:
2093                 /*
2094                  * Create new sub-transaction under pstate.
2095                  * (any DELETE is handled in post-processing of msg).
2096                  *
2097                  * (During routing the msgid was made unique for this
2098                  * direction over the comlink, so our RB trees can be
2099                  * iocom-based instead of state-based).
2100                  */
2101                 if (state != pstate) {
2102                         fprintf(stderr,
2103                                 "duplicate transaction %s\n",
2104                                 dmsg_msg_str(msg));
2105                         error = DMSG_IOQ_ERROR_TRANS;
2106                         assert(0);
2107                         break;
2108                 }
2109
2110                 /*
2111                  * Allocate the new state.
2112                  */
2113                 state = malloc(sizeof(*state));
2114                 atomic_add_int(&dmsg_state_count, 1);
2115                 bzero(state, sizeof(*state));
2116                 TAILQ_INIT(&state->subq);
2117                 dmsg_state_hold(pstate);
2118                 state->refs = 1;
2119                 state->parent = pstate;
2120                 state->iocom = iocom;
2121                 state->flags = DMSG_STATE_DYNAMIC |
2122                                DMSG_STATE_OPPOSITE;
2123                 state->msgid = msg->any.head.msgid;
2124                 state->txcmd = DMSGF_REPLY;
2125                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2126                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2127                 msg->state = state;
2128                 pthread_mutex_lock(&iocom->mtx);
2129                 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2130                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2131                 state->flags |= DMSG_STATE_SUBINSERTED |
2132                                 DMSG_STATE_RBINSERTED;
2133
2134                 /*
2135                  * If the parent is a relay set up the state handler to
2136                  * automatically route the message.  Local processing will
2137                  * not occur if set.
2138                  *
2139                  * (state relays are seeded by SPAN processing)
2140                  */
2141                 if (pstate->relay)
2142                         state->func = dmsg_state_relay;
2143                 pthread_mutex_unlock(&iocom->mtx);
2144                 error = 0;
2145
2146                 if (DMsgDebugOpt) {
2147                         fprintf(stderr,
2148                                 "create state %p id=%08x on iocom staterd %p\n",
2149                                 state, (uint32_t)state->msgid, iocom);
2150                 }
2151                 break;
2152         case DMSGF_DELETE:
2153                 /*
2154                  * Persistent state is expected but might not exist if an
2155                  * ABORT+DELETE races the close.
2156                  *
2157                  * (any DELETE is handled in post-processing of msg).
2158                  */
2159                 if (state == pstate) {
2160                         if (msg->any.head.cmd & DMSGF_ABORT) {
2161                                 error = DMSG_IOQ_ERROR_EALREADY;
2162                         } else {
2163                                 fprintf(stderr, "missing-state %s\n",
2164                                         dmsg_msg_str(msg));
2165                                 error = DMSG_IOQ_ERROR_TRANS;
2166                                 assert(0);
2167                         }
2168                         break;
2169                 }
2170
2171                 /*
2172                  * Handle another ABORT+DELETE case if the msgid has already
2173                  * been reused.
2174                  */
2175                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2176                         if (msg->any.head.cmd & DMSGF_ABORT) {
2177                                 error = DMSG_IOQ_ERROR_EALREADY;
2178                         } else {
2179                                 fprintf(stderr, "reused-state %s\n",
2180                                         dmsg_msg_str(msg));
2181                                 error = DMSG_IOQ_ERROR_TRANS;
2182                                 assert(0);
2183                         }
2184                         break;
2185                 }
2186                 error = 0;
2187                 break;
2188         default:
2189                 /*
2190                  * Check for mid-stream ABORT command received, otherwise
2191                  * allow.
2192                  */
2193                 if (msg->any.head.cmd & DMSGF_ABORT) {
2194                         if ((state == pstate) ||
2195                             (state->rxcmd & DMSGF_CREATE) == 0) {
2196                                 error = DMSG_IOQ_ERROR_EALREADY;
2197                                 break;
2198                         }
2199                 }
2200                 error = 0;
2201                 break;
2202         case DMSGF_REPLY | DMSGF_CREATE:
2203         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2204                 /*
2205                  * When receiving a reply with CREATE set the original
2206                  * persistent state message should already exist.
2207                  */
2208                 if (state == pstate) {
2209                         fprintf(stderr, "no-state(r) %s\n",
2210                                 dmsg_msg_str(msg));
2211                         error = DMSG_IOQ_ERROR_TRANS;
2212                         assert(0);
2213                         break;
2214                 }
2215                 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2216                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2217                 error = 0;
2218                 break;
2219         case DMSGF_REPLY | DMSGF_DELETE:
2220                 /*
2221                  * Received REPLY+ABORT+DELETE in case where msgid has
2222                  * already been fully closed, ignore the message.
2223                  */
2224                 if (state == pstate) {
2225                         if (msg->any.head.cmd & DMSGF_ABORT) {
2226                                 error = DMSG_IOQ_ERROR_EALREADY;
2227                         } else {
2228                                 fprintf(stderr, "no-state(r,d) %s\n",
2229                                         dmsg_msg_str(msg));
2230                                 error = DMSG_IOQ_ERROR_TRANS;
2231                                 assert(0);
2232                         }
2233                         break;
2234                 }
2235
2236                 /*
2237                  * Received REPLY+ABORT+DELETE in case where msgid has
2238                  * already been reused for an unrelated message,
2239                  * ignore the message.
2240                  */
2241                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2242                         if (msg->any.head.cmd & DMSGF_ABORT) {
2243                                 error = DMSG_IOQ_ERROR_EALREADY;
2244                         } else {
2245                                 fprintf(stderr, "reused-state(r,d) %s\n",
2246                                         dmsg_msg_str(msg));
2247                                 error = DMSG_IOQ_ERROR_TRANS;
2248                                 assert(0);
2249                         }
2250                         break;
2251                 }
2252                 error = 0;
2253                 break;
2254         case DMSGF_REPLY:
2255                 /*
2256                  * Check for mid-stream ABORT reply received to sent command.
2257                  */
2258                 if (msg->any.head.cmd & DMSGF_ABORT) {
2259                         if (state == pstate ||
2260                             (state->rxcmd & DMSGF_CREATE) == 0) {
2261                                 error = DMSG_IOQ_ERROR_EALREADY;
2262                                 break;
2263                         }
2264                 }
2265                 error = 0;
2266                 break;
2267         }
2268
2269         /*
2270          * Calculate the easy-switch() transactional command.  Represents
2271          * the outer-transaction command for any transaction-create or
2272          * transaction-delete, and the inner message command for any
2273          * non-transaction or inside-transaction command.  tcmd will be
2274          * set to 0 for any messaging error condition.
2275          *
2276          * The two can be told apart because outer-transaction commands
2277          * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2278          */
2279         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2280                 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
2281                         msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2282                                     (msg->any.head.cmd & (DMSGF_CREATE |
2283                                                           DMSGF_DELETE |
2284                                                           DMSGF_REPLY));
2285                 } else {
2286                         msg->tcmd = 0;
2287                 }
2288         } else {
2289                 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2290         }
2291
2292 #ifdef DMSG_BLOCK_DEBUG
2293         switch (msg->tcmd) {
2294         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2295         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2296                 fprintf(stderr, "read  BIO %-3d %016jx %d@%016jx\n",
2297                         biocount, msg->any.head.msgid,
2298                         msg->any.blk_read.bytes,
2299                         msg->any.blk_read.offset);
2300                 break;
2301         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2302         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2303                 fprintf(stderr, "rread BIO %-3d %016jx %d@%016jx\n",
2304                         biocount, msg->any.head.msgid,
2305                         msg->any.blk_read.bytes,
2306                         msg->any.blk_read.offset);
2307                 break;
2308         default:
2309                 break;
2310         }
2311 #endif
2312
2313         return (error);
2314 }
2315
2316 /*
2317  * Route the message and handle pair-state processing.
2318  */
2319 void
2320 dmsg_state_relay(dmsg_msg_t *lmsg)
2321 {
2322         dmsg_state_t *lpstate;
2323         dmsg_state_t *rpstate;
2324         dmsg_state_t *lstate;
2325         dmsg_state_t *rstate;
2326         dmsg_msg_t *rmsg;
2327
2328 #ifdef DMSG_BLOCK_DEBUG
2329         switch (lmsg->tcmd) {
2330         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2331         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2332                 atomic_add_int(&biocount, 1);
2333                 fprintf(stderr, "relay BIO %-3d %016jx %d@%016jx\n",
2334                         biocount, lmsg->any.head.msgid,
2335                         lmsg->any.blk_read.bytes,
2336                         lmsg->any.blk_read.offset);
2337                 break;
2338         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2339         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2340                 fprintf(stderr, "retrn BIO %-3d %016jx %d@%016jx\n",
2341                         biocount, lmsg->any.head.msgid,
2342                         lmsg->any.blk_read.bytes,
2343                         lmsg->any.blk_read.offset);
2344                 atomic_add_int(&biocount, -1);
2345                 break;
2346         default:
2347                 break;
2348         }
2349 #endif
2350
2351         if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2352             DMSGF_CREATE) {
2353                 /*
2354                  * New sub-transaction, establish new state and relay.
2355                  */
2356                 lstate = lmsg->state;
2357                 lpstate = lstate->parent;
2358                 rpstate = lpstate->relay;
2359                 assert(lstate->relay == NULL);
2360                 assert(rpstate != NULL);
2361
2362                 rmsg = dmsg_msg_alloc(rpstate, 0,
2363                                       lmsg->any.head.cmd,
2364                                       dmsg_state_relay, NULL);
2365                 rstate = rmsg->state;
2366                 rstate->relay = lstate;
2367                 lstate->relay = rstate;
2368                 dmsg_state_hold(lstate);
2369                 dmsg_state_hold(rstate);
2370         } else {
2371                 /*
2372                  * State & relay already established
2373                  */
2374                 lstate = lmsg->state;
2375                 rstate = lstate->relay;
2376                 assert(rstate != NULL);
2377
2378                 rmsg = dmsg_msg_alloc(rstate, 0,
2379                                       lmsg->any.head.cmd,
2380                                       dmsg_state_relay, NULL);
2381         }
2382         if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2383                 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2384                       lmsg->hdr_size - sizeof(lmsg->any.head));
2385         }
2386         rmsg->any.head.error = lmsg->any.head.error;
2387         rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2388         rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2389         rmsg->aux_size = lmsg->aux_size;
2390         rmsg->aux_data = lmsg->aux_data;
2391         lmsg->aux_data = NULL;
2392         /*
2393         fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2394         */
2395         dmsg_msg_write(rmsg);
2396 }
2397
2398 /*
2399  * Cleanup and retire msg after processing
2400  */
2401 void
2402 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2403 {
2404         dmsg_state_t *state;
2405         dmsg_state_t *pstate;
2406
2407         assert(msg->state->iocom == iocom);
2408         state = msg->state;
2409         if (state->flags & DMSG_STATE_ROOT) {
2410                 /*
2411                  * Free a non-transactional message, there is no state
2412                  * to worry about.
2413                  */
2414                 dmsg_msg_free(msg);
2415         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2416                 /*
2417                  * Message terminating transaction, destroy the related
2418                  * state, the original message, and this message (if it
2419                  * isn't the original message due to a CREATE|DELETE).
2420                  *
2421                  * It's possible for governing state to terminate while
2422                  * sub-transactions still exist.  This is allowed but
2423                  * will cause sub-transactions to recursively fail.
2424                  * Further reception of sub-transaction messages will be
2425                  * impossible because the circuit will no longer exist.
2426                  * (XXX need code to make sure that happens properly).
2427                  */
2428                 pthread_mutex_lock(&iocom->mtx);
2429                 state->rxcmd |= DMSGF_DELETE;
2430
2431                 if (state->txcmd & DMSGF_DELETE) {
2432                         assert(state->flags & DMSG_STATE_RBINSERTED);
2433                         if (state->rxcmd & DMSGF_REPLY) {
2434                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2435                                 RB_REMOVE(dmsg_state_tree,
2436                                           &iocom->statewr_tree, state);
2437                         } else {
2438                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2439                                 RB_REMOVE(dmsg_state_tree,
2440                                           &iocom->staterd_tree, state);
2441                         }
2442                         state->flags &= ~DMSG_STATE_RBINSERTED;
2443                         if (state->flags & DMSG_STATE_SUBINSERTED) {
2444                                 pstate = state->parent;
2445                                 TAILQ_REMOVE(&pstate->subq, state, entry);
2446                                 state->flags &= ~DMSG_STATE_SUBINSERTED;
2447                                 dmsg_state_drop(pstate);
2448                         }
2449                         state->parent = NULL;
2450
2451                         if (state->relay) {
2452                                 dmsg_state_drop(state->relay);
2453                                 state->relay = NULL;
2454                         }
2455                         dmsg_msg_free(msg);
2456                         dmsg_state_drop(state);
2457                 } else {
2458                         dmsg_msg_free(msg);
2459                 }
2460                 pthread_mutex_unlock(&iocom->mtx);
2461         } else {
2462                 /*
2463                  * Message not terminating transaction, leave state intact
2464                  * and free message if it isn't the CREATE message.
2465                  */
2466                 dmsg_msg_free(msg);
2467         }
2468 }
2469
2470 /*
2471  * Clean up the state after pulling out needed fields and queueing the
2472  * message for transmission.   This occurs in dmsg_msg_write().
2473  */
2474 static void
2475 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2476 {
2477         dmsg_state_t *state;
2478         dmsg_state_t *pstate;
2479
2480         assert(iocom == msg->state->iocom);
2481         state = msg->state;
2482         if (state->flags & DMSG_STATE_ROOT) {
2483                 ;
2484         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2485                 /*
2486                  * Message terminating transaction, destroy the related
2487                  * state, the original message, and this message (if it
2488                  * isn't the original message due to a CREATE|DELETE).
2489                  *
2490                  * It's possible for governing state to terminate while
2491                  * sub-transactions still exist.  This is allowed but
2492                  * will cause sub-transactions to recursively fail.
2493                  * Further reception of sub-transaction messages will be
2494                  * impossible because the circuit will no longer exist.
2495                  * (XXX need code to make sure that happens properly).
2496                  */
2497                 pthread_mutex_lock(&iocom->mtx);
2498                 assert((state->txcmd & DMSGF_DELETE) == 0);
2499                 state->txcmd |= DMSGF_DELETE;
2500                 if (state->rxcmd & DMSGF_DELETE) {
2501                         assert(state->flags & DMSG_STATE_RBINSERTED);
2502                         if (state->txcmd & DMSGF_REPLY) {
2503                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2504                                 RB_REMOVE(dmsg_state_tree,
2505                                           &iocom->staterd_tree, state);
2506                         } else {
2507                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2508                                 RB_REMOVE(dmsg_state_tree,
2509                                           &iocom->statewr_tree, state);
2510                         }
2511                         state->flags &= ~DMSG_STATE_RBINSERTED;
2512                         pstate = state->parent;
2513                         if (state->flags & DMSG_STATE_SUBINSERTED) {
2514                                 TAILQ_REMOVE(&pstate->subq, state, entry);
2515                                 state->flags &= ~DMSG_STATE_SUBINSERTED;
2516                         }
2517                         state->parent = NULL;
2518                         dmsg_state_drop(pstate);
2519
2520                         if (state->relay) {
2521                                 dmsg_state_drop(state->relay);
2522                                 state->relay = NULL;
2523                         }
2524                         dmsg_state_drop(state); /* usually the last drop */
2525                 }
2526                 pthread_mutex_unlock(&iocom->mtx);
2527         }
2528 }
2529
2530 /*
2531  * Called with or without locks
2532  */
2533 void
2534 dmsg_state_hold(dmsg_state_t *state)
2535 {
2536         atomic_add_int(&state->refs, 1);
2537 }
2538
2539 void
2540 dmsg_state_drop(dmsg_state_t *state)
2541 {
2542         if (atomic_fetchadd_int(&state->refs, -1) == 1)
2543                 dmsg_state_free(state);
2544 }
2545
2546 /*
2547  * Called with iocom locked
2548  */
2549 static void
2550 dmsg_state_free(dmsg_state_t *state)
2551 {
2552         atomic_add_int(&dmsg_state_count, -1);
2553         if (DMsgDebugOpt) {
2554                 fprintf(stderr, "terminate state %p id=%08x\n",
2555                         state, (uint32_t)state->msgid);
2556         }
2557         assert((state->flags & (DMSG_STATE_ROOT |
2558                                 DMSG_STATE_SUBINSERTED |
2559                                 DMSG_STATE_RBINSERTED)) == 0);
2560         assert(TAILQ_EMPTY(&state->subq));
2561         assert(state->refs == 0);
2562         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2563                 closefrom(3);
2564         assert(state->any.any == NULL);
2565         free(state);
2566 }
2567
2568 /*
2569  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2570  * header is not adjusted, just the core header.
2571  */
2572 void
2573 dmsg_bswap_head(dmsg_hdr_t *head)
2574 {
2575         head->magic     = bswap16(head->magic);
2576         head->reserved02 = bswap16(head->reserved02);
2577         head->salt      = bswap32(head->salt);
2578
2579         head->msgid     = bswap64(head->msgid);
2580         head->circuit   = bswap64(head->circuit);
2581         head->reserved18= bswap64(head->reserved18);
2582
2583         head->cmd       = bswap32(head->cmd);
2584         head->aux_crc   = bswap32(head->aux_crc);
2585         head->aux_bytes = bswap32(head->aux_bytes);
2586         head->error     = bswap32(head->error);
2587         head->aux_descr = bswap64(head->aux_descr);
2588         head->reserved38= bswap32(head->reserved38);
2589         head->hdr_crc   = bswap32(head->hdr_crc);
2590 }