dmsg - Formalize most of the debugging output, cleanup, fix uninit bug
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 #define DMSG_BLOCK_DEBUG
39
40 int DMsgDebugOpt;
41 int dmsg_state_count;
42 #ifdef DMSG_BLOCK_DEBUG
43 static int biocount;
44 #endif
45
46 static int dmsg_state_msgrx(dmsg_msg_t *msg, int mstate);
47 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
48 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
49 static void dmsg_state_free(dmsg_state_t *state);
50 static void dmsg_subq_delete(dmsg_state_t *state);
51 static void dmsg_simulate_failure(dmsg_state_t *state, int meto, int error);
52 static void dmsg_state_abort(dmsg_state_t *state);
53 static void dmsg_state_dying(dmsg_state_t *state);
54
55 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
56
57 /*
58  * STATE TREE - Represents open transactions which are indexed by their
59  *              { msgid } relative to the governing iocom.
60  */
61 int
62 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
63 {
64         if (state1->msgid < state2->msgid)
65                 return(-1);
66         if (state1->msgid > state2->msgid)
67                 return(1);
68         return(0);
69 }
70
71 /*
72  * Initialize a low-level ioq
73  */
74 void
75 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
76 {
77         bzero(ioq, sizeof(*ioq));
78         ioq->state = DMSG_MSGQ_STATE_HEADER1;
79         TAILQ_INIT(&ioq->msgq);
80 }
81
82 /*
83  * Cleanup queue.
84  *
85  * caller holds iocom->mtx.
86  */
87 void
88 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
89 {
90         dmsg_msg_t *msg;
91
92         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
93                 assert(0);      /* shouldn't happen */
94                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
95                 dmsg_msg_free(msg);
96         }
97         if ((msg = ioq->msg) != NULL) {
98                 ioq->msg = NULL;
99                 dmsg_msg_free(msg);
100         }
101 }
102
103 /*
104  * Initialize a low-level communications channel.
105  *
106  * NOTE: The signal_func() is called at least once from the loop and can be
107  *       re-armed via dmsg_iocom_restate().
108  */
109 void
110 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
111                    void (*signal_func)(dmsg_iocom_t *iocom),
112                    void (*rcvmsg_func)(dmsg_msg_t *msg),
113                    void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
114                    void (*altmsg_func)(dmsg_iocom_t *iocom))
115 {
116         struct stat st;
117
118         bzero(iocom, sizeof(*iocom));
119
120         asprintf(&iocom->label, "iocom-%p", iocom);
121         iocom->signal_callback = signal_func;
122         iocom->rcvmsg_callback = rcvmsg_func;
123         iocom->altmsg_callback = altmsg_func;
124         iocom->usrmsg_callback = usrmsg_func;
125
126         pthread_mutex_init(&iocom->mtx, NULL);
127         RB_INIT(&iocom->staterd_tree);
128         RB_INIT(&iocom->statewr_tree);
129         TAILQ_INIT(&iocom->txmsgq);
130         iocom->sock_fd = sock_fd;
131         iocom->alt_fd = alt_fd;
132         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
133         if (signal_func)
134                 iocom->flags |= DMSG_IOCOMF_SWORK;
135         dmsg_ioq_init(iocom, &iocom->ioq_rx);
136         dmsg_ioq_init(iocom, &iocom->ioq_tx);
137         iocom->state0.refs = 1;         /* should never trigger a free */
138         iocom->state0.iocom = iocom;
139         iocom->state0.parent = &iocom->state0;
140         iocom->state0.flags = DMSG_STATE_ROOT;
141         TAILQ_INIT(&iocom->state0.subq);
142
143         if (pipe(iocom->wakeupfds) < 0)
144                 assert(0);
145         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
146         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
147
148         /*
149          * Negotiate session crypto synchronously.  This will mark the
150          * connection as error'd if it fails.  If this is a pipe it's
151          * a linkage that we set up ourselves to the filesystem and there
152          * is no crypto.
153          */
154         if (fstat(sock_fd, &st) < 0)
155                 assert(0);
156         if (S_ISSOCK(st.st_mode))
157                 dmsg_crypto_negotiate(iocom);
158
159         /*
160          * Make sure our fds are set to non-blocking for the iocom core.
161          */
162         if (sock_fd >= 0)
163                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
164 #if 0
165         /* if line buffered our single fgets() should be fine */
166         if (alt_fd >= 0)
167                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
168 #endif
169 }
170
171 void
172 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
173 {
174         va_list va;
175         char *optr;
176
177         va_start(va, ctl);
178         optr = iocom->label;
179         vasprintf(&iocom->label, ctl, va);
180         va_end(va);
181         if (optr)
182                 free(optr);
183 }
184
185 /*
186  * May only be called from a callback from iocom_core.
187  *
188  * Adjust state machine functions, set flags to guarantee that both
189  * the recevmsg_func and the sendmsg_func is called at least once.
190  */
191 void
192 dmsg_iocom_restate(dmsg_iocom_t *iocom,
193                    void (*signal_func)(dmsg_iocom_t *),
194                    void (*rcvmsg_func)(dmsg_msg_t *msg))
195 {
196         pthread_mutex_lock(&iocom->mtx);
197         iocom->signal_callback = signal_func;
198         iocom->rcvmsg_callback = rcvmsg_func;
199         if (signal_func)
200                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
201         else
202                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
203         pthread_mutex_unlock(&iocom->mtx);
204 }
205
206 void
207 dmsg_iocom_signal(dmsg_iocom_t *iocom)
208 {
209         pthread_mutex_lock(&iocom->mtx);
210         if (iocom->signal_callback)
211                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
212         pthread_mutex_unlock(&iocom->mtx);
213 }
214
215 /*
216  * Cleanup a terminating iocom.
217  *
218  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
219  * from all possible references to it.
220  */
221 void
222 dmsg_iocom_done(dmsg_iocom_t *iocom)
223 {
224         if (iocom->sock_fd >= 0) {
225                 close(iocom->sock_fd);
226                 iocom->sock_fd = -1;
227         }
228         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
229                 close(iocom->alt_fd);
230                 iocom->alt_fd = -1;
231         }
232         dmsg_ioq_done(iocom, &iocom->ioq_rx);
233         dmsg_ioq_done(iocom, &iocom->ioq_tx);
234         if (iocom->wakeupfds[0] >= 0) {
235                 close(iocom->wakeupfds[0]);
236                 iocom->wakeupfds[0] = -1;
237         }
238         if (iocom->wakeupfds[1] >= 0) {
239                 close(iocom->wakeupfds[1]);
240                 iocom->wakeupfds[1] = -1;
241         }
242         pthread_mutex_destroy(&iocom->mtx);
243 }
244
245 /*
246  * Allocate a new message using the specified transaction state.
247  *
248  * If CREATE is set a new transaction is allocated relative to the passed-in
249  * transaction (the 'state' argument becomes pstate).
250  *
251  * If CREATE is not set the message is associated with the passed-in
252  * transaction.
253  */
254 dmsg_msg_t *
255 dmsg_msg_alloc(dmsg_state_t *state,
256                size_t aux_size, uint32_t cmd,
257                void (*func)(dmsg_msg_t *), void *data)
258 {
259         dmsg_iocom_t *iocom = state->iocom;
260         dmsg_msg_t *msg;
261
262         pthread_mutex_lock(&iocom->mtx);
263         msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
264         pthread_mutex_unlock(&iocom->mtx);
265
266         return msg;
267 }
268
269 dmsg_msg_t *
270 dmsg_msg_alloc_locked(dmsg_state_t *state,
271                size_t aux_size, uint32_t cmd,
272                void (*func)(dmsg_msg_t *), void *data)
273 {
274         dmsg_iocom_t *iocom = state->iocom;
275         dmsg_state_t *pstate;
276         dmsg_msg_t *msg;
277         int hbytes;
278         size_t aligned_size;
279
280         aligned_size = DMSG_DOALIGN(aux_size);
281         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
282                 /*
283                  * When CREATE is set without REPLY the caller is
284                  * initiating a new transaction stacked under the specified
285                  * circuit.
286                  *
287                  * It is possible to race a circuit failure, inherit the
288                  * parent's STATE_DYING flag to trigger an abort sequence
289                  * in the transmit path.  By not inheriting ABORTING the
290                  * abort sequence can recurse.
291                  *
292                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
293                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
294                  */
295                 pstate = state;
296                 state = malloc(sizeof(*state));
297                 bzero(state, sizeof(*state));
298                 atomic_add_int(&dmsg_state_count, 1);
299
300                 TAILQ_INIT(&state->subq);
301                 state->parent = pstate;
302                 state->iocom = iocom;
303                 state->flags = DMSG_STATE_DYNAMIC;
304                 state->msgid = (uint64_t)(uintptr_t)state;
305                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
306                 state->rxcmd = DMSGF_REPLY;
307                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
308                 state->func = func;
309                 state->any.any = data;
310
311                 state->flags |= DMSG_STATE_SUBINSERTED |
312                                 DMSG_STATE_RBINSERTED;
313                 state->flags |= pstate->flags & DMSG_STATE_DYING;
314                 if (TAILQ_EMPTY(&pstate->subq))
315                         dmsg_state_hold(pstate);
316                 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
317                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
318                 dmsg_state_hold(state);         /* state on pstate->subq */
319                 dmsg_state_hold(state);         /* state on rbtree */
320                 dmsg_state_hold(state);         /* msg->state */
321         } else {
322                 /*
323                  * Otherwise the message is transmitted over the existing
324                  * open transaction.
325                  */
326                 pstate = state->parent;
327                 dmsg_state_hold(state);         /* msg->state */
328         }
329
330         /* XXX SMP race for state */
331         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
332         assert((size_t)hbytes >= sizeof(struct dmsg_hdr));
333         msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes);
334         bzero(msg, offsetof(struct dmsg_msg, any.head));
335
336         /*
337          * [re]allocate the auxillary data buffer.  The caller knows that
338          * a size-aligned buffer will be allocated but we do not want to
339          * force the caller to zero any tail piece, so we do that ourself.
340          */
341         if (msg->aux_size != aux_size) {
342                 if (msg->aux_data) {
343                         free(msg->aux_data);
344                         msg->aux_data = NULL;
345                         msg->aux_size = 0;
346                 }
347                 if (aux_size) {
348                         msg->aux_data = malloc(aligned_size);
349                         msg->aux_size = aux_size;
350                         if (aux_size != aligned_size) {
351                                 bzero(msg->aux_data + aux_size,
352                                       aligned_size - aux_size);
353                         }
354                 }
355         }
356
357         /*
358          * Set REVTRANS if the transaction was remotely initiated
359          * Set REVCIRC if the circuit was remotely initiated
360          */
361         if (state->flags & DMSG_STATE_OPPOSITE)
362                 cmd |= DMSGF_REVTRANS;
363         if (pstate->flags & DMSG_STATE_OPPOSITE)
364                 cmd |= DMSGF_REVCIRC;
365
366         /*
367          * Finish filling out the header.
368          */
369         bzero(&msg->any.head, hbytes);
370         msg->hdr_size = hbytes;
371         msg->any.head.magic = DMSG_HDR_MAGIC;
372         msg->any.head.cmd = cmd;
373         msg->any.head.aux_descr = 0;
374         msg->any.head.aux_crc = 0;
375         msg->any.head.msgid = state->msgid;
376         msg->any.head.circuit = pstate->msgid;
377         msg->state = state;
378
379         return (msg);
380 }
381
382 /*
383  * Free a message so it can be reused afresh.
384  *
385  * NOTE: aux_size can be 0 with a non-NULL aux_data.
386  */
387 static
388 void
389 dmsg_msg_free_locked(dmsg_msg_t *msg)
390 {
391         dmsg_state_t *state;
392
393         if ((state = msg->state) != NULL) {
394                 dmsg_state_drop(state);
395                 msg->state = NULL;      /* safety */
396         }
397         if (msg->aux_data) {
398                 free(msg->aux_data);
399                 msg->aux_data = NULL;   /* safety */
400         }
401         msg->aux_size = 0;
402         free (msg);
403 }
404
405 void
406 dmsg_msg_free(dmsg_msg_t *msg)
407 {
408         dmsg_iocom_t *iocom = msg->state->iocom;
409
410         pthread_mutex_lock(&iocom->mtx);
411         dmsg_msg_free_locked(msg);
412         pthread_mutex_unlock(&iocom->mtx);
413 }
414
415 /*
416  * I/O core loop for an iocom.
417  *
418  * Thread localized, iocom->mtx not held.
419  */
420 void
421 dmsg_iocom_core(dmsg_iocom_t *iocom)
422 {
423         struct pollfd fds[3];
424         char dummybuf[256];
425         dmsg_msg_t *msg;
426         int timeout;
427         int count;
428         int wi; /* wakeup pipe */
429         int si; /* socket */
430         int ai; /* alt bulk path socket */
431
432         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
433                 /*
434                  * These iocom->flags are only manipulated within the
435                  * context of the current thread.  However, modifications
436                  * still require atomic ops.
437                  */
438                 dmio_printf(iocom, 5, "iocom %p %08x\n",
439                             iocom, iocom->flags);
440                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
441                                      DMSG_IOCOMF_WWORK |
442                                      DMSG_IOCOMF_PWORK |
443                                      DMSG_IOCOMF_SWORK |
444                                      DMSG_IOCOMF_ARWORK |
445                                      DMSG_IOCOMF_AWWORK)) == 0) {
446                         /*
447                          * Only poll if no immediate work is pending.
448                          * Otherwise we are just wasting our time calling
449                          * poll.
450                          */
451                         timeout = 5000;
452
453                         count = 0;
454                         wi = -1;
455                         si = -1;
456                         ai = -1;
457
458                         /*
459                          * Always check the inter-thread pipe, e.g.
460                          * for iocom->txmsgq work.
461                          */
462                         wi = count++;
463                         fds[wi].fd = iocom->wakeupfds[0];
464                         fds[wi].events = POLLIN;
465                         fds[wi].revents = 0;
466
467                         /*
468                          * Check the socket input/output direction as
469                          * requested
470                          */
471                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
472                                             DMSG_IOCOMF_WREQ)) {
473                                 si = count++;
474                                 fds[si].fd = iocom->sock_fd;
475                                 fds[si].events = 0;
476                                 fds[si].revents = 0;
477
478                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
479                                         fds[si].events |= POLLIN;
480                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
481                                         fds[si].events |= POLLOUT;
482                         }
483
484                         /*
485                          * Check the alternative fd for work.
486                          */
487                         if (iocom->alt_fd >= 0) {
488                                 ai = count++;
489                                 fds[ai].fd = iocom->alt_fd;
490                                 fds[ai].events = POLLIN;
491                                 fds[ai].revents = 0;
492                         }
493                         poll(fds, count, timeout);
494
495                         if (wi >= 0 && (fds[wi].revents & POLLIN))
496                                 atomic_set_int(&iocom->flags,
497                                                DMSG_IOCOMF_PWORK);
498                         if (si >= 0 && (fds[si].revents & POLLIN))
499                                 atomic_set_int(&iocom->flags,
500                                                DMSG_IOCOMF_RWORK);
501                         if (si >= 0 && (fds[si].revents & POLLOUT))
502                                 atomic_set_int(&iocom->flags,
503                                                DMSG_IOCOMF_WWORK);
504                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
505                                 atomic_set_int(&iocom->flags,
506                                                DMSG_IOCOMF_WWORK);
507                         if (ai >= 0 && (fds[ai].revents & POLLIN))
508                                 atomic_set_int(&iocom->flags,
509                                                DMSG_IOCOMF_ARWORK);
510                 } else {
511                         /*
512                          * Always check the pipe
513                          */
514                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
515                 }
516
517                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
518                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
519                         iocom->signal_callback(iocom);
520                 }
521
522                 /*
523                  * Pending message queues from other threads wake us up
524                  * with a write to the wakeupfds[] pipe.  We have to clear
525                  * the pipe with a dummy read.
526                  */
527                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
528                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
529                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
530                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
531                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
532                 }
533
534                 /*
535                  * Message write sequencing
536                  */
537                 if (iocom->flags & DMSG_IOCOMF_WWORK)
538                         dmsg_iocom_flush1(iocom);
539
540                 /*
541                  * Message read sequencing.  Run this after the write
542                  * sequencing in case the write sequencing allowed another
543                  * auto-DELETE to occur on the read side.
544                  */
545                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
546                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
547                                (msg = dmsg_ioq_read(iocom)) != NULL) {
548                                 dmio_printf(iocom, 4, "receive %s\n",
549                                             dmsg_msg_str(msg));
550                                 iocom->rcvmsg_callback(msg);
551                                 pthread_mutex_lock(&iocom->mtx);
552                                 dmsg_state_cleanuprx(iocom, msg);
553                                 pthread_mutex_unlock(&iocom->mtx);
554                         }
555                 }
556
557                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
558                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
559                         iocom->altmsg_callback(iocom);
560                 }
561         }
562 }
563
564 /*
565  * Make sure there's enough room in the FIFO to hold the
566  * needed data.
567  *
568  * Assume worst case encrypted form is 2x the size of the
569  * plaintext equivalent.
570  */
571 static
572 size_t
573 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
574 {
575         size_t bytes;
576         size_t nmax;
577
578         bytes = ioq->fifo_cdx - ioq->fifo_beg;
579         nmax = sizeof(ioq->buf) - ioq->fifo_end;
580         if (bytes + nmax / 2 < needed) {
581                 if (bytes) {
582                         bcopy(ioq->buf + ioq->fifo_beg,
583                               ioq->buf,
584                               bytes);
585                 }
586                 ioq->fifo_cdx -= ioq->fifo_beg;
587                 ioq->fifo_beg = 0;
588                 if (ioq->fifo_cdn < ioq->fifo_end) {
589                         bcopy(ioq->buf + ioq->fifo_cdn,
590                               ioq->buf + ioq->fifo_cdx,
591                               ioq->fifo_end - ioq->fifo_cdn);
592                 }
593                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
594                 ioq->fifo_cdn = ioq->fifo_cdx;
595                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
596         }
597         return(nmax);
598 }
599
600 /*
601  * Read the next ready message from the ioq, issuing I/O if needed.
602  * Caller should retry on a read-event when NULL is returned.
603  *
604  * If an error occurs during reception a DMSG_LNK_ERROR msg will
605  * be returned for each open transaction, then the ioq and iocom
606  * will be errored out and a non-transactional DMSG_LNK_ERROR
607  * msg will be returned as the final message.  The caller should not call
608  * us again after the final message is returned.
609  *
610  * Thread localized, iocom->mtx not held.
611  */
612 dmsg_msg_t *
613 dmsg_ioq_read(dmsg_iocom_t *iocom)
614 {
615         dmsg_ioq_t *ioq = &iocom->ioq_rx;
616         dmsg_msg_t *msg;
617         dmsg_hdr_t *head;
618         ssize_t n;
619         size_t bytes;
620         size_t nmax;
621         uint32_t aux_size;
622         uint32_t xcrc32;
623         int error;
624
625 again:
626         /*
627          * If a message is already pending we can just remove and
628          * return it.  Message state has already been processed.
629          * (currently not implemented)
630          */
631         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
632                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
633
634                 if (msg->state == &iocom->state0) {
635                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
636                         dmio_printf(iocom, 1,
637                                     "EOF ON SOCKET %d\n",
638                                     iocom->sock_fd);
639                 }
640                 return (msg);
641         }
642         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
643
644         /*
645          * If the stream is errored out we stop processing it.
646          */
647         if (ioq->error)
648                 goto skip;
649
650         /*
651          * Message read in-progress (msg is NULL at the moment).  We don't
652          * allocate a msg until we have its core header.
653          */
654         nmax = sizeof(ioq->buf) - ioq->fifo_end;
655         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
656         msg = ioq->msg;
657
658         switch(ioq->state) {
659         case DMSG_MSGQ_STATE_HEADER1:
660                 /*
661                  * Load the primary header, fail on any non-trivial read
662                  * error or on EOF.  Since the primary header is the same
663                  * size is the message alignment it will never straddle
664                  * the end of the buffer.
665                  */
666                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
667                 if (bytes < sizeof(msg->any.head)) {
668                         n = read(iocom->sock_fd,
669                                  ioq->buf + ioq->fifo_end,
670                                  nmax);
671                         if (n <= 0) {
672                                 if (n == 0) {
673                                         ioq->error = DMSG_IOQ_ERROR_EOF;
674                                         break;
675                                 }
676                                 if (errno != EINTR &&
677                                     errno != EINPROGRESS &&
678                                     errno != EAGAIN) {
679                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
680                                         break;
681                                 }
682                                 n = 0;
683                                 /* fall through */
684                         }
685                         ioq->fifo_end += (size_t)n;
686                         nmax -= (size_t)n;
687                 }
688
689                 /*
690                  * Decrypt data received so far.  Data will be decrypted
691                  * in-place but might create gaps in the FIFO.  Partial
692                  * blocks are not immediately decrypted.
693                  *
694                  * WARNING!  The header might be in the wrong endian, we
695                  *           do not fix it up until we get the entire
696                  *           extended header.
697                  */
698                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
699                         dmsg_crypto_decrypt(iocom, ioq);
700                 } else {
701                         ioq->fifo_cdx = ioq->fifo_end;
702                         ioq->fifo_cdn = ioq->fifo_end;
703                 }
704                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
705
706                 /*
707                  * Insufficient data accumulated (msg is NULL, caller will
708                  * retry on event).
709                  */
710                 assert(msg == NULL);
711                 if (bytes < sizeof(msg->any.head))
712                         break;
713
714                 /*
715                  * Check and fixup the core header.  Note that the icrc
716                  * has to be calculated before any fixups, but the crc
717                  * fields in the msg may have to be swapped like everything
718                  * else.
719                  */
720                 head = (void *)(ioq->buf + ioq->fifo_beg);
721                 if (head->magic != DMSG_HDR_MAGIC &&
722                     head->magic != DMSG_HDR_MAGIC_REV) {
723                         dmio_printf(iocom, 1,
724                                     "%s: head->magic is bad %02x\n",
725                                     iocom->label, head->magic);
726                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
727                                 dmio_printf(iocom, 1, "%s\n",
728                                             "(on encrypted link)");
729                         ioq->error = DMSG_IOQ_ERROR_SYNC;
730                         break;
731                 }
732
733                 /*
734                  * Calculate the full header size and aux data size
735                  */
736                 if (head->magic == DMSG_HDR_MAGIC_REV) {
737                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
738                                       DMSG_ALIGN;
739                         aux_size = bswap32(head->aux_bytes);
740                 } else {
741                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
742                                       DMSG_ALIGN;
743                         aux_size = head->aux_bytes;
744                 }
745                 ioq->abytes = DMSG_DOALIGN(aux_size);
746                 ioq->unaligned_aux_size = aux_size;
747                 if (ioq->hbytes < sizeof(msg->any.head) ||
748                     ioq->hbytes > sizeof(msg->any) ||
749                     ioq->abytes > DMSG_AUX_MAX) {
750                         ioq->error = DMSG_IOQ_ERROR_FIELD;
751                         break;
752                 }
753
754                 /*
755                  * Allocate the message, the next state will fill it in.
756                  *
757                  * NOTE: The aux_data buffer will be sized to an aligned
758                  *       value and the aligned remainder zero'd for
759                  *       convenience.
760                  *
761                  * NOTE: Supply dummy state and a degenerate cmd without
762                  *       CREATE set.  The message will temporarily be
763                  *       associated with state0 until later post-processing.
764                  */
765                 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
766                                      ioq->hbytes / DMSG_ALIGN,
767                                      NULL, NULL);
768                 ioq->msg = msg;
769
770                 /*
771                  * Fall through to the next state.  Make sure that the
772                  * extended header does not straddle the end of the buffer.
773                  * We still want to issue larger reads into our buffer,
774                  * book-keeping is easier if we don't bcopy() yet.
775                  *
776                  * Make sure there is enough room for bloated encrypt data.
777                  */
778                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
779                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
780                 /* fall through */
781         case DMSG_MSGQ_STATE_HEADER2:
782                 /*
783                  * Fill out the extended header.
784                  */
785                 assert(msg != NULL);
786                 if (bytes < ioq->hbytes) {
787                         assert(nmax > 0);
788                         n = read(iocom->sock_fd,
789                                  ioq->buf + ioq->fifo_end,
790                                  nmax);
791                         if (n <= 0) {
792                                 if (n == 0) {
793                                         ioq->error = DMSG_IOQ_ERROR_EOF;
794                                         break;
795                                 }
796                                 if (errno != EINTR &&
797                                     errno != EINPROGRESS &&
798                                     errno != EAGAIN) {
799                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
800                                         break;
801                                 }
802                                 n = 0;
803                                 /* fall through */
804                         }
805                         ioq->fifo_end += (size_t)n;
806                         nmax -= (size_t)n;
807                 }
808
809                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
810                         dmsg_crypto_decrypt(iocom, ioq);
811                 } else {
812                         ioq->fifo_cdx = ioq->fifo_end;
813                         ioq->fifo_cdn = ioq->fifo_end;
814                 }
815                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
816
817                 /*
818                  * Insufficient data accumulated (set msg NULL so caller will
819                  * retry on event).
820                  */
821                 if (bytes < ioq->hbytes) {
822                         msg = NULL;
823                         break;
824                 }
825
826                 /*
827                  * Calculate the extended header, decrypt data received
828                  * so far.  Handle endian-conversion for the entire extended
829                  * header.
830                  */
831                 head = (void *)(ioq->buf + ioq->fifo_beg);
832
833                 /*
834                  * Check the CRC.
835                  */
836                 if (head->magic == DMSG_HDR_MAGIC_REV)
837                         xcrc32 = bswap32(head->hdr_crc);
838                 else
839                         xcrc32 = head->hdr_crc;
840                 head->hdr_crc = 0;
841                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
842                         ioq->error = DMSG_IOQ_ERROR_XCRC;
843                         dmio_printf(iocom, 1, "BAD-XCRC(%08x,%08x) %s\n",
844                                     xcrc32, dmsg_icrc32(head, ioq->hbytes),
845                                     dmsg_msg_str(msg));
846                         assert(0);
847                         break;
848                 }
849                 head->hdr_crc = xcrc32;
850
851                 if (head->magic == DMSG_HDR_MAGIC_REV) {
852                         dmsg_bswap_head(head);
853                 }
854
855                 /*
856                  * Copy the extended header into the msg and adjust the
857                  * FIFO.
858                  */
859                 bcopy(head, &msg->any, ioq->hbytes);
860
861                 /*
862                  * We are either done or we fall-through.
863                  */
864                 if (ioq->abytes == 0) {
865                         ioq->fifo_beg += ioq->hbytes;
866                         break;
867                 }
868
869                 /*
870                  * Must adjust bytes (and the state) when falling through.
871                  * nmax doesn't change.
872                  */
873                 ioq->fifo_beg += ioq->hbytes;
874                 bytes -= ioq->hbytes;
875                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
876                 /* fall through */
877         case DMSG_MSGQ_STATE_AUXDATA1:
878                 /*
879                  * Copy the partial or complete [decrypted] payload from
880                  * remaining bytes in the FIFO in order to optimize the
881                  * makeroom call in the AUXDATA2 state.  We have to
882                  * fall-through either way so we can check the crc.
883                  *
884                  * msg->aux_size tracks our aux data.
885                  *
886                  * (Lets not complicate matters if the data is encrypted,
887                  *  since the data in-stream is not the same size as the
888                  *  data decrypted).
889                  */
890                 if (bytes >= ioq->abytes) {
891                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
892                               ioq->abytes);
893                         msg->aux_size = ioq->abytes;
894                         ioq->fifo_beg += ioq->abytes;
895                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
896                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
897                         bytes -= ioq->abytes;
898                 } else if (bytes) {
899                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
900                               bytes);
901                         msg->aux_size = bytes;
902                         ioq->fifo_beg += bytes;
903                         if (ioq->fifo_cdx < ioq->fifo_beg)
904                                 ioq->fifo_cdx = ioq->fifo_beg;
905                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
906                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
907                         bytes = 0;
908                 } else {
909                         msg->aux_size = 0;
910                 }
911                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
912                 /* fall through */
913         case DMSG_MSGQ_STATE_AUXDATA2:
914                 /*
915                  * Make sure there is enough room for more data.
916                  */
917                 assert(msg);
918                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
919
920                 /*
921                  * Read and decrypt more of the payload.
922                  */
923                 if (msg->aux_size < ioq->abytes) {
924                         assert(nmax > 0);
925                         assert(bytes == 0);
926                         n = read(iocom->sock_fd,
927                                  ioq->buf + ioq->fifo_end,
928                                  nmax);
929                         if (n <= 0) {
930                                 if (n == 0) {
931                                         ioq->error = DMSG_IOQ_ERROR_EOF;
932                                         break;
933                                 }
934                                 if (errno != EINTR &&
935                                     errno != EINPROGRESS &&
936                                     errno != EAGAIN) {
937                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
938                                         break;
939                                 }
940                                 n = 0;
941                                 /* fall through */
942                         }
943                         ioq->fifo_end += (size_t)n;
944                         nmax -= (size_t)n;
945                 }
946
947                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
948                         dmsg_crypto_decrypt(iocom, ioq);
949                 } else {
950                         ioq->fifo_cdx = ioq->fifo_end;
951                         ioq->fifo_cdn = ioq->fifo_end;
952                 }
953                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
954
955                 if (bytes > ioq->abytes - msg->aux_size)
956                         bytes = ioq->abytes - msg->aux_size;
957
958                 if (bytes) {
959                         bcopy(ioq->buf + ioq->fifo_beg,
960                               msg->aux_data + msg->aux_size,
961                               bytes);
962                         msg->aux_size += bytes;
963                         ioq->fifo_beg += bytes;
964                 }
965
966                 /*
967                  * Insufficient data accumulated (set msg NULL so caller will
968                  * retry on event).
969                  *
970                  * Assert the auxillary data size is correct, then record the
971                  * original unaligned size from the message header.
972                  */
973                 if (msg->aux_size < ioq->abytes) {
974                         msg = NULL;
975                         break;
976                 }
977                 assert(msg->aux_size == ioq->abytes);
978                 msg->aux_size = ioq->unaligned_aux_size;
979
980                 /*
981                  * Check aux_crc, then we are done.  Note that the crc
982                  * is calculated over the aligned size, not the actual
983                  * size.
984                  */
985                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
986                 if (xcrc32 != msg->any.head.aux_crc) {
987                         ioq->error = DMSG_IOQ_ERROR_ACRC;
988                         dmio_printf(iocom, 1,
989                                     "iocom: ACRC error %08x vs %08x "
990                                     "msgid %016jx msgcmd %08x auxsize %d\n",
991                                     xcrc32,
992                                     msg->any.head.aux_crc,
993                                     (intmax_t)msg->any.head.msgid,
994                                     msg->any.head.cmd,
995                                     msg->any.head.aux_bytes);
996                         break;
997                 }
998                 break;
999         case DMSG_MSGQ_STATE_ERROR:
1000                 /*
1001                  * Continued calls to drain recorded transactions (returning
1002                  * a LNK_ERROR for each one), before we return the final
1003                  * LNK_ERROR.
1004                  */
1005                 assert(msg == NULL);
1006                 break;
1007         default:
1008                 /*
1009                  * We don't double-return errors, the caller should not
1010                  * have called us again after getting an error msg.
1011                  */
1012                 assert(0);
1013                 break;
1014         }
1015
1016         /*
1017          * Check the message sequence.  The iv[] should prevent any
1018          * possibility of a replay but we add this check anyway.
1019          */
1020         if (msg && ioq->error == 0) {
1021                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1022                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1023                 } else {
1024                         ++ioq->seq;
1025                 }
1026         }
1027
1028         /*
1029          * Handle error, RREQ, or completion
1030          *
1031          * NOTE: nmax and bytes are invalid at this point, we don't bother
1032          *       to update them when breaking out.
1033          */
1034         if (ioq->error) {
1035 skip:
1036                 /*
1037                  * An unrecoverable error causes all active receive
1038                  * transactions to be terminated with a LNK_ERROR message.
1039                  *
1040                  * Once all active transactions are exhausted we set the
1041                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1042                  * message, which should cause master processing loops to
1043                  * terminate.
1044                  */
1045                 dmio_printf(iocom, 1, "IOQ ERROR %d\n", ioq->error);
1046                 assert(ioq->msg == msg);
1047                 if (msg) {
1048                         dmsg_msg_free(msg);
1049                         ioq->msg = NULL;
1050                         msg = NULL;
1051                 }
1052
1053                 /*
1054                  * No more I/O read processing
1055                  */
1056                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1057
1058                 /*
1059                  * Simulate a remote LNK_ERROR DELETE msg for any open
1060                  * transactions, ending with a final non-transactional
1061                  * LNK_ERROR (that the session can detect) when no
1062                  * transactions remain.
1063                  *
1064                  * NOTE: Temporarily supply state0 and a degenerate cmd
1065                  *       without CREATE set.  The real state will be
1066                  *       assigned in the loop.
1067                  *
1068                  * NOTE: We are simulating a received message using our
1069                  *       side of the state, so the DMSGF_REV* bits have
1070                  *       to be reversed.
1071                  */
1072                 pthread_mutex_lock(&iocom->mtx);
1073                 dmsg_iocom_drain(iocom);
1074                 dmsg_simulate_failure(&iocom->state0, 0, ioq->error);
1075                 pthread_mutex_unlock(&iocom->mtx);
1076                 if (TAILQ_FIRST(&ioq->msgq))
1077                         goto again;
1078
1079 #if 0
1080                 /*
1081                  * For the iocom error case we want to set RWORK to indicate
1082                  * that more messages might be pending.
1083                  *
1084                  * It is possible to return NULL when there is more work to
1085                  * do because each message has to be DELETEd in both
1086                  * directions before we continue on with the next (though
1087                  * this could be optimized).  The transmit direction will
1088                  * re-set RWORK.
1089                  */
1090                 if (msg)
1091                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1092 #endif
1093         } else if (msg == NULL) {
1094                 /*
1095                  * Insufficient data received to finish building the message,
1096                  * set RREQ and return NULL.
1097                  *
1098                  * Leave ioq->msg intact.
1099                  * Leave the FIFO intact.
1100                  */
1101                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1102         } else {
1103                 /*
1104                  * Continue processing msg.
1105                  *
1106                  * The fifo has already been advanced past the message.
1107                  * Trivially reset the FIFO indices if possible.
1108                  *
1109                  * clear the FIFO if it is now empty and set RREQ to wait
1110                  * for more from the socket.  If the FIFO is not empty set
1111                  * TWORK to bypass the poll so we loop immediately.
1112                  */
1113                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1114                     ioq->fifo_cdn == ioq->fifo_end) {
1115                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1116                         ioq->fifo_cdx = 0;
1117                         ioq->fifo_cdn = 0;
1118                         ioq->fifo_beg = 0;
1119                         ioq->fifo_end = 0;
1120                 } else {
1121                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1122                 }
1123                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1124                 ioq->msg = NULL;
1125
1126                 /*
1127                  * Handle message routing.  Validates non-zero sources
1128                  * and routes message.  Error will be 0 if the message is
1129                  * destined for us.
1130                  *
1131                  * State processing only occurs for messages destined for us.
1132                  */
1133                 dmio_printf(iocom, 5,
1134                             "rxmsg cmd=%08x circ=%016jx\n",
1135                             msg->any.head.cmd,
1136                             (intmax_t)msg->any.head.circuit);
1137
1138                 error = dmsg_state_msgrx(msg, 0);
1139
1140                 if (error) {
1141                         /*
1142                          * Abort-after-closure, throw message away and
1143                          * start reading another.
1144                          */
1145                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1146                                 dmsg_msg_free(msg);
1147                                 goto again;
1148                         }
1149
1150                         /*
1151                          * Process real error and throw away message.
1152                          */
1153                         ioq->error = error;
1154                         goto skip;
1155                 }
1156
1157                 /*
1158                  * No error and not routed
1159                  */
1160                 /* no error, not routed.  Fall through and return msg */
1161         }
1162         return (msg);
1163 }
1164
1165 /*
1166  * Calculate the header and data crc's and write a low-level message to
1167  * the connection.  If aux_crc is non-zero the aux_data crc is already
1168  * assumed to have been set.
1169  *
1170  * A non-NULL msg is added to the queue but not necessarily flushed.
1171  * Calling this function with msg == NULL will get a flush going.
1172  *
1173  * (called from iocom_core only)
1174  */
1175 void
1176 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1177 {
1178         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1179         dmsg_msg_t *msg;
1180         uint32_t xcrc32;
1181         size_t hbytes;
1182         size_t abytes;
1183         dmsg_msg_queue_t tmpq;
1184
1185         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1186         TAILQ_INIT(&tmpq);
1187         pthread_mutex_lock(&iocom->mtx);
1188         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1189                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1190                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1191         }
1192         pthread_mutex_unlock(&iocom->mtx);
1193
1194         /*
1195          * Flush queue, doing all required encryption and CRC generation,
1196          * with the mutex unlocked.
1197          */
1198         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1199                 /*
1200                  * Process terminal connection errors.
1201                  */
1202                 TAILQ_REMOVE(&tmpq, msg, qentry);
1203                 if (ioq->error) {
1204                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1205                         ++ioq->msgcount;
1206                         continue;
1207                 }
1208
1209                 /*
1210                  * Finish populating the msg fields.  The salt ensures that
1211                  * the iv[] array is ridiculously randomized and we also
1212                  * re-seed our PRNG every 32768 messages just to be sure.
1213                  */
1214                 msg->any.head.magic = DMSG_HDR_MAGIC;
1215                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1216                 ++ioq->seq;
1217                 if ((ioq->seq & 32767) == 0) {
1218                         pthread_mutex_lock(&iocom->mtx);
1219                         srandomdev();
1220                         pthread_mutex_unlock(&iocom->mtx);
1221                 }
1222
1223                 /*
1224                  * Calculate aux_crc if 0, then calculate hdr_crc.
1225                  */
1226                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1227                         abytes = DMSG_DOALIGN(msg->aux_size);
1228                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1229                         msg->any.head.aux_crc = xcrc32;
1230                 }
1231                 msg->any.head.aux_bytes = msg->aux_size;
1232
1233                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1234                          DMSG_ALIGN;
1235                 msg->any.head.hdr_crc = 0;
1236                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1237
1238                 /*
1239                  * Enqueue the message (the flush codes handles stream
1240                  * encryption).
1241                  */
1242                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1243                 ++ioq->msgcount;
1244         }
1245         dmsg_iocom_flush2(iocom);
1246 }
1247
1248 /*
1249  * Thread localized, iocom->mtx not held by caller.
1250  *
1251  * (called from iocom_core via iocom_flush1 only)
1252  */
1253 void
1254 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1255 {
1256         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1257         dmsg_msg_t *msg;
1258         ssize_t n;
1259         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1260         size_t nact;
1261         size_t hbytes;
1262         size_t abytes;
1263         size_t hoff;
1264         size_t aoff;
1265         int iovcnt;
1266         int save_errno;
1267
1268         if (ioq->error) {
1269                 dmsg_iocom_drain(iocom);
1270                 return;
1271         }
1272
1273         /*
1274          * Pump messages out the connection by building an iovec.
1275          *
1276          * ioq->hbytes/ioq->abytes tracks how much of the first message
1277          * in the queue has been successfully written out, so we can
1278          * resume writing.
1279          */
1280         iovcnt = 0;
1281         nact = 0;
1282         hoff = ioq->hbytes;
1283         aoff = ioq->abytes;
1284
1285         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1286                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1287                          DMSG_ALIGN;
1288                 abytes = DMSG_DOALIGN(msg->aux_size);
1289                 assert(hoff <= hbytes && aoff <= abytes);
1290
1291                 if (hoff < hbytes) {
1292                         size_t maxlen = hbytes - hoff;
1293                         if (maxlen > sizeof(ioq->buf) / 2)
1294                                 maxlen = sizeof(ioq->buf) / 2;
1295                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1296                         iov[iovcnt].iov_len = maxlen;
1297                         nact += maxlen;
1298                         ++iovcnt;
1299                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1300                             maxlen != hbytes - hoff) {
1301                                 break;
1302                         }
1303                 }
1304                 if (aoff < abytes) {
1305                         size_t maxlen = abytes - aoff;
1306                         if (maxlen > sizeof(ioq->buf) / 2)
1307                                 maxlen = sizeof(ioq->buf) / 2;
1308
1309                         assert(msg->aux_data != NULL);
1310                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1311                         iov[iovcnt].iov_len = maxlen;
1312                         nact += maxlen;
1313                         ++iovcnt;
1314                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1315                             maxlen != abytes - aoff) {
1316                                 break;
1317                         }
1318                 }
1319                 hoff = 0;
1320                 aoff = 0;
1321         }
1322
1323         /*
1324          * Shortcut if no work to do.  Be sure to check for old work still
1325          * pending in the FIFO.
1326          */
1327         if (iovcnt == 0 && ioq->fifo_beg == ioq->fifo_cdx)
1328                 return;
1329
1330         /*
1331          * Encrypt and write the data.  The crypto code will move the
1332          * data into the fifo and adjust the iov as necessary.  If
1333          * encryption is disabled the iov is left alone.
1334          *
1335          * May return a smaller iov (thus a smaller n), with aggregated
1336          * chunks.  May reduce nmax to what fits in the FIFO.
1337          *
1338          * This function sets nact to the number of original bytes now
1339          * encrypted, adding to the FIFO some number of bytes that might
1340          * be greater depending on the crypto mechanic.  iov[] is adjusted
1341          * to point at the FIFO if necessary.
1342          *
1343          * NOTE: nact is the number of bytes eaten from the message.  For
1344          *       encrypted data this is the number of bytes processed for
1345          *       encryption and not necessarily the number of bytes writable.
1346          *       The return value from the writev() is the post-encrypted
1347          *       byte count which might be larger.
1348          *
1349          * NOTE: For direct writes, nact is the return value from the writev().
1350          */
1351         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1352                 /*
1353                  * Make sure the FIFO has a reasonable amount of space
1354                  * left (if not completely full).
1355                  *
1356                  * In this situation we are staging the encrypted message
1357                  * data in the FIFO.  (nact) represents how much plaintext
1358                  * has been staged, (n) represents how much encrypted data
1359                  * has been flushed.  The two are independent of each other.
1360                  */
1361                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1362                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1363                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1364                               ioq->fifo_end - ioq->fifo_beg);
1365                         ioq->fifo_cdx -= ioq->fifo_beg;
1366                         ioq->fifo_cdn -= ioq->fifo_beg;
1367                         ioq->fifo_end -= ioq->fifo_beg;
1368                         ioq->fifo_beg = 0;
1369                 }
1370
1371                 /* 
1372                  * beg .... cdx ............ cdn ............. end
1373                  * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1374                  *
1375                  * Advance fifo_beg on a successful write.
1376                  */
1377                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1378                 n = writev(iocom->sock_fd, iov, iovcnt);
1379                 save_errno = errno;
1380                 if (n > 0) {
1381                         ioq->fifo_beg += n;
1382                         if (ioq->fifo_beg == ioq->fifo_end) {
1383                                 ioq->fifo_beg = 0;
1384                                 ioq->fifo_cdn = 0;
1385                                 ioq->fifo_cdx = 0;
1386                                 ioq->fifo_end = 0;
1387                         }
1388                 }
1389
1390                 /*
1391                  * We don't mess with the nact returned by the crypto_encrypt
1392                  * call, which represents the filling of the FIFO.  (n) tells
1393                  * us how much we were able to write from the FIFO.  The two
1394                  * are different beasts when encrypting.
1395                  */
1396         } else {
1397                 /*
1398                  * In this situation we are not staging the messages to the
1399                  * FIFO but instead writing them directly from the msg
1400                  * structure(s) unencrypted, so (nact) is basically (n).
1401                  */
1402                 n = writev(iocom->sock_fd, iov, iovcnt);
1403                 save_errno = errno;
1404                 if (n > 0)
1405                         nact = n;
1406                 else
1407                         nact = 0;
1408         }
1409
1410         /*
1411          * Clean out the transmit queue based on what we successfully
1412          * encrypted (nact is the plaintext count) and is now in the FIFO.
1413          * ioq->hbytes/abytes represents the portion of the first message
1414          * previously sent.
1415          */
1416         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1417                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1418                          DMSG_ALIGN;
1419                 abytes = DMSG_DOALIGN(msg->aux_size);
1420
1421                 if ((size_t)nact < hbytes - ioq->hbytes) {
1422                         ioq->hbytes += nact;
1423                         nact = 0;
1424                         break;
1425                 }
1426                 nact -= hbytes - ioq->hbytes;
1427                 ioq->hbytes = hbytes;
1428                 if ((size_t)nact < abytes - ioq->abytes) {
1429                         ioq->abytes += nact;
1430                         nact = 0;
1431                         break;
1432                 }
1433                 nact -= abytes - ioq->abytes;
1434                 /* ioq->abytes = abytes; optimized out */
1435
1436                 dmio_printf(iocom, 5,
1437                             "txmsg cmd=%08x circ=%016jx\n",
1438                             msg->any.head.cmd,
1439                             (intmax_t)msg->any.head.circuit);
1440
1441 #ifdef DMSG_BLOCK_DEBUG
1442                 uint32_t tcmd;
1443
1444                 if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
1445                         if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
1446                                 tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
1447                                             (msg->any.head.cmd & (DMSGF_CREATE |
1448                                                                   DMSGF_DELETE |
1449                                                                   DMSGF_REPLY));
1450                         } else {
1451                                 tcmd = 0;
1452                         }
1453                 } else {
1454                         tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
1455                 }
1456
1457                 switch (tcmd) {
1458                 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
1459                 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
1460                         dmio_printf(iocom, 4,
1461                                     "write BIO %-3d %016jx %d@%016jx\n",
1462                                     biocount, msg->any.head.msgid,
1463                                     msg->any.blk_read.bytes,
1464                                     msg->any.blk_read.offset);
1465                         break;
1466                 case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1467                 case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
1468                         dmio_printf(iocom, 4,
1469                                     "wretr BIO %-3d %016jx %d@%016jx\n",
1470                                     biocount, msg->any.head.msgid,
1471                                     msg->any.blk_read.bytes,
1472                                     msg->any.blk_read.offset);
1473                         break;
1474                 default:
1475                         break;
1476                 }
1477 #endif
1478
1479                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1480                 --ioq->msgcount;
1481                 ioq->hbytes = 0;
1482                 ioq->abytes = 0;
1483                 dmsg_msg_free(msg);
1484         }
1485         assert(nact == 0);
1486
1487         /*
1488          * Process the return value from the write w/regards to blocking.
1489          */
1490         if (n < 0) {
1491                 if (save_errno != EINTR &&
1492                     save_errno != EINPROGRESS &&
1493                     save_errno != EAGAIN) {
1494                         /*
1495                          * Fatal write error
1496                          */
1497                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1498                         dmsg_iocom_drain(iocom);
1499                 } else {
1500                         /*
1501                          * Wait for socket buffer space, do not try to
1502                          * process more packets for transmit until space
1503                          * is available.
1504                          */
1505                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1506                 }
1507         } else if (TAILQ_FIRST(&ioq->msgq) ||
1508                    TAILQ_FIRST(&iocom->txmsgq) ||
1509                    ioq->fifo_beg != ioq->fifo_cdx) {
1510                 /*
1511                  * If the write succeeded and more messages are pending
1512                  * in either msgq, or the FIFO WWORK must remain set.
1513                  */
1514                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
1515         }
1516         /* else no transmit-side work remains */
1517
1518         if (ioq->error) {
1519                 dmsg_iocom_drain(iocom);
1520         }
1521 }
1522
1523 /*
1524  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1525  * write events will occur.  We don't kill read msgs because we want
1526  * the caller to pull off our contrived terminal error msg to detect
1527  * the connection failure.
1528  *
1529  * Localized to iocom_core thread, iocom->mtx not held by caller.
1530  */
1531 void
1532 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1533 {
1534         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1535         dmsg_msg_t *msg;
1536
1537         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1538         ioq->hbytes = 0;
1539         ioq->abytes = 0;
1540
1541         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1542                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1543                 --ioq->msgcount;
1544                 dmsg_msg_free(msg);
1545         }
1546 }
1547
1548 /*
1549  * Write a message to an iocom, with additional state processing.
1550  */
1551 void
1552 dmsg_msg_write(dmsg_msg_t *msg)
1553 {
1554         dmsg_iocom_t *iocom = msg->state->iocom;
1555         dmsg_state_t *state;
1556         char dummy;
1557
1558         pthread_mutex_lock(&iocom->mtx);
1559         state = msg->state;
1560
1561         dmio_printf(iocom, 5,
1562                     "msgtx: cmd=%08x msgid=%016jx "
1563                     "state %p(%08x) error=%d\n",
1564                     msg->any.head.cmd, msg->any.head.msgid,
1565                     state, (state ? state->icmd : 0),
1566                     msg->any.head.error);
1567
1568
1569 #if 0
1570         /*
1571          * Make sure the parent transaction is still open in the transmit
1572          * direction.  If it isn't the message is dead and we have to
1573          * potentially simulate a rxmsg terminating the transaction.
1574          */
1575         if ((state->parent->txcmd & DMSGF_DELETE) ||
1576             (state->parent->rxcmd & DMSGF_DELETE)) {
1577                 dmio_printf(iocom, 4, "dmsg_msg_write: EARLY TERMINATION\n");
1578                 dmsg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1579                 dmsg_state_cleanuptx(iocom, msg);
1580                 dmsg_msg_free(msg);
1581                 pthread_mutex_unlock(&iocom->mtx);
1582                 return;
1583         }
1584 #endif
1585         /*
1586          * Process state data into the message as needed, then update the
1587          * state based on the message.
1588          */
1589         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1590                 /*
1591                  * Existing transaction (could be reply).  It is also
1592                  * possible for this to be the first reply (CREATE is set),
1593                  * in which case we populate state->txcmd.
1594                  *
1595                  * state->txcmd is adjusted to hold the final message cmd,
1596                  * and we also be sure to set the CREATE bit here.  We did
1597                  * not set it in dmsg_msg_alloc() because that would have
1598                  * not been serialized (state could have gotten ripped out
1599                  * from under the message prior to it being transmitted).
1600                  */
1601                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1602                     DMSGF_CREATE) {
1603                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1604                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1605                         state->flags &= ~DMSG_STATE_NEW;
1606                 }
1607                 msg->any.head.msgid = state->msgid;
1608
1609                 if (msg->any.head.cmd & DMSGF_CREATE) {
1610                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1611                 }
1612         }
1613
1614         /*
1615          * Discard messages sent to transactions which are already dead.
1616          */
1617         if (state && (state->txcmd & DMSGF_DELETE)) {
1618                 dmio_printf(iocom, 4,
1619                             "dmsg_msg_write: drop msg %08x to dead "
1620                             "circuit state=%p\n",
1621                             msg->any.head.cmd, state);
1622                 dmsg_msg_free(msg);
1623                 return;
1624         }
1625
1626         /*
1627          * Normally we queue the msg for output.  However, if the circuit is
1628          * dead or dying we must simulate a failure in the return direction
1629          * and throw the message away.  The other end is not expecting any
1630          * further messages from us on this state.
1631          *
1632          * Note that the I/O thread is responsible for generating the CRCs
1633          * and encryption.
1634          */
1635         if (state->flags & DMSG_STATE_DYING) {
1636 #if 0
1637         if ((state->parent->txcmd & DMSGF_DELETE) ||
1638             (state->parent->flags & DMSG_STATE_DYING) ||
1639             (state->flags & DMSG_STATE_DYING)) {
1640 #endif
1641                 /* 
1642                  * Illegal message, kill state and related sub-state.
1643                  * Cannot transmit if state is already dying.
1644                  */
1645                 dmio_printf(iocom, 4,
1646                             "dmsg_msg_write: Write to dying circuit "
1647                             "ptxcmd=%08x prxcmd=%08x flags=%08x\n",
1648                             state->parent->rxcmd,
1649                             state->parent->txcmd,
1650                             state->parent->flags);
1651                 dmsg_state_hold(state);
1652                 dmsg_state_cleanuptx(iocom, msg);
1653                 if ((state->flags & DMSG_STATE_ABORTING) == 0) {
1654                         dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
1655                 }
1656                 dmsg_state_drop(state);
1657                 dmsg_msg_free(msg);
1658         } else {
1659                 /*
1660                  * Queue the message, clean up transmit state prior to queueing
1661                  * to avoid SMP races.
1662                  */
1663                 dmio_printf(iocom, 5,
1664                             "dmsg_msg_write: commit msg state=%p to txkmsgq\n",
1665                             state);
1666                 dmsg_state_cleanuptx(iocom, msg);
1667                 TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1668                 dummy = 0;
1669                 write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1670         }
1671         pthread_mutex_unlock(&iocom->mtx);
1672 }
1673
1674 /*
1675  * Remove state from its parent's subq.  This can wind up recursively
1676  * dropping the parent upward.
1677  *
1678  * NOTE: iocom must be locked.
1679  *
1680  * NOTE: Once we drop the parent, our pstate pointer may become invalid.
1681  */
1682 static
1683 void
1684 dmsg_subq_delete(dmsg_state_t *state)
1685 {
1686         dmsg_state_t *pstate;
1687
1688         if (state->flags & DMSG_STATE_SUBINSERTED) {
1689                 pstate = state->parent;
1690                 assert(pstate);
1691                 if (pstate->scan == state)
1692                         pstate->scan = NULL;
1693                 TAILQ_REMOVE(&pstate->subq, state, entry);
1694                 state->flags &= ~DMSG_STATE_SUBINSERTED;
1695                 state->parent = NULL;
1696                 if (TAILQ_EMPTY(&pstate->subq))
1697                         dmsg_state_drop(pstate);/* pstate->subq */
1698                 pstate = NULL;                  /* safety */
1699                 dmsg_state_drop(state);         /* pstate->subq */
1700         } else {
1701                 assert(state->parent == NULL);
1702         }
1703 }
1704
1705 /*
1706  * Simulate reception of a transaction DELETE message when the link goes
1707  * bad.  This routine must recurse through state->subq and generate messages
1708  * and callbacks bottom-up.
1709  *
1710  * iocom->mtx must be held by caller.
1711  */
1712 static
1713 void
1714 dmsg_simulate_failure(dmsg_state_t *state, int meto, int error)
1715 {
1716         dmsg_state_t *substate;
1717
1718         dmsg_state_hold(state);
1719         if (meto)
1720                 dmsg_state_abort(state);
1721
1722         /*
1723          * Recurse through sub-states.
1724          */
1725 again:
1726         TAILQ_FOREACH(substate, &state->subq, entry) {
1727                 if (substate->flags & DMSG_STATE_ABORTING)
1728                         continue;
1729                 state->scan = substate;
1730                 dmsg_simulate_failure(substate, 1, error);
1731                 if (state->scan != substate)
1732                         goto again;
1733         }
1734
1735         dmsg_state_drop(state);
1736 }
1737
1738 static
1739 void
1740 dmsg_state_abort(dmsg_state_t *state)
1741 {
1742         dmsg_iocom_t *iocom;
1743         dmsg_msg_t *msg;
1744
1745         /*
1746          * Set ABORTING and DYING, return if already set.  If the state was
1747          * just allocated we defer the abort operation until the related
1748          * message is processed.
1749          */
1750         if (state->flags & DMSG_STATE_ABORTING)
1751                 return;
1752         state->flags |= DMSG_STATE_ABORTING;
1753         dmsg_state_dying(state);
1754         if (state->flags & DMSG_STATE_NEW) {
1755                 dmio_printf(iocom, 4,
1756                             "dmsg_state_abort(0): state %p rxcmd %08x "
1757                             "txcmd %08x flags %08x - in NEW state\n",
1758                             state, state->rxcmd,
1759                             state->txcmd, state->flags);
1760                 return;
1761         }
1762
1763         /*
1764          * Simulate parent state failure before child states.  Device
1765          * drivers need to understand this and flag the situation but might
1766          * have asynchronous operations in progress that they cannot stop.
1767          * To make things easier, parent states will not actually disappear
1768          * until the children are all gone.
1769          */
1770         if ((state->rxcmd & DMSGF_DELETE) == 0) {
1771                 dmio_printf(iocom, 5,
1772                             "dmsg_state_abort() on state %p\n",
1773                             state);
1774                 msg = dmsg_msg_alloc_locked(state, 0, DMSG_LNK_ERROR,
1775                                             NULL, NULL);
1776                 if ((state->rxcmd & DMSGF_CREATE) == 0)
1777                         msg->any.head.cmd |= DMSGF_CREATE;
1778                 msg->any.head.cmd |= DMSGF_DELETE |
1779                                      (state->rxcmd & DMSGF_REPLY);
1780                 msg->any.head.cmd ^= (DMSGF_REVTRANS | DMSGF_REVCIRC);
1781                 msg->any.head.error = DMSG_ERR_LOSTLINK;
1782                 msg->any.head.cmd |= DMSGF_ABORT;
1783
1784                 /*
1785                  * Issue callback synchronously even though this isn't
1786                  * the receiver thread.  We need to issue the callback
1787                  * before removing state from the subq in order to allow
1788                  * the callback to reply.
1789                  */
1790                 iocom = state->iocom;
1791                 dmsg_state_msgrx(msg, 1);
1792                 pthread_mutex_unlock(&iocom->mtx);
1793                 iocom->rcvmsg_callback(msg);
1794                 pthread_mutex_lock(&iocom->mtx);
1795                 dmsg_state_cleanuprx(iocom, msg);
1796 #if 0
1797                 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1798                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1799 #endif
1800         }
1801 }
1802
1803
1804 /*
1805  * Recursively sets DMSG_STATE_DYING on state and all sub-states, preventing
1806  * the transmission of any new messages on these states.  This is done
1807  * atomically when parent state is terminating, whereas setting ABORTING is
1808  * not atomic and can leak races.
1809  */
1810 static
1811 void
1812 dmsg_state_dying(dmsg_state_t *state)
1813 {
1814         dmsg_state_t *scan;
1815
1816         if ((state->flags & DMSG_STATE_DYING) == 0) {
1817                 state->flags |= DMSG_STATE_DYING;
1818                 TAILQ_FOREACH(scan, &state->subq, entry)
1819                         dmsg_state_dying(scan);
1820         }
1821 }
1822
1823 /*
1824  * This is a shortcut to formulate a reply to msg with a simple error code,
1825  * It can reply to and terminate a transaction, or it can reply to a one-way
1826  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1827  * the error code (which can be 0).  Not all transactions are terminated
1828  * with DMSG_LNK_ERROR status (the low level only cares about the
1829  * MSGF_DELETE flag), but most are.
1830  *
1831  * Replies to one-way messages are a bit of an oxymoron but the feature
1832  * is used by the debug (DBG) protocol.
1833  *
1834  * The reply contains no extended data.
1835  */
1836 void
1837 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1838 {
1839         dmsg_state_t *state = msg->state;
1840         dmsg_msg_t *nmsg;
1841         uint32_t cmd;
1842
1843         /*
1844          * Reply with a simple error code and terminate the transaction.
1845          */
1846         cmd = DMSG_LNK_ERROR;
1847
1848         /*
1849          * Check if our direction has even been initiated yet, set CREATE.
1850          *
1851          * Check what direction this is (command or reply direction).  Note
1852          * that txcmd might not have been initiated yet.
1853          *
1854          * If our direction has already been closed we just return without
1855          * doing anything.
1856          */
1857         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1858                 if (state->txcmd & DMSGF_DELETE)
1859                         return;
1860                 if (state->txcmd & DMSGF_REPLY)
1861                         cmd |= DMSGF_REPLY;
1862                 cmd |= DMSGF_DELETE;
1863         } else {
1864                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1865                         cmd |= DMSGF_REPLY;
1866         }
1867
1868         /*
1869          * Allocate the message and associate it with the existing state.
1870          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1871          * allocate new state.  We have our state already.
1872          */
1873         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1874         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1875                 if ((state->txcmd & DMSGF_CREATE) == 0)
1876                         nmsg->any.head.cmd |= DMSGF_CREATE;
1877         }
1878         nmsg->any.head.error = error;
1879
1880         dmsg_msg_write(nmsg);
1881 }
1882
1883 /*
1884  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1885  * we are generating a streaming reply or an intermediate acknowledgement
1886  * of some sort as part of the higher level protocol, with more to come
1887  * later.
1888  */
1889 void
1890 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1891 {
1892         dmsg_state_t *state = msg->state;
1893         dmsg_msg_t *nmsg;
1894         uint32_t cmd;
1895
1896
1897         /*
1898          * Reply with a simple error code and terminate the transaction.
1899          */
1900         cmd = DMSG_LNK_ERROR;
1901
1902         /*
1903          * Check if our direction has even been initiated yet, set CREATE.
1904          *
1905          * Check what direction this is (command or reply direction).  Note
1906          * that txcmd might not have been initiated yet.
1907          *
1908          * If our direction has already been closed we just return without
1909          * doing anything.
1910          */
1911         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1912                 if (state->txcmd & DMSGF_DELETE)
1913                         return;
1914                 if (state->txcmd & DMSGF_REPLY)
1915                         cmd |= DMSGF_REPLY;
1916                 /* continuing transaction, do not set MSGF_DELETE */
1917         } else {
1918                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1919                         cmd |= DMSGF_REPLY;
1920         }
1921         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1922         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1923                 if ((state->txcmd & DMSGF_CREATE) == 0)
1924                         nmsg->any.head.cmd |= DMSGF_CREATE;
1925         }
1926         nmsg->any.head.error = error;
1927
1928         dmsg_msg_write(nmsg);
1929 }
1930
1931 /*
1932  * Terminate a transaction given a state structure by issuing a DELETE.
1933  * (the state structure must not be &iocom->state0)
1934  */
1935 void
1936 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1937 {
1938         dmsg_msg_t *nmsg;
1939         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1940
1941         /*
1942          * Nothing to do if we already transmitted a delete
1943          */
1944         if (state->txcmd & DMSGF_DELETE)
1945                 return;
1946
1947         /*
1948          * Set REPLY if the other end initiated the command.  Otherwise
1949          * we are the command direction.
1950          */
1951         if (state->txcmd & DMSGF_REPLY)
1952                 cmd |= DMSGF_REPLY;
1953
1954         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1955         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1956                 if ((state->txcmd & DMSGF_CREATE) == 0)
1957                         nmsg->any.head.cmd |= DMSGF_CREATE;
1958         }
1959         nmsg->any.head.error = error;
1960         dmsg_msg_write(nmsg);
1961 }
1962
1963 /*
1964  * Terminate a transaction given a state structure by issuing a DELETE.
1965  * (the state structure must not be &iocom->state0)
1966  */
1967 void
1968 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1969 {
1970         dmsg_msg_t *nmsg;
1971         uint32_t cmd = DMSG_LNK_ERROR;
1972
1973         /*
1974          * Nothing to do if we already transmitted a delete
1975          */
1976         if (state->txcmd & DMSGF_DELETE)
1977                 return;
1978
1979         /*
1980          * Set REPLY if the other end initiated the command.  Otherwise
1981          * we are the command direction.
1982          */
1983         if (state->txcmd & DMSGF_REPLY)
1984                 cmd |= DMSGF_REPLY;
1985
1986         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1987         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1988                 if ((state->txcmd & DMSGF_CREATE) == 0)
1989                         nmsg->any.head.cmd |= DMSGF_CREATE;
1990         }
1991         nmsg->any.head.error = error;
1992         dmsg_msg_write(nmsg);
1993 }
1994
1995 /************************************************************************
1996  *                      TRANSACTION STATE HANDLING                      *
1997  ************************************************************************
1998  *
1999  */
2000
2001 /*
2002  * Process state tracking for a message after reception, prior to execution.
2003  * Possibly route the message (consuming it).
2004  *
2005  * Called with msglk held and the msg dequeued.
2006  *
2007  * All messages are called with dummy state and return actual state.
2008  * (One-off messages often just return the same dummy state).
2009  *
2010  * May request that caller discard the message by setting *discardp to 1.
2011  * The returned state is not used in this case and is allowed to be NULL.
2012  *
2013  * --
2014  *
2015  * These routines handle persistent and command/reply message state via the
2016  * CREATE and DELETE flags.  The first message in a command or reply sequence
2017  * sets CREATE, the last message in a command or reply sequence sets DELETE.
2018  *
2019  * There can be any number of intermediate messages belonging to the same
2020  * sequence sent inbetween the CREATE message and the DELETE message,
2021  * which set neither flag.  This represents a streaming command or reply.
2022  *
2023  * Any command message received with CREATE set expects a reply sequence to
2024  * be returned.  Reply sequences work the same as command sequences except the
2025  * REPLY bit is also sent.  Both the command side and reply side can
2026  * degenerate into a single message with both CREATE and DELETE set.  Note
2027  * that one side can be streaming and the other side not, or neither, or both.
2028  *
2029  * The msgid is unique for the initiator.  That is, two sides sending a new
2030  * message can use the same msgid without colliding.
2031  *
2032  * --
2033  *
2034  * The message may be running over a circuit.  If the circuit is half-deleted
2035  * The message is typically racing against a link failure and must be thrown
2036  * out.  As the circuit deletion propagates the library will automatically
2037  * generate terminations for sub states.
2038  *
2039  * --
2040  *
2041  * ABORT sequences work by setting the ABORT flag along with normal message
2042  * state.  However, ABORTs can also be sent on half-closed messages, that is
2043  * even if the command or reply side has already sent a DELETE, as long as
2044  * the message has not been fully closed it can still send an ABORT+DELETE
2045  * to terminate the half-closed message state.
2046  *
2047  * Since ABORT+DELETEs can race we silently discard ABORT's for message
2048  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
2049  * also race, and in this situation the other side might have already
2050  * initiated a new unrelated command with the same message id.  Since
2051  * the abort has not set the CREATE flag the situation can be detected
2052  * and the message will also be discarded.
2053  *
2054  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
2055  * The ABORT request is essentially integrated into the command instead
2056  * of being sent later on.  In this situation the command implementation
2057  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
2058  * special-case non-blocking operation for the command.
2059  *
2060  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
2061  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
2062  *        one-way messages are not supported.
2063  *
2064  * NOTE!  If a command sequence does not support aborts the ABORT flag is
2065  *        simply ignored.
2066  *
2067  * --
2068  *
2069  * One-off messages (no reply expected) are sent without an established
2070  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
2071  * For one-off messages sent over circuits msgid generally MUST be 0.
2072  *
2073  * One-off messages cannot be aborted and typically aren't processed
2074  * by these routines.  Order is still guaranteed for messages sent over
2075  * the same circuit.  The REPLY bit can be used to distinguish whether
2076  * a one-off message is a command or reply.  For example, one-off replies
2077  * will typically just contain status updates.
2078  */
2079 static int
2080 dmsg_state_msgrx(dmsg_msg_t *msg, int mstate)
2081 {
2082         dmsg_iocom_t *iocom = msg->state->iocom;
2083         dmsg_state_t *state;
2084         dmsg_state_t *pstate;
2085         dmsg_state_t sdummy;
2086         int error;
2087
2088         pthread_mutex_lock(&iocom->mtx);
2089
2090         if (DMsgDebugOpt) {
2091                 dmio_printf(iocom, 5,
2092                             "msgrx: cmd=%08x msgid=%016jx "
2093                             "circuit=%016jx error=%d\n",
2094                             msg->any.head.cmd,
2095                             msg->any.head.msgid,
2096                             msg->any.head.circuit,
2097                             msg->any.head.error);
2098         }
2099
2100         /*
2101          * Lookup the circuit (pstate).  The circuit will be an open
2102          * transaction.  The REVCIRC bit in the message tells us which side
2103          * initiated it.
2104          *
2105          * If mstate is non-zero the state has already been incorporated
2106          * into the message as part of a simulated abort.  Note that in this
2107          * situation the parent state may have already been removed from
2108          * the RBTREE.
2109          */
2110         if (mstate) {
2111                 pstate = msg->state->parent;
2112         } else if (msg->any.head.circuit) {
2113                 sdummy.msgid = msg->any.head.circuit;
2114
2115                 if (msg->any.head.cmd & DMSGF_REVCIRC) {
2116                         pstate = RB_FIND(dmsg_state_tree,
2117                                          &iocom->statewr_tree,
2118                                          &sdummy);
2119                 } else {
2120                         pstate = RB_FIND(dmsg_state_tree,
2121                                          &iocom->staterd_tree,
2122                                          &sdummy);
2123                 }
2124
2125                 /*
2126                  * If we cannot find the circuit throw the message away.
2127                  * The state will have already been taken care of by
2128                  * the simulated failure code.  This case can occur due
2129                  * to a failure propagating in one direction crossing a
2130                  * request on the failed circuit propagating in the other
2131                  * direction.
2132                  */
2133                 if (pstate == NULL) {
2134                         dmio_printf(iocom, 4,
2135                                     "missing parent in stacked trans %s\n",
2136                                     dmsg_msg_str(msg));
2137                         pthread_mutex_unlock(&iocom->mtx);
2138                         error = DMSG_IOQ_ERROR_EALREADY;
2139
2140                         return error;
2141                 }
2142         } else {
2143                 pstate = &iocom->state0;
2144         }
2145         /* WARNING: pstate not (yet) refd */
2146
2147         /*
2148          * Lookup the msgid.
2149          *
2150          * If mstate is non-zero the state has already been incorporated
2151          * into the message as part of a simulated abort.  Note that in this
2152          * situation the state may have already been removed from the RBTREE.
2153          *
2154          * If received msg is a command state is on staterd_tree.
2155          * If received msg is a reply state is on statewr_tree.
2156          * Otherwise there is no state (retain &iocom->state0)
2157          */
2158         if (mstate) {
2159                 state = msg->state;
2160         } else {
2161                 sdummy.msgid = msg->any.head.msgid;
2162                 if (msg->any.head.cmd & DMSGF_REVTRANS) {
2163                         state = RB_FIND(dmsg_state_tree,
2164                                         &iocom->statewr_tree, &sdummy);
2165                 } else {
2166                         state = RB_FIND(dmsg_state_tree,
2167                                         &iocom->staterd_tree, &sdummy);
2168                 }
2169         }
2170
2171         if (DMsgDebugOpt) {
2172                 dmio_printf(iocom, 5, "msgrx:\tstate %p(%08x)",
2173                             state, (state ? state->icmd : 0));
2174                 if (pstate != &iocom->state0) {
2175                         dmio_printf(iocom, 5,
2176                                     " pstate %p(%08x)",
2177                                     pstate, pstate->icmd);
2178                 }
2179                 dmio_printf(iocom, 5, "%s\n", "");
2180         }
2181
2182         if (mstate) {
2183                 /* state already assigned to msg */
2184         } else if (state) {
2185                 /*
2186                  * Message over an existing transaction (CREATE should not
2187                  * be set).
2188                  */
2189                 dmsg_state_drop(msg->state);
2190                 dmsg_state_hold(state);
2191                 msg->state = state;
2192                 assert(pstate == state->parent);
2193         } else {
2194                 /*
2195                  * Either a new transaction (if CREATE set) or a one-off.
2196                  */
2197                 state = pstate;
2198         }
2199
2200         /*
2201          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2202          * inside the case statements.
2203          *
2204          * Construct new state as necessary.
2205          */
2206         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2207                                     DMSGF_REPLY)) {
2208         case DMSGF_CREATE:
2209         case DMSGF_CREATE | DMSGF_DELETE:
2210                 /*
2211                  * Create new sub-transaction under pstate.
2212                  * (any DELETE is handled in post-processing of msg).
2213                  *
2214                  * (During routing the msgid was made unique for this
2215                  * direction over the comlink, so our RB trees can be
2216                  * iocom-based instead of state-based).
2217                  */
2218                 if (state != pstate) {
2219                         dmio_printf(iocom, 2,
2220                                     "duplicate transaction %s\n",
2221                                     dmsg_msg_str(msg));
2222                         error = DMSG_IOQ_ERROR_TRANS;
2223                         assert(0);
2224                         break;
2225                 }
2226
2227                 /*
2228                  * Allocate the new state.
2229                  */
2230                 state = malloc(sizeof(*state));
2231                 bzero(state, sizeof(*state));
2232                 atomic_add_int(&dmsg_state_count, 1);
2233
2234                 TAILQ_INIT(&state->subq);
2235                 dmsg_state_hold(pstate);
2236                 state->parent = pstate;
2237                 state->iocom = iocom;
2238                 state->flags = DMSG_STATE_DYNAMIC |
2239                                DMSG_STATE_OPPOSITE;
2240                 state->msgid = msg->any.head.msgid;
2241                 state->txcmd = DMSGF_REPLY;
2242                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2243                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2244                 state->flags &= ~DMSG_STATE_NEW;
2245                 msg->state = state;
2246
2247                 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2248                 if (TAILQ_EMPTY(&pstate->subq))
2249                         dmsg_state_hold(pstate);/* pstate->subq */
2250                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2251                 state->flags |= DMSG_STATE_SUBINSERTED |
2252                                 DMSG_STATE_RBINSERTED;
2253                 dmsg_state_hold(state);         /* pstate->subq */
2254                 dmsg_state_hold(state);         /* state on rbtree */
2255                 dmsg_state_hold(state);         /* msg->state */
2256
2257                 /*
2258                  * If the parent is a relay set up the state handler to
2259                  * automatically route the message.  Local processing will
2260                  * not occur if set.
2261                  *
2262                  * (state relays are seeded by SPAN processing)
2263                  */
2264                 if (pstate->relay)
2265                         state->func = dmsg_state_relay;
2266                 error = 0;
2267                 break;
2268         case DMSGF_DELETE:
2269                 /*
2270                  * Persistent state is expected but might not exist if an
2271                  * ABORT+DELETE races the close.
2272                  *
2273                  * (any DELETE is handled in post-processing of msg).
2274                  */
2275                 if (state == pstate) {
2276                         if (msg->any.head.cmd & DMSGF_ABORT) {
2277                                 error = DMSG_IOQ_ERROR_EALREADY;
2278                         } else {
2279                                 dmio_printf(iocom, 2,
2280                                             "missing-state %s\n",
2281                                             dmsg_msg_str(msg));
2282                                 error = DMSG_IOQ_ERROR_TRANS;
2283                                 assert(0);
2284                         }
2285                         break;
2286                 }
2287
2288                 /*
2289                  * Handle another ABORT+DELETE case if the msgid has already
2290                  * been reused.
2291                  */
2292                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2293                         if (msg->any.head.cmd & DMSGF_ABORT) {
2294                                 error = DMSG_IOQ_ERROR_EALREADY;
2295                         } else {
2296                                 dmio_printf(iocom, 2,
2297                                             "reused-state %s\n",
2298                                             dmsg_msg_str(msg));
2299                                 error = DMSG_IOQ_ERROR_TRANS;
2300                                 assert(0);
2301                         }
2302                         break;
2303                 }
2304                 error = 0;
2305                 break;
2306         default:
2307                 /*
2308                  * Check for mid-stream ABORT command received, otherwise
2309                  * allow.
2310                  */
2311                 if (msg->any.head.cmd & DMSGF_ABORT) {
2312                         if ((state == pstate) ||
2313                             (state->rxcmd & DMSGF_CREATE) == 0) {
2314                                 error = DMSG_IOQ_ERROR_EALREADY;
2315                                 break;
2316                         }
2317                 }
2318                 error = 0;
2319                 break;
2320         case DMSGF_REPLY | DMSGF_CREATE:
2321         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2322                 /*
2323                  * When receiving a reply with CREATE set the original
2324                  * persistent state message should already exist.
2325                  */
2326                 if (state == pstate) {
2327                         dmio_printf(iocom, 2, "no-state(r) %s\n",
2328                                     dmsg_msg_str(msg));
2329                         error = DMSG_IOQ_ERROR_TRANS;
2330                         assert(0);
2331                         break;
2332                 }
2333                 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2334                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2335                 error = 0;
2336                 break;
2337         case DMSGF_REPLY | DMSGF_DELETE:
2338                 /*
2339                  * Received REPLY+ABORT+DELETE in case where msgid has
2340                  * already been fully closed, ignore the message.
2341                  */
2342                 if (state == pstate) {
2343                         if (msg->any.head.cmd & DMSGF_ABORT) {
2344                                 error = DMSG_IOQ_ERROR_EALREADY;
2345                         } else {
2346                                 dmio_printf(iocom, 2,
2347                                             "no-state(r,d) %s\n",
2348                                             dmsg_msg_str(msg));
2349                                 error = DMSG_IOQ_ERROR_TRANS;
2350                                 assert(0);
2351                         }
2352                         break;
2353                 }
2354
2355                 /*
2356                  * Received REPLY+ABORT+DELETE in case where msgid has
2357                  * already been reused for an unrelated message,
2358                  * ignore the message.
2359                  */
2360                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2361                         if (msg->any.head.cmd & DMSGF_ABORT) {
2362                                 error = DMSG_IOQ_ERROR_EALREADY;
2363                         } else {
2364                                 dmio_printf(iocom, 2,
2365                                             "reused-state(r,d) %s\n",
2366                                             dmsg_msg_str(msg));
2367                                 error = DMSG_IOQ_ERROR_TRANS;
2368                                 assert(0);
2369                         }
2370                         break;
2371                 }
2372                 error = 0;
2373                 break;
2374         case DMSGF_REPLY:
2375                 /*
2376                  * Check for mid-stream ABORT reply received to sent command.
2377                  */
2378                 if (msg->any.head.cmd & DMSGF_ABORT) {
2379                         if (state == pstate ||
2380                             (state->rxcmd & DMSGF_CREATE) == 0) {
2381                                 error = DMSG_IOQ_ERROR_EALREADY;
2382                                 break;
2383                         }
2384                 }
2385                 error = 0;
2386                 break;
2387         }
2388
2389         /*
2390          * Calculate the easy-switch() transactional command.  Represents
2391          * the outer-transaction command for any transaction-create or
2392          * transaction-delete, and the inner message command for any
2393          * non-transaction or inside-transaction command.  tcmd will be
2394          * set to 0 for any messaging error condition.
2395          *
2396          * The two can be told apart because outer-transaction commands
2397          * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2398          */
2399         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2400                 if ((msg->state->flags & DMSG_STATE_ROOT) == 0) {
2401                         msg->tcmd = (state->icmd & DMSGF_BASECMDMASK) |
2402                                     (msg->any.head.cmd & (DMSGF_CREATE |
2403                                                           DMSGF_DELETE |
2404                                                           DMSGF_REPLY));
2405                 } else {
2406                         msg->tcmd = 0;
2407                 }
2408         } else {
2409                 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2410         }
2411
2412 #ifdef DMSG_BLOCK_DEBUG
2413         switch (msg->tcmd) {
2414         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2415         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2416                 dmio_printf(iocom, 4,
2417                             "read  BIO %-3d %016jx %d@%016jx\n",
2418                             biocount, msg->any.head.msgid,
2419                             msg->any.blk_read.bytes,
2420                             msg->any.blk_read.offset);
2421                 break;
2422         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2423         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2424                 dmio_printf(iocom, 4,
2425                             "rread BIO %-3d %016jx %d@%016jx\n",
2426                             biocount, msg->any.head.msgid,
2427                             msg->any.blk_read.bytes,
2428                             msg->any.blk_read.offset);
2429                 break;
2430         default:
2431                 break;
2432         }
2433 #endif
2434
2435         /*
2436          * Adjust state, mark receive side as DELETED if appropriate and
2437          * adjust RB tree if both sides are DELETED.  cleanuprx handles
2438          * the rest after the state callback returns.
2439          */
2440         assert(msg->state->iocom == iocom);
2441         assert(msg->state == state);
2442
2443         if (state->flags & DMSG_STATE_ROOT) {
2444                 /*
2445                  * Nothing to do for non-transactional messages.
2446                  */
2447         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2448                 /*
2449                  * Message terminating transaction, remove the state from
2450                  * the RB tree if the full transaction is now complete.
2451                  * The related state, subq, and parent link is retained
2452                  * until after the state callback is complete.
2453                  */
2454                 assert((state->rxcmd & DMSGF_DELETE) == 0);
2455                 state->rxcmd |= DMSGF_DELETE;
2456                 if (state->txcmd & DMSGF_DELETE) {
2457                         assert(state->flags & DMSG_STATE_RBINSERTED);
2458                         if (state->rxcmd & DMSGF_REPLY) {
2459                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2460                                 RB_REMOVE(dmsg_state_tree,
2461                                           &iocom->statewr_tree, state);
2462                         } else {
2463                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2464                                 RB_REMOVE(dmsg_state_tree,
2465                                           &iocom->staterd_tree, state);
2466                         }
2467                         state->flags &= ~DMSG_STATE_RBINSERTED;
2468                         dmsg_state_drop(state);
2469                 }
2470         }
2471
2472         pthread_mutex_unlock(&iocom->mtx);
2473
2474         if (DMsgDebugOpt && error)
2475                 dmio_printf(iocom, 1, "msgrx: error %d\n", error);
2476
2477         return (error);
2478 }
2479
2480 /*
2481  * Route the message and handle pair-state processing.
2482  */
2483 void
2484 dmsg_state_relay(dmsg_msg_t *lmsg)
2485 {
2486         dmsg_state_t *lpstate;
2487         dmsg_state_t *rpstate;
2488         dmsg_state_t *lstate;
2489         dmsg_state_t *rstate;
2490         dmsg_msg_t *rmsg;
2491
2492 #ifdef DMSG_BLOCK_DEBUG
2493         switch (lmsg->tcmd) {
2494         case DMSG_BLK_OPEN | DMSGF_CREATE:
2495                 dmio_printf(iocom, 4, "%s\n",
2496                             "relay BIO_OPEN (CREATE)");
2497                 break;
2498         case DMSG_BLK_OPEN | DMSGF_DELETE:
2499                 dmio_printf(iocom, 4, "%s\n",
2500                             "relay BIO_OPEN (DELETE)");
2501                 break;
2502         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE:
2503         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE:
2504                 atomic_add_int(&biocount, 1);
2505                 dmio_printf(iocom, 4,
2506                             "relay BIO %-3d %016jx %d@%016jx\n",
2507                             biocount, lmsg->any.head.msgid,
2508                             lmsg->any.blk_read.bytes,
2509                             lmsg->any.blk_read.offset);
2510                 break;
2511         case DMSG_BLK_READ | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2512         case DMSG_BLK_WRITE | DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY:
2513                 dmio_printf(iocom, 4,
2514                             "retrn BIO %-3d %016jx %d@%016jx\n",
2515                             biocount, lmsg->any.head.msgid,
2516                             lmsg->any.blk_read.bytes,
2517                             lmsg->any.blk_read.offset);
2518                 atomic_add_int(&biocount, -1);
2519                 break;
2520         default:
2521                 break;
2522         }
2523 #endif
2524
2525         if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2526             DMSGF_CREATE) {
2527                 /*
2528                  * New sub-transaction, establish new state and relay.
2529                  */
2530                 lstate = lmsg->state;
2531                 lpstate = lstate->parent;
2532                 rpstate = lpstate->relay;
2533                 assert(lstate->relay == NULL);
2534                 assert(rpstate != NULL);
2535
2536                 rmsg = dmsg_msg_alloc(rpstate, 0,
2537                                       lmsg->any.head.cmd,
2538                                       dmsg_state_relay, NULL);
2539                 rstate = rmsg->state;
2540                 rstate->relay = lstate;
2541                 lstate->relay = rstate;
2542                 dmsg_state_hold(lstate);
2543                 dmsg_state_hold(rstate);
2544         } else {
2545                 /*
2546                  * State & relay already established
2547                  */
2548                 lstate = lmsg->state;
2549                 rstate = lstate->relay;
2550                 assert(rstate != NULL);
2551
2552                 assert((rstate->txcmd & DMSGF_DELETE) == 0);
2553
2554 #if 0
2555                 if (lstate->flags & DMSG_STATE_ABORTING) {
2556                         dmio_printf(iocom, 4,
2557                                     "relay: relay lost link l=%p r=%p\n",
2558                                     lstate, rstate);
2559                         dmsg_simulate_failure(rstate, 0, DMSG_ERR_LOSTLINK);
2560                 }
2561 #endif
2562
2563                 rmsg = dmsg_msg_alloc(rstate, 0,
2564                                       lmsg->any.head.cmd,
2565                                       dmsg_state_relay, NULL);
2566         }
2567         if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2568                 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2569                       lmsg->hdr_size - sizeof(lmsg->any.head));
2570         }
2571         rmsg->any.head.error = lmsg->any.head.error;
2572         rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2573         rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2574         rmsg->aux_size = lmsg->aux_size;
2575         rmsg->aux_data = lmsg->aux_data;
2576         lmsg->aux_data = NULL;
2577
2578         dmsg_msg_write(rmsg);
2579 }
2580
2581 /*
2582  * Cleanup and retire msg after issuing the state callback.  The state
2583  * has already been removed from the RB tree.  The subq and msg must be
2584  * cleaned up.
2585  *
2586  * Called with the iocom mutex held (to handle subq disconnection).
2587  */
2588 void
2589 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2590 {
2591         dmsg_state_t *state;
2592
2593         assert(msg->state->iocom == iocom);
2594         state = msg->state;
2595         if (state->flags & DMSG_STATE_ROOT) {
2596                 /*
2597                  * Free a non-transactional message, there is no state
2598                  * to worry about.
2599                  */
2600                 dmsg_msg_free(msg);
2601         } else if ((state->flags & DMSG_STATE_SUBINSERTED) &&
2602                    (state->rxcmd & DMSGF_DELETE) &&
2603                    (state->txcmd & DMSGF_DELETE)) {
2604                 /*
2605                  * Must disconnect from parent and drop relay.
2606                  */
2607                 dmsg_subq_delete(state);
2608                 if (state->relay) {
2609                         dmsg_state_drop(state->relay);
2610                         state->relay = NULL;
2611                 }
2612                 dmsg_msg_free(msg);
2613         } else {
2614                 /*
2615                  * Message not terminating transaction, leave state intact
2616                  * and free message if it isn't the CREATE message.
2617                  */
2618                 dmsg_msg_free(msg);
2619         }
2620 }
2621
2622 /*
2623  * Clean up the state after pulling out needed fields and queueing the
2624  * message for transmission.   This occurs in dmsg_msg_write().
2625  *
2626  * Called with the mutex locked.
2627  */
2628 static void
2629 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2630 {
2631         dmsg_state_t *state;
2632
2633         assert(iocom == msg->state->iocom);
2634         state = msg->state;
2635
2636         dmsg_state_hold(state);
2637
2638         if (state->flags & DMSG_STATE_ROOT) {
2639                 ;
2640         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2641                 /*
2642                  * Message terminating transaction, destroy the related
2643                  * state, the original message, and this message (if it
2644                  * isn't the original message due to a CREATE|DELETE).
2645                  *
2646                  * It's possible for governing state to terminate while
2647                  * sub-transactions still exist.  This is allowed but
2648                  * will cause sub-transactions to recursively fail.
2649                  * Further reception of sub-transaction messages will be
2650                  * impossible because the circuit will no longer exist.
2651                  * (XXX need code to make sure that happens properly).
2652                  *
2653                  * NOTE: It is possible for a fafilure to terminate the
2654                  *       state after we have written the message but before
2655                  *       we are able to call cleanuptx, so txcmd might already
2656                  *       have DMSGF_DELETE set.
2657                  */
2658                 if ((state->txcmd & DMSGF_DELETE) == 0 &&
2659                     (state->rxcmd & DMSGF_DELETE)) {
2660                         state->txcmd |= DMSGF_DELETE;
2661                         assert(state->flags & DMSG_STATE_RBINSERTED);
2662                         if (state->txcmd & DMSGF_REPLY) {
2663                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2664                                 RB_REMOVE(dmsg_state_tree,
2665                                           &iocom->staterd_tree, state);
2666                         } else {
2667                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2668                                 RB_REMOVE(dmsg_state_tree,
2669                                           &iocom->statewr_tree, state);
2670                         }
2671                         state->flags &= ~DMSG_STATE_RBINSERTED;
2672                         dmsg_subq_delete(state);
2673
2674                         if (state->relay) {
2675                                 dmsg_state_drop(state->relay);
2676                                 state->relay = NULL;
2677                         }
2678                         dmsg_state_drop(state); /* state->rbtree */
2679                 } else if ((state->txcmd & DMSGF_DELETE) == 0) {
2680                         state->txcmd |= DMSGF_DELETE;
2681                 }
2682         }
2683
2684         /*
2685          * Deferred abort after transmission.
2686          */
2687         if ((state->flags & (DMSG_STATE_ABORTING | DMSG_STATE_DYING)) &&
2688             (state->rxcmd & DMSGF_DELETE) == 0) {
2689                 dmio_printf(iocom, 4,
2690                             "cleanuptx: state=%p "
2691                             "executing deferred abort\n",
2692                             state);
2693                 state->flags &= ~DMSG_STATE_ABORTING;
2694                 dmsg_simulate_failure(state, 1, DMSG_ERR_LOSTLINK);
2695         }
2696
2697         dmsg_state_drop(state);
2698 }
2699
2700 /*
2701  * Called with or without locks
2702  */
2703 void
2704 dmsg_state_hold(dmsg_state_t *state)
2705 {
2706         atomic_add_int(&state->refs, 1);
2707 }
2708
2709 void
2710 dmsg_state_drop(dmsg_state_t *state)
2711 {
2712         assert(state->refs > 0);
2713         if (atomic_fetchadd_int(&state->refs, -1) == 1)
2714                 dmsg_state_free(state);
2715 }
2716
2717 /*
2718  * Called with iocom locked
2719  */
2720 static void
2721 dmsg_state_free(dmsg_state_t *state)
2722 {
2723         atomic_add_int(&dmsg_state_count, -1);
2724         dmio_printf(state->iocom, 5, "terminate state %p\n", state);
2725         assert((state->flags & (DMSG_STATE_ROOT |
2726                                 DMSG_STATE_SUBINSERTED |
2727                                 DMSG_STATE_RBINSERTED)) == 0);
2728         assert(TAILQ_EMPTY(&state->subq));
2729         assert(state->refs == 0);
2730         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2731                 closefrom(3);
2732         assert(state->any.any == NULL);
2733         free(state);
2734 }
2735
2736 /*
2737  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2738  * header is not adjusted, just the core header.
2739  */
2740 void
2741 dmsg_bswap_head(dmsg_hdr_t *head)
2742 {
2743         head->magic     = bswap16(head->magic);
2744         head->reserved02 = bswap16(head->reserved02);
2745         head->salt      = bswap32(head->salt);
2746
2747         head->msgid     = bswap64(head->msgid);
2748         head->circuit   = bswap64(head->circuit);
2749         head->reserved18= bswap64(head->reserved18);
2750
2751         head->cmd       = bswap32(head->cmd);
2752         head->aux_crc   = bswap32(head->aux_crc);
2753         head->aux_bytes = bswap32(head->aux_bytes);
2754         head->error     = bswap32(head->error);
2755         head->aux_descr = bswap64(head->aux_descr);
2756         head->reserved38= bswap32(head->reserved38);
2757         head->hdr_crc   = bswap32(head->hdr_crc);
2758 }