dmsg - Stabilization work
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 #define DMSG_BLOCK_DEBUG
39
40 int DMsgDebugOpt;
41 int dmsg_state_count;
42 #ifdef DMSG_BLOCK_DEBUG
43 static int biocount;
44 #endif
45
46 static int dmsg_state_msgrx(dmsg_msg_t *msg, int mstate);
47 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
48 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
49 static void dmsg_state_free(dmsg_state_t *state);
50 static void dmsg_subq_delete(dmsg_state_t *state);
51 static void dmsg_simulate_failure(dmsg_state_t *state, int meto, int error);
52 static void dmsg_state_abort(dmsg_state_t *state);
53 static void dmsg_state_dying(dmsg_state_t *state);
54
55 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
56
57 /*
58  * STATE TREE - Represents open transactions which are indexed by their
59  *              { msgid } relative to the governing iocom.
60  */
61 int
62 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
63 {
64         if (state1->msgid < state2->msgid)
65                 return(-1);
66         if (state1->msgid > state2->msgid)
67                 return(1);
68         return(0);
69 }
70
71 /*
72  * Initialize a low-level ioq
73  */
74 void
75 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
76 {
77         bzero(ioq, sizeof(*ioq));
78         ioq->state = DMSG_MSGQ_STATE_HEADER1;
79         TAILQ_INIT(&ioq->msgq);
80 }
81
82 /*
83  * Cleanup queue.
84  *
85  * caller holds iocom->mtx.
86  */
87 void
88 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
89 {
90         dmsg_msg_t *msg;
91
92         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
93                 assert(0);      /* shouldn't happen */
94                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
95                 dmsg_msg_free(msg);
96         }
97         if ((msg = ioq->msg) != NULL) {
98                 ioq->msg = NULL;
99                 dmsg_msg_free(msg);
100         }
101 }
102
103 /*
104  * Initialize a low-level communications channel.
105  *
106  * NOTE: The signal_func() is called at least once from the loop and can be
107  *       re-armed via dmsg_iocom_restate().
108  */
109 void
110 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
111                    void (*signal_func)(dmsg_iocom_t *iocom),
112                    void (*rcvmsg_func)(dmsg_msg_t *msg),
113                    void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
114                    void (*altmsg_func)(dmsg_iocom_t *iocom))
115 {
116         struct stat st;
117
118         bzero(iocom, sizeof(*iocom));
119
120         asprintf(&iocom->label, "iocom-%p", iocom);
121         iocom->signal_callback = signal_func;
122         iocom->rcvmsg_callback = rcvmsg_func;
123         iocom->altmsg_callback = altmsg_func;
124         iocom->usrmsg_callback = usrmsg_func;
125
126         pthread_mutex_init(&iocom->mtx, NULL);
127         RB_INIT(&iocom->staterd_tree);
128         RB_INIT(&iocom->statewr_tree);
129         TAILQ_INIT(&iocom->txmsgq);
130         iocom->sock_fd = sock_fd;
131         iocom->alt_fd = alt_fd;
132         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
133         if (signal_func)
134                 iocom->flags |= DMSG_IOCOMF_SWORK;
135         dmsg_ioq_init(iocom, &iocom->ioq_rx);
136         dmsg_ioq_init(iocom, &iocom->ioq_tx);
137         iocom->state0.refs = 1;         /* should never trigger a free */
138         iocom->state0.iocom = iocom;
139         iocom->state0.parent = &iocom->state0;
140         iocom->state0.flags = DMSG_STATE_ROOT;
141         TAILQ_INIT(&iocom->state0.subq);
142
143         if (pipe(iocom->wakeupfds) < 0)
144                 assert(0);
145         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
146         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
147
148         /*
149          * Negotiate session crypto synchronously.  This will mark the
150          * connection as error'd if it fails.  If this is a pipe it's
151          * a linkage that we set up ourselves to the filesystem and there
152          * is no crypto.
153          */
154         if (fstat(sock_fd, &st) < 0)
155                 assert(0);
156         if (S_ISSOCK(st.st_mode))
157                 dmsg_crypto_negotiate(iocom);
158
159         /*
160          * Make sure our fds are set to non-blocking for the iocom core.
161          */
162         if (sock_fd >= 0)
163                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
164 #if 0
165         /* if line buffered our single fgets() should be fine */
166         if (alt_fd >= 0)
167                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
168 #endif
169 }
170
171 void
172 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
173 {
174         va_list va;
175         char *optr;
176
177         va_start(va, ctl);
178         optr = iocom->label;
179         vasprintf(&iocom->label, ctl, va);
180         va_end(va);
181         if (optr)
182                 free(optr);
183 }
184
185 /*
186  * May only be called from a callback from iocom_core.
187  *
188  * Adjust state machine functions, set flags to guarantee that both
189  * the recevmsg_func and the sendmsg_func is called at least once.
190  */
191 void
192 dmsg_iocom_restate(dmsg_iocom_t *iocom,
193                    void (*signal_func)(dmsg_iocom_t *),
194                    void (*rcvmsg_func)(dmsg_msg_t *msg))
195 {
196         pthread_mutex_lock(&iocom->mtx);
197         iocom->signal_callback = signal_func;
198         iocom->rcvmsg_callback = rcvmsg_func;
199         if (signal_func)
200                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
201         else
202                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
203         pthread_mutex_unlock(&iocom->mtx);
204 }
205
206 void
207 dmsg_iocom_signal(dmsg_iocom_t *iocom)
208 {
209         pthread_mutex_lock(&iocom->mtx);
210         if (iocom->signal_callback)
211                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
212         pthread_mutex_unlock(&iocom->mtx);
213 }
214
215 /*
216  * Cleanup a terminating iocom.
217  *
218  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
219  * from all possible references to it.
220  */
221 void
222 dmsg_iocom_done(dmsg_iocom_t *iocom)
223 {
224         if (iocom->sock_fd >= 0) {
225                 close(iocom->sock_fd);
226                 iocom->sock_fd = -1;
227         }
228         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
229                 close(iocom->alt_fd);
230                 iocom->alt_fd = -1;
231         }
232         dmsg_ioq_done(iocom, &iocom->ioq_rx);
233         dmsg_ioq_done(iocom, &iocom->ioq_tx);
234         if (iocom->wakeupfds[0] >= 0) {
235                 close(iocom->wakeupfds[0]);
236                 iocom->wakeupfds[0] = -1;
237         }
238         if (iocom->wakeupfds[1] >= 0) {
239                 close(iocom->wakeupfds[1]);
240                 iocom->wakeupfds[1] = -1;
241         }
242         pthread_mutex_destroy(&iocom->mtx);
243 }
244
245 /*
246  * Allocate a new message using the specified transaction state.
247  *
248  * If CREATE is set a new transaction is allocated relative to the passed-in
249  * transaction (the 'state' argument becomes pstate).
250  *
251  * If CREATE is not set the message is associated with the passed-in
252  * transaction.
253  */
254 dmsg_msg_t *
255 dmsg_msg_alloc(dmsg_state_t *state,
256                size_t aux_size, uint32_t cmd,
257                void (*func)(dmsg_msg_t *), void *data)
258 {
259         dmsg_iocom_t *iocom = state->iocom;
260         dmsg_msg_t *msg;
261
262         pthread_mutex_lock(&iocom->mtx);
263         msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
264         pthread_mutex_unlock(&iocom->mtx);
265
266         return msg;
267 }
268
269 dmsg_msg_t *
270 dmsg_msg_alloc_locked(dmsg_state_t *state,
271                size_t aux_size, uint32_t cmd,
272                void (*func)(dmsg_msg_t *), void *data)
273 {
274         dmsg_iocom_t *iocom = state->iocom;
275         dmsg_state_t *pstate;
276         dmsg_msg_t *msg;
277         int hbytes;
278         size_t aligned_size;
279
280         aligned_size = DMSG_DOALIGN(aux_size);
281         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
282                 /*
283                  * When CREATE is set without REPLY the caller is
284                  * initiating a new transaction stacked under the specified
285                  * circuit.
286                  *
287                  * It is possible to race a circuit failure, inherit the
288                  * parent's STATE_DYING flag to trigger an abort sequence
289                  * in the transmit path.  By not inheriting ABORTING the
290                  * abort sequence can recurse.
291                  *
292                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
293                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
294                  */
295                 pstate = state;
296                 state = malloc(sizeof(*state));
297                 bzero(state, sizeof(*state));
298                 atomic_add_int(&dmsg_state_count, 1);
299
300                 TAILQ_INIT(&state->subq);
301                 state->parent = pstate;
302                 state->iocom = iocom;
303                 state->flags = DMSG_STATE_DYNAMIC;
304                 state->msgid = (uint64_t)(uintptr_t)state;
305                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
306                 state->rxcmd = DMSGF_REPLY;
307                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
308                 state->func = func;
309                 state->any.any = data;
310
311                 state->flags |= DMSG_STATE_SUBINSERTED |
312                                 DMSG_STATE_RBINSERTED;
313                 state->flags |= pstate->flags & DMSG_STATE_DYING;
314                 if (TAILQ_EMPTY(&pstate->subq))
315                         dmsg_state_hold(pstate);
316                 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
317                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
318                 dmsg_state_hold(state);         /* state on pstate->subq */
319                 dmsg_state_hold(state);         /* state on rbtree */
320                 dmsg_state_hold(state);         /* msg->state */
321         } else {
322                 /*
323                  * Otherwise the message is transmitted over the existing
324                  * open transaction.
325                  */
326                 pstate = state->parent;
327                 dmsg_state_hold(state);         /* msg->state */
328         }
329
330         /* XXX SMP race for state */
331         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
332         assert((size_t)hbytes >= sizeof(struct dmsg_hdr));
333         msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes);
334         bzero(msg, offsetof(struct dmsg_msg, any.head));
335
336         /*
337          * [re]allocate the auxillary data buffer.  The caller knows that
338          * a size-aligned buffer will be allocated but we do not want to
339          * force the caller to zero any tail piece, so we do that ourself.
340          */
341         if (msg->aux_size != aux_size) {
342                 if (msg->aux_data) {
343                         free(msg->aux_data);
344                         msg->aux_data = NULL;
345                         msg->aux_size = 0;
346                 }
347                 if (aux_size) {
348                         msg->aux_data = malloc(aligned_size);
349                         msg->aux_size = aux_size;
350                         if (aux_size != aligned_size) {
351                                 bzero(msg->aux_data + aux_size,
352                                       aligned_size - aux_size);
353                         }
354                 }
355         }
356
357         /*
358          * Set REVTRANS if the transaction was remotely initiated
359          * Set REVCIRC if the circuit was remotely initiated
360          */
361         if (state->flags & DMSG_STATE_OPPOSITE)
362                 cmd |= DMSGF_REVTRANS;
363         if (pstate->flags & DMSG_STATE_OPPOSITE)
364                 cmd |= DMSGF_REVCIRC;
365
366         /*
367          * Finish filling out the header.
368          */
369         bzero(&msg->any.head, hbytes);
370         msg->hdr_size = hbytes;
371         msg->any.head.magic = DMSG_HDR_MAGIC;
372         msg->any.head.cmd = cmd;
373         msg->any.head.aux_descr = 0;
374         msg->any.head.aux_crc = 0;
375         msg->any.head.msgid = state->msgid;
376         msg->any.head.circuit = pstate->msgid;
377         msg->state = state;
378
379         return (msg);
380 }
381
382 /*
383  * Free a message so it can be reused afresh.
384  *
385  * NOTE: aux_size can be 0 with a non-NULL aux_data.
386  */
387 static
388 void
389 dmsg_msg_free_locked(dmsg_msg_t *msg)
390 {
391         dmsg_state_t *state;
392
393         if ((state = msg->state) != NULL) {
394                 dmsg_state_drop(state);
395                 msg->state = NULL;      /* safety */
396         }
397         if (msg->aux_data) {
398                 free(msg->aux_data);
399                 msg->aux_data = NULL;   /* safety */
400         }
401         msg->aux_size = 0;
402         free (msg);
403 }
404
405 void
406 dmsg_msg_free(dmsg_msg_t *msg)
407 {
408         dmsg_iocom_t *iocom = msg->state->iocom;
409
410         pthread_mutex_lock(&iocom->mtx);
411         dmsg_msg_free_locked(msg);
412         pthread_mutex_unlock(&iocom->mtx);
413 }
414
415 /*
416  * I/O core loop for an iocom.
417  *
418  * Thread localized, iocom->mtx not held.
419  */
420 void
421 dmsg_iocom_core(dmsg_iocom_t *iocom)
422 {
423         struct pollfd fds[3];
424         char dummybuf[256];
425         dmsg_msg_t *msg;
426         int timeout;
427         int count;
428         int wi; /* wakeup pipe */
429         int si; /* socket */
430         int ai; /* alt bulk path socket */
431
432         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
433                 /*
434                  * These iocom->flags are only manipulated within the
435                  * context of the current thread.  However, modifications
436                  * still require atomic ops.
437                  */
438 #if 0
439                 fprintf(stderr, "iocom %p %08x\n", iocom, iocom->flags);
440 #endif
441                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
442                                      DMSG_IOCOMF_WWORK |
443                                      DMSG_IOCOMF_PWORK |
444                                      DMSG_IOCOMF_SWORK |
445                                      DMSG_IOCOMF_ARWORK |
446                                      DMSG_IOCOMF_AWWORK)) == 0) {
447                         /*
448                          * Only poll if no immediate work is pending.
449                          * Otherwise we are just wasting our time calling
450                          * poll.
451                          */
452                         timeout = 5000;
453
454                         count = 0;
455                         wi = -1;
456                         si = -1;
457                         ai = -1;
458
459                         /*
460                          * Always check the inter-thread pipe, e.g.
461                          * for iocom->txmsgq work.
462                          */
463                         wi = count++;
464                         fds[wi].fd = iocom->wakeupfds[0];
465                         fds[wi].events = POLLIN;
466                         fds[wi].revents = 0;
467
468                         /*
469                          * Check the socket input/output direction as
470                          * requested
471                          */
472                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
473                                             DMSG_IOCOMF_WREQ)) {
474                                 si = count++;
475                                 fds[si].fd = iocom->sock_fd;
476                                 fds[si].events = 0;
477                                 fds[si].revents = 0;
478
479                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
480                                         fds[si].events |= POLLIN;
481                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
482                                         fds[si].events |= POLLOUT;
483                         }
484
485                         /*
486                          * Check the alternative fd for work.
487                          */
488                         if (iocom->alt_fd >= 0) {
489                                 ai = count++;
490                                 fds[ai].fd = iocom->alt_fd;
491                                 fds[ai].events = POLLIN;
492                                 fds[ai].revents = 0;
493                         }
494                         poll(fds, count, timeout);
495
496                         if (wi >= 0 && (fds[wi].revents & POLLIN))
497                                 atomic_set_int(&iocom->flags,
498                                                DMSG_IOCOMF_PWORK);
499                         if (si >= 0 && (fds[si].revents & POLLIN))
500                                 atomic_set_int(&iocom->flags,
501                                                DMSG_IOCOMF_RWORK);
502                         if (si >= 0 && (fds[si].revents & POLLOUT))
503                                 atomic_set_int(&iocom->flags,
504                                                DMSG_IOCOMF_WWORK);
505                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
506                                 atomic_set_int(&iocom->flags,
507                                                DMSG_IOCOMF_WWORK);
508                         if (ai >= 0 && (fds[ai].revents & POLLIN))
509                                 atomic_set_int(&iocom->flags,
510                                                DMSG_IOCOMF_ARWORK);
511                 } else {
512                         /*
513                          * Always check the pipe
514                          */
515                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
516                 }
517
518                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
519                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
520                         iocom->signal_callback(iocom);
521                 }
522
523                 /*
524                  * Pending message queues from other threads wake us up
525                  * with a write to the wakeupfds[] pipe.  We have to clear
526                  * the pipe with a dummy read.
527                  */
528                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
529                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
530                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
531                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
532                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
533                 }
534
535                 /*
536                  * Message write sequencing
537                  */
538                 if (iocom->flags & DMSG_IOCOMF_WWORK)
539                         dmsg_iocom_flush1(iocom);
540
541                 /*
542                  * Message read sequencing.  Run this after the write
543                  * sequencing in case the write sequencing allowed another
544                  * auto-DELETE to occur on the read side.
545                  */
546                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
547                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
548                                (msg = dmsg_ioq_read(iocom)) != NULL) {
549                                 if (DMsgDebugOpt) {
550                                         fprintf(stderr, "receive %s\n",
551                                                 dmsg_msg_str(msg));
552                                 }
553                                 iocom->rcvmsg_callback(msg);
554                                 pthread_mutex_lock(&iocom->mtx);
555                                 dmsg_state_cleanuprx(iocom, msg);
556                                 pthread_mutex_unlock(&iocom->mtx);
557                         }
558                 }
559
560                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
561                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
562                         iocom->altmsg_callback(iocom);
563                 }
564         }
565 }
566
567 /*
568  * Make sure there's enough room in the FIFO to hold the
569  * needed data.
570  *
571  * Assume worst case encrypted form is 2x the size of the
572  * plaintext equivalent.
573  */
574 static
575 size_t
576 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
577 {
578         size_t bytes;
579         size_t nmax;
580
581         bytes = ioq->fifo_cdx - ioq->fifo_beg;
582         nmax = sizeof(ioq->buf) - ioq->fifo_end;
583         if (bytes + nmax / 2 < needed) {
584                 if (bytes) {
585                         bcopy(ioq->buf + ioq->fifo_beg,
586                               ioq->buf,
587                               bytes);
588                 }
589                 ioq->fifo_cdx -= ioq->fifo_beg;
590                 ioq->fifo_beg = 0;
591                 if (ioq->fifo_cdn < ioq->fifo_end) {
592                         bcopy(ioq->buf + ioq->fifo_cdn,
593                               ioq->buf + ioq->fifo_cdx,
594                               ioq->fifo_end - ioq->fifo_cdn);
595                 }
596                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
597                 ioq->fifo_cdn = ioq->fifo_cdx;
598                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
599         }
600         return(nmax);
601 }
602
603 /*
604  * Read the next ready message from the ioq, issuing I/O if needed.
605  * Caller should retry on a read-event when NULL is returned.
606  *
607  * If an error occurs during reception a DMSG_LNK_ERROR msg will
608  * be returned for each open transaction, then the ioq and iocom
609  * will be errored out and a non-transactional DMSG_LNK_ERROR
610  * msg will be returned as the final message.  The caller should not call
611  * us again after the final message is returned.
612  *
613  * Thread localized, iocom->mtx not held.
614  */
615 dmsg_msg_t *
616 dmsg_ioq_read(dmsg_iocom_t *iocom)
617 {
618         dmsg_ioq_t *ioq = &iocom->ioq_rx;
619         dmsg_msg_t *msg;
620         dmsg_hdr_t *head;
621         ssize_t n;
622         size_t bytes;
623         size_t nmax;
624         uint32_t aux_size;
625         uint32_t xcrc32;
626         int error;
627
628 again:
629         /*
630          * If a message is already pending we can just remove and
631          * return it.  Message state has already been processed.
632          * (currently not implemented)
633          */
634         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
635                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
636
637                 if (msg->state == &iocom->state0) {
638                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
639                         fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
640                 }
641                 return (msg);
642         }
643         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
644
645         /*
646          * If the stream is errored out we stop processing it.
647          */
648         if (ioq->error)
649                 goto skip;
650
651         /*
652          * Message read in-progress (msg is NULL at the moment).  We don't
653          * allocate a msg until we have its core header.
654          */
655         nmax = sizeof(ioq->buf) - ioq->fifo_end;
656         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
657         msg = ioq->msg;
658
659         switch(ioq->state) {
660         case DMSG_MSGQ_STATE_HEADER1:
661                 /*
662                  * Load the primary header, fail on any non-trivial read
663                  * error or on EOF.  Since the primary header is the same
664                  * size is the message alignment it will never straddle
665                  * the end of the buffer.
666                  */
667                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
668                 if (bytes < sizeof(msg->any.head)) {
669                         n = read(iocom->sock_fd,
670                                  ioq->buf + ioq->fifo_end,
671                                  nmax);
672                         if (n <= 0) {
673                                 if (n == 0) {
674                                         ioq->error = DMSG_IOQ_ERROR_EOF;
675                                         break;
676                                 }
677                                 if (errno != EINTR &&
678                                     errno != EINPROGRESS &&
679                                     errno != EAGAIN) {
680                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
681                                         break;
682                                 }
683                                 n = 0;
684                                 /* fall through */
685                         }
686                         ioq->fifo_end += (size_t)n;
687                         nmax -= (size_t)n;
688                 }
689
690                 /*
691                  * Decrypt data received so far.  Data will be decrypted
692                  * in-place but might create gaps in the FIFO.  Partial
693                  * blocks are not immediately decrypted.
694                  *
695                  * WARNING!  The header might be in the wrong endian, we
696                  *           do not fix it up until we get the entire
697                  *           extended header.
698                  */
699                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
700                         dmsg_crypto_decrypt(iocom, ioq);
701                 } else {
702                         ioq->fifo_cdx = ioq->fifo_end;
703                         ioq->fifo_cdn = ioq->fifo_end;
704                 }
705                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
706
707                 /*
708                  * Insufficient data accumulated (msg is NULL, caller will
709                  * retry on event).
710                  */
711                 assert(msg == NULL);
712                 if (bytes < sizeof(msg->any.head))
713                         break;
714
715                 /*
716                  * Check and fixup the core header.  Note that the icrc
717                  * has to be calculated before any fixups, but the crc
718                  * fields in the msg may have to be swapped like everything
719                  * else.
720                  */
721                 head = (void *)(ioq->buf + ioq->fifo_beg);
722                 if (head->magic != DMSG_HDR_MAGIC &&
723                     head->magic != DMSG_HDR_MAGIC_REV) {
724                         fprintf(stderr, "%s: head->magic is bad %02x\n",
725                                 iocom->label, head->magic);
726                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
727                                 fprintf(stderr, "(on encrypted link)\n");
728                         ioq->error = DMSG_IOQ_ERROR_SYNC;
729                         break;
730                 }
731
732                 /*
733                  * Calculate the full header size and aux data size
734                  */
735                 if (head->magic == DMSG_HDR_MAGIC_REV) {
736                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
737                                       DMSG_ALIGN;
738                         aux_size = bswap32(head->aux_bytes);
739                 } else {
740                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
741                                       DMSG_ALIGN;
742                         aux_size = head->aux_bytes;
743                 }
744                 ioq->abytes = DMSG_DOALIGN(aux_size);
745                 ioq->unaligned_aux_size = aux_size;
746                 if (ioq->hbytes < sizeof(msg->any.head) ||
747                     ioq->hbytes > sizeof(msg->any) ||
748                     ioq->abytes > DMSG_AUX_MAX) {
749                         ioq->error = DMSG_IOQ_ERROR_FIELD;
750                         break;
751                 }
752
753                 /*
754                  * Allocate the message, the next state will fill it in.
755                  *
756                  * NOTE: The aux_data buffer will be sized to an aligned
757                  *       value and the aligned remainder zero'd for
758                  *       convenience.
759                  *
760                  * NOTE: Supply dummy state and a degenerate cmd without
761                  *       CREATE set.  The message will temporarily be
762                  *       associated with state0 until later post-processing.
763                  */
764                 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
765                                      ioq->hbytes / DMSG_ALIGN,
766                                      NULL, NULL);
767                 ioq->msg = msg;
768
769                 /*
770                  * Fall through to the next state.  Make sure that the
771                  * extended header does not straddle the end of the buffer.
772                  * We still want to issue larger reads into our buffer,
773                  * book-keeping is easier if we don't bcopy() yet.
774                  *
775                  * Make sure there is enough room for bloated encrypt data.
776                  */
777                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
778                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
779                 /* fall through */
780         case DMSG_MSGQ_STATE_HEADER2:
781                 /*
782                  * Fill out the extended header.
783                  */
784                 assert(msg != NULL);
785                 if (bytes < ioq->hbytes) {
786                         assert(nmax > 0);
787                         n = read(iocom->sock_fd,
788                                  ioq->buf + ioq->fifo_end,
789                                  nmax);
790                         if (n <= 0) {
791                                 if (n == 0) {
792                                         ioq->error = DMSG_IOQ_ERROR_EOF;
793                                         break;
794                                 }
795                                 if (errno != EINTR &&
796                                     errno != EINPROGRESS &&
797                                     errno != EAGAIN) {
798                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
799                                         break;
800                                 }
801                                 n = 0;
802                                 /* fall through */
803                         }
804                         ioq->fifo_end += (size_t)n;
805                         nmax -= (size_t)n;
806                 }
807
808                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
809                         dmsg_crypto_decrypt(iocom, ioq);
810                 } else {
811                         ioq->fifo_cdx = ioq->fifo_end;
812                         ioq->fifo_cdn = ioq->fifo_end;
813                 }
814                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
815
816                 /*
817                  * Insufficient data accumulated (set msg NULL so caller will
818                  * retry on event).
819                  */
820                 if (bytes < ioq->hbytes) {
821                         msg = NULL;
822                         break;
823                 }
824
825                 /*
826                  * Calculate the extended header, decrypt data received
827                  * so far.  Handle endian-conversion for the entire extended
828                  * header.
829                  */
830                 head = (void *)(ioq->buf + ioq->fifo_beg);
831
832                 /*
833                  * Check the CRC.
834                  */
835                 if (head->magic == DMSG_HDR_MAGIC_REV)
836                         xcrc32 = bswap32(head->hdr_crc);
837                 else
838                         xcrc32 = head->hdr_crc;
839                 head->hdr_crc = 0;
840                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
841                         ioq->error = DMSG_IOQ_ERROR_XCRC;
842                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
843                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
844                                 dmsg_msg_str(msg));
845                         assert(0);
846                         break;
847                 }
848                 head->hdr_crc = xcrc32;
849
850                 if (head->magic == DMSG_HDR_MAGIC_REV) {
851                         dmsg_bswap_head(head);
852                 }
853
854                 /*
855                  * Copy the extended header into the msg and adjust the
856                  * FIFO.
857                  */
858                 bcopy(head, &msg->any, ioq->hbytes);
859
860                 /*
861                  * We are either done or we fall-through.
862                  */
863                 if (ioq->abytes == 0) {
864                         ioq->fifo_beg += ioq->hbytes;
865                         break;
866                 }
867
868                 /*
869                  * Must adjust bytes (and the state) when falling through.
870                  * nmax doesn't change.
871                  */
872                 ioq->fifo_beg += ioq->hbytes;
873                 bytes -= ioq->hbytes;
874                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
875                 /* fall through */
876         case DMSG_MSGQ_STATE_AUXDATA1:
877                 /*
878                  * Copy the partial or complete [decrypted] payload from
879                  * remaining bytes in the FIFO in order to optimize the
880                  * makeroom call in the AUXDATA2 state.  We have to
881                  * fall-through either way so we can check the crc.
882                  *
883                  * msg->aux_size tracks our aux data.
884                  *
885                  * (Lets not complicate matters if the data is encrypted,
886                  *  since the data in-stream is not the same size as the
887                  *  data decrypted).
888                  */
889                 if (bytes >= ioq->abytes) {
890                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
891                               ioq->abytes);
892                         msg->aux_size = ioq->abytes;
893                         ioq->fifo_beg += ioq->abytes;
894                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
895                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
896                         bytes -= ioq->abytes;
897                 } else if (bytes) {
898                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
899                               bytes);
900                         msg->aux_size = bytes;
901                         ioq->fifo_beg += bytes;
902                         if (ioq->fifo_cdx < ioq->fifo_beg)
903                                 ioq->fifo_cdx = ioq->fifo_beg;
904                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
905                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
906                         bytes = 0;
907                 } else {
908                         msg->aux_size = 0;
909                 }
910                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
911                 /* fall through */
912         case DMSG_MSGQ_STATE_AUXDATA2:
913                 /*
914                  * Make sure there is enough room for more data.
915                  */
916                 assert(msg);
917                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
918
919                 /*
920                  * Read and decrypt more of the payload.
921                  */
922                 if (msg->aux_size < ioq->abytes) {
923                         assert(nmax > 0);
924                         assert(bytes == 0);
925                         n = read(iocom->sock_fd,
926                                  ioq->buf + ioq->fifo_end,
927                                  nmax);
928                         if (n <= 0) {
929                                 if (n == 0) {
930                                         ioq->error = DMSG_IOQ_ERROR_EOF;
931                                         break;
932                                 }
933                                 if (errno != EINTR &&
934                                     errno != EINPROGRESS &&
935                                     errno != EAGAIN) {
936                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
937                                         break;
938                                 }
939                                 n = 0;
940                                 /* fall through */
941                         }
942                         ioq->fifo_end += (size_t)n;
943                         nmax -= (size_t)n;
944                 }
945
946                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
947                         dmsg_crypto_decrypt(iocom, ioq);
948                 } else {
949                         ioq->fifo_cdx = ioq->fifo_end;
950                         ioq->fifo_cdn = ioq->fifo_end;
951                 }
952                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
953
954                 if (bytes > ioq->abytes - msg->aux_size)
955                         bytes = ioq->abytes - msg->aux_size;
956
957                 if (bytes) {
958                         bcopy(ioq->buf + ioq->fifo_beg,
959                               msg->aux_data + msg->aux_size,
960                               bytes);
961                         msg->aux_size += bytes;
962                         ioq->fifo_beg += bytes;
963                 }
964
965                 /*
966                  * Insufficient data accumulated (set msg NULL so caller will
967                  * retry on event).
968                  *
969                  * Assert the auxillary data size is correct, then record the
970                  * original unaligned size from the message header.
971                  */
972                 if (msg->aux_size < ioq->abytes) {
973                         msg = NULL;
974                         break;
975                 }
976                 assert(msg->aux_size == ioq->abytes);
977                 msg->aux_size = ioq->unaligned_aux_size;
978
979                 /*
980                  * Check aux_crc, then we are done.  Note that the crc
981                  * is calculated over the aligned size, not the actual
982                  * size.
983                  */
984                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
985                 if (xcrc32 != msg->any.head.aux_crc) {
986                         ioq->error = DMSG_IOQ_ERROR_ACRC;
987                         fprintf(stderr,
988                                 "iocom: ACRC error %08x vs %08x "
989                                 "msgid %016jx msgcmd %08x auxsize %d\n",
990                                 xcrc32,
991                                 msg->any.head.aux_crc,
992                                 (intmax_t)msg->any.head.msgid,
993                                 msg->any.head.cmd,
994                                 msg->any.head.aux_bytes);
995                         break;
996                 }
997                 break;
998         case DMSG_MSGQ_STATE_ERROR:
999                 /*
1000                  * Continued calls to drain recorded transactions (returning
1001                  * a LNK_ERROR for each one), before we return the final
1002                  * LNK_ERROR.
1003                  */
1004                 assert(msg == NULL);
1005                 break;
1006         default:
1007                 /*
1008                  * We don't double-return errors, the caller should not
1009                  * have called us again after getting an error msg.
1010                  */
1011                 assert(0);
1012                 break;
1013         }
1014
1015         /*
1016          * Check the message sequence.  The iv[] should prevent any
1017          * possibility of a replay but we add this check anyway.
1018          */
1019         if (msg && ioq->error == 0) {
1020                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1021                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1022                 } else {
1023                         ++ioq->seq;
1024                 }
1025         }
1026
1027         /*
1028          * Handle error, RREQ, or completion
1029          *
1030          * NOTE: nmax and bytes are invalid at this point, we don't bother
1031          *       to update them when breaking out.
1032          */
1033         if (ioq->error) {
1034 skip:
1035                 /*
1036                  * An unrecoverable error causes all active receive
1037                  * transactions to be terminated with a LNK_ERROR message.
1038                  *
1039                  * Once all active transactions are exhausted we set the
1040                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1041                  * message, which should cause master processing loops to
1042                  * terminate.
1043                  */
1044                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1045                 assert(ioq->msg == msg);
1046                 if (msg) {
1047                         dmsg_msg_free(msg);
1048                         ioq->msg = NULL;
1049                         msg = NULL;
1050                 }
1051
1052                 /*
1053                  * No more I/O read processing
1054                  */
1055                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1056
1057                 /*
1058                  * Simulate a remote LNK_ERROR DELETE msg for any open
1059                  * transactions, ending with a final non-transactional
1060                  * LNK_ERROR (that the session can detect) when no
1061                  * transactions remain.
1062                  *
1063                  * NOTE: Temporarily supply state0 and a degenerate cmd
1064                  *       without CREATE set.  The real state will be
1065                  *       assigned in the loop.
1066                  *
1067                  * NOTE: We are simulating a received message using our
1068                  *       side of the state, so the DMSGF_REV* bits have
1069                  *       to be reversed.
1070                  */
1071                 pthread_mutex_lock(&iocom->mtx);
1072                 dmsg_iocom_drain(iocom);
1073                 dmsg_simulate_failure(&iocom->state0, 0, ioq->error);
1074                 pthread_mutex_unlock(&iocom->mtx);
1075                 if (TAILQ_FIRST(&ioq->msgq))
1076                         goto again;
1077
1078 #if 0
1079                 /*
1080                  * For the iocom error case we want to set RWORK to indicate
1081                  * that more messages might be pending.
1082                  *
1083                  * It is possible to return NULL when there is more work to
1084                  * do because each message has to be DELETEd in both
1085                  * directions before we continue on with the next (though
1086                  * this could be optimized).  The transmit direction will
1087                  * re-set RWORK.
1088                  */
1089                 if (msg)
1090                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1091 #endif
1092         } else if (msg == NULL) {
1093                 /*
1094                  * Insufficient data received to finish building the message,
1095                  * set RREQ and return NULL.
1096                  *
1097                  * Leave ioq->msg intact.
1098                  * Leave the FIFO intact.
1099                  */
1100                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1101         } else {
1102                 /*
1103                  * Continue processing msg.
1104                  *
1105                  * The fifo has already been advanced past the message.
1106                  * Trivially reset the FIFO indices if possible.
1107                  *
1108                  * clear the FIFO if it is now empty and set RREQ to wait
1109                  * for more from the socket.  If the FIFO is not empty set
1110                  * TWORK to bypass the poll so we loop immediately.
1111                  */
1112                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1113                     ioq->fifo_cdn == ioq->fifo_end) {
1114                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1115                         ioq->fifo_cdx = 0;
1116                         ioq->fifo_cdn = 0;
1117                         ioq->fifo_beg = 0;
1118                         ioq->fifo_end = 0;
1119                 } else {
1120                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1121                 }
1122                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1123                 ioq->msg = NULL;
1124
1125                 /*
1126                  * Handle message routing.  Validates non-zero sources
1127                  * and routes message.  Error will be 0 if the message is
1128                  * destined for us.
1129                  *
1130                  * State processing only occurs for messages destined for us.
1131                  */
1132                 if (DMsgDebugOpt >= 5) {
1133                         fprintf(stderr,
1134                                 "rxmsg cmd=%08x circ=%016jx\n",
1135                                 msg->any.head.cmd,
1136                                 (intmax_t)msg->any.head.circuit);
1137                 }
1138
1139                 error = dmsg_state_msgrx(msg, 0);
1140
1141                 if (error) {
1142                         /*
1143                          * Abort-after-closure, throw message away and
1144                          * start reading another.
1145                          */
1146                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1147                                 dmsg_msg_free(msg);
1148                                 goto again;
1149                         }
1150
1151                         /*
1152                          * Process real error and throw away message.
1153                          */
1154                         ioq->error = error;
1155                         goto skip;
1156                 }
1157
1158                 /*
1159                  * No error and not routed
1160                  */
1161                 /* no error, not routed.  Fall through and return msg */
1162         }
1163         return (msg);
1164 }
1165
1166 /*
1167  * Calculate the header and data crc's and write a low-level message to
1168  * the connection.  If aux_crc is non-zero the aux_data crc is already
1169  * assumed to have been set.
1170  *
1171  * A non-NULL msg is added to the queue but not necessarily flushed.
1172  * Calling this function with msg == NULL will get a flush going.
1173  *
1174  * (called from iocom_core only)
1175  */
1176 void
1177 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1178 {
1179         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1180         dmsg_msg_t *msg;
1181         uint32_t xcrc32;
1182         size_t hbytes;
1183         size_t abytes;
1184         dmsg_msg_queue_t tmpq;
1185
1186         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1187         TAILQ_INIT(&tmpq);
1188         pthread_mutex_lock(&iocom->mtx);
1189         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1190                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1191                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1192         }
1193         pthread_mutex_unlock(&iocom->mtx);
1194
1195         /*
1196          * Flush queue, doing all required encryption and CRC generation,
1197          * with the mutex unlocked.
1198          */
1199         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1200                 /*
1201                  * Process terminal connection errors.
1202                  */
1203                 TAILQ_REMOVE(&tmpq, msg, qentry);
1204                 if (ioq->error) {
1205                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1206                         ++ioq->msgcount;
1207                         continue;
1208                 }
1209
1210                 /*
1211                  * Finish populating the msg fields.  The salt ensures that
1212                  * the iv[] array is ridiculously randomized and we also
1213                  * re-seed our PRNG every 32768 messages just to be sure.
1214                  */
1215                 msg->any.head.magic = DMSG_HDR_MAGIC;
1216                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1217                 ++ioq->seq;
1218                 if ((ioq->seq & 32767) == 0) {
1219                         pthread_mutex_lock(&iocom->mtx);
1220                         srandomdev();
1221                         pthread_mutex_unlock(&iocom->mtx);
1222                 }
1223
1224                 /*
1225                  * Calculate aux_crc if 0, then calculate hdr_crc.
1226                  */
1227                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1228                         abytes = DMSG_DOALIGN(msg->aux_size);
1229                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1230                         msg->any.head.aux_crc = xcrc32;
1231                 }
1232                 msg->any.head.aux_bytes = msg->aux_size;
1233
1234                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1235                          DMSG_ALIGN;
1236                 msg->any.head.hdr_crc = 0;
1237                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1238
1239                 /*
1240                  * Enqueue the message (the flush codes handles stream
1241                  * encryption).
1242                  */
1243                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1244                 ++ioq->msgcount;
1245         }
1246         dmsg_iocom_flush2(iocom);
1247 }
1248
1249 /*
1250  * Thread localized, iocom->mtx not held by caller.
1251  *
1252  * (called from iocom_core via iocom_flush1 only)
1253  */
1254 void
1255 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1256 {
1257         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1258         dmsg_msg_t *msg;
1259         ssize_t n;
1260         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1261         size_t nact;
1262         size_t hbytes;
1263         size_t abytes;
1264         size_t hoff;
1265         size_t aoff;
1266         int iovcnt;
1267         int save_errno;
1268
1269         if (ioq->error) {
1270                 dmsg_iocom_drain(iocom);
1271                 return;
1272         }
1273
1274         /*
1275          * Pump messages out the connection by building an iovec.
1276          *
1277          * ioq->hbytes/ioq->abytes tracks how much of the first message
1278          * in the queue has been successfully written out, so we can
1279          * resume writing.
1280          */
1281         iovcnt = 0;
1282         nact = 0;
1283         hoff = ioq->hbytes;
1284         aoff = ioq->abytes;
1285
1286         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1287                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1288                          DMSG_ALIGN;
1289                 abytes = DMSG_DOALIGN(msg->aux_size);
1290                 assert(hoff <= hbytes && aoff <= abytes);
1291
1292                 if (hoff < hbytes) {
1293                         size_t maxlen = hbytes - hoff;
1294                         if (maxlen > sizeof(ioq->buf) / 2)
1295                                 maxlen = sizeof(ioq->buf) / 2;
1296                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1297                         iov[iovcnt].iov_len = maxlen;
1298                         nact += maxlen;
1299                         ++iovcnt;
1300                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1301                             maxlen != hbytes - hoff) {
1302                                 break;
1303                         }
1304                 }
1305                 if (aoff < abytes) {
1306                         size_t maxlen = abytes - aoff;
1307                         if (maxlen > sizeof(ioq->buf) / 2)
1308                                 maxlen = sizeof(ioq->buf) / 2;
1309
1310                         assert(msg->aux_data != NULL);
1311                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1312                         iov[iovcnt].iov_len = maxlen;
1313                         nact += maxlen;
1314                         ++iovcnt;
1315                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1316                             maxlen != abytes - aoff) {
1317                                 break;
1318                         }
1319                 }
1320                 hoff = 0;
1321                 aoff = 0;
1322         }
1323
1324         /*
1325          * Shortcut if no work to do.  Be sure to check for old work still
1326          * pending in the FIFO.
1327          */
1328         if (iovcnt == 0 && ioq->fifo_beg == ioq->fifo_cdx)
1329                 return;
1330
1331         /*
1332          * Encrypt and write the data.  The crypto code will move the
1333          * data into the fifo and adjust the iov as necessary.  If
1334          * encryption is disabled the iov is left alone.
1335          *
1336          * May return a smaller iov (thus a smaller n), with aggregated
1337          * chunks.  May reduce nmax to what fits in the FIFO.
1338          *
1339          * This function sets nact to the number of original bytes now
1340          * encrypted, adding to the FIFO some number of bytes that might
1341          * be greater depending on the crypto mechanic.  iov[] is adjusted
1342          * to point at the FIFO if necessary.
1343          *
1344          * NOTE: nact is the number of bytes eaten from the message.  For
1345          *       encrypted data this is the number of bytes processed for
1346          *       encryption and not necessarily the number of bytes writable.
1347          *       The return value from the writev() is the post-encrypted
1348          *       byte count which might be larger.
1349          *
1350          * NOTE: For direct writes, nact is the return value from the writev().
1351          */
1352         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1353                 /*
1354                  * Make sure the FIFO has a reasonable amount of space
1355                  * left (if not completely full).
1356                  *
1357                  * In this situation we are staging the encrypted message
1358                  * data in the FIFO.  (nact) represents how much plaintext
1359                  * has been staged, (n) represents how much encrypted data
1360                  * has been flushed.  The two are independent of each other.
1361                  */
1362                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1363                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1364                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1365                               ioq->fifo_end - ioq->fifo_beg);
1366                         ioq->fifo_cdx -= ioq->fifo_beg;
1367                         ioq->fifo_cdn -= ioq->fifo_beg;
1368                         ioq->fifo_end -= ioq->fifo_beg;
1369                         ioq->fifo_beg = 0;
1370                 }
1371
1372                 /* 
1373                  * beg .... cdx ............ cdn ............. end
1374                  * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1375                  *
1376                  * Advance fifo_beg on a successful write.
1377                  */
1378                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1379                 n = writev(iocom->sock_fd, iov, iovcnt);
1380                 save_errno = errno;
1381                 if (n > 0) {
1382                         ioq->fifo_beg += n;
1383                         if (ioq->fifo_beg == ioq->fifo_end) {
1384                                 ioq->fifo_beg = 0;
1385                                 ioq->fifo_cdn = 0;
1386                                 ioq->fifo_cdx = 0;
1387                                 ioq->fifo_end = 0;
1388                         }
1389                 }
1390
1391                 /*
1392                  * We don't mess with the nact returned by the crypto_encrypt
1393                  * call, which represents the filling of the FIFO.  (n) tells
1394                  * us how much we were able to write from the FIFO.  The two
1395                  * are different beasts when encrypting.
1396                  */
1397         } else {
1398                 /*
1399                  * In this situation we are not staging the messages to the
1400                  * FIFO but instead writing them directly from the msg
1401                  * structure(s) unencrypted, so (nact) is basically (n).
1402                  */
1403                 n = writev(iocom->sock_fd, iov, iovcnt);
1404                 save_errno = errno;
1405                 if (n > 0)
1406                         nact = n;
1407                 else
1408                         nact = 0;
1409         }
1410
1411         /*
1412          * Clean out the transmit queue based on what we successfully
1413          * encrypted (nact is the plaintext count) and is now in the FIFO.
1414          * ioq->hbytes/abytes represents the portion of the first message
1415          * previously sent.
1416          */
1417         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1418                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1419                          DMSG_ALIGN;
1420                 abytes = DMSG_DOALIGN(msg->aux_size);
1421
1422                 if ((size_t)nact < hbytes - ioq->hbytes) {
1423                         ioq->hbytes += nact;
1424                         nact = 0;
1425                         break;
1426                 }
1427                 nact -= hbytes - ioq->hbytes;
1428                 ioq->hbytes = hbytes;
1429                 if ((size_t)nact < abytes - ioq->abytes) {
1430                         ioq->abytes += nact;
1431                         nact = 0;
1432                         break;
1433                 }
1434                 nact -= abytes - ioq->abytes;
1435                 /* ioq->abytes = abytes; optimized out */
1436
1437 #if 0
1438                 fprintf(stderr,
1439                         "txmsg cmd=%08x circ=%016jx\n",
1440                         msg->any.head.cmd,
1441                         (intmax_t)msg->any.head.circuit);
1442 #endif
1443
1444 #ifdef DMSG_BLOCK_DEBUG
1445                 uint32_t tcmd;
1446
1447                 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
1448                         if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
1449                                 tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
1450                                             (msg->any.head.cmd & (DMSGF_CREATE |
1451                                                                   DMSGF_DELETE |
1452                                                                   DMSGF_REPLY));
1453                         } else {
1454                                 tcmd = 0;
1455                         }
1456                 } else {
1457                         tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
1458                 }
1459
1460                 switch (tcmd) {
1461                 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
1462                 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
1463                         fprintf(stderr, "write BIO %-3d %016jx %d@%016jx\n",
1464                                 biocount, msg->any.head.msgid,
1465                                 msg->any.blk_read.bytes,
1466                                 msg->any.blk_read.offset);
1467                         break;
1468                 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1469                 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1470                         fprintf(stderr, "wretr BIO %-3d %016jx %d@%016jx\n",
1471                                 biocount, msg->any.head.msgid,
1472                                 msg->any.blk_read.bytes,
1473                                 msg->any.blk_read.offset);
1474                         break;
1475                 default:
1476                         break;
1477                 }
1478 #endif
1479
1480                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1481                 --ioq->msgcount;
1482                 ioq->hbytes = 0;
1483                 ioq->abytes = 0;
1484                 dmsg_msg_free(msg);
1485         }
1486         assert(nact == 0);
1487
1488         /*
1489          * Process the return value from the write w/regards to blocking.
1490          */
1491         if (n < 0) {
1492                 if (save_errno != EINTR &&
1493                     save_errno != EINPROGRESS &&
1494                     save_errno != EAGAIN) {
1495                         /*
1496                          * Fatal write error
1497                          */
1498                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1499                         dmsg_iocom_drain(iocom);
1500                 } else {
1501                         /*
1502                          * Wait for socket buffer space, do not try to
1503                          * process more packets for transmit until space
1504                          * is available.
1505                          */
1506                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1507                 }
1508         } else if (TAILQ_FIRST(&ioq->msgq) ||
1509                    TAILQ_FIRST(&iocom->txmsgq) ||
1510                    ioq->fifo_beg != ioq->fifo_cdx) {
1511                 /*
1512                  * If the write succeeded and more messages are pending
1513                  * in either msgq, or the FIFO WWORK must remain set.
1514                  */
1515                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
1516         }
1517         /* else no transmit-side work remains */
1518
1519         if (ioq->error) {
1520                 dmsg_iocom_drain(iocom);
1521         }
1522 }
1523
1524 /*
1525  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1526  * write events will occur.  We don't kill read msgs because we want
1527  * the caller to pull off our contrived terminal error msg to detect
1528  * the connection failure.
1529  *
1530  * Localized to iocom_core thread, iocom->mtx not held by caller.
1531  */
1532 void
1533 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1534 {
1535         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1536         dmsg_msg_t *msg;
1537
1538         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1539         ioq->hbytes = 0;
1540         ioq->abytes = 0;
1541
1542         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1543                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1544                 --ioq->msgcount;
1545                 dmsg_msg_free(msg);
1546         }
1547 }
1548
1549 /*
1550  * Write a message to an iocom, with additional state processing.
1551  */
1552 void
1553 dmsg_msg_write(dmsg_msg_t *msg)
1554 {
1555         dmsg_iocom_t *iocom = msg->state->iocom;
1556         dmsg_state_t *state;
1557         char dummy;
1558
1559         pthread_mutex_lock(&iocom->mtx);
1560         state = msg->state;
1561
1562         if (DMsgDebugOpt) {
1563                 fprintf(stderr,
1564                         "msgtx: cmd=%08x msgid=%016jx "
1565                         "state %p(%08x) error=%d\n",
1566                         msg->any.head.cmd, msg->any.head.msgid,
1567                         state, (state ? state->icmd : 0),
1568                         msg->any.head.error);
1569         }
1570
1571
1572 #if 0
1573         /*
1574          * Make sure the parent transaction is still open in the transmit
1575          * direction.  If it isn't the message is dead and we have to
1576          * potentially simulate a rxmsg terminating the transaction.
1577          */
1578         if ((state->parent->txcmd & DMSGF_DELETE) ||
1579             (state->parent->rxcmd & DMSGF_DELETE)) {
1580                 fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1581                 dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1582                 dmsg_state_cleanuptx(iocom, msg);
1583                 dmsg_msg_free(msg);
1584                 pthread_mutex_unlock(&iocom->mtx);
1585                 return;
1586         }
1587 #endif
1588         /*
1589          * Process state data into the message as needed, then update the
1590          * state based on the message.
1591          */
1592         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1593                 /*
1594                  * Existing transaction (could be reply).  It is also
1595                  * possible for this to be the first reply (CREATE is set),
1596                  * in which case we populate state->txcmd.
1597                  *
1598                  * state->txcmd is adjusted to hold the final message cmd,
1599                  * and we also be sure to set the CREATE bit here.  We did
1600                  * not set it in dmsg_msg_alloc() because that would have
1601                  * not been serialized (state could have gotten ripped out
1602                  * from under the message prior to it being transmitted).
1603                  */
1604                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1605                     DMSGF_CREATE) {
1606                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1607                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1608                         state->flags &= ~DMSG_STATE_NEW;
1609                 }
1610                 msg->any.head.msgid = state->msgid;
1611
1612                 if (msg->any.head.cmd & DMSGF_CREATE) {
1613                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1614                 }
1615         }
1616
1617         /*
1618          * Discard messages sent to transactions which are already dead.
1619          */
1620         if (state && (state->txcmd & DMSGF_DELETE)) {
1621                 printf("dmsg_msg_write: drop msg %08x to dead "
1622                        "circuit state=%p\n",
1623                        msg->any.head.cmd, state);
1624                 dmsg_msg_free(msg);
1625                 return;
1626         }
1627
1628         /*
1629          * Normally we queue the msg for output.  However, if the circuit is
1630          * dead or dying we must simulate a failure in the return direction
1631          * and throw the message away.  The other end is not expecting any
1632          * further messages from us on this state.
1633          *
1634          * Note that the I/O thread is responsible for generating the CRCs
1635          * and encryption.
1636          */
1637         if (state->flags & DMSG_STATE_DYING) {
1638 #if 0
1639         if ((state->parent->txcmd & DMSGF_DELETE) ||
1640             (state->parent->flags & DMSG_STATE_DYING) ||
1641             (state->flags & DMSG_STATE_DYING)) {
1642 #endif
1643                 /* 
1644                  * Illegal message, kill state and related sub-state.
1645                  * Cannot transmit if state is already dying.
1646                  */
1647                 printf("dmsg_msg_write: Write to dying circuit "
1648                         "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
1649                         state->parent->rxcmd,
1650                         state->parent->txcmd,
1651                         state->parent->flags);
1652                 dmsg_state_hold(state);
1653                 dmsg_state_cleanuptx(iocom, msg);
1654                 if ((state->flags & DMSG_STATE_ABORTING) == 0) {
1655                         dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
1656                 }
1657                 dmsg_state_drop(state);
1658                 dmsg_msg_free(msg);
1659         } else {
1660                 /*
1661                  * Queue the message, clean up transmit state prior to queueing
1662                  * to avoid SMP races.
1663                  */
1664                 if (DMsgDebugOpt)
1665                 printf("dmsg_msg_write: commit msg state=%p to txkmsgq\n", state);
1666                 dmsg_state_cleanuptx(iocom, msg);
1667                 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1668                 dummy = 0;
1669                 write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1670         }
1671         pthread_mutex_unlock(&iocom->mtx);
1672 }
1673
1674 /*
1675  * Remove state from its parent's subq.  This can wind up recursively
1676  * dropping the parent upward.
1677  *
1678  * NOTE: iocom must be locked.
1679  *
1680  * NOTE: Once we drop the parent, our pstate pointer may become invalid.
1681  */
1682 static
1683 void
1684 dmsg_subq_delete(dmsg_state_t *state)
1685 {
1686         dmsg_state_t *pstate;
1687
1688         if (state->flags & DMSG_STATE_SUBINSERTED) {
1689                 pstate = state->parent;
1690                 assert(pstate);
1691                 if (pstate->scan == state)
1692                         pstate->scan = NULL;
1693                 TAILQ_REMOVE(&pstate->subq, state, entry);
1694                 state->flags &= ~DMSG_STATE_SUBINSERTED;
1695                 state->parent = NULL;
1696                 if (TAILQ_EMPTY(&pstate->subq))
1697                         dmsg_state_drop(pstate);/* pstate->subq */
1698                 pstate = NULL;                  /* safety */
1699                 dmsg_state_drop(state);         /* pstate->subq */
1700         } else {
1701                 assert(state->parent == NULL);
1702         }
1703 }
1704
1705 /*
1706  * Simulate reception of a transaction DELETE message when the link goes
1707  * bad.  This routine must recurse through state->subq and generate messages
1708  * and callbacks bottom-up.
1709  *
1710  * iocom->mtx must be held by caller.
1711  */
1712 static
1713 void
1714 dmsg_simulate_failure(dmsg_state_t *state, int meto, int error)
1715 {
1716         dmsg_state_t *substate;
1717
1718         dmsg_state_hold(state);
1719         if (meto)
1720                 dmsg_state_abort(state);
1721
1722         /*
1723          * Recurse through sub-states.
1724          */
1725 again:
1726         TAILQ_FOREACH(substate, &state->subq, entry) {
1727                 if (substate->flags & DMSG_STATE_ABORTING)
1728                         continue;
1729                 state->scan = substate;
1730                 dmsg_simulate_failure(substate, 1, error);
1731                 if (state->scan != substate)
1732                         goto again;
1733         }
1734
1735         dmsg_state_drop(state);
1736 }
1737
1738 static
1739 void
1740 dmsg_state_abort(dmsg_state_t *state)
1741 {
1742         dmsg_iocom_t *iocom;
1743         dmsg_msg_t *msg;
1744
1745         /*
1746          * Set ABORTING and DYING, return if already set.  If the state was
1747          * just allocated we defer the abort operation until the related
1748          * message is processed.
1749          */
1750         if (state->flags & DMSG_STATE_ABORTING)
1751                 return;
1752         state->flags |= DMSG_STATE_ABORTING;
1753         dmsg_state_dying(state);
1754         if (state->flags & DMSG_STATE_NEW) {
1755                 printf("dmsg_state_abort(0): state %p rxcmd %08x txcmd %08x "
1756                        "flags %08x - in NEW state\n",
1757                        state, state->rxcmd, state->txcmd, state->flags);
1758                 return;
1759         }
1760
1761         /*
1762          * Simulate parent state failure before child states.  Device
1763          * drivers need to understand this and flag the situation but might
1764          * have asynchronous operations in progress that they cannot stop.
1765          * To make things easier, parent states will not actually disappear
1766          * until the children are all gone.
1767          */
1768         if ((state->rxcmd & DMSGF_DELETE) == 0) {
1769                 fprintf(stderr, "SIMULATE ERROR\n");
1770                 msg = dmsg_msg_alloc_locked(state, 0, DMSG_LNK_ERROR,
1771                                             NULL, NULL);
1772                 if ((state->rxcmd & DMSGF_CREATE) == 0)
1773                         msg->any.head.cmd |= DMSGF_CREATE;
1774                 msg->any.head.cmd |= DMSGF_DELETE |
1775                                      (state->rxcmd & DMSGF_REPLY);
1776                 msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
1777                 msg->any.head.error = DMSG_ERR_LOSTLINK;
1778                 msg->any.head.cmd |= DMSGF_ABORT;
1779
1780                 /*
1781                  * Issue callback synchronously even though this isn't
1782                  * the receiver thread.  We need to issue the callback
1783                  * before removing state from the subq in order to allow
1784                  * the callback to reply.
1785                  */
1786                 iocom = state->iocom;
1787                 dmsg_state_msgrx(msg, 1);
1788                 pthread_mutex_unlock(&iocom->mtx);
1789                 iocom->rcvmsg_callback(msg);
1790                 pthread_mutex_lock(&iocom->mtx);
1791                 dmsg_state_cleanuprx(iocom, msg);
1792 #if 0
1793                 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1794                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1795 #endif
1796         }
1797 }
1798
1799
1800 /*
1801  * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing
1802  * the transmission of any new messages on these states.  This is done
1803  * atomically when parent state is terminating, whereas setting ABORTING is
1804  * not atomic and can leak races.
1805  */
1806 static
1807 void
1808 dmsg_state_dying(dmsg_state_t *state)
1809 {
1810         dmsg_state_t *scan;
1811
1812         if ((state->flags & DMSG_STATE_DYING) == 0) {
1813                 state->flags |= DMSG_STATE_DYING;
1814                 TAILQ_FOREACH(scan, &state->subq, entry)
1815                         dmsg_state_dying(scan);
1816         }
1817 }
1818
1819 /*
1820  * This is a shortcut to formulate a reply to msg with a simple error code,
1821  * It can reply to and terminate a transaction, or it can reply to a one-way
1822  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1823  * the error code (which can be 0).  Not all transactions are terminated
1824  * with DMSG_LNK_ERROR status (the low level only cares about the
1825  * MSGF_DELETE flag), but most are.
1826  *
1827  * Replies to one-way messages are a bit of an oxymoron but the feature
1828  * is used by the debug (DBG) protocol.
1829  *
1830  * The reply contains no extended data.
1831  */
1832 void
1833 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1834 {
1835         dmsg_state_t *state = msg->state;
1836         dmsg_msg_t *nmsg;
1837         uint32_t cmd;
1838
1839         /*
1840          * Reply with a simple error code and terminate the transaction.
1841          */
1842         cmd = DMSG_LNK_ERROR;
1843
1844         /*
1845          * Check if our direction has even been initiated yet, set CREATE.
1846          *
1847          * Check what direction this is (command or reply direction).  Note
1848          * that txcmd might not have been initiated yet.
1849          *
1850          * If our direction has already been closed we just return without
1851          * doing anything.
1852          */
1853         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1854                 if (state->txcmd & DMSGF_DELETE)
1855                         return;
1856                 if (state->txcmd & DMSGF_REPLY)
1857                         cmd |= DMSGF_REPLY;
1858                 cmd |= DMSGF_DELETE;
1859         } else {
1860                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1861                         cmd |= DMSGF_REPLY;
1862         }
1863
1864         /*
1865          * Allocate the message and associate it with the existing state.
1866          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1867          * allocate new state.  We have our state already.
1868          */
1869         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1870         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1871                 if ((state->txcmd & DMSGF_CREATE) == 0)
1872                         nmsg->any.head.cmd |= DMSGF_CREATE;
1873         }
1874         nmsg->any.head.error = error;
1875
1876         dmsg_msg_write(nmsg);
1877 }
1878
1879 /*
1880  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1881  * we are generating a streaming reply or an intermediate acknowledgement
1882  * of some sort as part of the higher level protocol, with more to come
1883  * later.
1884  */
1885 void
1886 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1887 {
1888         dmsg_state_t *state = msg->state;
1889         dmsg_msg_t *nmsg;
1890         uint32_t cmd;
1891
1892
1893         /*
1894          * Reply with a simple error code and terminate the transaction.
1895          */
1896         cmd = DMSG_LNK_ERROR;
1897
1898         /*
1899          * Check if our direction has even been initiated yet, set CREATE.
1900          *
1901          * Check what direction this is (command or reply direction).  Note
1902          * that txcmd might not have been initiated yet.
1903          *
1904          * If our direction has already been closed we just return without
1905          * doing anything.
1906          */
1907         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1908                 if (state->txcmd & DMSGF_DELETE)
1909                         return;
1910                 if (state->txcmd & DMSGF_REPLY)
1911                         cmd |= DMSGF_REPLY;
1912                 /* continuing transaction, do not set MSGF_DELETE */
1913         } else {
1914                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1915                         cmd |= DMSGF_REPLY;
1916         }
1917         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1918         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1919                 if ((state->txcmd & DMSGF_CREATE) == 0)
1920                         nmsg->any.head.cmd |= DMSGF_CREATE;
1921         }
1922         nmsg->any.head.error = error;
1923
1924         dmsg_msg_write(nmsg);
1925 }
1926
1927 /*
1928  * Terminate a transaction given a state structure by issuing a DELETE.
1929  * (the state structure must not be &iocom->state0)
1930  */
1931 void
1932 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1933 {
1934         dmsg_msg_t *nmsg;
1935         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1936
1937         /*
1938          * Nothing to do if we already transmitted a delete
1939          */
1940         if (state->txcmd & DMSGF_DELETE)
1941                 return;
1942
1943         /*
1944          * Set REPLY if the other end initiated the command.  Otherwise
1945          * we are the command direction.
1946          */
1947         if (state->txcmd & DMSGF_REPLY)
1948                 cmd |= DMSGF_REPLY;
1949
1950         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1951         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1952                 if ((state->txcmd & DMSGF_CREATE) == 0)
1953                         nmsg->any.head.cmd |= DMSGF_CREATE;
1954         }
1955         nmsg->any.head.error = error;
1956         dmsg_msg_write(nmsg);
1957 }
1958
1959 /*
1960  * Terminate a transaction given a state structure by issuing a DELETE.
1961  * (the state structure must not be &iocom->state0)
1962  */
1963 void
1964 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1965 {
1966         dmsg_msg_t *nmsg;
1967         uint32_t cmd = DMSG_LNK_ERROR;
1968
1969         /*
1970          * Nothing to do if we already transmitted a delete
1971          */
1972         if (state->txcmd & DMSGF_DELETE)
1973                 return;
1974
1975         /*
1976          * Set REPLY if the other end initiated the command.  Otherwise
1977          * we are the command direction.
1978          */
1979         if (state->txcmd & DMSGF_REPLY)
1980                 cmd |= DMSGF_REPLY;
1981
1982         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1983         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1984                 if ((state->txcmd & DMSGF_CREATE) == 0)
1985                         nmsg->any.head.cmd |= DMSGF_CREATE;
1986         }
1987         nmsg->any.head.error = error;
1988         dmsg_msg_write(nmsg);
1989 }
1990
1991 /************************************************************************
1992  *                      TRANSACTION STATE HANDLING                      *
1993  ************************************************************************
1994  *
1995  */
1996
1997 /*
1998  * Process state tracking for a message after reception, prior to execution.
1999  * Possibly route the message (consuming it).
2000  *
2001  * Called with msglk held and the msg dequeued.
2002  *
2003  * All messages are called with dummy state and return actual state.
2004  * (One-off messages often just return the same dummy state).
2005  *
2006  * May request that caller discard the message by setting *discardp to 1.
2007  * The returned state is not used in this case and is allowed to be NULL.
2008  *
2009  * --
2010  *
2011  * These routines handle persistent and command/reply message state via the
2012  * CREATE and DELETE flags.  The first message in a command or reply sequence
2013  * sets CREATE, the last message in a command or reply sequence sets DELETE.
2014  *
2015  * There can be any number of intermediate messages belonging to the same
2016  * sequence sent inbetween the CREATE message and the DELETE message,
2017  * which set neither flag.  This represents a streaming command or reply.
2018  *
2019  * Any command message received with CREATE set expects a reply sequence to
2020  * be returned.  Reply sequences work the same as command sequences except the
2021  * REPLY bit is also sent.  Both the command side and reply side can
2022  * degenerate into a single message with both CREATE and DELETE set.  Note
2023  * that one side can be streaming and the other side not, or neither, or both.
2024  *
2025  * The msgid is unique for the initiator.  That is, two sides sending a new
2026  * message can use the same msgid without colliding.
2027  *
2028  * --
2029  *
2030  * The message may be running over a circuit.  If the circuit is half-deleted
2031  * The message is typically racing against a link failure and must be thrown
2032  * out.  As the circuit deletion propagates the library will automatically
2033  * generate terminations for sub states.
2034  *
2035  * --
2036  *
2037  * ABORT sequences work by setting the ABORT flag along with normal message
2038  * state.  However, ABORTs can also be sent on half-closed messages, that is
2039  * even if the command or reply side has already sent a DELETE, as long as
2040  * the message has not been fully closed it can still send an ABORT+DELETE
2041  * to terminate the half-closed message state.
2042  *
2043  * Since ABORT+DELETEs can race we silently discard ABORT's for message
2044  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
2045  * also race, and in this situation the other side might have already
2046  * initiated a new unrelated command with the same message id.  Since
2047  * the abort has not set the CREATE flag the situation can be detected
2048  * and the message will also be discarded.
2049  *
2050  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
2051  * The ABORT request is essentially integrated into the command instead
2052  * of being sent later on.  In this situation the command implementation
2053  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
2054  * special-case non-blocking operation for the command.
2055  *
2056  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
2057  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
2058  *        one-way messages are not supported.
2059  *
2060  * NOTE!  If a command sequence does not support aborts the ABORT flag is
2061  *        simply ignored.
2062  *
2063  * --
2064  *
2065  * One-off messages (no reply expected) are sent without an established
2066  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
2067  * For one-off messages sent over circuits msgid generally MUST be 0.
2068  *
2069  * One-off messages cannot be aborted and typically aren't processed
2070  * by these routines.  Order is still guaranteed for messages sent over
2071  * the same circuit.  The REPLY bit can be used to distinguish whether
2072  * a one-off message is a command or reply.  For example, one-off replies
2073  * will typically just contain status updates.
2074  */
2075 static int
2076 dmsg_state_msgrx(dmsg_msg_t *msg, int mstate)
2077 {
2078         dmsg_iocom_t *iocom = msg->state->iocom;
2079         dmsg_state_t *state;
2080         dmsg_state_t *pstate;
2081         dmsg_state_t sdummy;
2082         int error;
2083
2084         pthread_mutex_lock(&iocom->mtx);
2085
2086         if (DMsgDebugOpt) {
2087                 fprintf(stderr,
2088                         "msgrx: cmd=%08x msgid=%016jx "
2089                         "circuit=%016jx error=%d\n",
2090                         msg->any.head.cmd,
2091                         msg->any.head.msgid,
2092                         msg->any.head.circuit,
2093                         msg->any.head.error);
2094         }
2095
2096         /*
2097          * Lookup the circuit (pstate).  The circuit will be an open
2098          * transaction.  The REVCIRC bit in the message tells us which side
2099          * initiated it.
2100          *
2101          * If mstate is non-zero the state has already been incorporated
2102          * into the message as part of a simulated abort.  Note that in this
2103          * situation the parent state may have already been removed from
2104          * the RBTREE.
2105          */
2106         if (mstate) {
2107                 pstate = msg->state->parent;
2108         } else if (msg->any.head.circuit) {
2109                 sdummy.msgid = msg->any.head.circuit;
2110
2111                 if (msg->any.head.cmd & DMSGF_REVCIRC) {
2112                         pstate = RB_FIND(dmsg_state_tree,
2113                                          &iocom->statewr_tree,
2114                                          &sdummy);
2115                 } else {
2116                         pstate = RB_FIND(dmsg_state_tree,
2117                                          &iocom->staterd_tree,
2118                                          &sdummy);
2119                 }
2120
2121                 /*
2122                  * If we cannot find the circuit throw the message away.
2123                  * The state will have already been taken care of by
2124                  * the simulated failure code.  This case can occur due
2125                  * to a failure propagating in one direction crossing a
2126                  * request on the failed circuit propagating in the other
2127                  * direction.
2128                  */
2129                 if (pstate == NULL) {
2130                         fprintf(stderr,
2131                                 "missing parent in stacked trans %s\n",
2132                                 dmsg_msg_str(msg));
2133                         pthread_mutex_unlock(&iocom->mtx);
2134                         error = DMSG_IOQ_ERROR_EALREADY;
2135
2136                         return error;
2137                 }
2138         } else {
2139                 pstate = &iocom->state0;
2140         }
2141         /* WARNING: pstate not (yet) refd */
2142
2143         /*
2144          * Lookup the msgid.
2145          *
2146          * If mstate is non-zero the state has already been incorporated
2147          * into the message as part of a simulated abort.  Note that in this
2148          * situation the state may have already been removed from the RBTREE.
2149          *
2150          * If received msg is a command state is on staterd_tree.
2151          * If received msg is a reply state is on statewr_tree.
2152          * Otherwise there is no state (retain &iocom->state0)
2153          */
2154         if (mstate) {
2155                 state = msg->state;
2156         } else {
2157                 sdummy.msgid = msg->any.head.msgid;
2158                 if (msg->any.head.cmd & DMSGF_REVTRANS) {
2159                         state = RB_FIND(dmsg_state_tree,
2160                                         &iocom->statewr_tree, &sdummy);
2161                 } else {
2162                         state = RB_FIND(dmsg_state_tree,
2163                                         &iocom->staterd_tree, &sdummy);
2164                 }
2165         }
2166
2167         if (DMsgDebugOpt) {
2168                 fprintf(stderr,
2169                         "msgrx:\tstate %p(%08x)",
2170                         state, (state ? state->icmd : 0));
2171                 if (pstate != &iocom->state0) {
2172                         fprintf(stderr,
2173                                 " pstate %p(%08x)",
2174                                 pstate, pstate->icmd);
2175                 }
2176                 fprintf(stderr, "\n");
2177         }
2178
2179         if (mstate) {
2180                 /* state already assigned to msg */
2181         } else if (state) {
2182                 /*
2183                  * Message over an existing transaction (CREATE should not
2184                  * be set).
2185                  */
2186                 dmsg_state_drop(msg->state);
2187                 dmsg_state_hold(state);
2188                 msg->state = state;
2189                 assert(pstate == state->parent);
2190         } else {
2191                 /*
2192                  * Either a new transaction (if CREATE set) or a one-off.
2193                  */
2194                 state = pstate;
2195         }
2196
2197         /*
2198          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2199          * inside the case statements.
2200          *
2201          * Construct new state as necessary.
2202          */
2203         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2204                                     DMSGF_REPLY)) {
2205         case DMSGF_CREATE:
2206         case DMSGF_CREATE | DMSGF_DELETE:
2207                 /*
2208                  * Create new sub-transaction under pstate.
2209                  * (any DELETE is handled in post-processing of msg).
2210                  *
2211                  * (During routing the msgid was made unique for this
2212                  * direction over the comlink, so our RB trees can be
2213                  * iocom-based instead of state-based).
2214                  */
2215                 if (state != pstate) {
2216                         fprintf(stderr,
2217                                 "duplicate transaction %s\n",
2218                                 dmsg_msg_str(msg));
2219                         error = DMSG_IOQ_ERROR_TRANS;
2220                         assert(0);
2221                         break;
2222                 }
2223
2224                 /*
2225                  * Allocate the new state.
2226                  */
2227                 state = malloc(sizeof(*state));
2228                 bzero(state, sizeof(*state));
2229                 atomic_add_int(&dmsg_state_count, 1);
2230
2231                 TAILQ_INIT(&state->subq);
2232                 dmsg_state_hold(pstate);
2233                 state->parent = pstate;
2234                 state->iocom = iocom;
2235                 state->flags = DMSG_STATE_DYNAMIC |
2236                                DMSG_STATE_OPPOSITE;
2237                 state->msgid = msg->any.head.msgid;
2238                 state->txcmd = DMSGF_REPLY;
2239                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2240                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2241                 state->flags &= ~DMSG_STATE_NEW;
2242                 msg->state = state;
2243
2244                 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2245                 if (TAILQ_EMPTY(&pstate->subq))
2246                         dmsg_state_hold(pstate);/* pstate->subq */
2247                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2248                 state->flags |= DMSG_STATE_SUBINSERTED |
2249                                 DMSG_STATE_RBINSERTED;
2250                 dmsg_state_hold(state);         /* pstate->subq */
2251                 dmsg_state_hold(state);         /* state on rbtree */
2252                 dmsg_state_hold(state);         /* msg->state */
2253
2254                 /*
2255                  * If the parent is a relay set up the state handler to
2256                  * automatically route the message.  Local processing will
2257                  * not occur if set.
2258                  *
2259                  * (state relays are seeded by SPAN processing)
2260                  */
2261                 if (pstate->relay)
2262                         state->func = dmsg_state_relay;
2263                 error = 0;
2264                 break;
2265         case DMSGF_DELETE:
2266                 /*
2267                  * Persistent state is expected but might not exist if an
2268                  * ABORT+DELETE races the close.
2269                  *
2270                  * (any DELETE is handled in post-processing of msg).
2271                  */
2272                 if (state == pstate) {
2273                         if (msg->any.head.cmd & DMSGF_ABORT) {
2274                                 error = DMSG_IOQ_ERROR_EALREADY;
2275                         } else {
2276                                 fprintf(stderr, "missing-state %s\n",
2277                                         dmsg_msg_str(msg));
2278                                 error = DMSG_IOQ_ERROR_TRANS;
2279                                 assert(0);
2280                         }
2281                         break;
2282                 }
2283
2284                 /*
2285                  * Handle another ABORT+DELETE case if the msgid has already
2286                  * been reused.
2287                  */
2288                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2289                         if (msg->any.head.cmd & DMSGF_ABORT) {
2290                                 error = DMSG_IOQ_ERROR_EALREADY;
2291                         } else {
2292                                 fprintf(stderr, "reused-state %s\n",
2293                                         dmsg_msg_str(msg));
2294                                 error = DMSG_IOQ_ERROR_TRANS;
2295                                 assert(0);
2296                         }
2297                         break;
2298                 }
2299                 error = 0;
2300                 break;
2301         default:
2302                 /*
2303                  * Check for mid-stream ABORT command received, otherwise
2304                  * allow.
2305                  */
2306                 if (msg->any.head.cmd & DMSGF_ABORT) {
2307                         if ((state == pstate) ||
2308                             (state->rxcmd & DMSGF_CREATE) == 0) {
2309                                 error = DMSG_IOQ_ERROR_EALREADY;
2310                                 break;
2311                         }
2312                 }
2313                 error = 0;
2314                 break;
2315         case DMSGF_REPLY | DMSGF_CREATE:
2316         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2317                 /*
2318                  * When receiving a reply with CREATE set the original
2319                  * persistent state message should already exist.
2320                  */
2321                 if (state == pstate) {
2322                         fprintf(stderr, "no-state(r) %s\n",
2323                                 dmsg_msg_str(msg));
2324                         error = DMSG_IOQ_ERROR_TRANS;
2325                         assert(0);
2326                         break;
2327                 }
2328                 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2329                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2330                 error = 0;
2331                 break;
2332         case DMSGF_REPLY | DMSGF_DELETE:
2333                 /*
2334                  * Received REPLY+ABORT+DELETE in case where msgid has
2335                  * already been fully closed, ignore the message.
2336                  */
2337                 if (state == pstate) {
2338                         if (msg->any.head.cmd & DMSGF_ABORT) {
2339                                 error = DMSG_IOQ_ERROR_EALREADY;
2340                         } else {
2341                                 fprintf(stderr, "no-state(r,d) %s\n",
2342                                         dmsg_msg_str(msg));
2343                                 error = DMSG_IOQ_ERROR_TRANS;
2344                                 assert(0);
2345                         }
2346                         break;
2347                 }
2348
2349                 /*
2350                  * Received REPLY+ABORT+DELETE in case where msgid has
2351                  * already been reused for an unrelated message,
2352                  * ignore the message.
2353                  */
2354                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2355                         if (msg->any.head.cmd & DMSGF_ABORT) {
2356                                 error = DMSG_IOQ_ERROR_EALREADY;
2357                         } else {
2358                                 fprintf(stderr, "reused-state(r,d) %s\n",
2359                                         dmsg_msg_str(msg));
2360                                 error = DMSG_IOQ_ERROR_TRANS;
2361                                 assert(0);
2362                         }
2363                         break;
2364                 }
2365                 error = 0;
2366                 break;
2367         case DMSGF_REPLY:
2368                 /*
2369                  * Check for mid-stream ABORT reply received to sent command.
2370                  */
2371                 if (msg->any.head.cmd & DMSGF_ABORT) {
2372                         if (state == pstate ||
2373                             (state->rxcmd & DMSGF_CREATE) == 0) {
2374                                 error = DMSG_IOQ_ERROR_EALREADY;
2375                                 break;
2376                         }
2377                 }
2378                 error = 0;
2379                 break;
2380         }
2381
2382         /*
2383          * Calculate the easy-switch() transactional command.  Represents
2384          * the outer-transaction command for any transaction-create or
2385          * transaction-delete, and the inner message command for any
2386          * non-transaction or inside-transaction command.  tcmd will be
2387          * set to 0 for any messaging error condition.
2388          *
2389          * The two can be told apart because outer-transaction commands
2390          * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2391          */
2392         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2393                 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
2394                         msg->tcmd = (state->icmd & DMSGF_BASECMDMASK) |
2395                                     (msg->any.head.cmd & (DMSGF_CREATE |
2396                                                           DMSGF_DELETE |
2397                                                           DMSGF_REPLY));
2398                 } else {
2399                         msg->tcmd = 0;
2400                 }
2401         } else {
2402                 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2403         }
2404
2405 #ifdef DMSG_BLOCK_DEBUG
2406         switch (msg->tcmd) {
2407         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2408         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2409                 fprintf(stderr, "read  BIO %-3d %016jx %d@%016jx\n",
2410                         biocount, msg->any.head.msgid,
2411                         msg->any.blk_read.bytes,
2412                         msg->any.blk_read.offset);
2413                 break;
2414         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2415         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2416                 fprintf(stderr, "rread BIO %-3d %016jx %d@%016jx\n",
2417                         biocount, msg->any.head.msgid,
2418                         msg->any.blk_read.bytes,
2419                         msg->any.blk_read.offset);
2420                 break;
2421         default:
2422                 break;
2423         }
2424 #endif
2425
2426         /*
2427          * Adjust state, mark receive side as DELETED if appropriate and
2428          * adjust RB tree if both sides are DELETED.  cleanuprx handles
2429          * the rest after the state callback returns.
2430          */
2431         assert(msg->state->iocom == iocom);
2432         assert(msg->state == state);
2433
2434         if (state->flags & DMSG_STATE_ROOT) {
2435                 /*
2436                  * Nothing to do for non-transactional messages.
2437                  */
2438         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2439                 /*
2440                  * Message terminating transaction, remove the state from
2441                  * the RB tree if the full transaction is now complete.
2442                  * The related state, subq, and parent link is retained
2443                  * until after the state callback is complete.
2444                  */
2445                 assert((state->rxcmd & DMSGF_DELETE) == 0);
2446                 state->rxcmd |= DMSGF_DELETE;
2447                 if (state->txcmd & DMSGF_DELETE) {
2448                         assert(state->flags & DMSG_STATE_RBINSERTED);
2449                         if (state->rxcmd & DMSGF_REPLY) {
2450                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2451                                 RB_REMOVE(dmsg_state_tree,
2452                                           &iocom->statewr_tree, state);
2453                         } else {
2454                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2455                                 RB_REMOVE(dmsg_state_tree,
2456                                           &iocom->staterd_tree, state);
2457                         }
2458                         state->flags &= ~DMSG_STATE_RBINSERTED;
2459                         dmsg_state_drop(state);
2460                 }
2461         }
2462
2463         pthread_mutex_unlock(&iocom->mtx);
2464
2465         if (DMsgDebugOpt && error)
2466                 fprintf(stderr, "msgrx: error %d\n", error);
2467
2468         return (error);
2469 }
2470
2471 /*
2472  * Route the message and handle pair-state processing.
2473  */
2474 void
2475 dmsg_state_relay(dmsg_msg_t *lmsg)
2476 {
2477         dmsg_state_t *lpstate;
2478         dmsg_state_t *rpstate;
2479         dmsg_state_t *lstate;
2480         dmsg_state_t *rstate;
2481         dmsg_msg_t *rmsg;
2482
2483 #ifdef DMSG_BLOCK_DEBUG
2484         switch (lmsg->tcmd) {
2485         case DMSG_BLK_OPEN | DMSGF_CREATE:
2486                 fprintf(stderr, "relay BIO_OPEN (CREATE)\n");
2487                 break;
2488         case DMSG_BLK_OPEN | DMSGF_DELETE:
2489                 fprintf(stderr, "relay BIO_OPEN (DELETE)\n");
2490                 break;
2491         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2492         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2493                 atomic_add_int(&biocount, 1);
2494                 fprintf(stderr, "relay BIO %-3d %016jx %d@%016jx\n",
2495                         biocount, lmsg->any.head.msgid,
2496                         lmsg->any.blk_read.bytes,
2497                         lmsg->any.blk_read.offset);
2498                 break;
2499         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2500         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2501                 fprintf(stderr, "retrn BIO %-3d %016jx %d@%016jx\n",
2502                         biocount, lmsg->any.head.msgid,
2503                         lmsg->any.blk_read.bytes,
2504                         lmsg->any.blk_read.offset);
2505                 atomic_add_int(&biocount, -1);
2506                 break;
2507         default:
2508                 break;
2509         }
2510 #endif
2511
2512         if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2513             DMSGF_CREATE) {
2514                 /*
2515                  * New sub-transaction, establish new state and relay.
2516                  */
2517                 lstate = lmsg->state;
2518                 lpstate = lstate->parent;
2519                 rpstate = lpstate->relay;
2520                 assert(lstate->relay == NULL);
2521                 assert(rpstate != NULL);
2522
2523                 rmsg = dmsg_msg_alloc(rpstate, 0,
2524                                       lmsg->any.head.cmd,
2525                                       dmsg_state_relay, NULL);
2526                 rstate = rmsg->state;
2527                 rstate->relay = lstate;
2528                 lstate->relay = rstate;
2529                 dmsg_state_hold(lstate);
2530                 dmsg_state_hold(rstate);
2531         } else {
2532                 /*
2533                  * State & relay already established
2534                  */
2535                 lstate = lmsg->state;
2536                 rstate = lstate->relay;
2537                 assert(rstate != NULL);
2538
2539                 assert((rstate->txcmd & DMSGF_DELETE) == 0);
2540
2541 #if 0
2542                 if (lstate->flags & DMSG_STATE_ABORTING) {
2543                         fprintf(stderr,
2544                                 "relay: relay lost link l=%p r=%p\n",
2545                                 lstate, rstate);
2546                         dmsg_simulate_failure(rstate, 0, DMSG_ERR_LOSTLINK);
2547                 }
2548 #endif
2549
2550                 rmsg = dmsg_msg_alloc(rstate, 0,
2551                                       lmsg->any.head.cmd,
2552                                       dmsg_state_relay, NULL);
2553         }
2554         if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2555                 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2556                       lmsg->hdr_size - sizeof(lmsg->any.head));
2557         }
2558         rmsg->any.head.error = lmsg->any.head.error;
2559         rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2560         rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2561         rmsg->aux_size = lmsg->aux_size;
2562         rmsg->aux_data = lmsg->aux_data;
2563         lmsg->aux_data = NULL;
2564
2565         /*
2566         fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2567         */
2568         dmsg_msg_write(rmsg);
2569 }
2570
2571 /*
2572  * Cleanup and retire msg after issuing the state callback.  The state
2573  * has already been removed from the RB tree.  The subq and msg must be
2574  * cleaned up.
2575  *
2576  * Called with the iocom mutex held (to handle subq disconnection).
2577  */
2578 void
2579 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2580 {
2581         dmsg_state_t *state;
2582
2583         assert(msg->state->iocom == iocom);
2584         state = msg->state;
2585         if (state->flags & DMSG_STATE_ROOT) {
2586                 /*
2587                  * Free a non-transactional message, there is no state
2588                  * to worry about.
2589                  */
2590                 dmsg_msg_free(msg);
2591         } else if ((state->flags & DMSG_STATE_SUBINSERTED) &&
2592                    (state->rxcmd & DMSGF_DELETE) &&
2593                    (state->txcmd & DMSGF_DELETE)) {
2594                 /*
2595                  * Must disconnect from parent and drop relay.
2596                  */
2597                 dmsg_subq_delete(state);
2598                 if (state->relay) {
2599                         dmsg_state_drop(state->relay);
2600                         state->relay = NULL;
2601                 }
2602                 dmsg_msg_free(msg);
2603         } else {
2604                 /*
2605                  * Message not terminating transaction, leave state intact
2606                  * and free message if it isn't the CREATE message.
2607                  */
2608                 dmsg_msg_free(msg);
2609         }
2610 }
2611
2612 /*
2613  * Clean up the state after pulling out needed fields and queueing the
2614  * message for transmission.   This occurs in dmsg_msg_write().
2615  *
2616  * Called with the mutex locked.
2617  */
2618 static void
2619 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2620 {
2621         dmsg_state_t *state;
2622
2623         assert(iocom == msg->state->iocom);
2624         state = msg->state;
2625
2626         dmsg_state_hold(state);
2627
2628         if (state->flags & DMSG_STATE_ROOT) {
2629                 ;
2630         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2631                 /*
2632                  * Message terminating transaction, destroy the related
2633                  * state, the original message, and this message (if it
2634                  * isn't the original message due to a CREATE|DELETE).
2635                  *
2636                  * It's possible for governing state to terminate while
2637                  * sub-transactions still exist.  This is allowed but
2638                  * will cause sub-transactions to recursively fail.
2639                  * Further reception of sub-transaction messages will be
2640                  * impossible because the circuit will no longer exist.
2641                  * (XXX need code to make sure that happens properly).
2642                  *
2643                  * NOTE: It is possible for a fafilure to terminate the
2644                  *       state after we have written the message but before
2645                  *       we are able to call cleanuptx, so txcmd might already
2646                  *       have DMSGF_DELETE set.
2647                  */
2648                 if ((state->txcmd & DMSGF_DELETE) == 0 &&
2649                     (state->rxcmd & DMSGF_DELETE)) {
2650                         state->txcmd |= DMSGF_DELETE;
2651                         assert(state->flags & DMSG_STATE_RBINSERTED);
2652                         if (state->txcmd & DMSGF_REPLY) {
2653                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2654                                 RB_REMOVE(dmsg_state_tree,
2655                                           &iocom->staterd_tree, state);
2656                         } else {
2657                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2658                                 RB_REMOVE(dmsg_state_tree,
2659                                           &iocom->statewr_tree, state);
2660                         }
2661                         state->flags &= ~DMSG_STATE_RBINSERTED;
2662                         dmsg_subq_delete(state);
2663
2664                         if (state->relay) {
2665                                 dmsg_state_drop(state->relay);
2666                                 state->relay = NULL;
2667                         }
2668                         dmsg_state_drop(state); /* state->rbtree */
2669                 } else if ((state->txcmd & DMSGF_DELETE) == 0) {
2670                         state->txcmd |= DMSGF_DELETE;
2671                 }
2672         }
2673
2674         /*
2675          * Deferred abort after transmission.
2676          */
2677         if ((state->flags & (DMSG_STATE_ABORTING | DMSG_STATE_DYING)) &&
2678             (state->rxcmd & DMSGF_DELETE) == 0) {
2679                 printf("kdmsg_state_cleanuptx: state=%p "
2680                         "executing deferred abort\n",
2681                         state);
2682                 state->flags &= ~DMSG_STATE_ABORTING;
2683                 dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
2684         }
2685
2686         dmsg_state_drop(state);
2687 }
2688
2689 /*
2690  * Called with or without locks
2691  */
2692 void
2693 dmsg_state_hold(dmsg_state_t *state)
2694 {
2695         atomic_add_int(&state->refs, 1);
2696 }
2697
2698 void
2699 dmsg_state_drop(dmsg_state_t *state)
2700 {
2701         assert(state->refs > 0);
2702         if (atomic_fetchadd_int(&state->refs, -1) == 1)
2703                 dmsg_state_free(state);
2704 }
2705
2706 /*
2707  * Called with iocom locked
2708  */
2709 static void
2710 dmsg_state_free(dmsg_state_t *state)
2711 {
2712         atomic_add_int(&dmsg_state_count, -1);
2713         if (DMsgDebugOpt) {
2714                 fprintf(stderr, "terminate state %p\n", state);
2715         }
2716         assert((state->flags & (DMSG_STATE_ROOT |
2717                                 DMSG_STATE_SUBINSERTED |
2718                                 DMSG_STATE_RBINSERTED)) == 0);
2719         assert(TAILQ_EMPTY(&state->subq));
2720         assert(state->refs == 0);
2721         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2722                 closefrom(3);
2723         assert(state->any.any == NULL);
2724         free(state);
2725 }
2726
2727 /*
2728  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2729  * header is not adjusted, just the core header.
2730  */
2731 void
2732 dmsg_bswap_head(dmsg_hdr_t *head)
2733 {
2734         head->magic     = bswap16(head->magic);
2735         head->reserved02 = bswap16(head->reserved02);
2736         head->salt      = bswap32(head->salt);
2737
2738         head->msgid     = bswap64(head->msgid);
2739         head->circuit   = bswap64(head->circuit);
2740         head->reserved18= bswap64(head->reserved18);
2741
2742         head->cmd       = bswap32(head->cmd);
2743         head->aux_crc   = bswap32(head->aux_crc);
2744         head->aux_bytes = bswap32(head->aux_bytes);
2745         head->error     = bswap32(head->error);
2746         head->aux_descr = bswap64(head->aux_descr);
2747         head->reserved38= bswap32(head->reserved38);
2748         head->hdr_crc   = bswap32(head->hdr_crc);
2749 }