libdmsg - Fix memory leak
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2015 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39 int dmsg_state_count;
40
41 static int dmsg_state_msgrx(dmsg_msg_t *msg);
42 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
43 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
44 static void dmsg_state_free(dmsg_state_t *state);
45 static void dmsg_msg_simulate_failure(dmsg_state_t *state, int error);
46
47 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
48
49 /*
50  * STATE TREE - Represents open transactions which are indexed by their
51  *              { msgid } relative to the governing iocom.
52  */
53 int
54 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
55 {
56         if (state1->msgid < state2->msgid)
57                 return(-1);
58         if (state1->msgid > state2->msgid)
59                 return(1);
60         return(0);
61 }
62
63 /*
64  * Initialize a low-level ioq
65  */
66 void
67 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
68 {
69         bzero(ioq, sizeof(*ioq));
70         ioq->state = DMSG_MSGQ_STATE_HEADER1;
71         TAILQ_INIT(&ioq->msgq);
72 }
73
74 /*
75  * Cleanup queue.
76  *
77  * caller holds iocom->mtx.
78  */
79 void
80 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
81 {
82         dmsg_msg_t *msg;
83
84         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
85                 assert(0);      /* shouldn't happen */
86                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
87                 dmsg_msg_free(msg);
88         }
89         if ((msg = ioq->msg) != NULL) {
90                 ioq->msg = NULL;
91                 dmsg_msg_free(msg);
92         }
93 }
94
95 /*
96  * Initialize a low-level communications channel.
97  *
98  * NOTE: The signal_func() is called at least once from the loop and can be
99  *       re-armed via dmsg_iocom_restate().
100  */
101 void
102 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
103                    void (*signal_func)(dmsg_iocom_t *iocom),
104                    void (*rcvmsg_func)(dmsg_msg_t *msg),
105                    void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
106                    void (*altmsg_func)(dmsg_iocom_t *iocom))
107 {
108         struct stat st;
109
110         bzero(iocom, sizeof(*iocom));
111
112         asprintf(&iocom->label, "iocom-%p", iocom);
113         iocom->signal_callback = signal_func;
114         iocom->rcvmsg_callback = rcvmsg_func;
115         iocom->altmsg_callback = altmsg_func;
116         iocom->usrmsg_callback = usrmsg_func;
117
118         pthread_mutex_init(&iocom->mtx, NULL);
119         RB_INIT(&iocom->staterd_tree);
120         RB_INIT(&iocom->statewr_tree);
121         TAILQ_INIT(&iocom->freeq);
122         TAILQ_INIT(&iocom->freeq_aux);
123         TAILQ_INIT(&iocom->txmsgq);
124         iocom->sock_fd = sock_fd;
125         iocom->alt_fd = alt_fd;
126         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
127         if (signal_func)
128                 iocom->flags |= DMSG_IOCOMF_SWORK;
129         dmsg_ioq_init(iocom, &iocom->ioq_rx);
130         dmsg_ioq_init(iocom, &iocom->ioq_tx);
131         iocom->state0.refs = 1;         /* should never trigger a free */
132         iocom->state0.iocom = iocom;
133         iocom->state0.parent = &iocom->state0;
134         iocom->state0.flags = DMSG_STATE_ROOT;
135         TAILQ_INIT(&iocom->state0.subq);
136
137         if (pipe(iocom->wakeupfds) < 0)
138                 assert(0);
139         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
140         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
141
142         /*
143          * Negotiate session crypto synchronously.  This will mark the
144          * connection as error'd if it fails.  If this is a pipe it's
145          * a linkage that we set up ourselves to the filesystem and there
146          * is no crypto.
147          */
148         if (fstat(sock_fd, &st) < 0)
149                 assert(0);
150         if (S_ISSOCK(st.st_mode))
151                 dmsg_crypto_negotiate(iocom);
152
153         /*
154          * Make sure our fds are set to non-blocking for the iocom core.
155          */
156         if (sock_fd >= 0)
157                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
158 #if 0
159         /* if line buffered our single fgets() should be fine */
160         if (alt_fd >= 0)
161                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
162 #endif
163 }
164
165 void
166 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
167 {
168         va_list va;
169         char *optr;
170
171         va_start(va, ctl);
172         optr = iocom->label;
173         vasprintf(&iocom->label, ctl, va);
174         va_end(va);
175         if (optr)
176                 free(optr);
177 }
178
179 /*
180  * May only be called from a callback from iocom_core.
181  *
182  * Adjust state machine functions, set flags to guarantee that both
183  * the recevmsg_func and the sendmsg_func is called at least once.
184  */
185 void
186 dmsg_iocom_restate(dmsg_iocom_t *iocom,
187                    void (*signal_func)(dmsg_iocom_t *),
188                    void (*rcvmsg_func)(dmsg_msg_t *msg))
189 {
190         pthread_mutex_lock(&iocom->mtx);
191         iocom->signal_callback = signal_func;
192         iocom->rcvmsg_callback = rcvmsg_func;
193         if (signal_func)
194                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
195         else
196                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
197         pthread_mutex_unlock(&iocom->mtx);
198 }
199
200 void
201 dmsg_iocom_signal(dmsg_iocom_t *iocom)
202 {
203         pthread_mutex_lock(&iocom->mtx);
204         if (iocom->signal_callback)
205                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206         pthread_mutex_unlock(&iocom->mtx);
207 }
208
209 /*
210  * Cleanup a terminating iocom.
211  *
212  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
213  * from all possible references to it.
214  */
215 void
216 dmsg_iocom_done(dmsg_iocom_t *iocom)
217 {
218         dmsg_msg_t *msg;
219
220         if (iocom->sock_fd >= 0) {
221                 close(iocom->sock_fd);
222                 iocom->sock_fd = -1;
223         }
224         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
225                 close(iocom->alt_fd);
226                 iocom->alt_fd = -1;
227         }
228         dmsg_ioq_done(iocom, &iocom->ioq_rx);
229         dmsg_ioq_done(iocom, &iocom->ioq_tx);
230         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
231                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
232                 free(msg);
233         }
234         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
235                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
236                 free(msg->aux_data);
237                 msg->aux_data = NULL;
238                 free(msg);
239         }
240         if (iocom->wakeupfds[0] >= 0) {
241                 close(iocom->wakeupfds[0]);
242                 iocom->wakeupfds[0] = -1;
243         }
244         if (iocom->wakeupfds[1] >= 0) {
245                 close(iocom->wakeupfds[1]);
246                 iocom->wakeupfds[1] = -1;
247         }
248         pthread_mutex_destroy(&iocom->mtx);
249 }
250
251 /*
252  * Allocate a new message using the specified transaction state.
253  *
254  * If CREATE is set a new transaction is allocated relative to the passed-in
255  * transaction (the 'state' argument becomes pstate).
256  *
257  * If CREATE is not set the message is associated with the passed-in
258  * transaction.
259  */
260 dmsg_msg_t *
261 dmsg_msg_alloc(dmsg_state_t *state,
262                size_t aux_size, uint32_t cmd,
263                void (*func)(dmsg_msg_t *), void *data)
264 {
265         dmsg_iocom_t *iocom = state->iocom;
266         dmsg_msg_t *msg;
267
268         pthread_mutex_lock(&iocom->mtx);
269         msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
270         pthread_mutex_unlock(&iocom->mtx);
271
272         return msg;
273 }
274
275 dmsg_msg_t *
276 dmsg_msg_alloc_locked(dmsg_state_t *state,
277                size_t aux_size, uint32_t cmd,
278                void (*func)(dmsg_msg_t *), void *data)
279 {
280         dmsg_iocom_t *iocom = state->iocom;
281         dmsg_state_t *pstate;
282         dmsg_msg_t *msg;
283         int hbytes;
284         size_t aligned_size;
285
286 #if 0
287         if (aux_size) {
288                 aligned_size = DMSG_DOALIGN(aux_size);
289                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
290                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
291         } else {
292                 aligned_size = 0;
293                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
294                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
295         }
296 #endif
297         aligned_size = DMSG_DOALIGN(aux_size);
298         msg = NULL;
299         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
300                 /*
301                  * When CREATE is set without REPLY the caller is
302                  * initiating a new transaction stacked under the specified
303                  * circuit.
304                  *
305                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
306                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
307                  */
308                 pstate = state;
309                 state = malloc(sizeof(*state));
310                 atomic_add_int(&dmsg_state_count, 1);
311                 bzero(state, sizeof(*state));
312                 TAILQ_INIT(&state->subq);
313                 dmsg_state_hold(pstate);
314                 state->refs = 1;
315                 state->parent = pstate;
316                 state->iocom = iocom;
317                 state->flags = DMSG_STATE_DYNAMIC;
318                 state->msgid = (uint64_t)(uintptr_t)state;
319                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
320                 state->rxcmd = DMSGF_REPLY;
321                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
322                 state->func = func;
323                 state->any.any = data;
324
325                 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
326                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
327                 state->flags |= DMSG_STATE_INSERTED;
328
329                 if (DMsgDebugOpt) {
330                         fprintf(stderr,
331                                 "create state %p id=%08x on iocom statewr %p\n",
332                                 state, (uint32_t)state->msgid, iocom);
333                 }
334         } else {
335                 /*
336                  * Otherwise the message is transmitted over the existing
337                  * open transaction.
338                  */
339                 pstate = state->parent;
340         }
341
342         /* XXX SMP race for state */
343         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
344         if (msg == NULL) {
345                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
346                 bzero(msg, offsetof(struct dmsg_msg, any.head));
347                 *(int *)((char *)msg +
348                          offsetof(struct dmsg_msg, any.head) + hbytes) =
349                                  0x71B2C3D4;
350                 if (DMsgDebugOpt) {
351                         fprintf(stderr,
352                                 "allo msg %p id=%08x on iocom %p\n",
353                                 msg, (int)msg->any.head.msgid, iocom);
354                 }
355 #if 0
356                 msg = malloc(sizeof(*msg));
357                 bzero(msg, sizeof(*msg));
358 #endif
359         }
360
361         /*
362          * [re]allocate the auxillary data buffer.  The caller knows that
363          * a size-aligned buffer will be allocated but we do not want to
364          * force the caller to zero any tail piece, so we do that ourself.
365          */
366         if (msg->aux_size != aux_size) {
367                 if (msg->aux_data) {
368                         free(msg->aux_data);
369                         msg->aux_data = NULL;
370                         msg->aux_size = 0;
371                 }
372                 if (aux_size) {
373                         msg->aux_data = malloc(aligned_size);
374                         msg->aux_size = aux_size;
375                         if (aux_size != aligned_size) {
376                                 bzero(msg->aux_data + aux_size,
377                                       aligned_size - aux_size);
378                         }
379                 }
380         }
381
382         /*
383          * Set REVTRANS if the transaction was remotely initiated
384          * Set REVCIRC if the circuit was remotely initiated
385          */
386         if (state->flags & DMSG_STATE_OPPOSITE)
387                 cmd |= DMSGF_REVTRANS;
388         if (pstate->flags & DMSG_STATE_OPPOSITE)
389                 cmd |= DMSGF_REVCIRC;
390
391         /*
392          * Finish filling out the header.
393          */
394         if (hbytes)
395                 bzero(&msg->any.head, hbytes);
396         msg->hdr_size = hbytes;
397         msg->any.head.magic = DMSG_HDR_MAGIC;
398         msg->any.head.cmd = cmd;
399         msg->any.head.aux_descr = 0;
400         msg->any.head.aux_crc = 0;
401         msg->any.head.msgid = state->msgid;
402         msg->any.head.circuit = pstate->msgid;
403         msg->state = state;
404
405         return (msg);
406 }
407
408 /*
409  * Free a message so it can be reused afresh.
410  *
411  * NOTE: aux_size can be 0 with a non-NULL aux_data.
412  */
413 static
414 void
415 dmsg_msg_free_locked(dmsg_msg_t *msg)
416 {
417         /*dmsg_iocom_t *iocom = msg->iocom;*/
418
419         if (DMsgDebugOpt) {
420                 fprintf(stderr,
421                         "free msg %p id=%08x on (aux %p)\n",
422                         msg, (int)msg->any.head.msgid, msg->aux_data);
423         }
424 #if 1
425         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
426         if (*(int *)((char *)msg +
427                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
428              0x71B2C3D4) {
429                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
430                 assert(0);
431         }
432 #endif
433         msg->state = NULL;      /* safety */
434         if (msg->aux_data) {
435                 free(msg->aux_data);
436                 msg->aux_data = NULL;
437         }
438         msg->aux_size = 0;
439         free (msg);
440 #if 0
441         if (msg->aux_data)
442                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
443         else
444                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
445 #endif
446 }
447
448 void
449 dmsg_msg_free(dmsg_msg_t *msg)
450 {
451         dmsg_iocom_t *iocom = msg->state->iocom;
452
453         pthread_mutex_lock(&iocom->mtx);
454         dmsg_msg_free_locked(msg);
455         pthread_mutex_unlock(&iocom->mtx);
456 }
457
458 /*
459  * I/O core loop for an iocom.
460  *
461  * Thread localized, iocom->mtx not held.
462  */
463 void
464 dmsg_iocom_core(dmsg_iocom_t *iocom)
465 {
466         struct pollfd fds[3];
467         char dummybuf[256];
468         dmsg_msg_t *msg;
469         int timeout;
470         int count;
471         int wi; /* wakeup pipe */
472         int si; /* socket */
473         int ai; /* alt bulk path socket */
474
475         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
476                 /*
477                  * These iocom->flags are only manipulated within the
478                  * context of the current thread.  However, modifications
479                  * still require atomic ops.
480                  */
481                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
482                                      DMSG_IOCOMF_WWORK |
483                                      DMSG_IOCOMF_PWORK |
484                                      DMSG_IOCOMF_SWORK |
485                                      DMSG_IOCOMF_ARWORK |
486                                      DMSG_IOCOMF_AWWORK)) == 0) {
487                         /*
488                          * Only poll if no immediate work is pending.
489                          * Otherwise we are just wasting our time calling
490                          * poll.
491                          */
492                         timeout = 5000;
493
494                         count = 0;
495                         wi = -1;
496                         si = -1;
497                         ai = -1;
498
499                         /*
500                          * Always check the inter-thread pipe, e.g.
501                          * for iocom->txmsgq work.
502                          */
503                         wi = count++;
504                         fds[wi].fd = iocom->wakeupfds[0];
505                         fds[wi].events = POLLIN;
506                         fds[wi].revents = 0;
507
508                         /*
509                          * Check the socket input/output direction as
510                          * requested
511                          */
512                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
513                                             DMSG_IOCOMF_WREQ)) {
514                                 si = count++;
515                                 fds[si].fd = iocom->sock_fd;
516                                 fds[si].events = 0;
517                                 fds[si].revents = 0;
518
519                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
520                                         fds[si].events |= POLLIN;
521                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
522                                         fds[si].events |= POLLOUT;
523                         }
524
525                         /*
526                          * Check the alternative fd for work.
527                          */
528                         if (iocom->alt_fd >= 0) {
529                                 ai = count++;
530                                 fds[ai].fd = iocom->alt_fd;
531                                 fds[ai].events = POLLIN;
532                                 fds[ai].revents = 0;
533                         }
534                         poll(fds, count, timeout);
535
536                         if (wi >= 0 && (fds[wi].revents & POLLIN))
537                                 atomic_set_int(&iocom->flags,
538                                                DMSG_IOCOMF_PWORK);
539                         if (si >= 0 && (fds[si].revents & POLLIN))
540                                 atomic_set_int(&iocom->flags,
541                                                DMSG_IOCOMF_RWORK);
542                         if (si >= 0 && (fds[si].revents & POLLOUT))
543                                 atomic_set_int(&iocom->flags,
544                                                DMSG_IOCOMF_WWORK);
545                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
546                                 atomic_set_int(&iocom->flags,
547                                                DMSG_IOCOMF_WWORK);
548                         if (ai >= 0 && (fds[ai].revents & POLLIN))
549                                 atomic_set_int(&iocom->flags,
550                                                DMSG_IOCOMF_ARWORK);
551                 } else {
552                         /*
553                          * Always check the pipe
554                          */
555                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
556                 }
557
558                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
559                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
560                         iocom->signal_callback(iocom);
561                 }
562
563                 /*
564                  * Pending message queues from other threads wake us up
565                  * with a write to the wakeupfds[] pipe.  We have to clear
566                  * the pipe with a dummy read.
567                  */
568                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
569                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
570                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
571                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
572                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
573                         if (TAILQ_FIRST(&iocom->txmsgq))
574                                 dmsg_iocom_flush1(iocom);
575                 }
576
577                 /*
578                  * Message write sequencing
579                  */
580                 if (iocom->flags & DMSG_IOCOMF_WWORK)
581                         dmsg_iocom_flush1(iocom);
582
583                 /*
584                  * Message read sequencing.  Run this after the write
585                  * sequencing in case the write sequencing allowed another
586                  * auto-DELETE to occur on the read side.
587                  */
588                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
589                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
590                                (msg = dmsg_ioq_read(iocom)) != NULL) {
591                                 if (DMsgDebugOpt) {
592                                         fprintf(stderr, "receive %s\n",
593                                                 dmsg_msg_str(msg));
594                                 }
595                                 iocom->rcvmsg_callback(msg);
596                                 dmsg_state_cleanuprx(iocom, msg);
597                         }
598                 }
599
600                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
601                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
602                         iocom->altmsg_callback(iocom);
603                 }
604         }
605 }
606
607 /*
608  * Make sure there's enough room in the FIFO to hold the
609  * needed data.
610  *
611  * Assume worst case encrypted form is 2x the size of the
612  * plaintext equivalent.
613  */
614 static
615 size_t
616 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
617 {
618         size_t bytes;
619         size_t nmax;
620
621         bytes = ioq->fifo_cdx - ioq->fifo_beg;
622         nmax = sizeof(ioq->buf) - ioq->fifo_end;
623         if (bytes + nmax / 2 < needed) {
624                 if (bytes) {
625                         bcopy(ioq->buf + ioq->fifo_beg,
626                               ioq->buf,
627                               bytes);
628                 }
629                 ioq->fifo_cdx -= ioq->fifo_beg;
630                 ioq->fifo_beg = 0;
631                 if (ioq->fifo_cdn < ioq->fifo_end) {
632                         bcopy(ioq->buf + ioq->fifo_cdn,
633                               ioq->buf + ioq->fifo_cdx,
634                               ioq->fifo_end - ioq->fifo_cdn);
635                 }
636                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
637                 ioq->fifo_cdn = ioq->fifo_cdx;
638                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
639         }
640         return(nmax);
641 }
642
643 /*
644  * Read the next ready message from the ioq, issuing I/O if needed.
645  * Caller should retry on a read-event when NULL is returned.
646  *
647  * If an error occurs during reception a DMSG_LNK_ERROR msg will
648  * be returned for each open transaction, then the ioq and iocom
649  * will be errored out and a non-transactional DMSG_LNK_ERROR
650  * msg will be returned as the final message.  The caller should not call
651  * us again after the final message is returned.
652  *
653  * Thread localized, iocom->mtx not held.
654  */
655 dmsg_msg_t *
656 dmsg_ioq_read(dmsg_iocom_t *iocom)
657 {
658         dmsg_ioq_t *ioq = &iocom->ioq_rx;
659         dmsg_msg_t *msg;
660         dmsg_state_t *state;
661         dmsg_hdr_t *head;
662         ssize_t n;
663         size_t bytes;
664         size_t nmax;
665         uint32_t aux_size;
666         uint32_t xcrc32;
667         int error;
668
669 again:
670         /*
671          * If a message is already pending we can just remove and
672          * return it.  Message state has already been processed.
673          * (currently not implemented)
674          */
675         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
676                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
677                 return (msg);
678         }
679         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
680
681         /*
682          * If the stream is errored out we stop processing it.
683          */
684         if (ioq->error)
685                 goto skip;
686
687         /*
688          * Message read in-progress (msg is NULL at the moment).  We don't
689          * allocate a msg until we have its core header.
690          */
691         nmax = sizeof(ioq->buf) - ioq->fifo_end;
692         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
693         msg = ioq->msg;
694
695         switch(ioq->state) {
696         case DMSG_MSGQ_STATE_HEADER1:
697                 /*
698                  * Load the primary header, fail on any non-trivial read
699                  * error or on EOF.  Since the primary header is the same
700                  * size is the message alignment it will never straddle
701                  * the end of the buffer.
702                  */
703                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
704                 if (bytes < sizeof(msg->any.head)) {
705                         n = read(iocom->sock_fd,
706                                  ioq->buf + ioq->fifo_end,
707                                  nmax);
708                         if (n <= 0) {
709                                 if (n == 0) {
710                                         ioq->error = DMSG_IOQ_ERROR_EOF;
711                                         break;
712                                 }
713                                 if (errno != EINTR &&
714                                     errno != EINPROGRESS &&
715                                     errno != EAGAIN) {
716                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
717                                         break;
718                                 }
719                                 n = 0;
720                                 /* fall through */
721                         }
722                         ioq->fifo_end += (size_t)n;
723                         nmax -= (size_t)n;
724                 }
725
726                 /*
727                  * Decrypt data received so far.  Data will be decrypted
728                  * in-place but might create gaps in the FIFO.  Partial
729                  * blocks are not immediately decrypted.
730                  *
731                  * WARNING!  The header might be in the wrong endian, we
732                  *           do not fix it up until we get the entire
733                  *           extended header.
734                  */
735                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
736                         dmsg_crypto_decrypt(iocom, ioq);
737                 } else {
738                         ioq->fifo_cdx = ioq->fifo_end;
739                         ioq->fifo_cdn = ioq->fifo_end;
740                 }
741                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
742
743                 /*
744                  * Insufficient data accumulated (msg is NULL, caller will
745                  * retry on event).
746                  */
747                 assert(msg == NULL);
748                 if (bytes < sizeof(msg->any.head))
749                         break;
750
751                 /*
752                  * Check and fixup the core header.  Note that the icrc
753                  * has to be calculated before any fixups, but the crc
754                  * fields in the msg may have to be swapped like everything
755                  * else.
756                  */
757                 head = (void *)(ioq->buf + ioq->fifo_beg);
758                 if (head->magic != DMSG_HDR_MAGIC &&
759                     head->magic != DMSG_HDR_MAGIC_REV) {
760                         fprintf(stderr, "%s: head->magic is bad %02x\n",
761                                 iocom->label, head->magic);
762                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
763                                 fprintf(stderr, "(on encrypted link)\n");
764                         ioq->error = DMSG_IOQ_ERROR_SYNC;
765                         break;
766                 }
767
768                 /*
769                  * Calculate the full header size and aux data size
770                  */
771                 if (head->magic == DMSG_HDR_MAGIC_REV) {
772                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
773                                       DMSG_ALIGN;
774                         aux_size = bswap32(head->aux_bytes);
775                 } else {
776                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
777                                       DMSG_ALIGN;
778                         aux_size = head->aux_bytes;
779                 }
780                 ioq->abytes = DMSG_DOALIGN(aux_size);
781                 ioq->unaligned_aux_size = aux_size;
782                 if (ioq->hbytes < sizeof(msg->any.head) ||
783                     ioq->hbytes > sizeof(msg->any) ||
784                     ioq->abytes > DMSG_AUX_MAX) {
785                         ioq->error = DMSG_IOQ_ERROR_FIELD;
786                         break;
787                 }
788
789                 /*
790                  * Allocate the message, the next state will fill it in.
791                  *
792                  * NOTE: The aux_data buffer will be sized to an aligned
793                  *       value and the aligned remainder zero'd for
794                  *       convenience.
795                  *
796                  * NOTE: Supply dummy state and a degenerate cmd without
797                  *       CREATE set.  The message will temporarily be
798                  *       associated with state0 until later post-processing.
799                  */
800                 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
801                                      ioq->hbytes / DMSG_ALIGN,
802                                      NULL, NULL);
803                 ioq->msg = msg;
804
805                 /*
806                  * Fall through to the next state.  Make sure that the
807                  * extended header does not straddle the end of the buffer.
808                  * We still want to issue larger reads into our buffer,
809                  * book-keeping is easier if we don't bcopy() yet.
810                  *
811                  * Make sure there is enough room for bloated encrypt data.
812                  */
813                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
814                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
815                 /* fall through */
816         case DMSG_MSGQ_STATE_HEADER2:
817                 /*
818                  * Fill out the extended header.
819                  */
820                 assert(msg != NULL);
821                 if (bytes < ioq->hbytes) {
822                         n = read(iocom->sock_fd,
823                                  ioq->buf + ioq->fifo_end,
824                                  nmax);
825                         if (n <= 0) {
826                                 if (n == 0) {
827                                         ioq->error = DMSG_IOQ_ERROR_EOF;
828                                         break;
829                                 }
830                                 if (errno != EINTR &&
831                                     errno != EINPROGRESS &&
832                                     errno != EAGAIN) {
833                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
834                                         break;
835                                 }
836                                 n = 0;
837                                 /* fall through */
838                         }
839                         ioq->fifo_end += (size_t)n;
840                         nmax -= (size_t)n;
841                 }
842
843                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
844                         dmsg_crypto_decrypt(iocom, ioq);
845                 } else {
846                         ioq->fifo_cdx = ioq->fifo_end;
847                         ioq->fifo_cdn = ioq->fifo_end;
848                 }
849                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
850
851                 /*
852                  * Insufficient data accumulated (set msg NULL so caller will
853                  * retry on event).
854                  */
855                 if (bytes < ioq->hbytes) {
856                         msg = NULL;
857                         break;
858                 }
859
860                 /*
861                  * Calculate the extended header, decrypt data received
862                  * so far.  Handle endian-conversion for the entire extended
863                  * header.
864                  */
865                 head = (void *)(ioq->buf + ioq->fifo_beg);
866
867                 /*
868                  * Check the CRC.
869                  */
870                 if (head->magic == DMSG_HDR_MAGIC_REV)
871                         xcrc32 = bswap32(head->hdr_crc);
872                 else
873                         xcrc32 = head->hdr_crc;
874                 head->hdr_crc = 0;
875                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
876                         ioq->error = DMSG_IOQ_ERROR_XCRC;
877                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
878                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
879                                 dmsg_msg_str(msg));
880                         assert(0);
881                         break;
882                 }
883                 head->hdr_crc = xcrc32;
884
885                 if (head->magic == DMSG_HDR_MAGIC_REV) {
886                         dmsg_bswap_head(head);
887                 }
888
889                 /*
890                  * Copy the extended header into the msg and adjust the
891                  * FIFO.
892                  */
893                 bcopy(head, &msg->any, ioq->hbytes);
894
895                 /*
896                  * We are either done or we fall-through.
897                  */
898                 if (ioq->abytes == 0) {
899                         ioq->fifo_beg += ioq->hbytes;
900                         break;
901                 }
902
903                 /*
904                  * Must adjust bytes (and the state) when falling through.
905                  * nmax doesn't change.
906                  */
907                 ioq->fifo_beg += ioq->hbytes;
908                 bytes -= ioq->hbytes;
909                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
910                 /* fall through */
911         case DMSG_MSGQ_STATE_AUXDATA1:
912                 /*
913                  * Copy the partial or complete [decrypted] payload from
914                  * remaining bytes in the FIFO in order to optimize the
915                  * makeroom call in the AUXDATA2 state.  We have to
916                  * fall-through either way so we can check the crc.
917                  *
918                  * msg->aux_size tracks our aux data.
919                  *
920                  * (Lets not complicate matters if the data is encrypted,
921                  *  since the data in-stream is not the same size as the
922                  *  data decrypted).
923                  */
924                 if (bytes >= ioq->abytes) {
925                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
926                               ioq->abytes);
927                         msg->aux_size = ioq->abytes;
928                         ioq->fifo_beg += ioq->abytes;
929                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
930                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
931                         bytes -= ioq->abytes;
932                 } else if (bytes) {
933                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
934                               bytes);
935                         msg->aux_size = bytes;
936                         ioq->fifo_beg += bytes;
937                         if (ioq->fifo_cdx < ioq->fifo_beg)
938                                 ioq->fifo_cdx = ioq->fifo_beg;
939                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
940                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
941                         bytes = 0;
942                 } else {
943                         msg->aux_size = 0;
944                 }
945                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
946                 /* fall through */
947         case DMSG_MSGQ_STATE_AUXDATA2:
948                 /*
949                  * Make sure there is enough room for more data.
950                  */
951                 assert(msg);
952                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
953
954                 /*
955                  * Read and decrypt more of the payload.
956                  */
957                 if (msg->aux_size < ioq->abytes) {
958                         assert(bytes == 0);
959                         n = read(iocom->sock_fd,
960                                  ioq->buf + ioq->fifo_end,
961                                  nmax);
962                         if (n <= 0) {
963                                 if (n == 0) {
964                                         ioq->error = DMSG_IOQ_ERROR_EOF;
965                                         break;
966                                 }
967                                 if (errno != EINTR &&
968                                     errno != EINPROGRESS &&
969                                     errno != EAGAIN) {
970                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
971                                         break;
972                                 }
973                                 n = 0;
974                                 /* fall through */
975                         }
976                         ioq->fifo_end += (size_t)n;
977                         nmax -= (size_t)n;
978                 }
979
980                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
981                         dmsg_crypto_decrypt(iocom, ioq);
982                 } else {
983                         ioq->fifo_cdx = ioq->fifo_end;
984                         ioq->fifo_cdn = ioq->fifo_end;
985                 }
986                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
987
988                 if (bytes > ioq->abytes - msg->aux_size)
989                         bytes = ioq->abytes - msg->aux_size;
990
991                 if (bytes) {
992                         bcopy(ioq->buf + ioq->fifo_beg,
993                               msg->aux_data + msg->aux_size,
994                               bytes);
995                         msg->aux_size += bytes;
996                         ioq->fifo_beg += bytes;
997                 }
998
999                 /*
1000                  * Insufficient data accumulated (set msg NULL so caller will
1001                  * retry on event).
1002                  *
1003                  * Assert the auxillary data size is correct, then record the
1004                  * original unaligned size from the message header.
1005                  */
1006                 if (msg->aux_size < ioq->abytes) {
1007                         msg = NULL;
1008                         break;
1009                 }
1010                 assert(msg->aux_size == ioq->abytes);
1011                 msg->aux_size = ioq->unaligned_aux_size;
1012
1013                 /*
1014                  * Check aux_crc, then we are done.  Note that the crc
1015                  * is calculated over the aligned size, not the actual
1016                  * size.
1017                  */
1018                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
1019                 if (xcrc32 != msg->any.head.aux_crc) {
1020                         ioq->error = DMSG_IOQ_ERROR_ACRC;
1021                         fprintf(stderr,
1022                                 "iocom: ACRC error %08x vs %08x "
1023                                 "msgid %016jx msgcmd %08x auxsize %d\n",
1024                                 xcrc32,
1025                                 msg->any.head.aux_crc,
1026                                 (intmax_t)msg->any.head.msgid,
1027                                 msg->any.head.cmd,
1028                                 msg->any.head.aux_bytes);
1029                         break;
1030                 }
1031                 break;
1032         case DMSG_MSGQ_STATE_ERROR:
1033                 /*
1034                  * Continued calls to drain recorded transactions (returning
1035                  * a LNK_ERROR for each one), before we return the final
1036                  * LNK_ERROR.
1037                  */
1038                 assert(msg == NULL);
1039                 break;
1040         default:
1041                 /*
1042                  * We don't double-return errors, the caller should not
1043                  * have called us again after getting an error msg.
1044                  */
1045                 assert(0);
1046                 break;
1047         }
1048
1049         /*
1050          * Check the message sequence.  The iv[] should prevent any
1051          * possibility of a replay but we add this check anyway.
1052          */
1053         if (msg && ioq->error == 0) {
1054                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1055                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1056                 } else {
1057                         ++ioq->seq;
1058                 }
1059         }
1060
1061         /*
1062          * Handle error, RREQ, or completion
1063          *
1064          * NOTE: nmax and bytes are invalid at this point, we don't bother
1065          *       to update them when breaking out.
1066          */
1067         if (ioq->error) {
1068                 dmsg_state_t *tmp_state;
1069 skip:
1070                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1071                 /*
1072                  * An unrecoverable error causes all active receive
1073                  * transactions to be terminated with a LNK_ERROR message.
1074                  *
1075                  * Once all active transactions are exhausted we set the
1076                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1077                  * message, which should cause master processing loops to
1078                  * terminate.
1079                  */
1080                 assert(ioq->msg == msg);
1081                 if (msg) {
1082                         dmsg_msg_free(msg);
1083                         ioq->msg = NULL;
1084                         msg = NULL;
1085                 }
1086
1087                 /*
1088                  * No more I/O read processing
1089                  */
1090                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1091
1092                 /*
1093                  * Simulate a remote LNK_ERROR DELETE msg for any open
1094                  * transactions, ending with a final non-transactional
1095                  * LNK_ERROR (that the session can detect) when no
1096                  * transactions remain.
1097                  *
1098                  * NOTE: Temporarily supply state0 and a degenerate cmd
1099                  *       without CREATE set.  The real state will be
1100                  *       assigned in the loop.
1101                  *
1102                  * NOTE: We are simulating a received message using our
1103                  *       side of the state, so the DMSGF_REV* bits have
1104                  *       to be reversed.
1105                  */
1106                 pthread_mutex_lock(&iocom->mtx);
1107                 dmsg_iocom_drain(iocom);
1108
1109                 tmp_state = NULL;
1110                 RB_FOREACH(state, dmsg_state_tree, &iocom->staterd_tree) {
1111                         atomic_set_int(&state->flags, DMSG_STATE_DYING);
1112                         if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1113                                 tmp_state = state;
1114                 }
1115                 RB_FOREACH(state, dmsg_state_tree, &iocom->statewr_tree) {
1116                         atomic_set_int(&state->flags, DMSG_STATE_DYING);
1117                         if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1118                                 tmp_state = state;
1119                 }
1120
1121                 if (tmp_state) {
1122                         dmsg_msg_simulate_failure(tmp_state, ioq->error);
1123                 } else {
1124                         dmsg_msg_simulate_failure(&iocom->state0, ioq->error);
1125                 }
1126                 pthread_mutex_unlock(&iocom->mtx);
1127                 if (TAILQ_FIRST(&ioq->msgq))
1128                         goto again;
1129
1130 #if 0
1131                 /*
1132                  * For the iocom error case we want to set RWORK to indicate
1133                  * that more messages might be pending.
1134                  *
1135                  * It is possible to return NULL when there is more work to
1136                  * do because each message has to be DELETEd in both
1137                  * directions before we continue on with the next (though
1138                  * this could be optimized).  The transmit direction will
1139                  * re-set RWORK.
1140                  */
1141                 if (msg)
1142                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1143 #endif
1144         } else if (msg == NULL) {
1145                 /*
1146                  * Insufficient data received to finish building the message,
1147                  * set RREQ and return NULL.
1148                  *
1149                  * Leave ioq->msg intact.
1150                  * Leave the FIFO intact.
1151                  */
1152                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1153         } else {
1154                 /*
1155                  * Continue processing msg.
1156                  *
1157                  * The fifo has already been advanced past the message.
1158                  * Trivially reset the FIFO indices if possible.
1159                  *
1160                  * clear the FIFO if it is now empty and set RREQ to wait
1161                  * for more from the socket.  If the FIFO is not empty set
1162                  * TWORK to bypass the poll so we loop immediately.
1163                  */
1164                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1165                     ioq->fifo_cdn == ioq->fifo_end) {
1166                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1167                         ioq->fifo_cdx = 0;
1168                         ioq->fifo_cdn = 0;
1169                         ioq->fifo_beg = 0;
1170                         ioq->fifo_end = 0;
1171                 } else {
1172                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1173                 }
1174                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1175                 ioq->msg = NULL;
1176
1177                 /*
1178                  * Handle message routing.  Validates non-zero sources
1179                  * and routes message.  Error will be 0 if the message is
1180                  * destined for us.
1181                  *
1182                  * State processing only occurs for messages destined for us.
1183                  */
1184                 if (DMsgDebugOpt >= 5) {
1185                         fprintf(stderr,
1186                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1187                                 msg->any.head.cmd,
1188                                 (intmax_t)msg->any.head.msgid,
1189                                 (intmax_t)msg->any.head.circuit);
1190                 }
1191                 error = dmsg_state_msgrx(msg);
1192
1193                 if (error) {
1194                         /*
1195                          * Abort-after-closure, throw message away and
1196                          * start reading another.
1197                          */
1198                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1199                                 dmsg_msg_free(msg);
1200                                 goto again;
1201                         }
1202
1203                         /*
1204                          * Process real error and throw away message.
1205                          */
1206                         ioq->error = error;
1207                         goto skip;
1208                 }
1209                 /* no error, not routed.  Fall through and return msg */
1210         }
1211         return (msg);
1212 }
1213
1214 /*
1215  * Calculate the header and data crc's and write a low-level message to
1216  * the connection.  If aux_crc is non-zero the aux_data crc is already
1217  * assumed to have been set.
1218  *
1219  * A non-NULL msg is added to the queue but not necessarily flushed.
1220  * Calling this function with msg == NULL will get a flush going.
1221  *
1222  * (called from iocom_core only)
1223  */
1224 void
1225 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1226 {
1227         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1228         dmsg_msg_t *msg;
1229         uint32_t xcrc32;
1230         size_t hbytes;
1231         size_t abytes;
1232         dmsg_msg_queue_t tmpq;
1233
1234         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1235         TAILQ_INIT(&tmpq);
1236         pthread_mutex_lock(&iocom->mtx);
1237         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1238                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1239                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1240         }
1241         pthread_mutex_unlock(&iocom->mtx);
1242
1243         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1244                 /*
1245                  * Process terminal connection errors.
1246                  */
1247                 TAILQ_REMOVE(&tmpq, msg, qentry);
1248                 if (ioq->error) {
1249                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1250                         ++ioq->msgcount;
1251                         continue;
1252                 }
1253
1254                 /*
1255                  * Finish populating the msg fields.  The salt ensures that
1256                  * the iv[] array is ridiculously randomized and we also
1257                  * re-seed our PRNG every 32768 messages just to be sure.
1258                  */
1259                 msg->any.head.magic = DMSG_HDR_MAGIC;
1260                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1261                 ++ioq->seq;
1262                 if ((ioq->seq & 32767) == 0)
1263                         srandomdev();
1264
1265                 /*
1266                  * Calculate aux_crc if 0, then calculate hdr_crc.
1267                  */
1268                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1269                         abytes = DMSG_DOALIGN(msg->aux_size);
1270                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1271                         msg->any.head.aux_crc = xcrc32;
1272                 }
1273                 msg->any.head.aux_bytes = msg->aux_size;
1274
1275                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1276                          DMSG_ALIGN;
1277                 msg->any.head.hdr_crc = 0;
1278                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1279
1280                 /*
1281                  * Enqueue the message (the flush codes handles stream
1282                  * encryption).
1283                  */
1284                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1285                 ++ioq->msgcount;
1286         }
1287         dmsg_iocom_flush2(iocom);
1288 }
1289
1290 /*
1291  * Thread localized, iocom->mtx not held by caller.
1292  *
1293  * (called from iocom_core via iocom_flush1 only)
1294  */
1295 void
1296 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1297 {
1298         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1299         dmsg_msg_t *msg;
1300         ssize_t n;
1301         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1302         size_t nact;
1303         size_t hbytes;
1304         size_t abytes;
1305         size_t hoff;
1306         size_t aoff;
1307         int iovcnt;
1308
1309         if (ioq->error) {
1310                 dmsg_iocom_drain(iocom);
1311                 return;
1312         }
1313
1314         /*
1315          * Pump messages out the connection by building an iovec.
1316          *
1317          * ioq->hbytes/ioq->abytes tracks how much of the first message
1318          * in the queue has been successfully written out, so we can
1319          * resume writing.
1320          */
1321         iovcnt = 0;
1322         nact = 0;
1323         hoff = ioq->hbytes;
1324         aoff = ioq->abytes;
1325
1326         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1327                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1328                          DMSG_ALIGN;
1329                 abytes = DMSG_DOALIGN(msg->aux_size);
1330                 assert(hoff <= hbytes && aoff <= abytes);
1331
1332                 if (hoff < hbytes) {
1333                         size_t maxlen = hbytes - hoff;
1334                         if (maxlen > sizeof(ioq->buf) / 2)
1335                                 maxlen = sizeof(ioq->buf) / 2;
1336                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1337                         iov[iovcnt].iov_len = maxlen;
1338                         nact += maxlen;
1339                         ++iovcnt;
1340                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1341                             maxlen != hbytes - hoff) {
1342                                 break;
1343                         }
1344                 }
1345                 if (aoff < abytes) {
1346                         size_t maxlen = abytes - aoff;
1347                         if (maxlen > sizeof(ioq->buf) / 2)
1348                                 maxlen = sizeof(ioq->buf) / 2;
1349
1350                         assert(msg->aux_data != NULL);
1351                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1352                         iov[iovcnt].iov_len = maxlen;
1353                         nact += maxlen;
1354                         ++iovcnt;
1355                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1356                             maxlen != abytes - aoff) {
1357                                 break;
1358                         }
1359                 }
1360                 hoff = 0;
1361                 aoff = 0;
1362         }
1363         if (iovcnt == 0)
1364                 return;
1365
1366         /*
1367          * Encrypt and write the data.  The crypto code will move the
1368          * data into the fifo and adjust the iov as necessary.  If
1369          * encryption is disabled the iov is left alone.
1370          *
1371          * May return a smaller iov (thus a smaller n), with aggregated
1372          * chunks.  May reduce nmax to what fits in the FIFO.
1373          *
1374          * This function sets nact to the number of original bytes now
1375          * encrypted, adding to the FIFO some number of bytes that might
1376          * be greater depending on the crypto mechanic.  iov[] is adjusted
1377          * to point at the FIFO if necessary.
1378          *
1379          * NOTE: nact is the number of bytes eaten from the message.  For
1380          *       encrypted data this is the number of bytes processed for
1381          *       encryption and not necessarily the number of bytes writable.
1382          *       The return value from the writev() is the post-encrypted
1383          *       byte count which might be larger.
1384          *
1385          * NOTE: For direct writes, nact is the return value from the writev().
1386          */
1387         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1388                 /*
1389                  * Make sure the FIFO has a reasonable amount of space
1390                  * left (if not completely full).
1391                  *
1392                  * In this situation we are staging the encrypted message
1393                  * data in the FIFO.  (nact) represents how much plaintext
1394                  * has been staged, (n) represents how much encrypted data
1395                  * has been flushed.  The two are independent of each other.
1396                  */
1397                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1398                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1399                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1400                               ioq->fifo_end - ioq->fifo_beg);
1401                         ioq->fifo_cdx -= ioq->fifo_beg;
1402                         ioq->fifo_cdn -= ioq->fifo_beg;
1403                         ioq->fifo_end -= ioq->fifo_beg;
1404                         ioq->fifo_beg = 0;
1405                 }
1406
1407                 /* 
1408                  * beg .... cdx ............ cdn ............. end
1409                  * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1410                  *
1411                  * Advance fifo_beg on a successful write.
1412                  */
1413                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1414                 n = writev(iocom->sock_fd, iov, iovcnt);
1415                 if (n > 0) {
1416                         ioq->fifo_beg += n;
1417                         if (ioq->fifo_beg == ioq->fifo_end) {
1418                                 ioq->fifo_beg = 0;
1419                                 ioq->fifo_cdn = 0;
1420                                 ioq->fifo_cdx = 0;
1421                                 ioq->fifo_end = 0;
1422                         }
1423                 }
1424                 /*
1425                  * We don't mess with the nact returned by the crypto_encrypt
1426                  * call, which represents the filling of the FIFO.  (n) tells
1427                  * us how much we were able to write from the FIFO.  The two
1428                  * are different beasts when encrypting.
1429                  */
1430         } else {
1431                 /*
1432                  * In this situation we are not staging the messages to the
1433                  * FIFO but instead writing them directly from the msg
1434                  * structure(s) unencrypted, so (nact) is basically (n).
1435                  */
1436                 n = writev(iocom->sock_fd, iov, iovcnt);
1437                 if (n > 0)
1438                         nact = n;
1439                 else
1440                         nact = 0;
1441         }
1442
1443         /*
1444          * Clean out the transmit queue based on what we successfully
1445          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1446          * represents the portion of the first message previously sent.
1447          */
1448         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1449                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1450                          DMSG_ALIGN;
1451                 abytes = DMSG_DOALIGN(msg->aux_size);
1452
1453                 if ((size_t)nact < hbytes - ioq->hbytes) {
1454                         ioq->hbytes += nact;
1455                         nact = 0;
1456                         break;
1457                 }
1458                 nact -= hbytes - ioq->hbytes;
1459                 ioq->hbytes = hbytes;
1460                 if ((size_t)nact < abytes - ioq->abytes) {
1461                         ioq->abytes += nact;
1462                         nact = 0;
1463                         break;
1464                 }
1465                 nact -= abytes - ioq->abytes;
1466                 /* ioq->abytes = abytes; optimized out */
1467
1468 #if 0
1469                 fprintf(stderr,
1470                         "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1471                         msg->any.head.cmd,
1472                         (intmax_t)msg->any.head.msgid,
1473                         (intmax_t)msg->any.head.circuit);
1474 #endif
1475
1476                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1477                 --ioq->msgcount;
1478                 ioq->hbytes = 0;
1479                 ioq->abytes = 0;
1480                 dmsg_msg_free(msg);
1481         }
1482         assert(nact == 0);
1483
1484         /*
1485          * Process the return value from the write w/regards to blocking.
1486          */
1487         if (n < 0) {
1488                 if (errno != EINTR &&
1489                     errno != EINPROGRESS &&
1490                     errno != EAGAIN) {
1491                         /*
1492                          * Fatal write error
1493                          */
1494                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1495                         dmsg_iocom_drain(iocom);
1496                 } else {
1497                         /*
1498                          * Wait for socket buffer space
1499                          */
1500                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1501                 }
1502         } else {
1503                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1504         }
1505         if (ioq->error) {
1506                 dmsg_iocom_drain(iocom);
1507         }
1508 }
1509
1510 /*
1511  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1512  * write events will occur.  We don't kill read msgs because we want
1513  * the caller to pull off our contrived terminal error msg to detect
1514  * the connection failure.
1515  *
1516  * Localized to iocom_core thread, iocom->mtx not held by caller.
1517  */
1518 void
1519 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1520 {
1521         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1522         dmsg_msg_t *msg;
1523
1524         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1525         ioq->hbytes = 0;
1526         ioq->abytes = 0;
1527
1528         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1529                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1530                 --ioq->msgcount;
1531                 dmsg_msg_free(msg);
1532         }
1533 }
1534
1535 /*
1536  * Write a message to an iocom, with additional state processing.
1537  */
1538 void
1539 dmsg_msg_write(dmsg_msg_t *msg)
1540 {
1541         dmsg_iocom_t *iocom = msg->state->iocom;
1542         dmsg_state_t *state;
1543         char dummy;
1544
1545         pthread_mutex_lock(&iocom->mtx);
1546         state = msg->state;
1547
1548         /*
1549          * Make sure the parent transaction is still open in the transmit
1550          * direction.  If it isn't the message is dead and we have to
1551          * potentially simulate a rxmsg terminating the transaction.
1552          */
1553         if (state->parent->txcmd & DMSGF_DELETE) {
1554                 fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1555                 dmsg_msg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1556                 dmsg_state_cleanuptx(iocom, msg);
1557                 dmsg_msg_free(msg);
1558                 pthread_mutex_unlock(&iocom->mtx);
1559                 return;
1560         }
1561
1562         /*
1563          * Process state data into the message as needed, then update the
1564          * state based on the message.
1565          */
1566         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1567                 /*
1568                  * Existing transaction (could be reply).  It is also
1569                  * possible for this to be the first reply (CREATE is set),
1570                  * in which case we populate state->txcmd.
1571                  *
1572                  * state->txcmd is adjusted to hold the final message cmd,
1573                  * and we also be sure to set the CREATE bit here.  We did
1574                  * not set it in dmsg_msg_alloc() because that would have
1575                  * not been serialized (state could have gotten ripped out
1576                  * from under the message prior to it being transmitted).
1577                  */
1578                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1579                     DMSGF_CREATE) {
1580                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1581                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1582                 }
1583                 msg->any.head.msgid = state->msgid;
1584
1585                 if (msg->any.head.cmd & DMSGF_CREATE) {
1586                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1587                 }
1588         }
1589         dmsg_state_cleanuptx(iocom, msg);
1590
1591 #if 0
1592         fprintf(stderr,
1593                 "MSGWRITE %016jx %08x\n",
1594                 msg->any.head.msgid, msg->any.head.cmd);
1595 #endif
1596
1597         /*
1598          * Queue it for output, wake up the I/O pthread.  Note that the
1599          * I/O thread is responsible for generating the CRCs and encryption.
1600          */
1601         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1602         dummy = 0;
1603         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1604         pthread_mutex_unlock(&iocom->mtx);
1605 }
1606
1607 /*
1608  * iocom->mtx must be held by caller.
1609  */
1610 static
1611 void
1612 dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
1613 {
1614         dmsg_iocom_t *iocom = state->iocom;
1615         dmsg_msg_t *msg;
1616
1617         msg = NULL;
1618
1619         if (state == &iocom->state0) {
1620                 /*
1621                  * No active local or remote transactions remain.
1622                  * Generate a final LNK_ERROR and flag EOF.
1623                  */
1624                 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1625                                             DMSG_LNK_ERROR,
1626                                             NULL, NULL);
1627                 msg->any.head.error = error;
1628                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1629                 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1630         } else if (state->flags & DMSG_STATE_OPPOSITE) {
1631                 /*
1632                  * Active remote transactions are still present.
1633                  * Simulate the other end sending us a DELETE.
1634                  */
1635                 if (state->rxcmd & DMSGF_DELETE) {
1636                         fprintf(stderr,
1637                                 "iocom: ioq error(rd) %d sleeping "
1638                                 "state %p rxcmd %08x txcmd %08x "
1639                                 "func %p\n",
1640                                 error, state, state->rxcmd,
1641                                 state->txcmd, state->func);
1642                         usleep(100000); /* XXX */
1643                         atomic_set_int(&iocom->flags,
1644                                        DMSG_IOCOMF_RWORK);
1645                 } else {
1646                         fprintf(stderr, "SIMULATE ERROR1\n");
1647                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1648                                              DMSG_LNK_ERROR,
1649                                              NULL, NULL);
1650                         /*state->txcmd |= DMSGF_DELETE;*/
1651                         msg->state = state;
1652                         msg->any.head.error = error;
1653                         msg->any.head.msgid = state->msgid;
1654                         msg->any.head.circuit = state->parent->msgid;
1655                         msg->any.head.cmd |= DMSGF_ABORT |
1656                                              DMSGF_DELETE;
1657                         if ((state->parent->flags &
1658                              DMSG_STATE_OPPOSITE) == 0) {
1659                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1660                         }
1661                 }
1662         } else {
1663                 /*
1664                  * Active local transactions are still present.
1665                  * Simulate the other end sending us a DELETE.
1666                  */
1667                 if (state->rxcmd & DMSGF_DELETE) {
1668                         fprintf(stderr,
1669                                 "iocom: ioq error(wr) %d sleeping "
1670                                 "state %p rxcmd %08x txcmd %08x "
1671                                 "func %p\n",
1672                                 error, state, state->rxcmd,
1673                                 state->txcmd, state->func);
1674                         usleep(100000); /* XXX */
1675                         atomic_set_int(&iocom->flags,
1676                                        DMSG_IOCOMF_RWORK);
1677                 } else {
1678                         fprintf(stderr, "SIMULATE ERROR1\n");
1679                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1680                                              DMSG_LNK_ERROR,
1681                                              NULL, NULL);
1682                         msg->state = state;
1683                         msg->any.head.error = error;
1684                         msg->any.head.msgid = state->msgid;
1685                         msg->any.head.circuit = state->parent->msgid;
1686                         msg->any.head.cmd |= DMSGF_ABORT |
1687                                              DMSGF_DELETE |
1688                                              DMSGF_REVTRANS |
1689                                              DMSGF_REPLY;
1690                         if ((state->parent->flags &
1691                              DMSG_STATE_OPPOSITE) == 0) {
1692                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1693                         }
1694                         if ((state->rxcmd & DMSGF_CREATE) == 0)
1695                                 msg->any.head.cmd |= DMSGF_CREATE;
1696                 }
1697         }
1698         if (msg) {
1699                 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1700                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1701         }
1702 }
1703
1704 /*
1705  * This is a shortcut to formulate a reply to msg with a simple error code,
1706  * It can reply to and terminate a transaction, or it can reply to a one-way
1707  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1708  * the error code (which can be 0).  Not all transactions are terminated
1709  * with DMSG_LNK_ERROR status (the low level only cares about the
1710  * MSGF_DELETE flag), but most are.
1711  *
1712  * Replies to one-way messages are a bit of an oxymoron but the feature
1713  * is used by the debug (DBG) protocol.
1714  *
1715  * The reply contains no extended data.
1716  */
1717 void
1718 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1719 {
1720         dmsg_state_t *state = msg->state;
1721         dmsg_msg_t *nmsg;
1722         uint32_t cmd;
1723
1724         /*
1725          * Reply with a simple error code and terminate the transaction.
1726          */
1727         cmd = DMSG_LNK_ERROR;
1728
1729         /*
1730          * Check if our direction has even been initiated yet, set CREATE.
1731          *
1732          * Check what direction this is (command or reply direction).  Note
1733          * that txcmd might not have been initiated yet.
1734          *
1735          * If our direction has already been closed we just return without
1736          * doing anything.
1737          */
1738         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1739                 if (state->txcmd & DMSGF_DELETE)
1740                         return;
1741                 if (state->txcmd & DMSGF_REPLY)
1742                         cmd |= DMSGF_REPLY;
1743                 cmd |= DMSGF_DELETE;
1744         } else {
1745                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1746                         cmd |= DMSGF_REPLY;
1747         }
1748
1749         /*
1750          * Allocate the message and associate it with the existing state.
1751          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1752          * allocate new state.  We have our state already.
1753          */
1754         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1755         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1756                 if ((state->txcmd & DMSGF_CREATE) == 0)
1757                         nmsg->any.head.cmd |= DMSGF_CREATE;
1758         }
1759         nmsg->any.head.error = error;
1760
1761         dmsg_msg_write(nmsg);
1762 }
1763
1764 /*
1765  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1766  * we are generating a streaming reply or an intermediate acknowledgement
1767  * of some sort as part of the higher level protocol, with more to come
1768  * later.
1769  */
1770 void
1771 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1772 {
1773         dmsg_state_t *state = msg->state;
1774         dmsg_msg_t *nmsg;
1775         uint32_t cmd;
1776
1777
1778         /*
1779          * Reply with a simple error code and terminate the transaction.
1780          */
1781         cmd = DMSG_LNK_ERROR;
1782
1783         /*
1784          * Check if our direction has even been initiated yet, set CREATE.
1785          *
1786          * Check what direction this is (command or reply direction).  Note
1787          * that txcmd might not have been initiated yet.
1788          *
1789          * If our direction has already been closed we just return without
1790          * doing anything.
1791          */
1792         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1793                 if (state->txcmd & DMSGF_DELETE)
1794                         return;
1795                 if (state->txcmd & DMSGF_REPLY)
1796                         cmd |= DMSGF_REPLY;
1797                 /* continuing transaction, do not set MSGF_DELETE */
1798         } else {
1799                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1800                         cmd |= DMSGF_REPLY;
1801         }
1802         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1803         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1804                 if ((state->txcmd & DMSGF_CREATE) == 0)
1805                         nmsg->any.head.cmd |= DMSGF_CREATE;
1806         }
1807         nmsg->any.head.error = error;
1808
1809         dmsg_msg_write(nmsg);
1810 }
1811
1812 /*
1813  * Terminate a transaction given a state structure by issuing a DELETE.
1814  * (the state structure must not be &iocom->state0)
1815  */
1816 void
1817 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1818 {
1819         dmsg_msg_t *nmsg;
1820         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1821
1822         /*
1823          * Nothing to do if we already transmitted a delete
1824          */
1825         if (state->txcmd & DMSGF_DELETE)
1826                 return;
1827
1828         /*
1829          * Set REPLY if the other end initiated the command.  Otherwise
1830          * we are the command direction.
1831          */
1832         if (state->txcmd & DMSGF_REPLY)
1833                 cmd |= DMSGF_REPLY;
1834
1835         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1836         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1837                 if ((state->txcmd & DMSGF_CREATE) == 0)
1838                         nmsg->any.head.cmd |= DMSGF_CREATE;
1839         }
1840         nmsg->any.head.error = error;
1841         dmsg_msg_write(nmsg);
1842 }
1843
1844 /*
1845  * Terminate a transaction given a state structure by issuing a DELETE.
1846  * (the state structure must not be &iocom->state0)
1847  */
1848 void
1849 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1850 {
1851         dmsg_msg_t *nmsg;
1852         uint32_t cmd = DMSG_LNK_ERROR;
1853
1854         /*
1855          * Nothing to do if we already transmitted a delete
1856          */
1857         if (state->txcmd & DMSGF_DELETE)
1858                 return;
1859
1860         /*
1861          * Set REPLY if the other end initiated the command.  Otherwise
1862          * we are the command direction.
1863          */
1864         if (state->txcmd & DMSGF_REPLY)
1865                 cmd |= DMSGF_REPLY;
1866
1867         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1868         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1869                 if ((state->txcmd & DMSGF_CREATE) == 0)
1870                         nmsg->any.head.cmd |= DMSGF_CREATE;
1871         }
1872         nmsg->any.head.error = error;
1873         dmsg_msg_write(nmsg);
1874 }
1875
1876 /************************************************************************
1877  *                      TRANSACTION STATE HANDLING                      *
1878  ************************************************************************
1879  *
1880  */
1881
1882 /*
1883  * Process state tracking for a message after reception, prior to execution.
1884  * Possibly route the message (consuming it).
1885  *
1886  * Called with msglk held and the msg dequeued.
1887  *
1888  * All messages are called with dummy state and return actual state.
1889  * (One-off messages often just return the same dummy state).
1890  *
1891  * May request that caller discard the message by setting *discardp to 1.
1892  * The returned state is not used in this case and is allowed to be NULL.
1893  *
1894  * --
1895  *
1896  * These routines handle persistent and command/reply message state via the
1897  * CREATE and DELETE flags.  The first message in a command or reply sequence
1898  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1899  *
1900  * There can be any number of intermediate messages belonging to the same
1901  * sequence sent inbetween the CREATE message and the DELETE message,
1902  * which set neither flag.  This represents a streaming command or reply.
1903  *
1904  * Any command message received with CREATE set expects a reply sequence to
1905  * be returned.  Reply sequences work the same as command sequences except the
1906  * REPLY bit is also sent.  Both the command side and reply side can
1907  * degenerate into a single message with both CREATE and DELETE set.  Note
1908  * that one side can be streaming and the other side not, or neither, or both.
1909  *
1910  * The msgid is unique for the initiator.  That is, two sides sending a new
1911  * message can use the same msgid without colliding.
1912  *
1913  * --
1914  *
1915  * ABORT sequences work by setting the ABORT flag along with normal message
1916  * state.  However, ABORTs can also be sent on half-closed messages, that is
1917  * even if the command or reply side has already sent a DELETE, as long as
1918  * the message has not been fully closed it can still send an ABORT+DELETE
1919  * to terminate the half-closed message state.
1920  *
1921  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1922  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1923  * also race, and in this situation the other side might have already
1924  * initiated a new unrelated command with the same message id.  Since
1925  * the abort has not set the CREATE flag the situation can be detected
1926  * and the message will also be discarded.
1927  *
1928  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1929  * The ABORT request is essentially integrated into the command instead
1930  * of being sent later on.  In this situation the command implementation
1931  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1932  * special-case non-blocking operation for the command.
1933  *
1934  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1935  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1936  *        one-way messages are not supported.
1937  *
1938  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1939  *        simply ignored.
1940  *
1941  * --
1942  *
1943  * One-off messages (no reply expected) are sent without an established
1944  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
1945  * For one-off messages sent over circuits msgid generally MUST be 0.
1946  *
1947  * One-off messages cannot be aborted and typically aren't processed
1948  * by these routines.  Order is still guaranteed for messages sent over
1949  * the same circuit.  The REPLY bit can be used to distinguish whether
1950  * a one-off message is a command or reply.  For example, one-off replies
1951  * will typically just contain status updates.
1952  */
1953 static int
1954 dmsg_state_msgrx(dmsg_msg_t *msg)
1955 {
1956         dmsg_iocom_t *iocom = msg->state->iocom;
1957         dmsg_state_t *state;
1958         dmsg_state_t *pstate;
1959         dmsg_state_t sdummy;
1960         int error;
1961
1962         pthread_mutex_lock(&iocom->mtx);
1963
1964         /*
1965          * Lookup the circuit (pstate).  The circuit will be an open
1966          * transaction.  The REVCIRC bit in the message tells us which side
1967          * initiated it.
1968          */
1969         if (msg->any.head.circuit) {
1970                 sdummy.msgid = msg->any.head.circuit;
1971
1972                 if (msg->any.head.cmd & DMSGF_REVCIRC) {
1973                         pstate = RB_FIND(dmsg_state_tree,
1974                                          &iocom->statewr_tree,
1975                                          &sdummy);
1976                 } else {
1977                         pstate = RB_FIND(dmsg_state_tree,
1978                                          &iocom->staterd_tree,
1979                                          &sdummy);
1980                 }
1981                 if (pstate == NULL) {
1982                         fprintf(stderr,
1983                                 "missing parent in stacked trans %s\n",
1984                                 dmsg_msg_str(msg));
1985                         error = DMSG_IOQ_ERROR_TRANS;
1986                         pthread_mutex_unlock(&iocom->mtx);
1987                         assert(0);
1988                 }
1989         } else {
1990                 pstate = &iocom->state0;
1991         }
1992
1993         /*
1994          * Lookup the msgid.
1995          *
1996          * If received msg is a command state is on staterd_tree.
1997          * If received msg is a reply state is on statewr_tree.
1998          * Otherwise there is no state (retain &iocom->state0)
1999          */
2000         sdummy.msgid = msg->any.head.msgid;
2001         if (msg->any.head.cmd & DMSGF_REVTRANS)
2002                 state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
2003         else
2004                 state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
2005
2006         if (state) {
2007                 /*
2008                  * Message over an existing transaction (CREATE should not
2009                  * be set).
2010                  */
2011                 msg->state = state;
2012                 assert(pstate == state->parent);
2013         } else {
2014                 /*
2015                  * Either a new transaction (if CREATE set) or a one-off.
2016                  */
2017                 state = pstate;
2018         }
2019
2020         pthread_mutex_unlock(&iocom->mtx);
2021
2022         /*
2023          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2024          * inside the case statements.
2025          *
2026          * Construct new state as necessary.
2027          */
2028         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2029                                     DMSGF_REPLY)) {
2030         case DMSGF_CREATE:
2031         case DMSGF_CREATE | DMSGF_DELETE:
2032                 /*
2033                  * Create new sub-transaction under pstate.
2034                  * (any DELETE is handled in post-processing of msg).
2035                  *
2036                  * (During routing the msgid was made unique for this
2037                  * direction over the comlink, so our RB trees can be
2038                  * iocom-based instead of state-based).
2039                  */
2040                 if (state != pstate) {
2041                         fprintf(stderr,
2042                                 "duplicate transaction %s\n",
2043                                 dmsg_msg_str(msg));
2044                         error = DMSG_IOQ_ERROR_TRANS;
2045                         assert(0);
2046                         break;
2047                 }
2048
2049                 /*
2050                  * Allocate the new state.
2051                  */
2052                 state = malloc(sizeof(*state));
2053                 atomic_add_int(&dmsg_state_count, 1);
2054                 bzero(state, sizeof(*state));
2055                 TAILQ_INIT(&state->subq);
2056                 dmsg_state_hold(pstate);
2057                 state->refs = 1;
2058                 state->parent = pstate;
2059                 state->iocom = iocom;
2060                 state->flags = DMSG_STATE_DYNAMIC |
2061                                DMSG_STATE_OPPOSITE;
2062                 state->msgid = msg->any.head.msgid;
2063                 state->txcmd = DMSGF_REPLY;
2064                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2065                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2066                 msg->state = state;
2067                 pthread_mutex_lock(&iocom->mtx);
2068                 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2069                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2070                 state->flags |= DMSG_STATE_INSERTED;
2071
2072                 /*
2073                  * If the parent is a relay set up the state handler to
2074                  * automatically route the message.  Local processing will
2075                  * not occur if set.
2076                  *
2077                  * (state relays are seeded by SPAN processing)
2078                  */
2079                 if (pstate->relay)
2080                         state->func = dmsg_state_relay;
2081                 pthread_mutex_unlock(&iocom->mtx);
2082                 error = 0;
2083
2084                 if (DMsgDebugOpt) {
2085                         fprintf(stderr,
2086                                 "create state %p id=%08x on iocom staterd %p\n",
2087                                 state, (uint32_t)state->msgid, iocom);
2088                 }
2089                 break;
2090         case DMSGF_DELETE:
2091                 /*
2092                  * Persistent state is expected but might not exist if an
2093                  * ABORT+DELETE races the close.
2094                  *
2095                  * (any DELETE is handled in post-processing of msg).
2096                  */
2097                 if (state == pstate) {
2098                         if (msg->any.head.cmd & DMSGF_ABORT) {
2099                                 error = DMSG_IOQ_ERROR_EALREADY;
2100                         } else {
2101                                 fprintf(stderr, "missing-state %s\n",
2102                                         dmsg_msg_str(msg));
2103                                 error = DMSG_IOQ_ERROR_TRANS;
2104                                 assert(0);
2105                         }
2106                         break;
2107                 }
2108
2109                 /*
2110                  * Handle another ABORT+DELETE case if the msgid has already
2111                  * been reused.
2112                  */
2113                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2114                         if (msg->any.head.cmd & DMSGF_ABORT) {
2115                                 error = DMSG_IOQ_ERROR_EALREADY;
2116                         } else {
2117                                 fprintf(stderr, "reused-state %s\n",
2118                                         dmsg_msg_str(msg));
2119                                 error = DMSG_IOQ_ERROR_TRANS;
2120                                 assert(0);
2121                         }
2122                         break;
2123                 }
2124                 error = 0;
2125                 break;
2126         default:
2127                 /*
2128                  * Check for mid-stream ABORT command received, otherwise
2129                  * allow.
2130                  */
2131                 if (msg->any.head.cmd & DMSGF_ABORT) {
2132                         if ((state == pstate) ||
2133                             (state->rxcmd & DMSGF_CREATE) == 0) {
2134                                 error = DMSG_IOQ_ERROR_EALREADY;
2135                                 break;
2136                         }
2137                 }
2138                 error = 0;
2139                 break;
2140         case DMSGF_REPLY | DMSGF_CREATE:
2141         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2142                 /*
2143                  * When receiving a reply with CREATE set the original
2144                  * persistent state message should already exist.
2145                  */
2146                 if (state == pstate) {
2147                         fprintf(stderr, "no-state(r) %s\n",
2148                                 dmsg_msg_str(msg));
2149                         error = DMSG_IOQ_ERROR_TRANS;
2150                         assert(0);
2151                         break;
2152                 }
2153                 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2154                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2155                 error = 0;
2156                 break;
2157         case DMSGF_REPLY | DMSGF_DELETE:
2158                 /*
2159                  * Received REPLY+ABORT+DELETE in case where msgid has
2160                  * already been fully closed, ignore the message.
2161                  */
2162                 if (state == pstate) {
2163                         if (msg->any.head.cmd & DMSGF_ABORT) {
2164                                 error = DMSG_IOQ_ERROR_EALREADY;
2165                         } else {
2166                                 fprintf(stderr, "no-state(r,d) %s\n",
2167                                         dmsg_msg_str(msg));
2168                                 error = DMSG_IOQ_ERROR_TRANS;
2169                                 assert(0);
2170                         }
2171                         break;
2172                 }
2173
2174                 /*
2175                  * Received REPLY+ABORT+DELETE in case where msgid has
2176                  * already been reused for an unrelated message,
2177                  * ignore the message.
2178                  */
2179                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2180                         if (msg->any.head.cmd & DMSGF_ABORT) {
2181                                 error = DMSG_IOQ_ERROR_EALREADY;
2182                         } else {
2183                                 fprintf(stderr, "reused-state(r,d) %s\n",
2184                                         dmsg_msg_str(msg));
2185                                 error = DMSG_IOQ_ERROR_TRANS;
2186                                 assert(0);
2187                         }
2188                         break;
2189                 }
2190                 error = 0;
2191                 break;
2192         case DMSGF_REPLY:
2193                 /*
2194                  * Check for mid-stream ABORT reply received to sent command.
2195                  */
2196                 if (msg->any.head.cmd & DMSGF_ABORT) {
2197                         if (state == pstate ||
2198                             (state->rxcmd & DMSGF_CREATE) == 0) {
2199                                 error = DMSG_IOQ_ERROR_EALREADY;
2200                                 break;
2201                         }
2202                 }
2203                 error = 0;
2204                 break;
2205         }
2206
2207         /*
2208          * Calculate the easy-switch() transactional command.  Represents
2209          * the outer-transaction command for any transaction-create or
2210          * transaction-delete, and the inner message command for any
2211          * non-transaction or inside-transaction command.  tcmd will be
2212          * set to 0 for any messaging error condition.
2213          *
2214          * The two can be told apart because outer-transaction commands
2215          * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2216          */
2217         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2218                 if ((state->flags & DMSG_STATE_ROOT) == 0) {
2219                         msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2220                                     (msg->any.head.cmd & (DMSGF_CREATE |
2221                                                           DMSGF_DELETE |
2222                                                           DMSGF_REPLY));
2223                 } else {
2224                         msg->tcmd = 0;
2225                 }
2226         } else {
2227                 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2228         }
2229         return (error);
2230 }
2231
2232 /*
2233  * Route the message and handle pair-state processing.
2234  */
2235 void
2236 dmsg_state_relay(dmsg_msg_t *lmsg)
2237 {
2238         dmsg_state_t *lpstate;
2239         dmsg_state_t *rpstate;
2240         dmsg_state_t *lstate;
2241         dmsg_state_t *rstate;
2242         dmsg_msg_t *rmsg;
2243
2244         if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2245             DMSGF_CREATE) {
2246                 /*
2247                  * New sub-transaction, establish new state and relay.
2248                  */
2249                 lstate = lmsg->state;
2250                 lpstate = lstate->parent;
2251                 rpstate = lpstate->relay;
2252                 assert(lstate->relay == NULL);
2253                 assert(rpstate != NULL);
2254
2255                 rmsg = dmsg_msg_alloc(rpstate, 0,
2256                                       lmsg->any.head.cmd,
2257                                       dmsg_state_relay, NULL);
2258                 rstate = rmsg->state;
2259                 rstate->relay = lstate;
2260                 lstate->relay = rstate;
2261                 dmsg_state_hold(lstate);
2262                 dmsg_state_hold(rstate);
2263         } else {
2264                 /*
2265                  * State & relay already established
2266                  */
2267                 lstate = lmsg->state;
2268                 rstate = lstate->relay;
2269                 assert(rstate != NULL);
2270
2271                 rmsg = dmsg_msg_alloc(rstate, 0,
2272                                       lmsg->any.head.cmd,
2273                                       dmsg_state_relay, NULL);
2274         }
2275         if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2276                 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2277                       lmsg->hdr_size - sizeof(lmsg->any.head));
2278         }
2279         rmsg->any.head.error = lmsg->any.head.error;
2280         rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2281         rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2282         rmsg->aux_size = lmsg->aux_size;
2283         rmsg->aux_data = lmsg->aux_data;
2284         lmsg->aux_data = NULL;
2285         /*
2286         fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2287         */
2288         dmsg_msg_write(rmsg);
2289 }
2290
2291 /*
2292  * Cleanup and retire msg after processing
2293  */
2294 void
2295 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2296 {
2297         dmsg_state_t *state;
2298         dmsg_state_t *pstate;
2299
2300         assert(msg->state->iocom == iocom);
2301         state = msg->state;
2302         if (state->flags & DMSG_STATE_ROOT) {
2303                 /*
2304                  * Free a non-transactional message, there is no state
2305                  * to worry about.
2306                  */
2307                 dmsg_msg_free(msg);
2308         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2309                 /*
2310                  * Message terminating transaction, destroy the related
2311                  * state, the original message, and this message (if it
2312                  * isn't the original message due to a CREATE|DELETE).
2313                  *
2314                  * It's possible for governing state to terminate while
2315                  * sub-transactions still exist.  This is allowed but
2316                  * will cause sub-transactions to recursively fail.
2317                  * Further reception of sub-transaction messages will be
2318                  * impossible because the circuit will no longer exist.
2319                  * (XXX need code to make sure that happens properly).
2320                  */
2321                 pthread_mutex_lock(&iocom->mtx);
2322                 state->rxcmd |= DMSGF_DELETE;
2323
2324                 if (state->txcmd & DMSGF_DELETE) {
2325                         assert(state->flags & DMSG_STATE_INSERTED);
2326                         if (state->rxcmd & DMSGF_REPLY) {
2327                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2328                                 RB_REMOVE(dmsg_state_tree,
2329                                           &iocom->statewr_tree, state);
2330                         } else {
2331                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2332                                 RB_REMOVE(dmsg_state_tree,
2333                                           &iocom->staterd_tree, state);
2334                         }
2335                         pstate = state->parent;
2336                         TAILQ_REMOVE(&pstate->subq, state, entry);
2337                         state->flags &= ~DMSG_STATE_INSERTED;
2338                         state->parent = NULL;
2339                         dmsg_state_drop(pstate);
2340
2341                         if (state->relay) {
2342                                 dmsg_state_drop(state->relay);
2343                                 state->relay = NULL;
2344                         }
2345                         dmsg_msg_free(msg);
2346                         dmsg_state_drop(state);
2347                 } else {
2348                         dmsg_msg_free(msg);
2349                 }
2350                 pthread_mutex_unlock(&iocom->mtx);
2351         } else {
2352                 /*
2353                  * Message not terminating transaction, leave state intact
2354                  * and free message if it isn't the CREATE message.
2355                  */
2356                 dmsg_msg_free(msg);
2357         }
2358 }
2359
2360 /*
2361  * Clean up the state after pulling out needed fields and queueing the
2362  * message for transmission.   This occurs in dmsg_msg_write().
2363  */
2364 static void
2365 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2366 {
2367         dmsg_state_t *state;
2368         dmsg_state_t *pstate;
2369
2370         assert(iocom == msg->state->iocom);
2371         state = msg->state;
2372         if (state->flags & DMSG_STATE_ROOT) {
2373                 ;
2374         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2375                 /*
2376                  * Message terminating transaction, destroy the related
2377                  * state, the original message, and this message (if it
2378                  * isn't the original message due to a CREATE|DELETE).
2379                  *
2380                  * It's possible for governing state to terminate while
2381                  * sub-transactions still exist.  This is allowed but
2382                  * will cause sub-transactions to recursively fail.
2383                  * Further reception of sub-transaction messages will be
2384                  * impossible because the circuit will no longer exist.
2385                  * (XXX need code to make sure that happens properly).
2386                  */
2387                 pthread_mutex_lock(&iocom->mtx);
2388                 assert((state->txcmd & DMSGF_DELETE) == 0);
2389                 state->txcmd |= DMSGF_DELETE;
2390                 if (state->rxcmd & DMSGF_DELETE) {
2391                         assert(state->flags & DMSG_STATE_INSERTED);
2392                         if (state->txcmd & DMSGF_REPLY) {
2393                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2394                                 RB_REMOVE(dmsg_state_tree,
2395                                           &iocom->staterd_tree, state);
2396                         } else {
2397                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2398                                 RB_REMOVE(dmsg_state_tree,
2399                                           &iocom->statewr_tree, state);
2400                         }
2401                         pstate = state->parent;
2402                         TAILQ_REMOVE(&pstate->subq, state, entry);
2403                         state->flags &= ~DMSG_STATE_INSERTED;
2404                         state->parent = NULL;
2405                         dmsg_state_drop(pstate);
2406
2407                         if (state->relay) {
2408                                 dmsg_state_drop(state->relay);
2409                                 state->relay = NULL;
2410                         }
2411                         dmsg_state_drop(state); /* usually the last drop */
2412                 }
2413                 pthread_mutex_unlock(&iocom->mtx);
2414         }
2415 }
2416
2417 /*
2418  * Called with or without locks
2419  */
2420 void
2421 dmsg_state_hold(dmsg_state_t *state)
2422 {
2423         atomic_add_int(&state->refs, 1);
2424 }
2425
2426 void
2427 dmsg_state_drop(dmsg_state_t *state)
2428 {
2429         if (atomic_fetchadd_int(&state->refs, -1) == 1)
2430                 dmsg_state_free(state);
2431 }
2432
2433 /*
2434  * Called with iocom locked
2435  */
2436 static void
2437 dmsg_state_free(dmsg_state_t *state)
2438 {
2439         atomic_add_int(&dmsg_state_count, -1);
2440         if (DMsgDebugOpt) {
2441                 fprintf(stderr, "terminate state %p id=%08x\n",
2442                         state, (uint32_t)state->msgid);
2443         }
2444         assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_INSERTED)) == 0);
2445         assert(TAILQ_EMPTY(&state->subq));
2446         assert(state->refs == 0);
2447         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2448                 closefrom(3);
2449         assert(state->any.any == NULL);
2450         free(state);
2451 }
2452
2453 /*
2454  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2455  * header is not adjusted, just the core header.
2456  */
2457 void
2458 dmsg_bswap_head(dmsg_hdr_t *head)
2459 {
2460         head->magic     = bswap16(head->magic);
2461         head->reserved02 = bswap16(head->reserved02);
2462         head->salt      = bswap32(head->salt);
2463
2464         head->msgid     = bswap64(head->msgid);
2465         head->circuit   = bswap64(head->circuit);
2466         head->reserved18= bswap64(head->reserved18);
2467
2468         head->cmd       = bswap32(head->cmd);
2469         head->aux_crc   = bswap32(head->aux_crc);
2470         head->aux_bytes = bswap32(head->aux_bytes);
2471         head->error     = bswap32(head->error);
2472         head->aux_descr = bswap64(head->aux_descr);
2473         head->reserved38= bswap32(head->reserved38);
2474         head->hdr_crc   = bswap32(head->hdr_crc);
2475 }