419ff816ad555ebfcd4b9f075297b2e57ccb449d
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39 int dmsg_state_count;
40
41 static int dmsg_state_msgrx(dmsg_msg_t *msg);
42 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
43 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
44 static void dmsg_state_free(dmsg_state_t *state);
45 static void dmsg_msg_simulate_failure(dmsg_state_t *state, int error);
46
47 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
48
49 /*
50  * STATE TREE - Represents open transactions which are indexed by their
51  *              { msgid } relative to the governing iocom.
52  */
53 int
54 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
55 {
56         if (state1->msgid < state2->msgid)
57                 return(-1);
58         if (state1->msgid > state2->msgid)
59                 return(1);
60         return(0);
61 }
62
63 /*
64  * Initialize a low-level ioq
65  */
66 void
67 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
68 {
69         bzero(ioq, sizeof(*ioq));
70         ioq->state = DMSG_MSGQ_STATE_HEADER1;
71         TAILQ_INIT(&ioq->msgq);
72 }
73
74 /*
75  * Cleanup queue.
76  *
77  * caller holds iocom->mtx.
78  */
79 void
80 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
81 {
82         dmsg_msg_t *msg;
83
84         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
85                 assert(0);      /* shouldn't happen */
86                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
87                 dmsg_msg_free(msg);
88         }
89         if ((msg = ioq->msg) != NULL) {
90                 ioq->msg = NULL;
91                 dmsg_msg_free(msg);
92         }
93 }
94
95 /*
96  * Initialize a low-level communications channel.
97  *
98  * NOTE: The signal_func() is called at least once from the loop and can be
99  *       re-armed via dmsg_iocom_restate().
100  */
101 void
102 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
103                    void (*signal_func)(dmsg_iocom_t *iocom),
104                    void (*rcvmsg_func)(dmsg_msg_t *msg),
105                    void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
106                    void (*altmsg_func)(dmsg_iocom_t *iocom))
107 {
108         struct stat st;
109
110         bzero(iocom, sizeof(*iocom));
111
112         asprintf(&iocom->label, "iocom-%p", iocom);
113         iocom->signal_callback = signal_func;
114         iocom->rcvmsg_callback = rcvmsg_func;
115         iocom->altmsg_callback = altmsg_func;
116         iocom->usrmsg_callback = usrmsg_func;
117
118         pthread_mutex_init(&iocom->mtx, NULL);
119         RB_INIT(&iocom->staterd_tree);
120         RB_INIT(&iocom->statewr_tree);
121         TAILQ_INIT(&iocom->freeq);
122         TAILQ_INIT(&iocom->freeq_aux);
123         TAILQ_INIT(&iocom->txmsgq);
124         iocom->sock_fd = sock_fd;
125         iocom->alt_fd = alt_fd;
126         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
127         if (signal_func)
128                 iocom->flags |= DMSG_IOCOMF_SWORK;
129         dmsg_ioq_init(iocom, &iocom->ioq_rx);
130         dmsg_ioq_init(iocom, &iocom->ioq_tx);
131         iocom->state0.refs = 1;         /* should never trigger a free */
132         iocom->state0.iocom = iocom;
133         iocom->state0.parent = &iocom->state0;
134         iocom->state0.flags = DMSG_STATE_ROOT;
135         TAILQ_INIT(&iocom->state0.subq);
136
137         if (pipe(iocom->wakeupfds) < 0)
138                 assert(0);
139         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
140         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
141
142         /*
143          * Negotiate session crypto synchronously.  This will mark the
144          * connection as error'd if it fails.  If this is a pipe it's
145          * a linkage that we set up ourselves to the filesystem and there
146          * is no crypto.
147          */
148         if (fstat(sock_fd, &st) < 0)
149                 assert(0);
150         if (S_ISSOCK(st.st_mode))
151                 dmsg_crypto_negotiate(iocom);
152
153         /*
154          * Make sure our fds are set to non-blocking for the iocom core.
155          */
156         if (sock_fd >= 0)
157                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
158 #if 0
159         /* if line buffered our single fgets() should be fine */
160         if (alt_fd >= 0)
161                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
162 #endif
163 }
164
165 void
166 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
167 {
168         va_list va;
169         char *optr;
170
171         va_start(va, ctl);
172         optr = iocom->label;
173         vasprintf(&iocom->label, ctl, va);
174         va_end(va);
175         if (optr)
176                 free(optr);
177 }
178
179 /*
180  * May only be called from a callback from iocom_core.
181  *
182  * Adjust state machine functions, set flags to guarantee that both
183  * the recevmsg_func and the sendmsg_func is called at least once.
184  */
185 void
186 dmsg_iocom_restate(dmsg_iocom_t *iocom,
187                    void (*signal_func)(dmsg_iocom_t *),
188                    void (*rcvmsg_func)(dmsg_msg_t *msg))
189 {
190         pthread_mutex_lock(&iocom->mtx);
191         iocom->signal_callback = signal_func;
192         iocom->rcvmsg_callback = rcvmsg_func;
193         if (signal_func)
194                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
195         else
196                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
197         pthread_mutex_unlock(&iocom->mtx);
198 }
199
200 void
201 dmsg_iocom_signal(dmsg_iocom_t *iocom)
202 {
203         pthread_mutex_lock(&iocom->mtx);
204         if (iocom->signal_callback)
205                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206         pthread_mutex_unlock(&iocom->mtx);
207 }
208
209 /*
210  * Cleanup a terminating iocom.
211  *
212  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
213  * from all possible references to it.
214  */
215 void
216 dmsg_iocom_done(dmsg_iocom_t *iocom)
217 {
218         dmsg_msg_t *msg;
219
220         if (iocom->sock_fd >= 0) {
221                 close(iocom->sock_fd);
222                 iocom->sock_fd = -1;
223         }
224         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
225                 close(iocom->alt_fd);
226                 iocom->alt_fd = -1;
227         }
228         dmsg_ioq_done(iocom, &iocom->ioq_rx);
229         dmsg_ioq_done(iocom, &iocom->ioq_tx);
230         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
231                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
232                 free(msg);
233         }
234         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
235                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
236                 free(msg->aux_data);
237                 msg->aux_data = NULL;
238                 free(msg);
239         }
240         if (iocom->wakeupfds[0] >= 0) {
241                 close(iocom->wakeupfds[0]);
242                 iocom->wakeupfds[0] = -1;
243         }
244         if (iocom->wakeupfds[1] >= 0) {
245                 close(iocom->wakeupfds[1]);
246                 iocom->wakeupfds[1] = -1;
247         }
248         pthread_mutex_destroy(&iocom->mtx);
249 }
250
251 /*
252  * Allocate a new message using the specified transaction state.
253  *
254  * If CREATE is set a new transaction is allocated relative to the passed-in
255  * transaction (the 'state' argument becomes pstate).
256  *
257  * If CREATE is not set the message is associated with the passed-in
258  * transaction.
259  */
260 dmsg_msg_t *
261 dmsg_msg_alloc(dmsg_state_t *state,
262                size_t aux_size, uint32_t cmd,
263                void (*func)(dmsg_msg_t *), void *data)
264 {
265         dmsg_iocom_t *iocom = state->iocom;
266         dmsg_msg_t *msg;
267
268         pthread_mutex_lock(&iocom->mtx);
269         msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
270         pthread_mutex_unlock(&iocom->mtx);
271
272         return msg;
273 }
274
275 dmsg_msg_t *
276 dmsg_msg_alloc_locked(dmsg_state_t *state,
277                size_t aux_size, uint32_t cmd,
278                void (*func)(dmsg_msg_t *), void *data)
279 {
280         dmsg_iocom_t *iocom = state->iocom;
281         dmsg_state_t *pstate;
282         dmsg_msg_t *msg;
283         int hbytes;
284         size_t aligned_size;
285
286 #if 0
287         if (aux_size) {
288                 aligned_size = DMSG_DOALIGN(aux_size);
289                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
290                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
291         } else {
292                 aligned_size = 0;
293                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
294                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
295         }
296 #endif
297         aligned_size = DMSG_DOALIGN(aux_size);
298         msg = NULL;
299         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
300                 /*
301                  * When CREATE is set without REPLY the caller is
302                  * initiating a new transaction stacked under the specified
303                  * circuit.
304                  *
305                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
306                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
307                  */
308                 pstate = state;
309                 state = malloc(sizeof(*state));
310                 atomic_add_int(&dmsg_state_count, 1);
311                 bzero(state, sizeof(*state));
312                 TAILQ_INIT(&state->subq);
313                 dmsg_state_hold(pstate);
314                 state->refs = 1;
315                 state->parent = pstate;
316                 state->iocom = iocom;
317                 state->flags = DMSG_STATE_DYNAMIC;
318                 state->msgid = (uint64_t)(uintptr_t)state;
319                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
320                 state->rxcmd = DMSGF_REPLY;
321                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
322                 state->func = func;
323                 state->any.any = data;
324
325                 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
326                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
327                 state->flags |= DMSG_STATE_INSERTED;
328         } else {
329                 /*
330                  * Otherwise the message is transmitted over the existing
331                  * open transaction.
332                  */
333                 pstate = state->parent;
334         }
335
336         /* XXX SMP race for state */
337         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
338         if (msg == NULL) {
339                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
340                 bzero(msg, offsetof(struct dmsg_msg, any.head));
341                 *(int *)((char *)msg +
342                          offsetof(struct dmsg_msg, any.head) + hbytes) =
343                                  0x71B2C3D4;
344 #if 0
345                 msg = malloc(sizeof(*msg));
346                 bzero(msg, sizeof(*msg));
347 #endif
348         }
349
350         /*
351          * [re]allocate the auxillary data buffer.  The caller knows that
352          * a size-aligned buffer will be allocated but we do not want to
353          * force the caller to zero any tail piece, so we do that ourself.
354          */
355         if (msg->aux_size != aux_size) {
356                 if (msg->aux_data) {
357                         free(msg->aux_data);
358                         msg->aux_data = NULL;
359                         msg->aux_size = 0;
360                 }
361                 if (aux_size) {
362                         msg->aux_data = malloc(aligned_size);
363                         msg->aux_size = aux_size;
364                         if (aux_size != aligned_size) {
365                                 bzero(msg->aux_data + aux_size,
366                                       aligned_size - aux_size);
367                         }
368                 }
369         }
370
371         /*
372          * Set REVTRANS if the transaction was remotely initiated
373          * Set REVCIRC if the circuit was remotely initiated
374          */
375         if (state->flags & DMSG_STATE_OPPOSITE)
376                 cmd |= DMSGF_REVTRANS;
377         if (pstate->flags & DMSG_STATE_OPPOSITE)
378                 cmd |= DMSGF_REVCIRC;
379
380         /*
381          * Finish filling out the header.
382          */
383         if (hbytes)
384                 bzero(&msg->any.head, hbytes);
385         msg->hdr_size = hbytes;
386         msg->any.head.magic = DMSG_HDR_MAGIC;
387         msg->any.head.cmd = cmd;
388         msg->any.head.aux_descr = 0;
389         msg->any.head.aux_crc = 0;
390         msg->any.head.msgid = state->msgid;
391         msg->any.head.circuit = pstate->msgid;
392         msg->state = state;
393
394         return (msg);
395 }
396
397 /*
398  * Free a message so it can be reused afresh.
399  *
400  * NOTE: aux_size can be 0 with a non-NULL aux_data.
401  */
402 static
403 void
404 dmsg_msg_free_locked(dmsg_msg_t *msg)
405 {
406         /*dmsg_iocom_t *iocom = msg->iocom;*/
407 #if 1
408         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
409         if (*(int *)((char *)msg +
410                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
411              0x71B2C3D4) {
412                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
413                 assert(0);
414         }
415 #endif
416         msg->state = NULL;      /* safety */
417         if (msg->aux_data) {
418                 free(msg->aux_data);
419                 msg->aux_data = NULL;
420         }
421         msg->aux_size = 0;
422         free (msg);
423 #if 0
424         if (msg->aux_data)
425                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
426         else
427                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
428 #endif
429 }
430
431 void
432 dmsg_msg_free(dmsg_msg_t *msg)
433 {
434         dmsg_iocom_t *iocom = msg->state->iocom;
435
436         pthread_mutex_lock(&iocom->mtx);
437         dmsg_msg_free_locked(msg);
438         pthread_mutex_unlock(&iocom->mtx);
439 }
440
441 /*
442  * I/O core loop for an iocom.
443  *
444  * Thread localized, iocom->mtx not held.
445  */
446 void
447 dmsg_iocom_core(dmsg_iocom_t *iocom)
448 {
449         struct pollfd fds[3];
450         char dummybuf[256];
451         dmsg_msg_t *msg;
452         int timeout;
453         int count;
454         int wi; /* wakeup pipe */
455         int si; /* socket */
456         int ai; /* alt bulk path socket */
457
458         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
459                 /*
460                  * These iocom->flags are only manipulated within the
461                  * context of the current thread.  However, modifications
462                  * still require atomic ops.
463                  */
464                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
465                                      DMSG_IOCOMF_WWORK |
466                                      DMSG_IOCOMF_PWORK |
467                                      DMSG_IOCOMF_SWORK |
468                                      DMSG_IOCOMF_ARWORK |
469                                      DMSG_IOCOMF_AWWORK)) == 0) {
470                         /*
471                          * Only poll if no immediate work is pending.
472                          * Otherwise we are just wasting our time calling
473                          * poll.
474                          */
475                         timeout = 5000;
476
477                         count = 0;
478                         wi = -1;
479                         si = -1;
480                         ai = -1;
481
482                         /*
483                          * Always check the inter-thread pipe, e.g.
484                          * for iocom->txmsgq work.
485                          */
486                         wi = count++;
487                         fds[wi].fd = iocom->wakeupfds[0];
488                         fds[wi].events = POLLIN;
489                         fds[wi].revents = 0;
490
491                         /*
492                          * Check the socket input/output direction as
493                          * requested
494                          */
495                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
496                                             DMSG_IOCOMF_WREQ)) {
497                                 si = count++;
498                                 fds[si].fd = iocom->sock_fd;
499                                 fds[si].events = 0;
500                                 fds[si].revents = 0;
501
502                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
503                                         fds[si].events |= POLLIN;
504                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
505                                         fds[si].events |= POLLOUT;
506                         }
507
508                         /*
509                          * Check the alternative fd for work.
510                          */
511                         if (iocom->alt_fd >= 0) {
512                                 ai = count++;
513                                 fds[ai].fd = iocom->alt_fd;
514                                 fds[ai].events = POLLIN;
515                                 fds[ai].revents = 0;
516                         }
517                         poll(fds, count, timeout);
518
519                         if (wi >= 0 && (fds[wi].revents & POLLIN))
520                                 atomic_set_int(&iocom->flags,
521                                                DMSG_IOCOMF_PWORK);
522                         if (si >= 0 && (fds[si].revents & POLLIN))
523                                 atomic_set_int(&iocom->flags,
524                                                DMSG_IOCOMF_RWORK);
525                         if (si >= 0 && (fds[si].revents & POLLOUT))
526                                 atomic_set_int(&iocom->flags,
527                                                DMSG_IOCOMF_WWORK);
528                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
529                                 atomic_set_int(&iocom->flags,
530                                                DMSG_IOCOMF_WWORK);
531                         if (ai >= 0 && (fds[ai].revents & POLLIN))
532                                 atomic_set_int(&iocom->flags,
533                                                DMSG_IOCOMF_ARWORK);
534                 } else {
535                         /*
536                          * Always check the pipe
537                          */
538                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
539                 }
540
541                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
542                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
543                         iocom->signal_callback(iocom);
544                 }
545
546                 /*
547                  * Pending message queues from other threads wake us up
548                  * with a write to the wakeupfds[] pipe.  We have to clear
549                  * the pipe with a dummy read.
550                  */
551                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
552                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
553                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
554                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
555                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
556                         if (TAILQ_FIRST(&iocom->txmsgq))
557                                 dmsg_iocom_flush1(iocom);
558                 }
559
560                 /*
561                  * Message write sequencing
562                  */
563                 if (iocom->flags & DMSG_IOCOMF_WWORK)
564                         dmsg_iocom_flush1(iocom);
565
566                 /*
567                  * Message read sequencing.  Run this after the write
568                  * sequencing in case the write sequencing allowed another
569                  * auto-DELETE to occur on the read side.
570                  */
571                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
572                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
573                                (msg = dmsg_ioq_read(iocom)) != NULL) {
574                                 if (DMsgDebugOpt) {
575                                         fprintf(stderr, "receive %s\n",
576                                                 dmsg_msg_str(msg));
577                                 }
578                                 iocom->rcvmsg_callback(msg);
579                                 dmsg_state_cleanuprx(iocom, msg);
580                         }
581                 }
582
583                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
584                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
585                         iocom->altmsg_callback(iocom);
586                 }
587         }
588 }
589
590 /*
591  * Make sure there's enough room in the FIFO to hold the
592  * needed data.
593  *
594  * Assume worst case encrypted form is 2x the size of the
595  * plaintext equivalent.
596  */
597 static
598 size_t
599 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
600 {
601         size_t bytes;
602         size_t nmax;
603
604         bytes = ioq->fifo_cdx - ioq->fifo_beg;
605         nmax = sizeof(ioq->buf) - ioq->fifo_end;
606         if (bytes + nmax / 2 < needed) {
607                 if (bytes) {
608                         bcopy(ioq->buf + ioq->fifo_beg,
609                               ioq->buf,
610                               bytes);
611                 }
612                 ioq->fifo_cdx -= ioq->fifo_beg;
613                 ioq->fifo_beg = 0;
614                 if (ioq->fifo_cdn < ioq->fifo_end) {
615                         bcopy(ioq->buf + ioq->fifo_cdn,
616                               ioq->buf + ioq->fifo_cdx,
617                               ioq->fifo_end - ioq->fifo_cdn);
618                 }
619                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
620                 ioq->fifo_cdn = ioq->fifo_cdx;
621                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
622         }
623         return(nmax);
624 }
625
626 /*
627  * Read the next ready message from the ioq, issuing I/O if needed.
628  * Caller should retry on a read-event when NULL is returned.
629  *
630  * If an error occurs during reception a DMSG_LNK_ERROR msg will
631  * be returned for each open transaction, then the ioq and iocom
632  * will be errored out and a non-transactional DMSG_LNK_ERROR
633  * msg will be returned as the final message.  The caller should not call
634  * us again after the final message is returned.
635  *
636  * Thread localized, iocom->mtx not held.
637  */
638 dmsg_msg_t *
639 dmsg_ioq_read(dmsg_iocom_t *iocom)
640 {
641         dmsg_ioq_t *ioq = &iocom->ioq_rx;
642         dmsg_msg_t *msg;
643         dmsg_state_t *state;
644         dmsg_hdr_t *head;
645         ssize_t n;
646         size_t bytes;
647         size_t nmax;
648         uint32_t aux_size;
649         uint32_t xcrc32;
650         int error;
651
652 again:
653         /*
654          * If a message is already pending we can just remove and
655          * return it.  Message state has already been processed.
656          * (currently not implemented)
657          */
658         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
659                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
660                 return (msg);
661         }
662         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
663
664         /*
665          * If the stream is errored out we stop processing it.
666          */
667         if (ioq->error)
668                 goto skip;
669
670         /*
671          * Message read in-progress (msg is NULL at the moment).  We don't
672          * allocate a msg until we have its core header.
673          */
674         nmax = sizeof(ioq->buf) - ioq->fifo_end;
675         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
676         msg = ioq->msg;
677
678         switch(ioq->state) {
679         case DMSG_MSGQ_STATE_HEADER1:
680                 /*
681                  * Load the primary header, fail on any non-trivial read
682                  * error or on EOF.  Since the primary header is the same
683                  * size is the message alignment it will never straddle
684                  * the end of the buffer.
685                  */
686                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
687                 if (bytes < sizeof(msg->any.head)) {
688                         n = read(iocom->sock_fd,
689                                  ioq->buf + ioq->fifo_end,
690                                  nmax);
691                         if (n <= 0) {
692                                 if (n == 0) {
693                                         ioq->error = DMSG_IOQ_ERROR_EOF;
694                                         break;
695                                 }
696                                 if (errno != EINTR &&
697                                     errno != EINPROGRESS &&
698                                     errno != EAGAIN) {
699                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
700                                         break;
701                                 }
702                                 n = 0;
703                                 /* fall through */
704                         }
705                         ioq->fifo_end += (size_t)n;
706                         nmax -= (size_t)n;
707                 }
708
709                 /*
710                  * Decrypt data received so far.  Data will be decrypted
711                  * in-place but might create gaps in the FIFO.  Partial
712                  * blocks are not immediately decrypted.
713                  *
714                  * WARNING!  The header might be in the wrong endian, we
715                  *           do not fix it up until we get the entire
716                  *           extended header.
717                  */
718                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
719                         dmsg_crypto_decrypt(iocom, ioq);
720                 } else {
721                         ioq->fifo_cdx = ioq->fifo_end;
722                         ioq->fifo_cdn = ioq->fifo_end;
723                 }
724                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
725
726                 /*
727                  * Insufficient data accumulated (msg is NULL, caller will
728                  * retry on event).
729                  */
730                 assert(msg == NULL);
731                 if (bytes < sizeof(msg->any.head))
732                         break;
733
734                 /*
735                  * Check and fixup the core header.  Note that the icrc
736                  * has to be calculated before any fixups, but the crc
737                  * fields in the msg may have to be swapped like everything
738                  * else.
739                  */
740                 head = (void *)(ioq->buf + ioq->fifo_beg);
741                 if (head->magic != DMSG_HDR_MAGIC &&
742                     head->magic != DMSG_HDR_MAGIC_REV) {
743                         fprintf(stderr, "%s: head->magic is bad %02x\n",
744                                 iocom->label, head->magic);
745                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
746                                 fprintf(stderr, "(on encrypted link)\n");
747                         ioq->error = DMSG_IOQ_ERROR_SYNC;
748                         break;
749                 }
750
751                 /*
752                  * Calculate the full header size and aux data size
753                  */
754                 if (head->magic == DMSG_HDR_MAGIC_REV) {
755                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
756                                       DMSG_ALIGN;
757                         aux_size = bswap32(head->aux_bytes);
758                 } else {
759                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
760                                       DMSG_ALIGN;
761                         aux_size = head->aux_bytes;
762                 }
763                 ioq->abytes = DMSG_DOALIGN(aux_size);
764                 ioq->unaligned_aux_size = aux_size;
765                 if (ioq->hbytes < sizeof(msg->any.head) ||
766                     ioq->hbytes > sizeof(msg->any) ||
767                     ioq->abytes > DMSG_AUX_MAX) {
768                         ioq->error = DMSG_IOQ_ERROR_FIELD;
769                         break;
770                 }
771
772                 /*
773                  * Allocate the message, the next state will fill it in.
774                  *
775                  * NOTE: The aux_data buffer will be sized to an aligned
776                  *       value and the aligned remainder zero'd for
777                  *       convenience.
778                  *
779                  * NOTE: Supply dummy state and a degenerate cmd without
780                  *       CREATE set.  The message will temporarily be
781                  *       associated with state0 until later post-processing.
782                  */
783                 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
784                                      ioq->hbytes / DMSG_ALIGN,
785                                      NULL, NULL);
786                 ioq->msg = msg;
787
788                 /*
789                  * Fall through to the next state.  Make sure that the
790                  * extended header does not straddle the end of the buffer.
791                  * We still want to issue larger reads into our buffer,
792                  * book-keeping is easier if we don't bcopy() yet.
793                  *
794                  * Make sure there is enough room for bloated encrypt data.
795                  */
796                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
797                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
798                 /* fall through */
799         case DMSG_MSGQ_STATE_HEADER2:
800                 /*
801                  * Fill out the extended header.
802                  */
803                 assert(msg != NULL);
804                 if (bytes < ioq->hbytes) {
805                         n = read(iocom->sock_fd,
806                                  ioq->buf + ioq->fifo_end,
807                                  nmax);
808                         if (n <= 0) {
809                                 if (n == 0) {
810                                         ioq->error = DMSG_IOQ_ERROR_EOF;
811                                         break;
812                                 }
813                                 if (errno != EINTR &&
814                                     errno != EINPROGRESS &&
815                                     errno != EAGAIN) {
816                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
817                                         break;
818                                 }
819                                 n = 0;
820                                 /* fall through */
821                         }
822                         ioq->fifo_end += (size_t)n;
823                         nmax -= (size_t)n;
824                 }
825
826                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
827                         dmsg_crypto_decrypt(iocom, ioq);
828                 } else {
829                         ioq->fifo_cdx = ioq->fifo_end;
830                         ioq->fifo_cdn = ioq->fifo_end;
831                 }
832                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
833
834                 /*
835                  * Insufficient data accumulated (set msg NULL so caller will
836                  * retry on event).
837                  */
838                 if (bytes < ioq->hbytes) {
839                         msg = NULL;
840                         break;
841                 }
842
843                 /*
844                  * Calculate the extended header, decrypt data received
845                  * so far.  Handle endian-conversion for the entire extended
846                  * header.
847                  */
848                 head = (void *)(ioq->buf + ioq->fifo_beg);
849
850                 /*
851                  * Check the CRC.
852                  */
853                 if (head->magic == DMSG_HDR_MAGIC_REV)
854                         xcrc32 = bswap32(head->hdr_crc);
855                 else
856                         xcrc32 = head->hdr_crc;
857                 head->hdr_crc = 0;
858                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
859                         ioq->error = DMSG_IOQ_ERROR_XCRC;
860                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
861                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
862                                 dmsg_msg_str(msg));
863                         assert(0);
864                         break;
865                 }
866                 head->hdr_crc = xcrc32;
867
868                 if (head->magic == DMSG_HDR_MAGIC_REV) {
869                         dmsg_bswap_head(head);
870                 }
871
872                 /*
873                  * Copy the extended header into the msg and adjust the
874                  * FIFO.
875                  */
876                 bcopy(head, &msg->any, ioq->hbytes);
877
878                 /*
879                  * We are either done or we fall-through.
880                  */
881                 if (ioq->abytes == 0) {
882                         ioq->fifo_beg += ioq->hbytes;
883                         break;
884                 }
885
886                 /*
887                  * Must adjust bytes (and the state) when falling through.
888                  * nmax doesn't change.
889                  */
890                 ioq->fifo_beg += ioq->hbytes;
891                 bytes -= ioq->hbytes;
892                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
893                 /* fall through */
894         case DMSG_MSGQ_STATE_AUXDATA1:
895                 /*
896                  * Copy the partial or complete [decrypted] payload from
897                  * remaining bytes in the FIFO in order to optimize the
898                  * makeroom call in the AUXDATA2 state.  We have to
899                  * fall-through either way so we can check the crc.
900                  *
901                  * msg->aux_size tracks our aux data.
902                  *
903                  * (Lets not complicate matters if the data is encrypted,
904                  *  since the data in-stream is not the same size as the
905                  *  data decrypted).
906                  */
907                 if (bytes >= ioq->abytes) {
908                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
909                               ioq->abytes);
910                         msg->aux_size = ioq->abytes;
911                         ioq->fifo_beg += ioq->abytes;
912                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
913                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
914                         bytes -= ioq->abytes;
915                 } else if (bytes) {
916                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
917                               bytes);
918                         msg->aux_size = bytes;
919                         ioq->fifo_beg += bytes;
920                         if (ioq->fifo_cdx < ioq->fifo_beg)
921                                 ioq->fifo_cdx = ioq->fifo_beg;
922                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
923                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
924                         bytes = 0;
925                 } else {
926                         msg->aux_size = 0;
927                 }
928                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
929                 /* fall through */
930         case DMSG_MSGQ_STATE_AUXDATA2:
931                 /*
932                  * Make sure there is enough room for more data.
933                  */
934                 assert(msg);
935                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
936
937                 /*
938                  * Read and decrypt more of the payload.
939                  */
940                 if (msg->aux_size < ioq->abytes) {
941                         assert(bytes == 0);
942                         n = read(iocom->sock_fd,
943                                  ioq->buf + ioq->fifo_end,
944                                  nmax);
945                         if (n <= 0) {
946                                 if (n == 0) {
947                                         ioq->error = DMSG_IOQ_ERROR_EOF;
948                                         break;
949                                 }
950                                 if (errno != EINTR &&
951                                     errno != EINPROGRESS &&
952                                     errno != EAGAIN) {
953                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
954                                         break;
955                                 }
956                                 n = 0;
957                                 /* fall through */
958                         }
959                         ioq->fifo_end += (size_t)n;
960                         nmax -= (size_t)n;
961                 }
962
963                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
964                         dmsg_crypto_decrypt(iocom, ioq);
965                 } else {
966                         ioq->fifo_cdx = ioq->fifo_end;
967                         ioq->fifo_cdn = ioq->fifo_end;
968                 }
969                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
970
971                 if (bytes > ioq->abytes - msg->aux_size)
972                         bytes = ioq->abytes - msg->aux_size;
973
974                 if (bytes) {
975                         bcopy(ioq->buf + ioq->fifo_beg,
976                               msg->aux_data + msg->aux_size,
977                               bytes);
978                         msg->aux_size += bytes;
979                         ioq->fifo_beg += bytes;
980                 }
981
982                 /*
983                  * Insufficient data accumulated (set msg NULL so caller will
984                  * retry on event).
985                  *
986                  * Assert the auxillary data size is correct, then record the
987                  * original unaligned size from the message header.
988                  */
989                 if (msg->aux_size < ioq->abytes) {
990                         msg = NULL;
991                         break;
992                 }
993                 assert(msg->aux_size == ioq->abytes);
994                 msg->aux_size = ioq->unaligned_aux_size;
995
996                 /*
997                  * Check aux_crc, then we are done.  Note that the crc
998                  * is calculated over the aligned size, not the actual
999                  * size.
1000                  */
1001                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
1002                 if (xcrc32 != msg->any.head.aux_crc) {
1003                         ioq->error = DMSG_IOQ_ERROR_ACRC;
1004                         fprintf(stderr,
1005                                 "iocom: ACRC error %08x vs %08x "
1006                                 "msgid %016jx msgcmd %08x auxsize %d\n",
1007                                 xcrc32,
1008                                 msg->any.head.aux_crc,
1009                                 (intmax_t)msg->any.head.msgid,
1010                                 msg->any.head.cmd,
1011                                 msg->any.head.aux_bytes);
1012                         break;
1013                 }
1014                 break;
1015         case DMSG_MSGQ_STATE_ERROR:
1016                 /*
1017                  * Continued calls to drain recorded transactions (returning
1018                  * a LNK_ERROR for each one), before we return the final
1019                  * LNK_ERROR.
1020                  */
1021                 assert(msg == NULL);
1022                 break;
1023         default:
1024                 /*
1025                  * We don't double-return errors, the caller should not
1026                  * have called us again after getting an error msg.
1027                  */
1028                 assert(0);
1029                 break;
1030         }
1031
1032         /*
1033          * Check the message sequence.  The iv[] should prevent any
1034          * possibility of a replay but we add this check anyway.
1035          */
1036         if (msg && ioq->error == 0) {
1037                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1038                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1039                 } else {
1040                         ++ioq->seq;
1041                 }
1042         }
1043
1044         /*
1045          * Handle error, RREQ, or completion
1046          *
1047          * NOTE: nmax and bytes are invalid at this point, we don't bother
1048          *       to update them when breaking out.
1049          */
1050         if (ioq->error) {
1051                 dmsg_state_t *tmp_state;
1052 skip:
1053                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1054                 /*
1055                  * An unrecoverable error causes all active receive
1056                  * transactions to be terminated with a LNK_ERROR message.
1057                  *
1058                  * Once all active transactions are exhausted we set the
1059                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1060                  * message, which should cause master processing loops to
1061                  * terminate.
1062                  */
1063                 assert(ioq->msg == msg);
1064                 if (msg) {
1065                         dmsg_msg_free(msg);
1066                         ioq->msg = NULL;
1067                         msg = NULL;
1068                 }
1069
1070                 /*
1071                  * No more I/O read processing
1072                  */
1073                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1074
1075                 /*
1076                  * Simulate a remote LNK_ERROR DELETE msg for any open
1077                  * transactions, ending with a final non-transactional
1078                  * LNK_ERROR (that the session can detect) when no
1079                  * transactions remain.
1080                  *
1081                  * NOTE: Temporarily supply state0 and a degenerate cmd
1082                  *       without CREATE set.  The real state will be
1083                  *       assigned in the loop.
1084                  *
1085                  * NOTE: We are simulating a received message using our
1086                  *       side of the state, so the DMSGF_REV* bits have
1087                  *       to be reversed.
1088                  */
1089                 pthread_mutex_lock(&iocom->mtx);
1090                 dmsg_iocom_drain(iocom);
1091
1092                 tmp_state = NULL;
1093                 RB_FOREACH(state, dmsg_state_tree, &iocom->staterd_tree) {
1094                         atomic_set_int(&state->flags, DMSG_STATE_DYING);
1095                         if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1096                                 tmp_state = state;
1097                 }
1098                 RB_FOREACH(state, dmsg_state_tree, &iocom->statewr_tree) {
1099                         atomic_set_int(&state->flags, DMSG_STATE_DYING);
1100                         if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1101                                 tmp_state = state;
1102                 }
1103
1104                 if (tmp_state) {
1105                         dmsg_msg_simulate_failure(tmp_state, ioq->error);
1106                 } else {
1107                         dmsg_msg_simulate_failure(&iocom->state0, ioq->error);
1108                 }
1109                 pthread_mutex_unlock(&iocom->mtx);
1110                 if (TAILQ_FIRST(&ioq->msgq))
1111                         goto again;
1112
1113 #if 0
1114                 /*
1115                  * For the iocom error case we want to set RWORK to indicate
1116                  * that more messages might be pending.
1117                  *
1118                  * It is possible to return NULL when there is more work to
1119                  * do because each message has to be DELETEd in both
1120                  * directions before we continue on with the next (though
1121                  * this could be optimized).  The transmit direction will
1122                  * re-set RWORK.
1123                  */
1124                 if (msg)
1125                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1126 #endif
1127         } else if (msg == NULL) {
1128                 /*
1129                  * Insufficient data received to finish building the message,
1130                  * set RREQ and return NULL.
1131                  *
1132                  * Leave ioq->msg intact.
1133                  * Leave the FIFO intact.
1134                  */
1135                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1136         } else {
1137                 /*
1138                  * Continue processing msg.
1139                  *
1140                  * The fifo has already been advanced past the message.
1141                  * Trivially reset the FIFO indices if possible.
1142                  *
1143                  * clear the FIFO if it is now empty and set RREQ to wait
1144                  * for more from the socket.  If the FIFO is not empty set
1145                  * TWORK to bypass the poll so we loop immediately.
1146                  */
1147                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1148                     ioq->fifo_cdn == ioq->fifo_end) {
1149                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1150                         ioq->fifo_cdx = 0;
1151                         ioq->fifo_cdn = 0;
1152                         ioq->fifo_beg = 0;
1153                         ioq->fifo_end = 0;
1154                 } else {
1155                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1156                 }
1157                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1158                 ioq->msg = NULL;
1159
1160                 /*
1161                  * Handle message routing.  Validates non-zero sources
1162                  * and routes message.  Error will be 0 if the message is
1163                  * destined for us.
1164                  *
1165                  * State processing only occurs for messages destined for us.
1166                  */
1167                 if (DMsgDebugOpt >= 5) {
1168                         fprintf(stderr,
1169                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1170                                 msg->any.head.cmd,
1171                                 (intmax_t)msg->any.head.msgid,
1172                                 (intmax_t)msg->any.head.circuit);
1173                 }
1174                 error = dmsg_state_msgrx(msg);
1175
1176                 if (error) {
1177                         /*
1178                          * Abort-after-closure, throw message away and
1179                          * start reading another.
1180                          */
1181                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1182                                 dmsg_msg_free(msg);
1183                                 goto again;
1184                         }
1185
1186                         /*
1187                          * Process real error and throw away message.
1188                          */
1189                         ioq->error = error;
1190                         goto skip;
1191                 }
1192                 /* no error, not routed.  Fall through and return msg */
1193         }
1194         return (msg);
1195 }
1196
1197 /*
1198  * Calculate the header and data crc's and write a low-level message to
1199  * the connection.  If aux_crc is non-zero the aux_data crc is already
1200  * assumed to have been set.
1201  *
1202  * A non-NULL msg is added to the queue but not necessarily flushed.
1203  * Calling this function with msg == NULL will get a flush going.
1204  *
1205  * (called from iocom_core only)
1206  */
1207 void
1208 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1209 {
1210         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1211         dmsg_msg_t *msg;
1212         uint32_t xcrc32;
1213         size_t hbytes;
1214         size_t abytes;
1215         dmsg_msg_queue_t tmpq;
1216
1217         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1218         TAILQ_INIT(&tmpq);
1219         pthread_mutex_lock(&iocom->mtx);
1220         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1221                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1222                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1223         }
1224         pthread_mutex_unlock(&iocom->mtx);
1225
1226         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1227                 /*
1228                  * Process terminal connection errors.
1229                  */
1230                 TAILQ_REMOVE(&tmpq, msg, qentry);
1231                 if (ioq->error) {
1232                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1233                         ++ioq->msgcount;
1234                         continue;
1235                 }
1236
1237                 /*
1238                  * Finish populating the msg fields.  The salt ensures that
1239                  * the iv[] array is ridiculously randomized and we also
1240                  * re-seed our PRNG every 32768 messages just to be sure.
1241                  */
1242                 msg->any.head.magic = DMSG_HDR_MAGIC;
1243                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1244                 ++ioq->seq;
1245                 if ((ioq->seq & 32767) == 0)
1246                         srandomdev();
1247
1248                 /*
1249                  * Calculate aux_crc if 0, then calculate hdr_crc.
1250                  */
1251                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1252                         abytes = DMSG_DOALIGN(msg->aux_size);
1253                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1254                         msg->any.head.aux_crc = xcrc32;
1255                 }
1256                 msg->any.head.aux_bytes = msg->aux_size;
1257
1258                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1259                          DMSG_ALIGN;
1260                 msg->any.head.hdr_crc = 0;
1261                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1262
1263                 /*
1264                  * Enqueue the message (the flush codes handles stream
1265                  * encryption).
1266                  */
1267                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1268                 ++ioq->msgcount;
1269         }
1270         dmsg_iocom_flush2(iocom);
1271 }
1272
1273 /*
1274  * Thread localized, iocom->mtx not held by caller.
1275  *
1276  * (called from iocom_core via iocom_flush1 only)
1277  */
1278 void
1279 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1280 {
1281         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1282         dmsg_msg_t *msg;
1283         ssize_t n;
1284         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1285         size_t nact;
1286         size_t hbytes;
1287         size_t abytes;
1288         size_t hoff;
1289         size_t aoff;
1290         int iovcnt;
1291
1292         if (ioq->error) {
1293                 dmsg_iocom_drain(iocom);
1294                 return;
1295         }
1296
1297         /*
1298          * Pump messages out the connection by building an iovec.
1299          *
1300          * ioq->hbytes/ioq->abytes tracks how much of the first message
1301          * in the queue has been successfully written out, so we can
1302          * resume writing.
1303          */
1304         iovcnt = 0;
1305         nact = 0;
1306         hoff = ioq->hbytes;
1307         aoff = ioq->abytes;
1308
1309         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1310                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1311                          DMSG_ALIGN;
1312                 abytes = DMSG_DOALIGN(msg->aux_size);
1313                 assert(hoff <= hbytes && aoff <= abytes);
1314
1315                 if (hoff < hbytes) {
1316                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1317                         iov[iovcnt].iov_len = hbytes - hoff;
1318                         nact += hbytes - hoff;
1319                         ++iovcnt;
1320                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1321                                 break;
1322                 }
1323                 if (aoff < abytes) {
1324                         assert(msg->aux_data != NULL);
1325                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1326                         iov[iovcnt].iov_len = abytes - aoff;
1327                         nact += abytes - aoff;
1328                         ++iovcnt;
1329                         if (iovcnt == DMSG_IOQ_MAXIOVEC)
1330                                 break;
1331                 }
1332                 hoff = 0;
1333                 aoff = 0;
1334         }
1335         if (iovcnt == 0)
1336                 return;
1337
1338         /*
1339          * Encrypt and write the data.  The crypto code will move the
1340          * data into the fifo and adjust the iov as necessary.  If
1341          * encryption is disabled the iov is left alone.
1342          *
1343          * May return a smaller iov (thus a smaller n), with aggregated
1344          * chunks.  May reduce nmax to what fits in the FIFO.
1345          *
1346          * This function sets nact to the number of original bytes now
1347          * encrypted, adding to the FIFO some number of bytes that might
1348          * be greater depending on the crypto mechanic.  iov[] is adjusted
1349          * to point at the FIFO if necessary.
1350          *
1351          * NOTE: The return value from the writev() is the post-encrypted
1352          *       byte count, not the plaintext count.
1353          */
1354         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1355                 /*
1356                  * Make sure the FIFO has a reasonable amount of space
1357                  * left (if not completely full).
1358                  *
1359                  * In this situation we are staging the encrypted message
1360                  * data in the FIFO.  (nact) represents how much plaintext
1361                  * has been staged, (n) represents how much encrypted data
1362                  * has been flushed.  The two are independent of each other.
1363                  */
1364                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1365                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1366                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1367                               ioq->fifo_end - ioq->fifo_beg);
1368                         ioq->fifo_cdx -= ioq->fifo_beg;
1369                         ioq->fifo_cdn -= ioq->fifo_beg;
1370                         ioq->fifo_end -= ioq->fifo_beg;
1371                         ioq->fifo_beg = 0;
1372                 }
1373
1374                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1375                 n = writev(iocom->sock_fd, iov, iovcnt);
1376                 if (n > 0) {
1377                         ioq->fifo_beg += n;
1378                         ioq->fifo_cdn += n;
1379                         ioq->fifo_cdx += n;
1380                         if (ioq->fifo_beg == ioq->fifo_end) {
1381                                 ioq->fifo_beg = 0;
1382                                 ioq->fifo_cdn = 0;
1383                                 ioq->fifo_cdx = 0;
1384                                 ioq->fifo_end = 0;
1385                         }
1386                 }
1387                 /*
1388                  * We don't mess with the nact returned by the crypto_encrypt
1389                  * call, which represents the filling of the FIFO.  (n) tells
1390                  * us how much we were able to write from the FIFO.  The two
1391                  * are different beasts when encrypting.
1392                  */
1393         } else {
1394                 /*
1395                  * In this situation we are not staging the messages to the
1396                  * FIFO but instead writing them directly from the msg
1397                  * structure(s), so (nact) is basically (n).
1398                  */
1399                 n = writev(iocom->sock_fd, iov, iovcnt);
1400                 if (n > 0)
1401                         nact = n;
1402                 else
1403                         nact = 0;
1404         }
1405
1406         /*
1407          * Clean out the transmit queue based on what we successfully
1408          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1409          * represents the portion of the first message previously sent.
1410          */
1411         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1412                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1413                          DMSG_ALIGN;
1414                 abytes = DMSG_DOALIGN(msg->aux_size);
1415
1416                 if ((size_t)nact < hbytes - ioq->hbytes) {
1417                         ioq->hbytes += nact;
1418                         nact = 0;
1419                         break;
1420                 }
1421                 nact -= hbytes - ioq->hbytes;
1422                 ioq->hbytes = hbytes;
1423                 if ((size_t)nact < abytes - ioq->abytes) {
1424                         ioq->abytes += nact;
1425                         nact = 0;
1426                         break;
1427                 }
1428                 nact -= abytes - ioq->abytes;
1429                 /* ioq->abytes = abytes; optimized out */
1430
1431 #if 0
1432                 fprintf(stderr,
1433                         "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1434                         msg->any.head.cmd,
1435                         (intmax_t)msg->any.head.msgid,
1436                         (intmax_t)msg->any.head.circuit);
1437 #endif
1438
1439                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1440                 --ioq->msgcount;
1441                 ioq->hbytes = 0;
1442                 ioq->abytes = 0;
1443                 dmsg_msg_free(msg);
1444         }
1445         assert(nact == 0);
1446
1447         /*
1448          * Process the return value from the write w/regards to blocking.
1449          */
1450         if (n < 0) {
1451                 if (errno != EINTR &&
1452                     errno != EINPROGRESS &&
1453                     errno != EAGAIN) {
1454                         /*
1455                          * Fatal write error
1456                          */
1457                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1458                         dmsg_iocom_drain(iocom);
1459                 } else {
1460                         /*
1461                          * Wait for socket buffer space
1462                          */
1463                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1464                 }
1465         } else {
1466                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1467         }
1468         if (ioq->error) {
1469                 dmsg_iocom_drain(iocom);
1470         }
1471 }
1472
1473 /*
1474  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1475  * write events will occur.  We don't kill read msgs because we want
1476  * the caller to pull off our contrived terminal error msg to detect
1477  * the connection failure.
1478  *
1479  * Localized to iocom_core thread, iocom->mtx not held by caller.
1480  */
1481 void
1482 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1483 {
1484         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1485         dmsg_msg_t *msg;
1486
1487         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1488         ioq->hbytes = 0;
1489         ioq->abytes = 0;
1490
1491         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1492                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1493                 --ioq->msgcount;
1494                 dmsg_msg_free(msg);
1495         }
1496 }
1497
1498 /*
1499  * Write a message to an iocom, with additional state processing.
1500  */
1501 void
1502 dmsg_msg_write(dmsg_msg_t *msg)
1503 {
1504         dmsg_iocom_t *iocom = msg->state->iocom;
1505         dmsg_state_t *state;
1506         char dummy;
1507
1508         pthread_mutex_lock(&iocom->mtx);
1509         state = msg->state;
1510
1511         /*
1512          * Make sure the parent transaction is still open in the transmit
1513          * direction.  If it isn't the message is dead and we have to
1514          * potentially simulate a rxmsg terminating the transaction.
1515          */
1516         if (state->parent->txcmd & DMSGF_DELETE) {
1517                 fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1518                 dmsg_msg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1519                 dmsg_state_cleanuptx(iocom, msg);
1520                 dmsg_msg_free(msg);
1521                 pthread_mutex_unlock(&iocom->mtx);
1522                 return;
1523         }
1524
1525         /*
1526          * Process state data into the message as needed, then update the
1527          * state based on the message.
1528          */
1529         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1530                 /*
1531                  * Existing transaction (could be reply).  It is also
1532                  * possible for this to be the first reply (CREATE is set),
1533                  * in which case we populate state->txcmd.
1534                  *
1535                  * state->txcmd is adjusted to hold the final message cmd,
1536                  * and we also be sure to set the CREATE bit here.  We did
1537                  * not set it in dmsg_msg_alloc() because that would have
1538                  * not been serialized (state could have gotten ripped out
1539                  * from under the message prior to it being transmitted).
1540                  */
1541                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1542                     DMSGF_CREATE) {
1543                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1544                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1545                 }
1546                 msg->any.head.msgid = state->msgid;
1547
1548                 if (msg->any.head.cmd & DMSGF_CREATE) {
1549                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1550                 }
1551         }
1552         dmsg_state_cleanuptx(iocom, msg);
1553
1554 #if 0
1555         fprintf(stderr,
1556                 "MSGWRITE %016jx %08x\n",
1557                 msg->any.head.msgid, msg->any.head.cmd);
1558 #endif
1559
1560         /*
1561          * Queue it for output, wake up the I/O pthread.  Note that the
1562          * I/O thread is responsible for generating the CRCs and encryption.
1563          */
1564         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1565         dummy = 0;
1566         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1567         pthread_mutex_unlock(&iocom->mtx);
1568 }
1569
1570 /*
1571  * iocom->mtx must be held by caller.
1572  */
1573 static
1574 void
1575 dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
1576 {
1577         dmsg_iocom_t *iocom = state->iocom;
1578         dmsg_msg_t *msg;
1579
1580         msg = NULL;
1581
1582         if (state == &iocom->state0) {
1583                 /*
1584                  * No active local or remote transactions remain.
1585                  * Generate a final LNK_ERROR and flag EOF.
1586                  */
1587                 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1588                                             DMSG_LNK_ERROR,
1589                                             NULL, NULL);
1590                 msg->any.head.error = error;
1591                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1592                 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1593         } else if (state->flags & DMSG_STATE_OPPOSITE) {
1594                 /*
1595                  * Active remote transactions are still present.
1596                  * Simulate the other end sending us a DELETE.
1597                  */
1598                 if (state->rxcmd & DMSGF_DELETE) {
1599                         fprintf(stderr,
1600                                 "iocom: ioq error(rd) %d sleeping "
1601                                 "state %p rxcmd %08x txcmd %08x "
1602                                 "func %p\n",
1603                                 error, state, state->rxcmd,
1604                                 state->txcmd, state->func);
1605                         usleep(100000); /* XXX */
1606                         atomic_set_int(&iocom->flags,
1607                                        DMSG_IOCOMF_RWORK);
1608                 } else {
1609                         fprintf(stderr, "SIMULATE ERROR1\n");
1610                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1611                                              DMSG_LNK_ERROR,
1612                                              NULL, NULL);
1613                         /*state->txcmd |= DMSGF_DELETE;*/
1614                         msg->state = state;
1615                         msg->any.head.error = error;
1616                         msg->any.head.msgid = state->msgid;
1617                         msg->any.head.circuit = state->parent->msgid;
1618                         msg->any.head.cmd |= DMSGF_ABORT |
1619                                              DMSGF_DELETE;
1620                         if ((state->parent->flags &
1621                              DMSG_STATE_OPPOSITE) == 0) {
1622                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1623                         }
1624                 }
1625         } else {
1626                 /*
1627                  * Active local transactions are still present.
1628                  * Simulate the other end sending us a DELETE.
1629                  */
1630                 if (state->rxcmd & DMSGF_DELETE) {
1631                         fprintf(stderr,
1632                                 "iocom: ioq error(wr) %d sleeping "
1633                                 "state %p rxcmd %08x txcmd %08x "
1634                                 "func %p\n",
1635                                 error, state, state->rxcmd,
1636                                 state->txcmd, state->func);
1637                         usleep(100000); /* XXX */
1638                         atomic_set_int(&iocom->flags,
1639                                        DMSG_IOCOMF_RWORK);
1640                 } else {
1641                         fprintf(stderr, "SIMULATE ERROR1\n");
1642                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1643                                              DMSG_LNK_ERROR,
1644                                              NULL, NULL);
1645                         msg->state = state;
1646                         msg->any.head.error = error;
1647                         msg->any.head.msgid = state->msgid;
1648                         msg->any.head.circuit = state->parent->msgid;
1649                         msg->any.head.cmd |= DMSGF_ABORT |
1650                                              DMSGF_DELETE |
1651                                              DMSGF_REVTRANS |
1652                                              DMSGF_REPLY;
1653                         if ((state->parent->flags &
1654                              DMSG_STATE_OPPOSITE) == 0) {
1655                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1656                         }
1657                         if ((state->rxcmd & DMSGF_CREATE) == 0)
1658                                 msg->any.head.cmd |= DMSGF_CREATE;
1659                 }
1660         }
1661         if (msg) {
1662                 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1663                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1664         }
1665 }
1666
1667 /*
1668  * This is a shortcut to formulate a reply to msg with a simple error code,
1669  * It can reply to and terminate a transaction, or it can reply to a one-way
1670  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1671  * the error code (which can be 0).  Not all transactions are terminated
1672  * with DMSG_LNK_ERROR status (the low level only cares about the
1673  * MSGF_DELETE flag), but most are.
1674  *
1675  * Replies to one-way messages are a bit of an oxymoron but the feature
1676  * is used by the debug (DBG) protocol.
1677  *
1678  * The reply contains no extended data.
1679  */
1680 void
1681 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1682 {
1683         dmsg_state_t *state = msg->state;
1684         dmsg_msg_t *nmsg;
1685         uint32_t cmd;
1686
1687         /*
1688          * Reply with a simple error code and terminate the transaction.
1689          */
1690         cmd = DMSG_LNK_ERROR;
1691
1692         /*
1693          * Check if our direction has even been initiated yet, set CREATE.
1694          *
1695          * Check what direction this is (command or reply direction).  Note
1696          * that txcmd might not have been initiated yet.
1697          *
1698          * If our direction has already been closed we just return without
1699          * doing anything.
1700          */
1701         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1702                 if (state->txcmd & DMSGF_DELETE)
1703                         return;
1704                 if (state->txcmd & DMSGF_REPLY)
1705                         cmd |= DMSGF_REPLY;
1706                 cmd |= DMSGF_DELETE;
1707         } else {
1708                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1709                         cmd |= DMSGF_REPLY;
1710         }
1711
1712         /*
1713          * Allocate the message and associate it with the existing state.
1714          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1715          * allocate new state.  We have our state already.
1716          */
1717         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1718         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1719                 if ((state->txcmd & DMSGF_CREATE) == 0)
1720                         nmsg->any.head.cmd |= DMSGF_CREATE;
1721         }
1722         nmsg->any.head.error = error;
1723
1724         dmsg_msg_write(nmsg);
1725 }
1726
1727 /*
1728  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1729  * we are generating a streaming reply or an intermediate acknowledgement
1730  * of some sort as part of the higher level protocol, with more to come
1731  * later.
1732  */
1733 void
1734 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1735 {
1736         dmsg_state_t *state = msg->state;
1737         dmsg_msg_t *nmsg;
1738         uint32_t cmd;
1739
1740
1741         /*
1742          * Reply with a simple error code and terminate the transaction.
1743          */
1744         cmd = DMSG_LNK_ERROR;
1745
1746         /*
1747          * Check if our direction has even been initiated yet, set CREATE.
1748          *
1749          * Check what direction this is (command or reply direction).  Note
1750          * that txcmd might not have been initiated yet.
1751          *
1752          * If our direction has already been closed we just return without
1753          * doing anything.
1754          */
1755         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1756                 if (state->txcmd & DMSGF_DELETE)
1757                         return;
1758                 if (state->txcmd & DMSGF_REPLY)
1759                         cmd |= DMSGF_REPLY;
1760                 /* continuing transaction, do not set MSGF_DELETE */
1761         } else {
1762                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1763                         cmd |= DMSGF_REPLY;
1764         }
1765         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1766         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1767                 if ((state->txcmd & DMSGF_CREATE) == 0)
1768                         nmsg->any.head.cmd |= DMSGF_CREATE;
1769         }
1770         nmsg->any.head.error = error;
1771
1772         dmsg_msg_write(nmsg);
1773 }
1774
1775 /*
1776  * Terminate a transaction given a state structure by issuing a DELETE.
1777  * (the state structure must not be &iocom->state0)
1778  */
1779 void
1780 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1781 {
1782         dmsg_msg_t *nmsg;
1783         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1784
1785         /*
1786          * Nothing to do if we already transmitted a delete
1787          */
1788         if (state->txcmd & DMSGF_DELETE)
1789                 return;
1790
1791         /*
1792          * Set REPLY if the other end initiated the command.  Otherwise
1793          * we are the command direction.
1794          */
1795         if (state->txcmd & DMSGF_REPLY)
1796                 cmd |= DMSGF_REPLY;
1797
1798         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1799         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1800                 if ((state->txcmd & DMSGF_CREATE) == 0)
1801                         nmsg->any.head.cmd |= DMSGF_CREATE;
1802         }
1803         nmsg->any.head.error = error;
1804         dmsg_msg_write(nmsg);
1805 }
1806
1807 /*
1808  * Terminate a transaction given a state structure by issuing a DELETE.
1809  * (the state structure must not be &iocom->state0)
1810  */
1811 void
1812 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1813 {
1814         dmsg_msg_t *nmsg;
1815         uint32_t cmd = DMSG_LNK_ERROR;
1816
1817         /*
1818          * Nothing to do if we already transmitted a delete
1819          */
1820         if (state->txcmd & DMSGF_DELETE)
1821                 return;
1822
1823         /*
1824          * Set REPLY if the other end initiated the command.  Otherwise
1825          * we are the command direction.
1826          */
1827         if (state->txcmd & DMSGF_REPLY)
1828                 cmd |= DMSGF_REPLY;
1829
1830         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1831         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1832                 if ((state->txcmd & DMSGF_CREATE) == 0)
1833                         nmsg->any.head.cmd |= DMSGF_CREATE;
1834         }
1835         nmsg->any.head.error = error;
1836         dmsg_msg_write(nmsg);
1837 }
1838
1839 /************************************************************************
1840  *                      TRANSACTION STATE HANDLING                      *
1841  ************************************************************************
1842  *
1843  */
1844
1845 /*
1846  * Process state tracking for a message after reception, prior to execution.
1847  * Possibly route the message (consuming it).
1848  *
1849  * Called with msglk held and the msg dequeued.
1850  *
1851  * All messages are called with dummy state and return actual state.
1852  * (One-off messages often just return the same dummy state).
1853  *
1854  * May request that caller discard the message by setting *discardp to 1.
1855  * The returned state is not used in this case and is allowed to be NULL.
1856  *
1857  * --
1858  *
1859  * These routines handle persistent and command/reply message state via the
1860  * CREATE and DELETE flags.  The first message in a command or reply sequence
1861  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1862  *
1863  * There can be any number of intermediate messages belonging to the same
1864  * sequence sent inbetween the CREATE message and the DELETE message,
1865  * which set neither flag.  This represents a streaming command or reply.
1866  *
1867  * Any command message received with CREATE set expects a reply sequence to
1868  * be returned.  Reply sequences work the same as command sequences except the
1869  * REPLY bit is also sent.  Both the command side and reply side can
1870  * degenerate into a single message with both CREATE and DELETE set.  Note
1871  * that one side can be streaming and the other side not, or neither, or both.
1872  *
1873  * The msgid is unique for the initiator.  That is, two sides sending a new
1874  * message can use the same msgid without colliding.
1875  *
1876  * --
1877  *
1878  * ABORT sequences work by setting the ABORT flag along with normal message
1879  * state.  However, ABORTs can also be sent on half-closed messages, that is
1880  * even if the command or reply side has already sent a DELETE, as long as
1881  * the message has not been fully closed it can still send an ABORT+DELETE
1882  * to terminate the half-closed message state.
1883  *
1884  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1885  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1886  * also race, and in this situation the other side might have already
1887  * initiated a new unrelated command with the same message id.  Since
1888  * the abort has not set the CREATE flag the situation can be detected
1889  * and the message will also be discarded.
1890  *
1891  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1892  * The ABORT request is essentially integrated into the command instead
1893  * of being sent later on.  In this situation the command implementation
1894  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1895  * special-case non-blocking operation for the command.
1896  *
1897  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1898  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1899  *        one-way messages are not supported.
1900  *
1901  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1902  *        simply ignored.
1903  *
1904  * --
1905  *
1906  * One-off messages (no reply expected) are sent without an established
1907  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
1908  * For one-off messages sent over circuits msgid generally MUST be 0.
1909  *
1910  * One-off messages cannot be aborted and typically aren't processed
1911  * by these routines.  Order is still guaranteed for messages sent over
1912  * the same circuit.  The REPLY bit can be used to distinguish whether
1913  * a one-off message is a command or reply.  For example, one-off replies
1914  * will typically just contain status updates.
1915  */
1916 static int
1917 dmsg_state_msgrx(dmsg_msg_t *msg)
1918 {
1919         dmsg_iocom_t *iocom = msg->state->iocom;
1920         dmsg_state_t *state;
1921         dmsg_state_t *pstate;
1922         dmsg_state_t sdummy;
1923         int error;
1924
1925         pthread_mutex_lock(&iocom->mtx);
1926
1927         /*
1928          * Lookup the circuit (pstate).  The circuit will be an open
1929          * transaction.  The REVCIRC bit in the message tells us which side
1930          * initiated it.
1931          */
1932         if (msg->any.head.circuit) {
1933                 sdummy.msgid = msg->any.head.circuit;
1934
1935                 if (msg->any.head.cmd & DMSGF_REVCIRC) {
1936                         pstate = RB_FIND(dmsg_state_tree,
1937                                          &iocom->statewr_tree,
1938                                          &sdummy);
1939                 } else {
1940                         pstate = RB_FIND(dmsg_state_tree,
1941                                          &iocom->staterd_tree,
1942                                          &sdummy);
1943                 }
1944                 if (pstate == NULL) {
1945                         fprintf(stderr,
1946                                 "missing parent in stacked trans %s\n",
1947                                 dmsg_msg_str(msg));
1948                         error = DMSG_IOQ_ERROR_TRANS;
1949                         pthread_mutex_unlock(&iocom->mtx);
1950                         assert(0);
1951                 }
1952         } else {
1953                 pstate = &iocom->state0;
1954         }
1955
1956         /*
1957          * Lookup the msgid.
1958          *
1959          * If received msg is a command state is on staterd_tree.
1960          * If received msg is a reply state is on statewr_tree.
1961          * Otherwise there is no state (retain &iocom->state0)
1962          */
1963         sdummy.msgid = msg->any.head.msgid;
1964         if (msg->any.head.cmd & DMSGF_REVTRANS)
1965                 state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
1966         else
1967                 state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
1968
1969         if (state) {
1970                 /*
1971                  * Message over an existing transaction (CREATE should not
1972                  * be set).
1973                  */
1974                 msg->state = state;
1975                 assert(pstate == state->parent);
1976         } else {
1977                 /*
1978                  * Either a new transaction (if CREATE set) or a one-off.
1979                  */
1980                 state = pstate;
1981         }
1982
1983         pthread_mutex_unlock(&iocom->mtx);
1984
1985         /*
1986          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1987          * inside the case statements.
1988          *
1989          * Construct new state as necessary.
1990          */
1991         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
1992                                     DMSGF_REPLY)) {
1993         case DMSGF_CREATE:
1994         case DMSGF_CREATE | DMSGF_DELETE:
1995                 /*
1996                  * Create new sub-transaction under pstate.
1997                  * (any DELETE is handled in post-processing of msg).
1998                  *
1999                  * (During routing the msgid was made unique for this
2000                  * direction over the comlink, so our RB trees can be
2001                  * iocom-based instead of state-based).
2002                  */
2003                 if (state != pstate) {
2004                         fprintf(stderr,
2005                                 "duplicate transaction %s\n",
2006                                 dmsg_msg_str(msg));
2007                         error = DMSG_IOQ_ERROR_TRANS;
2008                         assert(0);
2009                         break;
2010                 }
2011
2012                 /*
2013                  * Allocate the new state.
2014                  */
2015                 state = malloc(sizeof(*state));
2016                 atomic_add_int(&dmsg_state_count, 1);
2017                 bzero(state, sizeof(*state));
2018                 TAILQ_INIT(&state->subq);
2019                 dmsg_state_hold(pstate);
2020                 state->refs = 1;
2021                 state->parent = pstate;
2022                 state->iocom = iocom;
2023                 state->flags = DMSG_STATE_DYNAMIC |
2024                                DMSG_STATE_OPPOSITE;
2025                 state->msgid = msg->any.head.msgid;
2026                 state->txcmd = DMSGF_REPLY;
2027                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2028                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2029                 msg->state = state;
2030                 pthread_mutex_lock(&iocom->mtx);
2031                 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2032                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2033                 state->flags |= DMSG_STATE_INSERTED;
2034
2035                 /*
2036                  * If the parent is a relay set up the state handler to
2037                  * automatically route the message.  Local processing will
2038                  * not occur if set.
2039                  *
2040                  * (state relays are seeded by SPAN processing)
2041                  */
2042                 if (pstate->relay)
2043                         state->func = dmsg_state_relay;
2044                 pthread_mutex_unlock(&iocom->mtx);
2045                 error = 0;
2046
2047                 if (DMsgDebugOpt) {
2048                         fprintf(stderr,
2049                                 "create state %p id=%08x on iocom staterd %p\n",
2050                                 state, (uint32_t)state->msgid, iocom);
2051                 }
2052                 break;
2053         case DMSGF_DELETE:
2054                 /*
2055                  * Persistent state is expected but might not exist if an
2056                  * ABORT+DELETE races the close.
2057                  *
2058                  * (any DELETE is handled in post-processing of msg).
2059                  */
2060                 if (state == pstate) {
2061                         if (msg->any.head.cmd & DMSGF_ABORT) {
2062                                 error = DMSG_IOQ_ERROR_EALREADY;
2063                         } else {
2064                                 fprintf(stderr, "missing-state %s\n",
2065                                         dmsg_msg_str(msg));
2066                                 error = DMSG_IOQ_ERROR_TRANS;
2067                                 assert(0);
2068                         }
2069                         break;
2070                 }
2071
2072                 /*
2073                  * Handle another ABORT+DELETE case if the msgid has already
2074                  * been reused.
2075                  */
2076                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2077                         if (msg->any.head.cmd & DMSGF_ABORT) {
2078                                 error = DMSG_IOQ_ERROR_EALREADY;
2079                         } else {
2080                                 fprintf(stderr, "reused-state %s\n",
2081                                         dmsg_msg_str(msg));
2082                                 error = DMSG_IOQ_ERROR_TRANS;
2083                                 assert(0);
2084                         }
2085                         break;
2086                 }
2087                 error = 0;
2088                 break;
2089         default:
2090                 /*
2091                  * Check for mid-stream ABORT command received, otherwise
2092                  * allow.
2093                  */
2094                 if (msg->any.head.cmd & DMSGF_ABORT) {
2095                         if ((state == pstate) ||
2096                             (state->rxcmd & DMSGF_CREATE) == 0) {
2097                                 error = DMSG_IOQ_ERROR_EALREADY;
2098                                 break;
2099                         }
2100                 }
2101                 error = 0;
2102                 break;
2103         case DMSGF_REPLY | DMSGF_CREATE:
2104         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2105                 /*
2106                  * When receiving a reply with CREATE set the original
2107                  * persistent state message should already exist.
2108                  */
2109                 if (state == pstate) {
2110                         fprintf(stderr, "no-state(r) %s\n",
2111                                 dmsg_msg_str(msg));
2112                         error = DMSG_IOQ_ERROR_TRANS;
2113                         assert(0);
2114                         break;
2115                 }
2116                 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2117                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2118                 error = 0;
2119                 break;
2120         case DMSGF_REPLY | DMSGF_DELETE:
2121                 /*
2122                  * Received REPLY+ABORT+DELETE in case where msgid has
2123                  * already been fully closed, ignore the message.
2124                  */
2125                 if (state == pstate) {
2126                         if (msg->any.head.cmd & DMSGF_ABORT) {
2127                                 error = DMSG_IOQ_ERROR_EALREADY;
2128                         } else {
2129                                 fprintf(stderr, "no-state(r,d) %s\n",
2130                                         dmsg_msg_str(msg));
2131                                 error = DMSG_IOQ_ERROR_TRANS;
2132                                 assert(0);
2133                         }
2134                         break;
2135                 }
2136
2137                 /*
2138                  * Received REPLY+ABORT+DELETE in case where msgid has
2139                  * already been reused for an unrelated message,
2140                  * ignore the message.
2141                  */
2142                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2143                         if (msg->any.head.cmd & DMSGF_ABORT) {
2144                                 error = DMSG_IOQ_ERROR_EALREADY;
2145                         } else {
2146                                 fprintf(stderr, "reused-state(r,d) %s\n",
2147                                         dmsg_msg_str(msg));
2148                                 error = DMSG_IOQ_ERROR_TRANS;
2149                                 assert(0);
2150                         }
2151                         break;
2152                 }
2153                 error = 0;
2154                 break;
2155         case DMSGF_REPLY:
2156                 /*
2157                  * Check for mid-stream ABORT reply received to sent command.
2158                  */
2159                 if (msg->any.head.cmd & DMSGF_ABORT) {
2160                         if (state == pstate ||
2161                             (state->rxcmd & DMSGF_CREATE) == 0) {
2162                                 error = DMSG_IOQ_ERROR_EALREADY;
2163                                 break;
2164                         }
2165                 }
2166                 error = 0;
2167                 break;
2168         }
2169
2170         /*
2171          * Calculate the easy-switch() transactional command.  Represents
2172          * the outer-transaction command for any transaction-create or
2173          * transaction-delete, and the inner message command for any
2174          * non-transaction or inside-transaction command.  tcmd will be
2175          * set to 0 for any messaging error condition.
2176          *
2177          * The two can be told apart because outer-transaction commands
2178          * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2179          */
2180         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2181                 if ((state->flags & DMSG_STATE_ROOT) == 0) {
2182                         msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2183                                     (msg->any.head.cmd & (DMSGF_CREATE |
2184                                                           DMSGF_DELETE |
2185                                                           DMSGF_REPLY));
2186                 } else {
2187                         msg->tcmd = 0;
2188                 }
2189         } else {
2190                 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2191         }
2192         return (error);
2193 }
2194
2195 /*
2196  * Route the message and handle pair-state processing.
2197  */
2198 void
2199 dmsg_state_relay(dmsg_msg_t *lmsg)
2200 {
2201         dmsg_state_t *lpstate;
2202         dmsg_state_t *rpstate;
2203         dmsg_state_t *lstate;
2204         dmsg_state_t *rstate;
2205         dmsg_msg_t *rmsg;
2206
2207         if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2208             DMSGF_CREATE) {
2209                 /*
2210                  * New sub-transaction, establish new state and relay.
2211                  */
2212                 lstate = lmsg->state;
2213                 lpstate = lstate->parent;
2214                 rpstate = lpstate->relay;
2215                 assert(lstate->relay == NULL);
2216                 assert(rpstate != NULL);
2217
2218                 rmsg = dmsg_msg_alloc(rpstate,
2219                                       lmsg->aux_size,
2220                                       lmsg->any.head.cmd,
2221                                       dmsg_state_relay, NULL);
2222                 rstate = rmsg->state;
2223                 rstate->relay = lstate;
2224                 lstate->relay = rstate;
2225                 dmsg_state_hold(lstate);
2226                 dmsg_state_hold(rstate);
2227         } else {
2228                 /*
2229                  * State & relay already established
2230                  */
2231                 lstate = lmsg->state;
2232                 rstate = lstate->relay;
2233                 assert(rstate != NULL);
2234
2235                 rmsg = dmsg_msg_alloc(rstate,
2236                                       lmsg->aux_size,
2237                                       lmsg->any.head.cmd,
2238                                       dmsg_state_relay, NULL);
2239         }
2240         if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2241                 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2242                       lmsg->hdr_size - sizeof(lmsg->any.head));
2243         }
2244         rmsg->any.head.error = lmsg->any.head.error;
2245         rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2246         rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2247         rmsg->aux_data = lmsg->aux_data;
2248         lmsg->aux_data = NULL;
2249         /*
2250         fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2251         */
2252         dmsg_msg_write(rmsg);
2253 }
2254
2255 /*
2256  * Cleanup and retire msg after processing
2257  */
2258 void
2259 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2260 {
2261         dmsg_state_t *state;
2262         dmsg_state_t *pstate;
2263
2264         assert(msg->state->iocom == iocom);
2265         state = msg->state;
2266         if (state->flags & DMSG_STATE_ROOT) {
2267                 /*
2268                  * Free a non-transactional message, there is no state
2269                  * to worry about.
2270                  */
2271                 dmsg_msg_free(msg);
2272         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2273                 /*
2274                  * Message terminating transaction, destroy the related
2275                  * state, the original message, and this message (if it
2276                  * isn't the original message due to a CREATE|DELETE).
2277                  *
2278                  * It's possible for governing state to terminate while
2279                  * sub-transactions still exist.  This is allowed but
2280                  * will cause sub-transactions to recursively fail.
2281                  * Further reception of sub-transaction messages will be
2282                  * impossible because the circuit will no longer exist.
2283                  * (XXX need code to make sure that happens properly).
2284                  */
2285                 pthread_mutex_lock(&iocom->mtx);
2286                 state->rxcmd |= DMSGF_DELETE;
2287
2288                 if (state->txcmd & DMSGF_DELETE) {
2289                         assert(state->flags & DMSG_STATE_INSERTED);
2290                         if (state->rxcmd & DMSGF_REPLY) {
2291                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2292                                 RB_REMOVE(dmsg_state_tree,
2293                                           &iocom->statewr_tree, state);
2294                         } else {
2295                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2296                                 RB_REMOVE(dmsg_state_tree,
2297                                           &iocom->staterd_tree, state);
2298                         }
2299                         pstate = state->parent;
2300                         TAILQ_REMOVE(&pstate->subq, state, entry);
2301                         state->flags &= ~DMSG_STATE_INSERTED;
2302                         state->parent = NULL;
2303                         dmsg_state_drop(pstate);
2304
2305                         if (state->relay) {
2306                                 dmsg_state_drop(state->relay);
2307                                 state->relay = NULL;
2308                         }
2309                         dmsg_msg_free(msg);
2310                         dmsg_state_drop(state);
2311                 } else {
2312                         dmsg_msg_free(msg);
2313                 }
2314                 pthread_mutex_unlock(&iocom->mtx);
2315         } else {
2316                 /*
2317                  * Message not terminating transaction, leave state intact
2318                  * and free message if it isn't the CREATE message.
2319                  */
2320                 dmsg_msg_free(msg);
2321         }
2322 }
2323
2324 /*
2325  * Clean up the state after pulling out needed fields and queueing the
2326  * message for transmission.   This occurs in dmsg_msg_write().
2327  */
2328 static void
2329 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2330 {
2331         dmsg_state_t *state;
2332         dmsg_state_t *pstate;
2333
2334         assert(iocom == msg->state->iocom);
2335         state = msg->state;
2336         if (state->flags & DMSG_STATE_ROOT) {
2337                 ;
2338         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2339                 /*
2340                  * Message terminating transaction, destroy the related
2341                  * state, the original message, and this message (if it
2342                  * isn't the original message due to a CREATE|DELETE).
2343                  *
2344                  * It's possible for governing state to terminate while
2345                  * sub-transactions still exist.  This is allowed but
2346                  * will cause sub-transactions to recursively fail.
2347                  * Further reception of sub-transaction messages will be
2348                  * impossible because the circuit will no longer exist.
2349                  * (XXX need code to make sure that happens properly).
2350                  */
2351                 pthread_mutex_lock(&iocom->mtx);
2352                 assert((state->txcmd & DMSGF_DELETE) == 0);
2353                 state->txcmd |= DMSGF_DELETE;
2354                 if (state->rxcmd & DMSGF_DELETE) {
2355                         assert(state->flags & DMSG_STATE_INSERTED);
2356                         if (state->txcmd & DMSGF_REPLY) {
2357                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2358                                 RB_REMOVE(dmsg_state_tree,
2359                                           &iocom->staterd_tree, state);
2360                         } else {
2361                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2362                                 RB_REMOVE(dmsg_state_tree,
2363                                           &iocom->statewr_tree, state);
2364                         }
2365                         pstate = state->parent;
2366                         TAILQ_REMOVE(&pstate->subq, state, entry);
2367                         state->flags &= ~DMSG_STATE_INSERTED;
2368                         state->parent = NULL;
2369                         dmsg_state_drop(pstate);
2370
2371                         if (state->relay) {
2372                                 dmsg_state_drop(state->relay);
2373                                 state->relay = NULL;
2374                         }
2375                         dmsg_state_drop(state); /* usually the last drop */
2376                 }
2377                 pthread_mutex_unlock(&iocom->mtx);
2378         }
2379 }
2380
2381 /*
2382  * Called with or without locks
2383  */
2384 void
2385 dmsg_state_hold(dmsg_state_t *state)
2386 {
2387         atomic_add_int(&state->refs, 1);
2388 }
2389
2390 void
2391 dmsg_state_drop(dmsg_state_t *state)
2392 {
2393         if (atomic_fetchadd_int(&state->refs, -1) == 1)
2394                 dmsg_state_free(state);
2395 }
2396
2397 /*
2398  * Called with iocom locked
2399  */
2400 static void
2401 dmsg_state_free(dmsg_state_t *state)
2402 {
2403         atomic_add_int(&dmsg_state_count, -1);
2404         if (DMsgDebugOpt) {
2405                 fprintf(stderr, "terminate state %p id=%08x\n",
2406                         state, (uint32_t)state->msgid);
2407         }
2408         assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_INSERTED)) == 0);
2409         assert(TAILQ_EMPTY(&state->subq));
2410         assert(state->refs == 0);
2411         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2412                 closefrom(3);
2413         assert(state->any.any == NULL);
2414         free(state);
2415 }
2416
2417 /*
2418  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2419  * header is not adjusted, just the core header.
2420  */
2421 void
2422 dmsg_bswap_head(dmsg_hdr_t *head)
2423 {
2424         head->magic     = bswap16(head->magic);
2425         head->reserved02 = bswap16(head->reserved02);
2426         head->salt      = bswap32(head->salt);
2427
2428         head->msgid     = bswap64(head->msgid);
2429         head->circuit   = bswap64(head->circuit);
2430         head->reserved18= bswap64(head->reserved18);
2431
2432         head->cmd       = bswap32(head->cmd);
2433         head->aux_crc   = bswap32(head->aux_crc);
2434         head->aux_bytes = bswap32(head->aux_bytes);
2435         head->error     = bswap32(head->error);
2436         head->aux_descr = bswap64(head->aux_descr);
2437         head->reserved38= bswap32(head->reserved38);
2438         head->hdr_crc   = bswap32(head->hdr_crc);
2439 }