8e74c391c2cb9bbc5561efab1bf00c69a9786b1f
[dragonfly.git] / lib / libdmsg / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "dmsg_local.h"
37
38 int DMsgDebugOpt;
39 int dmsg_state_count;
40
41 static int dmsg_state_msgrx(dmsg_msg_t *msg);
42 static void dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg);
43 static void dmsg_msg_free_locked(dmsg_msg_t *msg);
44 static void dmsg_state_free(dmsg_state_t *state);
45 static void dmsg_msg_simulate_failure(dmsg_state_t *state, int error);
46
47 RB_GENERATE(dmsg_state_tree, dmsg_state, rbnode, dmsg_state_cmp);
48
49 /*
50  * STATE TREE - Represents open transactions which are indexed by their
51  *              { msgid } relative to the governing iocom.
52  */
53 int
54 dmsg_state_cmp(dmsg_state_t *state1, dmsg_state_t *state2)
55 {
56         if (state1->msgid < state2->msgid)
57                 return(-1);
58         if (state1->msgid > state2->msgid)
59                 return(1);
60         return(0);
61 }
62
63 /*
64  * Initialize a low-level ioq
65  */
66 void
67 dmsg_ioq_init(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
68 {
69         bzero(ioq, sizeof(*ioq));
70         ioq->state = DMSG_MSGQ_STATE_HEADER1;
71         TAILQ_INIT(&ioq->msgq);
72 }
73
74 /*
75  * Cleanup queue.
76  *
77  * caller holds iocom->mtx.
78  */
79 void
80 dmsg_ioq_done(dmsg_iocom_t *iocom __unused, dmsg_ioq_t *ioq)
81 {
82         dmsg_msg_t *msg;
83
84         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
85                 assert(0);      /* shouldn't happen */
86                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
87                 dmsg_msg_free(msg);
88         }
89         if ((msg = ioq->msg) != NULL) {
90                 ioq->msg = NULL;
91                 dmsg_msg_free(msg);
92         }
93 }
94
95 /*
96  * Initialize a low-level communications channel.
97  *
98  * NOTE: The signal_func() is called at least once from the loop and can be
99  *       re-armed via dmsg_iocom_restate().
100  */
101 void
102 dmsg_iocom_init(dmsg_iocom_t *iocom, int sock_fd, int alt_fd,
103                    void (*signal_func)(dmsg_iocom_t *iocom),
104                    void (*rcvmsg_func)(dmsg_msg_t *msg),
105                    void (*usrmsg_func)(dmsg_msg_t *msg, int unmanaged),
106                    void (*altmsg_func)(dmsg_iocom_t *iocom))
107 {
108         struct stat st;
109
110         bzero(iocom, sizeof(*iocom));
111
112         asprintf(&iocom->label, "iocom-%p", iocom);
113         iocom->signal_callback = signal_func;
114         iocom->rcvmsg_callback = rcvmsg_func;
115         iocom->altmsg_callback = altmsg_func;
116         iocom->usrmsg_callback = usrmsg_func;
117
118         pthread_mutex_init(&iocom->mtx, NULL);
119         RB_INIT(&iocom->staterd_tree);
120         RB_INIT(&iocom->statewr_tree);
121         TAILQ_INIT(&iocom->freeq);
122         TAILQ_INIT(&iocom->freeq_aux);
123         TAILQ_INIT(&iocom->txmsgq);
124         iocom->sock_fd = sock_fd;
125         iocom->alt_fd = alt_fd;
126         iocom->flags = DMSG_IOCOMF_RREQ | DMSG_IOCOMF_CLOSEALT;
127         if (signal_func)
128                 iocom->flags |= DMSG_IOCOMF_SWORK;
129         dmsg_ioq_init(iocom, &iocom->ioq_rx);
130         dmsg_ioq_init(iocom, &iocom->ioq_tx);
131         iocom->state0.refs = 1;         /* should never trigger a free */
132         iocom->state0.iocom = iocom;
133         iocom->state0.parent = &iocom->state0;
134         iocom->state0.flags = DMSG_STATE_ROOT;
135         TAILQ_INIT(&iocom->state0.subq);
136
137         if (pipe(iocom->wakeupfds) < 0)
138                 assert(0);
139         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
140         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
141
142         /*
143          * Negotiate session crypto synchronously.  This will mark the
144          * connection as error'd if it fails.  If this is a pipe it's
145          * a linkage that we set up ourselves to the filesystem and there
146          * is no crypto.
147          */
148         if (fstat(sock_fd, &st) < 0)
149                 assert(0);
150         if (S_ISSOCK(st.st_mode))
151                 dmsg_crypto_negotiate(iocom);
152
153         /*
154          * Make sure our fds are set to non-blocking for the iocom core.
155          */
156         if (sock_fd >= 0)
157                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
158 #if 0
159         /* if line buffered our single fgets() should be fine */
160         if (alt_fd >= 0)
161                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
162 #endif
163 }
164
165 void
166 dmsg_iocom_label(dmsg_iocom_t *iocom, const char *ctl, ...)
167 {
168         va_list va;
169         char *optr;
170
171         va_start(va, ctl);
172         optr = iocom->label;
173         vasprintf(&iocom->label, ctl, va);
174         va_end(va);
175         if (optr)
176                 free(optr);
177 }
178
179 /*
180  * May only be called from a callback from iocom_core.
181  *
182  * Adjust state machine functions, set flags to guarantee that both
183  * the recevmsg_func and the sendmsg_func is called at least once.
184  */
185 void
186 dmsg_iocom_restate(dmsg_iocom_t *iocom,
187                    void (*signal_func)(dmsg_iocom_t *),
188                    void (*rcvmsg_func)(dmsg_msg_t *msg))
189 {
190         pthread_mutex_lock(&iocom->mtx);
191         iocom->signal_callback = signal_func;
192         iocom->rcvmsg_callback = rcvmsg_func;
193         if (signal_func)
194                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
195         else
196                 atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
197         pthread_mutex_unlock(&iocom->mtx);
198 }
199
200 void
201 dmsg_iocom_signal(dmsg_iocom_t *iocom)
202 {
203         pthread_mutex_lock(&iocom->mtx);
204         if (iocom->signal_callback)
205                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_SWORK);
206         pthread_mutex_unlock(&iocom->mtx);
207 }
208
209 /*
210  * Cleanup a terminating iocom.
211  *
212  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
213  * from all possible references to it.
214  */
215 void
216 dmsg_iocom_done(dmsg_iocom_t *iocom)
217 {
218         dmsg_msg_t *msg;
219
220         if (iocom->sock_fd >= 0) {
221                 close(iocom->sock_fd);
222                 iocom->sock_fd = -1;
223         }
224         if (iocom->alt_fd >= 0 && (iocom->flags & DMSG_IOCOMF_CLOSEALT)) {
225                 close(iocom->alt_fd);
226                 iocom->alt_fd = -1;
227         }
228         dmsg_ioq_done(iocom, &iocom->ioq_rx);
229         dmsg_ioq_done(iocom, &iocom->ioq_tx);
230         while ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
231                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
232                 free(msg);
233         }
234         while ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
235                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
236                 free(msg->aux_data);
237                 msg->aux_data = NULL;
238                 free(msg);
239         }
240         if (iocom->wakeupfds[0] >= 0) {
241                 close(iocom->wakeupfds[0]);
242                 iocom->wakeupfds[0] = -1;
243         }
244         if (iocom->wakeupfds[1] >= 0) {
245                 close(iocom->wakeupfds[1]);
246                 iocom->wakeupfds[1] = -1;
247         }
248         pthread_mutex_destroy(&iocom->mtx);
249 }
250
251 /*
252  * Allocate a new message using the specified transaction state.
253  *
254  * If CREATE is set a new transaction is allocated relative to the passed-in
255  * transaction (the 'state' argument becomes pstate).
256  *
257  * If CREATE is not set the message is associated with the passed-in
258  * transaction.
259  */
260 dmsg_msg_t *
261 dmsg_msg_alloc(dmsg_state_t *state,
262                size_t aux_size, uint32_t cmd,
263                void (*func)(dmsg_msg_t *), void *data)
264 {
265         dmsg_iocom_t *iocom = state->iocom;
266         dmsg_msg_t *msg;
267
268         pthread_mutex_lock(&iocom->mtx);
269         msg = dmsg_msg_alloc_locked(state, aux_size, cmd, func, data);
270         pthread_mutex_unlock(&iocom->mtx);
271
272         return msg;
273 }
274
275 dmsg_msg_t *
276 dmsg_msg_alloc_locked(dmsg_state_t *state,
277                size_t aux_size, uint32_t cmd,
278                void (*func)(dmsg_msg_t *), void *data)
279 {
280         dmsg_iocom_t *iocom = state->iocom;
281         dmsg_state_t *pstate;
282         dmsg_msg_t *msg;
283         int hbytes;
284         size_t aligned_size;
285
286 #if 0
287         if (aux_size) {
288                 aligned_size = DMSG_DOALIGN(aux_size);
289                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
290                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
291         } else {
292                 aligned_size = 0;
293                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
294                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
295         }
296 #endif
297         aligned_size = DMSG_DOALIGN(aux_size);
298         msg = NULL;
299         if ((cmd & (DMSGF_CREATE | DMSGF_REPLY)) == DMSGF_CREATE) {
300                 /*
301                  * When CREATE is set without REPLY the caller is
302                  * initiating a new transaction stacked under the specified
303                  * circuit.
304                  *
305                  * NOTE: CREATE in txcmd handled by dmsg_msg_write()
306                  * NOTE: DELETE in txcmd handled by dmsg_state_cleanuptx()
307                  */
308                 pstate = state;
309                 state = malloc(sizeof(*state));
310                 atomic_add_int(&dmsg_state_count, 1);
311                 bzero(state, sizeof(*state));
312                 TAILQ_INIT(&state->subq);
313                 dmsg_state_hold(pstate);
314                 state->refs = 1;
315                 state->parent = pstate;
316                 state->iocom = iocom;
317                 state->flags = DMSG_STATE_DYNAMIC;
318                 state->msgid = (uint64_t)(uintptr_t)state;
319                 state->txcmd = cmd & ~(DMSGF_CREATE | DMSGF_DELETE);
320                 state->rxcmd = DMSGF_REPLY;
321                 state->icmd = state->txcmd & DMSGF_BASECMDMASK;
322                 state->func = func;
323                 state->any.any = data;
324
325                 RB_INSERT(dmsg_state_tree, &iocom->statewr_tree, state);
326                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
327                 state->flags |= DMSG_STATE_INSERTED;
328         } else {
329                 /*
330                  * Otherwise the message is transmitted over the existing
331                  * open transaction.
332                  */
333                 pstate = state->parent;
334         }
335
336         /* XXX SMP race for state */
337         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
338         if (msg == NULL) {
339                 msg = malloc(offsetof(struct dmsg_msg, any.head) + hbytes + 4);
340                 bzero(msg, offsetof(struct dmsg_msg, any.head));
341                 *(int *)((char *)msg +
342                          offsetof(struct dmsg_msg, any.head) + hbytes) =
343                                  0x71B2C3D4;
344 #if 0
345                 msg = malloc(sizeof(*msg));
346                 bzero(msg, sizeof(*msg));
347 #endif
348         }
349
350         /*
351          * [re]allocate the auxillary data buffer.  The caller knows that
352          * a size-aligned buffer will be allocated but we do not want to
353          * force the caller to zero any tail piece, so we do that ourself.
354          */
355         if (msg->aux_size != aux_size) {
356                 if (msg->aux_data) {
357                         free(msg->aux_data);
358                         msg->aux_data = NULL;
359                         msg->aux_size = 0;
360                 }
361                 if (aux_size) {
362                         msg->aux_data = malloc(aligned_size);
363                         msg->aux_size = aux_size;
364                         if (aux_size != aligned_size) {
365                                 bzero(msg->aux_data + aux_size,
366                                       aligned_size - aux_size);
367                         }
368                 }
369         }
370
371         /*
372          * Set REVTRANS if the transaction was remotely initiated
373          * Set REVCIRC if the circuit was remotely initiated
374          */
375         if (state->flags & DMSG_STATE_OPPOSITE)
376                 cmd |= DMSGF_REVTRANS;
377         if (pstate->flags & DMSG_STATE_OPPOSITE)
378                 cmd |= DMSGF_REVCIRC;
379
380         /*
381          * Finish filling out the header.
382          */
383         if (hbytes)
384                 bzero(&msg->any.head, hbytes);
385         msg->hdr_size = hbytes;
386         msg->any.head.magic = DMSG_HDR_MAGIC;
387         msg->any.head.cmd = cmd;
388         msg->any.head.aux_descr = 0;
389         msg->any.head.aux_crc = 0;
390         msg->any.head.msgid = state->msgid;
391         msg->any.head.circuit = pstate->msgid;
392         msg->state = state;
393
394         return (msg);
395 }
396
397 /*
398  * Free a message so it can be reused afresh.
399  *
400  * NOTE: aux_size can be 0 with a non-NULL aux_data.
401  */
402 static
403 void
404 dmsg_msg_free_locked(dmsg_msg_t *msg)
405 {
406         /*dmsg_iocom_t *iocom = msg->iocom;*/
407 #if 1
408         int hbytes = (msg->any.head.cmd & DMSGF_SIZE) * DMSG_ALIGN;
409         if (*(int *)((char *)msg +
410                      offsetof(struct  dmsg_msg, any.head) + hbytes) !=
411              0x71B2C3D4) {
412                 fprintf(stderr, "MSGFREE FAILED CMD %08x\n", msg->any.head.cmd);
413                 assert(0);
414         }
415 #endif
416         msg->state = NULL;      /* safety */
417         if (msg->aux_data) {
418                 free(msg->aux_data);
419                 msg->aux_data = NULL;
420         }
421         msg->aux_size = 0;
422         free (msg);
423 #if 0
424         if (msg->aux_data)
425                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
426         else
427                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
428 #endif
429 }
430
431 void
432 dmsg_msg_free(dmsg_msg_t *msg)
433 {
434         dmsg_iocom_t *iocom = msg->state->iocom;
435
436         pthread_mutex_lock(&iocom->mtx);
437         dmsg_msg_free_locked(msg);
438         pthread_mutex_unlock(&iocom->mtx);
439 }
440
441 /*
442  * I/O core loop for an iocom.
443  *
444  * Thread localized, iocom->mtx not held.
445  */
446 void
447 dmsg_iocom_core(dmsg_iocom_t *iocom)
448 {
449         struct pollfd fds[3];
450         char dummybuf[256];
451         dmsg_msg_t *msg;
452         int timeout;
453         int count;
454         int wi; /* wakeup pipe */
455         int si; /* socket */
456         int ai; /* alt bulk path socket */
457
458         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0) {
459                 /*
460                  * These iocom->flags are only manipulated within the
461                  * context of the current thread.  However, modifications
462                  * still require atomic ops.
463                  */
464                 if ((iocom->flags & (DMSG_IOCOMF_RWORK |
465                                      DMSG_IOCOMF_WWORK |
466                                      DMSG_IOCOMF_PWORK |
467                                      DMSG_IOCOMF_SWORK |
468                                      DMSG_IOCOMF_ARWORK |
469                                      DMSG_IOCOMF_AWWORK)) == 0) {
470                         /*
471                          * Only poll if no immediate work is pending.
472                          * Otherwise we are just wasting our time calling
473                          * poll.
474                          */
475                         timeout = 5000;
476
477                         count = 0;
478                         wi = -1;
479                         si = -1;
480                         ai = -1;
481
482                         /*
483                          * Always check the inter-thread pipe, e.g.
484                          * for iocom->txmsgq work.
485                          */
486                         wi = count++;
487                         fds[wi].fd = iocom->wakeupfds[0];
488                         fds[wi].events = POLLIN;
489                         fds[wi].revents = 0;
490
491                         /*
492                          * Check the socket input/output direction as
493                          * requested
494                          */
495                         if (iocom->flags & (DMSG_IOCOMF_RREQ |
496                                             DMSG_IOCOMF_WREQ)) {
497                                 si = count++;
498                                 fds[si].fd = iocom->sock_fd;
499                                 fds[si].events = 0;
500                                 fds[si].revents = 0;
501
502                                 if (iocom->flags & DMSG_IOCOMF_RREQ)
503                                         fds[si].events |= POLLIN;
504                                 if (iocom->flags & DMSG_IOCOMF_WREQ)
505                                         fds[si].events |= POLLOUT;
506                         }
507
508                         /*
509                          * Check the alternative fd for work.
510                          */
511                         if (iocom->alt_fd >= 0) {
512                                 ai = count++;
513                                 fds[ai].fd = iocom->alt_fd;
514                                 fds[ai].events = POLLIN;
515                                 fds[ai].revents = 0;
516                         }
517                         poll(fds, count, timeout);
518
519                         if (wi >= 0 && (fds[wi].revents & POLLIN))
520                                 atomic_set_int(&iocom->flags,
521                                                DMSG_IOCOMF_PWORK);
522                         if (si >= 0 && (fds[si].revents & POLLIN))
523                                 atomic_set_int(&iocom->flags,
524                                                DMSG_IOCOMF_RWORK);
525                         if (si >= 0 && (fds[si].revents & POLLOUT))
526                                 atomic_set_int(&iocom->flags,
527                                                DMSG_IOCOMF_WWORK);
528                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
529                                 atomic_set_int(&iocom->flags,
530                                                DMSG_IOCOMF_WWORK);
531                         if (ai >= 0 && (fds[ai].revents & POLLIN))
532                                 atomic_set_int(&iocom->flags,
533                                                DMSG_IOCOMF_ARWORK);
534                 } else {
535                         /*
536                          * Always check the pipe
537                          */
538                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_PWORK);
539                 }
540
541                 if (iocom->flags & DMSG_IOCOMF_SWORK) {
542                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_SWORK);
543                         iocom->signal_callback(iocom);
544                 }
545
546                 /*
547                  * Pending message queues from other threads wake us up
548                  * with a write to the wakeupfds[] pipe.  We have to clear
549                  * the pipe with a dummy read.
550                  */
551                 if (iocom->flags & DMSG_IOCOMF_PWORK) {
552                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_PWORK);
553                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
554                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
555                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WWORK);
556                         if (TAILQ_FIRST(&iocom->txmsgq))
557                                 dmsg_iocom_flush1(iocom);
558                 }
559
560                 /*
561                  * Message write sequencing
562                  */
563                 if (iocom->flags & DMSG_IOCOMF_WWORK)
564                         dmsg_iocom_flush1(iocom);
565
566                 /*
567                  * Message read sequencing.  Run this after the write
568                  * sequencing in case the write sequencing allowed another
569                  * auto-DELETE to occur on the read side.
570                  */
571                 if (iocom->flags & DMSG_IOCOMF_RWORK) {
572                         while ((iocom->flags & DMSG_IOCOMF_EOF) == 0 &&
573                                (msg = dmsg_ioq_read(iocom)) != NULL) {
574                                 if (DMsgDebugOpt) {
575                                         fprintf(stderr, "receive %s\n",
576                                                 dmsg_msg_str(msg));
577                                 }
578                                 iocom->rcvmsg_callback(msg);
579                                 dmsg_state_cleanuprx(iocom, msg);
580                         }
581                 }
582
583                 if (iocom->flags & DMSG_IOCOMF_ARWORK) {
584                         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_ARWORK);
585                         iocom->altmsg_callback(iocom);
586                 }
587         }
588 }
589
590 /*
591  * Make sure there's enough room in the FIFO to hold the
592  * needed data.
593  *
594  * Assume worst case encrypted form is 2x the size of the
595  * plaintext equivalent.
596  */
597 static
598 size_t
599 dmsg_ioq_makeroom(dmsg_ioq_t *ioq, size_t needed)
600 {
601         size_t bytes;
602         size_t nmax;
603
604         bytes = ioq->fifo_cdx - ioq->fifo_beg;
605         nmax = sizeof(ioq->buf) - ioq->fifo_end;
606         if (bytes + nmax / 2 < needed) {
607                 if (bytes) {
608                         bcopy(ioq->buf + ioq->fifo_beg,
609                               ioq->buf,
610                               bytes);
611                 }
612                 ioq->fifo_cdx -= ioq->fifo_beg;
613                 ioq->fifo_beg = 0;
614                 if (ioq->fifo_cdn < ioq->fifo_end) {
615                         bcopy(ioq->buf + ioq->fifo_cdn,
616                               ioq->buf + ioq->fifo_cdx,
617                               ioq->fifo_end - ioq->fifo_cdn);
618                 }
619                 ioq->fifo_end -= ioq->fifo_cdn - ioq->fifo_cdx;
620                 ioq->fifo_cdn = ioq->fifo_cdx;
621                 nmax = sizeof(ioq->buf) - ioq->fifo_end;
622         }
623         return(nmax);
624 }
625
626 /*
627  * Read the next ready message from the ioq, issuing I/O if needed.
628  * Caller should retry on a read-event when NULL is returned.
629  *
630  * If an error occurs during reception a DMSG_LNK_ERROR msg will
631  * be returned for each open transaction, then the ioq and iocom
632  * will be errored out and a non-transactional DMSG_LNK_ERROR
633  * msg will be returned as the final message.  The caller should not call
634  * us again after the final message is returned.
635  *
636  * Thread localized, iocom->mtx not held.
637  */
638 dmsg_msg_t *
639 dmsg_ioq_read(dmsg_iocom_t *iocom)
640 {
641         dmsg_ioq_t *ioq = &iocom->ioq_rx;
642         dmsg_msg_t *msg;
643         dmsg_state_t *state;
644         dmsg_hdr_t *head;
645         ssize_t n;
646         size_t bytes;
647         size_t nmax;
648         uint32_t aux_size;
649         uint32_t xcrc32;
650         int error;
651
652 again:
653         /*
654          * If a message is already pending we can just remove and
655          * return it.  Message state has already been processed.
656          * (currently not implemented)
657          */
658         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
659                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
660                 return (msg);
661         }
662         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_RREQ | DMSG_IOCOMF_RWORK);
663
664         /*
665          * If the stream is errored out we stop processing it.
666          */
667         if (ioq->error)
668                 goto skip;
669
670         /*
671          * Message read in-progress (msg is NULL at the moment).  We don't
672          * allocate a msg until we have its core header.
673          */
674         nmax = sizeof(ioq->buf) - ioq->fifo_end;
675         bytes = ioq->fifo_cdx - ioq->fifo_beg;          /* already decrypted */
676         msg = ioq->msg;
677
678         switch(ioq->state) {
679         case DMSG_MSGQ_STATE_HEADER1:
680                 /*
681                  * Load the primary header, fail on any non-trivial read
682                  * error or on EOF.  Since the primary header is the same
683                  * size is the message alignment it will never straddle
684                  * the end of the buffer.
685                  */
686                 nmax = dmsg_ioq_makeroom(ioq, sizeof(msg->any.head));
687                 if (bytes < sizeof(msg->any.head)) {
688                         n = read(iocom->sock_fd,
689                                  ioq->buf + ioq->fifo_end,
690                                  nmax);
691                         if (n <= 0) {
692                                 if (n == 0) {
693                                         ioq->error = DMSG_IOQ_ERROR_EOF;
694                                         break;
695                                 }
696                                 if (errno != EINTR &&
697                                     errno != EINPROGRESS &&
698                                     errno != EAGAIN) {
699                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
700                                         break;
701                                 }
702                                 n = 0;
703                                 /* fall through */
704                         }
705                         ioq->fifo_end += (size_t)n;
706                         nmax -= (size_t)n;
707                 }
708
709                 /*
710                  * Decrypt data received so far.  Data will be decrypted
711                  * in-place but might create gaps in the FIFO.  Partial
712                  * blocks are not immediately decrypted.
713                  *
714                  * WARNING!  The header might be in the wrong endian, we
715                  *           do not fix it up until we get the entire
716                  *           extended header.
717                  */
718                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
719                         dmsg_crypto_decrypt(iocom, ioq);
720                 } else {
721                         ioq->fifo_cdx = ioq->fifo_end;
722                         ioq->fifo_cdn = ioq->fifo_end;
723                 }
724                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
725
726                 /*
727                  * Insufficient data accumulated (msg is NULL, caller will
728                  * retry on event).
729                  */
730                 assert(msg == NULL);
731                 if (bytes < sizeof(msg->any.head))
732                         break;
733
734                 /*
735                  * Check and fixup the core header.  Note that the icrc
736                  * has to be calculated before any fixups, but the crc
737                  * fields in the msg may have to be swapped like everything
738                  * else.
739                  */
740                 head = (void *)(ioq->buf + ioq->fifo_beg);
741                 if (head->magic != DMSG_HDR_MAGIC &&
742                     head->magic != DMSG_HDR_MAGIC_REV) {
743                         fprintf(stderr, "%s: head->magic is bad %02x\n",
744                                 iocom->label, head->magic);
745                         if (iocom->flags & DMSG_IOCOMF_CRYPTED)
746                                 fprintf(stderr, "(on encrypted link)\n");
747                         ioq->error = DMSG_IOQ_ERROR_SYNC;
748                         break;
749                 }
750
751                 /*
752                  * Calculate the full header size and aux data size
753                  */
754                 if (head->magic == DMSG_HDR_MAGIC_REV) {
755                         ioq->hbytes = (bswap32(head->cmd) & DMSGF_SIZE) *
756                                       DMSG_ALIGN;
757                         aux_size = bswap32(head->aux_bytes);
758                 } else {
759                         ioq->hbytes = (head->cmd & DMSGF_SIZE) *
760                                       DMSG_ALIGN;
761                         aux_size = head->aux_bytes;
762                 }
763                 ioq->abytes = DMSG_DOALIGN(aux_size);
764                 ioq->unaligned_aux_size = aux_size;
765                 if (ioq->hbytes < sizeof(msg->any.head) ||
766                     ioq->hbytes > sizeof(msg->any) ||
767                     ioq->abytes > DMSG_AUX_MAX) {
768                         ioq->error = DMSG_IOQ_ERROR_FIELD;
769                         break;
770                 }
771
772                 /*
773                  * Allocate the message, the next state will fill it in.
774                  *
775                  * NOTE: The aux_data buffer will be sized to an aligned
776                  *       value and the aligned remainder zero'd for
777                  *       convenience.
778                  *
779                  * NOTE: Supply dummy state and a degenerate cmd without
780                  *       CREATE set.  The message will temporarily be
781                  *       associated with state0 until later post-processing.
782                  */
783                 msg = dmsg_msg_alloc(&iocom->state0, aux_size,
784                                      ioq->hbytes / DMSG_ALIGN,
785                                      NULL, NULL);
786                 ioq->msg = msg;
787
788                 /*
789                  * Fall through to the next state.  Make sure that the
790                  * extended header does not straddle the end of the buffer.
791                  * We still want to issue larger reads into our buffer,
792                  * book-keeping is easier if we don't bcopy() yet.
793                  *
794                  * Make sure there is enough room for bloated encrypt data.
795                  */
796                 nmax = dmsg_ioq_makeroom(ioq, ioq->hbytes);
797                 ioq->state = DMSG_MSGQ_STATE_HEADER2;
798                 /* fall through */
799         case DMSG_MSGQ_STATE_HEADER2:
800                 /*
801                  * Fill out the extended header.
802                  */
803                 assert(msg != NULL);
804                 if (bytes < ioq->hbytes) {
805                         n = read(iocom->sock_fd,
806                                  ioq->buf + ioq->fifo_end,
807                                  nmax);
808                         if (n <= 0) {
809                                 if (n == 0) {
810                                         ioq->error = DMSG_IOQ_ERROR_EOF;
811                                         break;
812                                 }
813                                 if (errno != EINTR &&
814                                     errno != EINPROGRESS &&
815                                     errno != EAGAIN) {
816                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
817                                         break;
818                                 }
819                                 n = 0;
820                                 /* fall through */
821                         }
822                         ioq->fifo_end += (size_t)n;
823                         nmax -= (size_t)n;
824                 }
825
826                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
827                         dmsg_crypto_decrypt(iocom, ioq);
828                 } else {
829                         ioq->fifo_cdx = ioq->fifo_end;
830                         ioq->fifo_cdn = ioq->fifo_end;
831                 }
832                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
833
834                 /*
835                  * Insufficient data accumulated (set msg NULL so caller will
836                  * retry on event).
837                  */
838                 if (bytes < ioq->hbytes) {
839                         msg = NULL;
840                         break;
841                 }
842
843                 /*
844                  * Calculate the extended header, decrypt data received
845                  * so far.  Handle endian-conversion for the entire extended
846                  * header.
847                  */
848                 head = (void *)(ioq->buf + ioq->fifo_beg);
849
850                 /*
851                  * Check the CRC.
852                  */
853                 if (head->magic == DMSG_HDR_MAGIC_REV)
854                         xcrc32 = bswap32(head->hdr_crc);
855                 else
856                         xcrc32 = head->hdr_crc;
857                 head->hdr_crc = 0;
858                 if (dmsg_icrc32(head, ioq->hbytes) != xcrc32) {
859                         ioq->error = DMSG_IOQ_ERROR_XCRC;
860                         fprintf(stderr, "BAD-XCRC(%08x,%08x) %s\n",
861                                 xcrc32, dmsg_icrc32(head, ioq->hbytes),
862                                 dmsg_msg_str(msg));
863                         assert(0);
864                         break;
865                 }
866                 head->hdr_crc = xcrc32;
867
868                 if (head->magic == DMSG_HDR_MAGIC_REV) {
869                         dmsg_bswap_head(head);
870                 }
871
872                 /*
873                  * Copy the extended header into the msg and adjust the
874                  * FIFO.
875                  */
876                 bcopy(head, &msg->any, ioq->hbytes);
877
878                 /*
879                  * We are either done or we fall-through.
880                  */
881                 if (ioq->abytes == 0) {
882                         ioq->fifo_beg += ioq->hbytes;
883                         break;
884                 }
885
886                 /*
887                  * Must adjust bytes (and the state) when falling through.
888                  * nmax doesn't change.
889                  */
890                 ioq->fifo_beg += ioq->hbytes;
891                 bytes -= ioq->hbytes;
892                 ioq->state = DMSG_MSGQ_STATE_AUXDATA1;
893                 /* fall through */
894         case DMSG_MSGQ_STATE_AUXDATA1:
895                 /*
896                  * Copy the partial or complete [decrypted] payload from
897                  * remaining bytes in the FIFO in order to optimize the
898                  * makeroom call in the AUXDATA2 state.  We have to
899                  * fall-through either way so we can check the crc.
900                  *
901                  * msg->aux_size tracks our aux data.
902                  *
903                  * (Lets not complicate matters if the data is encrypted,
904                  *  since the data in-stream is not the same size as the
905                  *  data decrypted).
906                  */
907                 if (bytes >= ioq->abytes) {
908                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
909                               ioq->abytes);
910                         msg->aux_size = ioq->abytes;
911                         ioq->fifo_beg += ioq->abytes;
912                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
913                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
914                         bytes -= ioq->abytes;
915                 } else if (bytes) {
916                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
917                               bytes);
918                         msg->aux_size = bytes;
919                         ioq->fifo_beg += bytes;
920                         if (ioq->fifo_cdx < ioq->fifo_beg)
921                                 ioq->fifo_cdx = ioq->fifo_beg;
922                         assert(ioq->fifo_beg <= ioq->fifo_cdx);
923                         assert(ioq->fifo_cdx <= ioq->fifo_cdn);
924                         bytes = 0;
925                 } else {
926                         msg->aux_size = 0;
927                 }
928                 ioq->state = DMSG_MSGQ_STATE_AUXDATA2;
929                 /* fall through */
930         case DMSG_MSGQ_STATE_AUXDATA2:
931                 /*
932                  * Make sure there is enough room for more data.
933                  */
934                 assert(msg);
935                 nmax = dmsg_ioq_makeroom(ioq, ioq->abytes - msg->aux_size);
936
937                 /*
938                  * Read and decrypt more of the payload.
939                  */
940                 if (msg->aux_size < ioq->abytes) {
941                         assert(bytes == 0);
942                         n = read(iocom->sock_fd,
943                                  ioq->buf + ioq->fifo_end,
944                                  nmax);
945                         if (n <= 0) {
946                                 if (n == 0) {
947                                         ioq->error = DMSG_IOQ_ERROR_EOF;
948                                         break;
949                                 }
950                                 if (errno != EINTR &&
951                                     errno != EINPROGRESS &&
952                                     errno != EAGAIN) {
953                                         ioq->error = DMSG_IOQ_ERROR_SOCK;
954                                         break;
955                                 }
956                                 n = 0;
957                                 /* fall through */
958                         }
959                         ioq->fifo_end += (size_t)n;
960                         nmax -= (size_t)n;
961                 }
962
963                 if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
964                         dmsg_crypto_decrypt(iocom, ioq);
965                 } else {
966                         ioq->fifo_cdx = ioq->fifo_end;
967                         ioq->fifo_cdn = ioq->fifo_end;
968                 }
969                 bytes = ioq->fifo_cdx - ioq->fifo_beg;
970
971                 if (bytes > ioq->abytes - msg->aux_size)
972                         bytes = ioq->abytes - msg->aux_size;
973
974                 if (bytes) {
975                         bcopy(ioq->buf + ioq->fifo_beg,
976                               msg->aux_data + msg->aux_size,
977                               bytes);
978                         msg->aux_size += bytes;
979                         ioq->fifo_beg += bytes;
980                 }
981
982                 /*
983                  * Insufficient data accumulated (set msg NULL so caller will
984                  * retry on event).
985                  *
986                  * Assert the auxillary data size is correct, then record the
987                  * original unaligned size from the message header.
988                  */
989                 if (msg->aux_size < ioq->abytes) {
990                         msg = NULL;
991                         break;
992                 }
993                 assert(msg->aux_size == ioq->abytes);
994                 msg->aux_size = ioq->unaligned_aux_size;
995
996                 /*
997                  * Check aux_crc, then we are done.  Note that the crc
998                  * is calculated over the aligned size, not the actual
999                  * size.
1000                  */
1001                 xcrc32 = dmsg_icrc32(msg->aux_data, ioq->abytes);
1002                 if (xcrc32 != msg->any.head.aux_crc) {
1003                         ioq->error = DMSG_IOQ_ERROR_ACRC;
1004                         fprintf(stderr,
1005                                 "iocom: ACRC error %08x vs %08x "
1006                                 "msgid %016jx msgcmd %08x auxsize %d\n",
1007                                 xcrc32,
1008                                 msg->any.head.aux_crc,
1009                                 (intmax_t)msg->any.head.msgid,
1010                                 msg->any.head.cmd,
1011                                 msg->any.head.aux_bytes);
1012                         break;
1013                 }
1014                 break;
1015         case DMSG_MSGQ_STATE_ERROR:
1016                 /*
1017                  * Continued calls to drain recorded transactions (returning
1018                  * a LNK_ERROR for each one), before we return the final
1019                  * LNK_ERROR.
1020                  */
1021                 assert(msg == NULL);
1022                 break;
1023         default:
1024                 /*
1025                  * We don't double-return errors, the caller should not
1026                  * have called us again after getting an error msg.
1027                  */
1028                 assert(0);
1029                 break;
1030         }
1031
1032         /*
1033          * Check the message sequence.  The iv[] should prevent any
1034          * possibility of a replay but we add this check anyway.
1035          */
1036         if (msg && ioq->error == 0) {
1037                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
1038                         ioq->error = DMSG_IOQ_ERROR_MSGSEQ;
1039                 } else {
1040                         ++ioq->seq;
1041                 }
1042         }
1043
1044         /*
1045          * Handle error, RREQ, or completion
1046          *
1047          * NOTE: nmax and bytes are invalid at this point, we don't bother
1048          *       to update them when breaking out.
1049          */
1050         if (ioq->error) {
1051                 dmsg_state_t *tmp_state;
1052 skip:
1053                 fprintf(stderr, "IOQ ERROR %d\n", ioq->error);
1054                 /*
1055                  * An unrecoverable error causes all active receive
1056                  * transactions to be terminated with a LNK_ERROR message.
1057                  *
1058                  * Once all active transactions are exhausted we set the
1059                  * iocom ERROR flag and return a non-transactional LNK_ERROR
1060                  * message, which should cause master processing loops to
1061                  * terminate.
1062                  */
1063                 assert(ioq->msg == msg);
1064                 if (msg) {
1065                         dmsg_msg_free(msg);
1066                         ioq->msg = NULL;
1067                         msg = NULL;
1068                 }
1069
1070                 /*
1071                  * No more I/O read processing
1072                  */
1073                 ioq->state = DMSG_MSGQ_STATE_ERROR;
1074
1075                 /*
1076                  * Simulate a remote LNK_ERROR DELETE msg for any open
1077                  * transactions, ending with a final non-transactional
1078                  * LNK_ERROR (that the session can detect) when no
1079                  * transactions remain.
1080                  *
1081                  * NOTE: Temporarily supply state0 and a degenerate cmd
1082                  *       without CREATE set.  The real state will be
1083                  *       assigned in the loop.
1084                  *
1085                  * NOTE: We are simulating a received message using our
1086                  *       side of the state, so the DMSGF_REV* bits have
1087                  *       to be reversed.
1088                  */
1089                 pthread_mutex_lock(&iocom->mtx);
1090                 dmsg_iocom_drain(iocom);
1091
1092                 tmp_state = NULL;
1093                 RB_FOREACH(state, dmsg_state_tree, &iocom->staterd_tree) {
1094                         atomic_set_int(&state->flags, DMSG_STATE_DYING);
1095                         if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1096                                 tmp_state = state;
1097                 }
1098                 RB_FOREACH(state, dmsg_state_tree, &iocom->statewr_tree) {
1099                         atomic_set_int(&state->flags, DMSG_STATE_DYING);
1100                         if (tmp_state == NULL && TAILQ_EMPTY(&state->subq))
1101                                 tmp_state = state;
1102                 }
1103
1104                 if (tmp_state) {
1105                         dmsg_msg_simulate_failure(tmp_state, ioq->error);
1106                 } else {
1107                         dmsg_msg_simulate_failure(&iocom->state0, ioq->error);
1108                 }
1109                 pthread_mutex_unlock(&iocom->mtx);
1110                 if (TAILQ_FIRST(&ioq->msgq))
1111                         goto again;
1112
1113 #if 0
1114                 /*
1115                  * For the iocom error case we want to set RWORK to indicate
1116                  * that more messages might be pending.
1117                  *
1118                  * It is possible to return NULL when there is more work to
1119                  * do because each message has to be DELETEd in both
1120                  * directions before we continue on with the next (though
1121                  * this could be optimized).  The transmit direction will
1122                  * re-set RWORK.
1123                  */
1124                 if (msg)
1125                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1126 #endif
1127         } else if (msg == NULL) {
1128                 /*
1129                  * Insufficient data received to finish building the message,
1130                  * set RREQ and return NULL.
1131                  *
1132                  * Leave ioq->msg intact.
1133                  * Leave the FIFO intact.
1134                  */
1135                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1136         } else {
1137                 /*
1138                  * Continue processing msg.
1139                  *
1140                  * The fifo has already been advanced past the message.
1141                  * Trivially reset the FIFO indices if possible.
1142                  *
1143                  * clear the FIFO if it is now empty and set RREQ to wait
1144                  * for more from the socket.  If the FIFO is not empty set
1145                  * TWORK to bypass the poll so we loop immediately.
1146                  */
1147                 if (ioq->fifo_beg == ioq->fifo_cdx &&
1148                     ioq->fifo_cdn == ioq->fifo_end) {
1149                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RREQ);
1150                         ioq->fifo_cdx = 0;
1151                         ioq->fifo_cdn = 0;
1152                         ioq->fifo_beg = 0;
1153                         ioq->fifo_end = 0;
1154                 } else {
1155                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1156                 }
1157                 ioq->state = DMSG_MSGQ_STATE_HEADER1;
1158                 ioq->msg = NULL;
1159
1160                 /*
1161                  * Handle message routing.  Validates non-zero sources
1162                  * and routes message.  Error will be 0 if the message is
1163                  * destined for us.
1164                  *
1165                  * State processing only occurs for messages destined for us.
1166                  */
1167                 if (DMsgDebugOpt >= 5) {
1168                         fprintf(stderr,
1169                                 "rxmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1170                                 msg->any.head.cmd,
1171                                 (intmax_t)msg->any.head.msgid,
1172                                 (intmax_t)msg->any.head.circuit);
1173                 }
1174                 error = dmsg_state_msgrx(msg);
1175
1176                 if (error) {
1177                         /*
1178                          * Abort-after-closure, throw message away and
1179                          * start reading another.
1180                          */
1181                         if (error == DMSG_IOQ_ERROR_EALREADY) {
1182                                 dmsg_msg_free(msg);
1183                                 goto again;
1184                         }
1185
1186                         /*
1187                          * Process real error and throw away message.
1188                          */
1189                         ioq->error = error;
1190                         goto skip;
1191                 }
1192                 /* no error, not routed.  Fall through and return msg */
1193         }
1194         return (msg);
1195 }
1196
1197 /*
1198  * Calculate the header and data crc's and write a low-level message to
1199  * the connection.  If aux_crc is non-zero the aux_data crc is already
1200  * assumed to have been set.
1201  *
1202  * A non-NULL msg is added to the queue but not necessarily flushed.
1203  * Calling this function with msg == NULL will get a flush going.
1204  *
1205  * (called from iocom_core only)
1206  */
1207 void
1208 dmsg_iocom_flush1(dmsg_iocom_t *iocom)
1209 {
1210         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1211         dmsg_msg_t *msg;
1212         uint32_t xcrc32;
1213         size_t hbytes;
1214         size_t abytes;
1215         dmsg_msg_queue_t tmpq;
1216
1217         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1218         TAILQ_INIT(&tmpq);
1219         pthread_mutex_lock(&iocom->mtx);
1220         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
1221                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
1222                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
1223         }
1224         pthread_mutex_unlock(&iocom->mtx);
1225
1226         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
1227                 /*
1228                  * Process terminal connection errors.
1229                  */
1230                 TAILQ_REMOVE(&tmpq, msg, qentry);
1231                 if (ioq->error) {
1232                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1233                         ++ioq->msgcount;
1234                         continue;
1235                 }
1236
1237                 /*
1238                  * Finish populating the msg fields.  The salt ensures that
1239                  * the iv[] array is ridiculously randomized and we also
1240                  * re-seed our PRNG every 32768 messages just to be sure.
1241                  */
1242                 msg->any.head.magic = DMSG_HDR_MAGIC;
1243                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
1244                 ++ioq->seq;
1245                 if ((ioq->seq & 32767) == 0)
1246                         srandomdev();
1247
1248                 /*
1249                  * Calculate aux_crc if 0, then calculate hdr_crc.
1250                  */
1251                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
1252                         abytes = DMSG_DOALIGN(msg->aux_size);
1253                         xcrc32 = dmsg_icrc32(msg->aux_data, abytes);
1254                         msg->any.head.aux_crc = xcrc32;
1255                 }
1256                 msg->any.head.aux_bytes = msg->aux_size;
1257
1258                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1259                          DMSG_ALIGN;
1260                 msg->any.head.hdr_crc = 0;
1261                 msg->any.head.hdr_crc = dmsg_icrc32(&msg->any.head, hbytes);
1262
1263                 /*
1264                  * Enqueue the message (the flush codes handles stream
1265                  * encryption).
1266                  */
1267                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
1268                 ++ioq->msgcount;
1269         }
1270         dmsg_iocom_flush2(iocom);
1271 }
1272
1273 /*
1274  * Thread localized, iocom->mtx not held by caller.
1275  *
1276  * (called from iocom_core via iocom_flush1 only)
1277  */
1278 void
1279 dmsg_iocom_flush2(dmsg_iocom_t *iocom)
1280 {
1281         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1282         dmsg_msg_t *msg;
1283         ssize_t n;
1284         struct iovec iov[DMSG_IOQ_MAXIOVEC];
1285         size_t nact;
1286         size_t hbytes;
1287         size_t abytes;
1288         size_t hoff;
1289         size_t aoff;
1290         int iovcnt;
1291
1292         if (ioq->error) {
1293                 dmsg_iocom_drain(iocom);
1294                 return;
1295         }
1296
1297         /*
1298          * Pump messages out the connection by building an iovec.
1299          *
1300          * ioq->hbytes/ioq->abytes tracks how much of the first message
1301          * in the queue has been successfully written out, so we can
1302          * resume writing.
1303          */
1304         iovcnt = 0;
1305         nact = 0;
1306         hoff = ioq->hbytes;
1307         aoff = ioq->abytes;
1308
1309         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1310                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1311                          DMSG_ALIGN;
1312                 abytes = DMSG_DOALIGN(msg->aux_size);
1313                 assert(hoff <= hbytes && aoff <= abytes);
1314
1315                 if (hoff < hbytes) {
1316                         size_t maxlen = hbytes - hoff;
1317                         if (maxlen > sizeof(ioq->buf) / 2)
1318                                 maxlen = sizeof(ioq->buf) / 2;
1319                         iov[iovcnt].iov_base = (char *)&msg->any.head + hoff;
1320                         iov[iovcnt].iov_len = maxlen;
1321                         nact += maxlen;
1322                         ++iovcnt;
1323                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1324                             maxlen != hbytes - hoff) {
1325                                 break;
1326                         }
1327                 }
1328                 if (aoff < abytes) {
1329                         size_t maxlen = abytes - aoff;
1330                         if (maxlen > sizeof(ioq->buf) / 2)
1331                                 maxlen = sizeof(ioq->buf) / 2;
1332
1333                         assert(msg->aux_data != NULL);
1334                         iov[iovcnt].iov_base = (char *)msg->aux_data + aoff;
1335                         iov[iovcnt].iov_len = maxlen;
1336                         nact += maxlen;
1337                         ++iovcnt;
1338                         if (iovcnt == DMSG_IOQ_MAXIOVEC ||
1339                             maxlen != abytes - aoff) {
1340                                 break;
1341                         }
1342                 }
1343                 hoff = 0;
1344                 aoff = 0;
1345         }
1346         if (iovcnt == 0)
1347                 return;
1348
1349         /*
1350          * Encrypt and write the data.  The crypto code will move the
1351          * data into the fifo and adjust the iov as necessary.  If
1352          * encryption is disabled the iov is left alone.
1353          *
1354          * May return a smaller iov (thus a smaller n), with aggregated
1355          * chunks.  May reduce nmax to what fits in the FIFO.
1356          *
1357          * This function sets nact to the number of original bytes now
1358          * encrypted, adding to the FIFO some number of bytes that might
1359          * be greater depending on the crypto mechanic.  iov[] is adjusted
1360          * to point at the FIFO if necessary.
1361          *
1362          * NOTE: nact is the number of bytes eaten from the message.  For
1363          *       encrypted data this is the number of bytes processed for
1364          *       encryption and not necessarily the number of bytes writable.
1365          *       The return value from the writev() is the post-encrypted
1366          *       byte count which might be larger.
1367          *
1368          * NOTE: For direct writes, nact is the return value from the writev().
1369          */
1370         if (iocom->flags & DMSG_IOCOMF_CRYPTED) {
1371                 /*
1372                  * Make sure the FIFO has a reasonable amount of space
1373                  * left (if not completely full).
1374                  *
1375                  * In this situation we are staging the encrypted message
1376                  * data in the FIFO.  (nact) represents how much plaintext
1377                  * has been staged, (n) represents how much encrypted data
1378                  * has been flushed.  The two are independent of each other.
1379                  */
1380                 if (ioq->fifo_beg > sizeof(ioq->buf) / 2 &&
1381                     sizeof(ioq->buf) - ioq->fifo_end < DMSG_ALIGN * 2) {
1382                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf,
1383                               ioq->fifo_end - ioq->fifo_beg);
1384                         ioq->fifo_cdx -= ioq->fifo_beg;
1385                         ioq->fifo_cdn -= ioq->fifo_beg;
1386                         ioq->fifo_end -= ioq->fifo_beg;
1387                         ioq->fifo_beg = 0;
1388                 }
1389
1390                 /* 
1391                  * beg .... cdx ............ cdn ............. end
1392                  * [WRITABLE] [PARTIALENCRYPT] [NOTYETENCRYPTED]
1393                  *
1394                  * Advance beg on a successful write.
1395                  */
1396                 iovcnt = dmsg_crypto_encrypt(iocom, ioq, iov, iovcnt, &nact);
1397                 n = writev(iocom->sock_fd, iov, iovcnt);
1398                 if (n > 0) {
1399                         ioq->fifo_beg += n;
1400                         if (ioq->fifo_beg == ioq->fifo_end) {
1401                                 ioq->fifo_beg = 0;
1402                                 ioq->fifo_cdn = 0;
1403                                 ioq->fifo_cdx = 0;
1404                                 ioq->fifo_end = 0;
1405                         }
1406                 }
1407                 /*
1408                  * We don't mess with the nact returned by the crypto_encrypt
1409                  * call, which represents the filling of the FIFO.  (n) tells
1410                  * us how much we were able to write from the FIFO.  The two
1411                  * are different beasts when encrypting.
1412                  */
1413         } else {
1414                 /*
1415                  * In this situation we are not staging the messages to the
1416                  * FIFO but instead writing them directly from the msg
1417                  * structure(s) unencrypted, so (nact) is basically (n).
1418                  */
1419                 n = writev(iocom->sock_fd, iov, iovcnt);
1420                 if (n > 0)
1421                         nact = n;
1422                 else
1423                         nact = 0;
1424         }
1425
1426         /*
1427          * Clean out the transmit queue based on what we successfully
1428          * sent (nact is the plaintext count).  ioq->hbytes/abytes
1429          * represents the portion of the first message previously sent.
1430          */
1431         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1432                 hbytes = (msg->any.head.cmd & DMSGF_SIZE) *
1433                          DMSG_ALIGN;
1434                 abytes = DMSG_DOALIGN(msg->aux_size);
1435
1436                 if ((size_t)nact < hbytes - ioq->hbytes) {
1437                         ioq->hbytes += nact;
1438                         nact = 0;
1439                         break;
1440                 }
1441                 nact -= hbytes - ioq->hbytes;
1442                 ioq->hbytes = hbytes;
1443                 if ((size_t)nact < abytes - ioq->abytes) {
1444                         ioq->abytes += nact;
1445                         nact = 0;
1446                         break;
1447                 }
1448                 nact -= abytes - ioq->abytes;
1449                 /* ioq->abytes = abytes; optimized out */
1450
1451 #if 0
1452                 fprintf(stderr,
1453                         "txmsg cmd=%08x msgid=%016jx circ=%016jx\n",
1454                         msg->any.head.cmd,
1455                         (intmax_t)msg->any.head.msgid,
1456                         (intmax_t)msg->any.head.circuit);
1457 #endif
1458
1459                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1460                 --ioq->msgcount;
1461                 ioq->hbytes = 0;
1462                 ioq->abytes = 0;
1463                 dmsg_msg_free(msg);
1464         }
1465         assert(nact == 0);
1466
1467         /*
1468          * Process the return value from the write w/regards to blocking.
1469          */
1470         if (n < 0) {
1471                 if (errno != EINTR &&
1472                     errno != EINPROGRESS &&
1473                     errno != EAGAIN) {
1474                         /*
1475                          * Fatal write error
1476                          */
1477                         ioq->error = DMSG_IOQ_ERROR_SOCK;
1478                         dmsg_iocom_drain(iocom);
1479                 } else {
1480                         /*
1481                          * Wait for socket buffer space
1482                          */
1483                         atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1484                 }
1485         } else {
1486                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_WREQ);
1487         }
1488         if (ioq->error) {
1489                 dmsg_iocom_drain(iocom);
1490         }
1491 }
1492
1493 /*
1494  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1495  * write events will occur.  We don't kill read msgs because we want
1496  * the caller to pull off our contrived terminal error msg to detect
1497  * the connection failure.
1498  *
1499  * Localized to iocom_core thread, iocom->mtx not held by caller.
1500  */
1501 void
1502 dmsg_iocom_drain(dmsg_iocom_t *iocom)
1503 {
1504         dmsg_ioq_t *ioq = &iocom->ioq_tx;
1505         dmsg_msg_t *msg;
1506
1507         atomic_clear_int(&iocom->flags, DMSG_IOCOMF_WREQ | DMSG_IOCOMF_WWORK);
1508         ioq->hbytes = 0;
1509         ioq->abytes = 0;
1510
1511         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1512                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1513                 --ioq->msgcount;
1514                 dmsg_msg_free(msg);
1515         }
1516 }
1517
1518 /*
1519  * Write a message to an iocom, with additional state processing.
1520  */
1521 void
1522 dmsg_msg_write(dmsg_msg_t *msg)
1523 {
1524         dmsg_iocom_t *iocom = msg->state->iocom;
1525         dmsg_state_t *state;
1526         char dummy;
1527
1528         pthread_mutex_lock(&iocom->mtx);
1529         state = msg->state;
1530
1531         /*
1532          * Make sure the parent transaction is still open in the transmit
1533          * direction.  If it isn't the message is dead and we have to
1534          * potentially simulate a rxmsg terminating the transaction.
1535          */
1536         if (state->parent->txcmd & DMSGF_DELETE) {
1537                 fprintf(stderr, "dmsg_msg_write: EARLY TERMINATION\n");
1538                 dmsg_msg_simulate_failure(state, DMSG_ERR_LOSTLINK);
1539                 dmsg_state_cleanuptx(iocom, msg);
1540                 dmsg_msg_free(msg);
1541                 pthread_mutex_unlock(&iocom->mtx);
1542                 return;
1543         }
1544
1545         /*
1546          * Process state data into the message as needed, then update the
1547          * state based on the message.
1548          */
1549         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1550                 /*
1551                  * Existing transaction (could be reply).  It is also
1552                  * possible for this to be the first reply (CREATE is set),
1553                  * in which case we populate state->txcmd.
1554                  *
1555                  * state->txcmd is adjusted to hold the final message cmd,
1556                  * and we also be sure to set the CREATE bit here.  We did
1557                  * not set it in dmsg_msg_alloc() because that would have
1558                  * not been serialized (state could have gotten ripped out
1559                  * from under the message prior to it being transmitted).
1560                  */
1561                 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
1562                     DMSGF_CREATE) {
1563                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1564                         state->icmd = state->txcmd & DMSGF_BASECMDMASK;
1565                 }
1566                 msg->any.head.msgid = state->msgid;
1567
1568                 if (msg->any.head.cmd & DMSGF_CREATE) {
1569                         state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
1570                 }
1571         }
1572         dmsg_state_cleanuptx(iocom, msg);
1573
1574 #if 0
1575         fprintf(stderr,
1576                 "MSGWRITE %016jx %08x\n",
1577                 msg->any.head.msgid, msg->any.head.cmd);
1578 #endif
1579
1580         /*
1581          * Queue it for output, wake up the I/O pthread.  Note that the
1582          * I/O thread is responsible for generating the CRCs and encryption.
1583          */
1584         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1585         dummy = 0;
1586         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1587         pthread_mutex_unlock(&iocom->mtx);
1588 }
1589
1590 /*
1591  * iocom->mtx must be held by caller.
1592  */
1593 static
1594 void
1595 dmsg_msg_simulate_failure(dmsg_state_t *state, int error)
1596 {
1597         dmsg_iocom_t *iocom = state->iocom;
1598         dmsg_msg_t *msg;
1599
1600         msg = NULL;
1601
1602         if (state == &iocom->state0) {
1603                 /*
1604                  * No active local or remote transactions remain.
1605                  * Generate a final LNK_ERROR and flag EOF.
1606                  */
1607                 msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1608                                             DMSG_LNK_ERROR,
1609                                             NULL, NULL);
1610                 msg->any.head.error = error;
1611                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_EOF);
1612                 fprintf(stderr, "EOF ON SOCKET %d\n", iocom->sock_fd);
1613         } else if (state->flags & DMSG_STATE_OPPOSITE) {
1614                 /*
1615                  * Active remote transactions are still present.
1616                  * Simulate the other end sending us a DELETE.
1617                  */
1618                 if (state->rxcmd & DMSGF_DELETE) {
1619                         fprintf(stderr,
1620                                 "iocom: ioq error(rd) %d sleeping "
1621                                 "state %p rxcmd %08x txcmd %08x "
1622                                 "func %p\n",
1623                                 error, state, state->rxcmd,
1624                                 state->txcmd, state->func);
1625                         usleep(100000); /* XXX */
1626                         atomic_set_int(&iocom->flags,
1627                                        DMSG_IOCOMF_RWORK);
1628                 } else {
1629                         fprintf(stderr, "SIMULATE ERROR1\n");
1630                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1631                                              DMSG_LNK_ERROR,
1632                                              NULL, NULL);
1633                         /*state->txcmd |= DMSGF_DELETE;*/
1634                         msg->state = state;
1635                         msg->any.head.error = error;
1636                         msg->any.head.msgid = state->msgid;
1637                         msg->any.head.circuit = state->parent->msgid;
1638                         msg->any.head.cmd |= DMSGF_ABORT |
1639                                              DMSGF_DELETE;
1640                         if ((state->parent->flags &
1641                              DMSG_STATE_OPPOSITE) == 0) {
1642                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1643                         }
1644                 }
1645         } else {
1646                 /*
1647                  * Active local transactions are still present.
1648                  * Simulate the other end sending us a DELETE.
1649                  */
1650                 if (state->rxcmd & DMSGF_DELETE) {
1651                         fprintf(stderr,
1652                                 "iocom: ioq error(wr) %d sleeping "
1653                                 "state %p rxcmd %08x txcmd %08x "
1654                                 "func %p\n",
1655                                 error, state, state->rxcmd,
1656                                 state->txcmd, state->func);
1657                         usleep(100000); /* XXX */
1658                         atomic_set_int(&iocom->flags,
1659                                        DMSG_IOCOMF_RWORK);
1660                 } else {
1661                         fprintf(stderr, "SIMULATE ERROR1\n");
1662                         msg = dmsg_msg_alloc_locked(&iocom->state0, 0,
1663                                              DMSG_LNK_ERROR,
1664                                              NULL, NULL);
1665                         msg->state = state;
1666                         msg->any.head.error = error;
1667                         msg->any.head.msgid = state->msgid;
1668                         msg->any.head.circuit = state->parent->msgid;
1669                         msg->any.head.cmd |= DMSGF_ABORT |
1670                                              DMSGF_DELETE |
1671                                              DMSGF_REVTRANS |
1672                                              DMSGF_REPLY;
1673                         if ((state->parent->flags &
1674                              DMSG_STATE_OPPOSITE) == 0) {
1675                                 msg->any.head.cmd |= DMSGF_REVCIRC;
1676                         }
1677                         if ((state->rxcmd & DMSGF_CREATE) == 0)
1678                                 msg->any.head.cmd |= DMSGF_CREATE;
1679                 }
1680         }
1681         if (msg) {
1682                 TAILQ_INSERT_TAIL(&iocom->ioq_rx.msgq, msg, qentry);
1683                 atomic_set_int(&iocom->flags, DMSG_IOCOMF_RWORK);
1684         }
1685 }
1686
1687 /*
1688  * This is a shortcut to formulate a reply to msg with a simple error code,
1689  * It can reply to and terminate a transaction, or it can reply to a one-way
1690  * messages.  A DMSG_LNK_ERROR command code is utilized to encode
1691  * the error code (which can be 0).  Not all transactions are terminated
1692  * with DMSG_LNK_ERROR status (the low level only cares about the
1693  * MSGF_DELETE flag), but most are.
1694  *
1695  * Replies to one-way messages are a bit of an oxymoron but the feature
1696  * is used by the debug (DBG) protocol.
1697  *
1698  * The reply contains no extended data.
1699  */
1700 void
1701 dmsg_msg_reply(dmsg_msg_t *msg, uint32_t error)
1702 {
1703         dmsg_state_t *state = msg->state;
1704         dmsg_msg_t *nmsg;
1705         uint32_t cmd;
1706
1707         /*
1708          * Reply with a simple error code and terminate the transaction.
1709          */
1710         cmd = DMSG_LNK_ERROR;
1711
1712         /*
1713          * Check if our direction has even been initiated yet, set CREATE.
1714          *
1715          * Check what direction this is (command or reply direction).  Note
1716          * that txcmd might not have been initiated yet.
1717          *
1718          * If our direction has already been closed we just return without
1719          * doing anything.
1720          */
1721         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1722                 if (state->txcmd & DMSGF_DELETE)
1723                         return;
1724                 if (state->txcmd & DMSGF_REPLY)
1725                         cmd |= DMSGF_REPLY;
1726                 cmd |= DMSGF_DELETE;
1727         } else {
1728                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1729                         cmd |= DMSGF_REPLY;
1730         }
1731
1732         /*
1733          * Allocate the message and associate it with the existing state.
1734          * We cannot pass DMSGF_CREATE to msg_alloc() because that may
1735          * allocate new state.  We have our state already.
1736          */
1737         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1738         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1739                 if ((state->txcmd & DMSGF_CREATE) == 0)
1740                         nmsg->any.head.cmd |= DMSGF_CREATE;
1741         }
1742         nmsg->any.head.error = error;
1743
1744         dmsg_msg_write(nmsg);
1745 }
1746
1747 /*
1748  * Similar to dmsg_msg_reply() but leave the transaction open.  That is,
1749  * we are generating a streaming reply or an intermediate acknowledgement
1750  * of some sort as part of the higher level protocol, with more to come
1751  * later.
1752  */
1753 void
1754 dmsg_msg_result(dmsg_msg_t *msg, uint32_t error)
1755 {
1756         dmsg_state_t *state = msg->state;
1757         dmsg_msg_t *nmsg;
1758         uint32_t cmd;
1759
1760
1761         /*
1762          * Reply with a simple error code and terminate the transaction.
1763          */
1764         cmd = DMSG_LNK_ERROR;
1765
1766         /*
1767          * Check if our direction has even been initiated yet, set CREATE.
1768          *
1769          * Check what direction this is (command or reply direction).  Note
1770          * that txcmd might not have been initiated yet.
1771          *
1772          * If our direction has already been closed we just return without
1773          * doing anything.
1774          */
1775         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1776                 if (state->txcmd & DMSGF_DELETE)
1777                         return;
1778                 if (state->txcmd & DMSGF_REPLY)
1779                         cmd |= DMSGF_REPLY;
1780                 /* continuing transaction, do not set MSGF_DELETE */
1781         } else {
1782                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1783                         cmd |= DMSGF_REPLY;
1784         }
1785         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1786         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1787                 if ((state->txcmd & DMSGF_CREATE) == 0)
1788                         nmsg->any.head.cmd |= DMSGF_CREATE;
1789         }
1790         nmsg->any.head.error = error;
1791
1792         dmsg_msg_write(nmsg);
1793 }
1794
1795 /*
1796  * Terminate a transaction given a state structure by issuing a DELETE.
1797  * (the state structure must not be &iocom->state0)
1798  */
1799 void
1800 dmsg_state_reply(dmsg_state_t *state, uint32_t error)
1801 {
1802         dmsg_msg_t *nmsg;
1803         uint32_t cmd = DMSG_LNK_ERROR | DMSGF_DELETE;
1804
1805         /*
1806          * Nothing to do if we already transmitted a delete
1807          */
1808         if (state->txcmd & DMSGF_DELETE)
1809                 return;
1810
1811         /*
1812          * Set REPLY if the other end initiated the command.  Otherwise
1813          * we are the command direction.
1814          */
1815         if (state->txcmd & DMSGF_REPLY)
1816                 cmd |= DMSGF_REPLY;
1817
1818         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1819         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1820                 if ((state->txcmd & DMSGF_CREATE) == 0)
1821                         nmsg->any.head.cmd |= DMSGF_CREATE;
1822         }
1823         nmsg->any.head.error = error;
1824         dmsg_msg_write(nmsg);
1825 }
1826
1827 /*
1828  * Terminate a transaction given a state structure by issuing a DELETE.
1829  * (the state structure must not be &iocom->state0)
1830  */
1831 void
1832 dmsg_state_result(dmsg_state_t *state, uint32_t error)
1833 {
1834         dmsg_msg_t *nmsg;
1835         uint32_t cmd = DMSG_LNK_ERROR;
1836
1837         /*
1838          * Nothing to do if we already transmitted a delete
1839          */
1840         if (state->txcmd & DMSGF_DELETE)
1841                 return;
1842
1843         /*
1844          * Set REPLY if the other end initiated the command.  Otherwise
1845          * we are the command direction.
1846          */
1847         if (state->txcmd & DMSGF_REPLY)
1848                 cmd |= DMSGF_REPLY;
1849
1850         nmsg = dmsg_msg_alloc(state, 0, cmd, NULL, NULL);
1851         if ((state->flags & DMSG_STATE_ROOT) == 0) {
1852                 if ((state->txcmd & DMSGF_CREATE) == 0)
1853                         nmsg->any.head.cmd |= DMSGF_CREATE;
1854         }
1855         nmsg->any.head.error = error;
1856         dmsg_msg_write(nmsg);
1857 }
1858
1859 /************************************************************************
1860  *                      TRANSACTION STATE HANDLING                      *
1861  ************************************************************************
1862  *
1863  */
1864
1865 /*
1866  * Process state tracking for a message after reception, prior to execution.
1867  * Possibly route the message (consuming it).
1868  *
1869  * Called with msglk held and the msg dequeued.
1870  *
1871  * All messages are called with dummy state and return actual state.
1872  * (One-off messages often just return the same dummy state).
1873  *
1874  * May request that caller discard the message by setting *discardp to 1.
1875  * The returned state is not used in this case and is allowed to be NULL.
1876  *
1877  * --
1878  *
1879  * These routines handle persistent and command/reply message state via the
1880  * CREATE and DELETE flags.  The first message in a command or reply sequence
1881  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1882  *
1883  * There can be any number of intermediate messages belonging to the same
1884  * sequence sent inbetween the CREATE message and the DELETE message,
1885  * which set neither flag.  This represents a streaming command or reply.
1886  *
1887  * Any command message received with CREATE set expects a reply sequence to
1888  * be returned.  Reply sequences work the same as command sequences except the
1889  * REPLY bit is also sent.  Both the command side and reply side can
1890  * degenerate into a single message with both CREATE and DELETE set.  Note
1891  * that one side can be streaming and the other side not, or neither, or both.
1892  *
1893  * The msgid is unique for the initiator.  That is, two sides sending a new
1894  * message can use the same msgid without colliding.
1895  *
1896  * --
1897  *
1898  * ABORT sequences work by setting the ABORT flag along with normal message
1899  * state.  However, ABORTs can also be sent on half-closed messages, that is
1900  * even if the command or reply side has already sent a DELETE, as long as
1901  * the message has not been fully closed it can still send an ABORT+DELETE
1902  * to terminate the half-closed message state.
1903  *
1904  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1905  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1906  * also race, and in this situation the other side might have already
1907  * initiated a new unrelated command with the same message id.  Since
1908  * the abort has not set the CREATE flag the situation can be detected
1909  * and the message will also be discarded.
1910  *
1911  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1912  * The ABORT request is essentially integrated into the command instead
1913  * of being sent later on.  In this situation the command implementation
1914  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1915  * special-case non-blocking operation for the command.
1916  *
1917  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1918  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1919  *        one-way messages are not supported.
1920  *
1921  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1922  *        simply ignored.
1923  *
1924  * --
1925  *
1926  * One-off messages (no reply expected) are sent without an established
1927  * transaction.  CREATE and DELETE are left clear and the msgid is usually 0.
1928  * For one-off messages sent over circuits msgid generally MUST be 0.
1929  *
1930  * One-off messages cannot be aborted and typically aren't processed
1931  * by these routines.  Order is still guaranteed for messages sent over
1932  * the same circuit.  The REPLY bit can be used to distinguish whether
1933  * a one-off message is a command or reply.  For example, one-off replies
1934  * will typically just contain status updates.
1935  */
1936 static int
1937 dmsg_state_msgrx(dmsg_msg_t *msg)
1938 {
1939         dmsg_iocom_t *iocom = msg->state->iocom;
1940         dmsg_state_t *state;
1941         dmsg_state_t *pstate;
1942         dmsg_state_t sdummy;
1943         int error;
1944
1945         pthread_mutex_lock(&iocom->mtx);
1946
1947         /*
1948          * Lookup the circuit (pstate).  The circuit will be an open
1949          * transaction.  The REVCIRC bit in the message tells us which side
1950          * initiated it.
1951          */
1952         if (msg->any.head.circuit) {
1953                 sdummy.msgid = msg->any.head.circuit;
1954
1955                 if (msg->any.head.cmd & DMSGF_REVCIRC) {
1956                         pstate = RB_FIND(dmsg_state_tree,
1957                                          &iocom->statewr_tree,
1958                                          &sdummy);
1959                 } else {
1960                         pstate = RB_FIND(dmsg_state_tree,
1961                                          &iocom->staterd_tree,
1962                                          &sdummy);
1963                 }
1964                 if (pstate == NULL) {
1965                         fprintf(stderr,
1966                                 "missing parent in stacked trans %s\n",
1967                                 dmsg_msg_str(msg));
1968                         error = DMSG_IOQ_ERROR_TRANS;
1969                         pthread_mutex_unlock(&iocom->mtx);
1970                         assert(0);
1971                 }
1972         } else {
1973                 pstate = &iocom->state0;
1974         }
1975
1976         /*
1977          * Lookup the msgid.
1978          *
1979          * If received msg is a command state is on staterd_tree.
1980          * If received msg is a reply state is on statewr_tree.
1981          * Otherwise there is no state (retain &iocom->state0)
1982          */
1983         sdummy.msgid = msg->any.head.msgid;
1984         if (msg->any.head.cmd & DMSGF_REVTRANS)
1985                 state = RB_FIND(dmsg_state_tree, &iocom->statewr_tree, &sdummy);
1986         else
1987                 state = RB_FIND(dmsg_state_tree, &iocom->staterd_tree, &sdummy);
1988
1989         if (state) {
1990                 /*
1991                  * Message over an existing transaction (CREATE should not
1992                  * be set).
1993                  */
1994                 msg->state = state;
1995                 assert(pstate == state->parent);
1996         } else {
1997                 /*
1998                  * Either a new transaction (if CREATE set) or a one-off.
1999                  */
2000                 state = pstate;
2001         }
2002
2003         pthread_mutex_unlock(&iocom->mtx);
2004
2005         /*
2006          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
2007          * inside the case statements.
2008          *
2009          * Construct new state as necessary.
2010          */
2011         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
2012                                     DMSGF_REPLY)) {
2013         case DMSGF_CREATE:
2014         case DMSGF_CREATE | DMSGF_DELETE:
2015                 /*
2016                  * Create new sub-transaction under pstate.
2017                  * (any DELETE is handled in post-processing of msg).
2018                  *
2019                  * (During routing the msgid was made unique for this
2020                  * direction over the comlink, so our RB trees can be
2021                  * iocom-based instead of state-based).
2022                  */
2023                 if (state != pstate) {
2024                         fprintf(stderr,
2025                                 "duplicate transaction %s\n",
2026                                 dmsg_msg_str(msg));
2027                         error = DMSG_IOQ_ERROR_TRANS;
2028                         assert(0);
2029                         break;
2030                 }
2031
2032                 /*
2033                  * Allocate the new state.
2034                  */
2035                 state = malloc(sizeof(*state));
2036                 atomic_add_int(&dmsg_state_count, 1);
2037                 bzero(state, sizeof(*state));
2038                 TAILQ_INIT(&state->subq);
2039                 dmsg_state_hold(pstate);
2040                 state->refs = 1;
2041                 state->parent = pstate;
2042                 state->iocom = iocom;
2043                 state->flags = DMSG_STATE_DYNAMIC |
2044                                DMSG_STATE_OPPOSITE;
2045                 state->msgid = msg->any.head.msgid;
2046                 state->txcmd = DMSGF_REPLY;
2047                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2048                 state->icmd = state->rxcmd & DMSGF_BASECMDMASK;
2049                 msg->state = state;
2050                 pthread_mutex_lock(&iocom->mtx);
2051                 RB_INSERT(dmsg_state_tree, &iocom->staterd_tree, state);
2052                 TAILQ_INSERT_TAIL(&pstate->subq, state, entry);
2053                 state->flags |= DMSG_STATE_INSERTED;
2054
2055                 /*
2056                  * If the parent is a relay set up the state handler to
2057                  * automatically route the message.  Local processing will
2058                  * not occur if set.
2059                  *
2060                  * (state relays are seeded by SPAN processing)
2061                  */
2062                 if (pstate->relay)
2063                         state->func = dmsg_state_relay;
2064                 pthread_mutex_unlock(&iocom->mtx);
2065                 error = 0;
2066
2067                 if (DMsgDebugOpt) {
2068                         fprintf(stderr,
2069                                 "create state %p id=%08x on iocom staterd %p\n",
2070                                 state, (uint32_t)state->msgid, iocom);
2071                 }
2072                 break;
2073         case DMSGF_DELETE:
2074                 /*
2075                  * Persistent state is expected but might not exist if an
2076                  * ABORT+DELETE races the close.
2077                  *
2078                  * (any DELETE is handled in post-processing of msg).
2079                  */
2080                 if (state == pstate) {
2081                         if (msg->any.head.cmd & DMSGF_ABORT) {
2082                                 error = DMSG_IOQ_ERROR_EALREADY;
2083                         } else {
2084                                 fprintf(stderr, "missing-state %s\n",
2085                                         dmsg_msg_str(msg));
2086                                 error = DMSG_IOQ_ERROR_TRANS;
2087                                 assert(0);
2088                         }
2089                         break;
2090                 }
2091
2092                 /*
2093                  * Handle another ABORT+DELETE case if the msgid has already
2094                  * been reused.
2095                  */
2096                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2097                         if (msg->any.head.cmd & DMSGF_ABORT) {
2098                                 error = DMSG_IOQ_ERROR_EALREADY;
2099                         } else {
2100                                 fprintf(stderr, "reused-state %s\n",
2101                                         dmsg_msg_str(msg));
2102                                 error = DMSG_IOQ_ERROR_TRANS;
2103                                 assert(0);
2104                         }
2105                         break;
2106                 }
2107                 error = 0;
2108                 break;
2109         default:
2110                 /*
2111                  * Check for mid-stream ABORT command received, otherwise
2112                  * allow.
2113                  */
2114                 if (msg->any.head.cmd & DMSGF_ABORT) {
2115                         if ((state == pstate) ||
2116                             (state->rxcmd & DMSGF_CREATE) == 0) {
2117                                 error = DMSG_IOQ_ERROR_EALREADY;
2118                                 break;
2119                         }
2120                 }
2121                 error = 0;
2122                 break;
2123         case DMSGF_REPLY | DMSGF_CREATE:
2124         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
2125                 /*
2126                  * When receiving a reply with CREATE set the original
2127                  * persistent state message should already exist.
2128                  */
2129                 if (state == pstate) {
2130                         fprintf(stderr, "no-state(r) %s\n",
2131                                 dmsg_msg_str(msg));
2132                         error = DMSG_IOQ_ERROR_TRANS;
2133                         assert(0);
2134                         break;
2135                 }
2136                 assert(((state->rxcmd ^ msg->any.head.cmd) & DMSGF_REPLY) == 0);
2137                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
2138                 error = 0;
2139                 break;
2140         case DMSGF_REPLY | DMSGF_DELETE:
2141                 /*
2142                  * Received REPLY+ABORT+DELETE in case where msgid has
2143                  * already been fully closed, ignore the message.
2144                  */
2145                 if (state == pstate) {
2146                         if (msg->any.head.cmd & DMSGF_ABORT) {
2147                                 error = DMSG_IOQ_ERROR_EALREADY;
2148                         } else {
2149                                 fprintf(stderr, "no-state(r,d) %s\n",
2150                                         dmsg_msg_str(msg));
2151                                 error = DMSG_IOQ_ERROR_TRANS;
2152                                 assert(0);
2153                         }
2154                         break;
2155                 }
2156
2157                 /*
2158                  * Received REPLY+ABORT+DELETE in case where msgid has
2159                  * already been reused for an unrelated message,
2160                  * ignore the message.
2161                  */
2162                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
2163                         if (msg->any.head.cmd & DMSGF_ABORT) {
2164                                 error = DMSG_IOQ_ERROR_EALREADY;
2165                         } else {
2166                                 fprintf(stderr, "reused-state(r,d) %s\n",
2167                                         dmsg_msg_str(msg));
2168                                 error = DMSG_IOQ_ERROR_TRANS;
2169                                 assert(0);
2170                         }
2171                         break;
2172                 }
2173                 error = 0;
2174                 break;
2175         case DMSGF_REPLY:
2176                 /*
2177                  * Check for mid-stream ABORT reply received to sent command.
2178                  */
2179                 if (msg->any.head.cmd & DMSGF_ABORT) {
2180                         if (state == pstate ||
2181                             (state->rxcmd & DMSGF_CREATE) == 0) {
2182                                 error = DMSG_IOQ_ERROR_EALREADY;
2183                                 break;
2184                         }
2185                 }
2186                 error = 0;
2187                 break;
2188         }
2189
2190         /*
2191          * Calculate the easy-switch() transactional command.  Represents
2192          * the outer-transaction command for any transaction-create or
2193          * transaction-delete, and the inner message command for any
2194          * non-transaction or inside-transaction command.  tcmd will be
2195          * set to 0 for any messaging error condition.
2196          *
2197          * The two can be told apart because outer-transaction commands
2198          * always have a DMSGF_CREATE and/or DMSGF_DELETE flag.
2199          */
2200         if (msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE)) {
2201                 if ((state->flags & DMSG_STATE_ROOT) == 0) {
2202                         msg->tcmd = (msg->state->icmd & DMSGF_BASECMDMASK) |
2203                                     (msg->any.head.cmd & (DMSGF_CREATE |
2204                                                           DMSGF_DELETE |
2205                                                           DMSGF_REPLY));
2206                 } else {
2207                         msg->tcmd = 0;
2208                 }
2209         } else {
2210                 msg->tcmd = msg->any.head.cmd & DMSGF_CMDSWMASK;
2211         }
2212         return (error);
2213 }
2214
2215 /*
2216  * Route the message and handle pair-state processing.
2217  */
2218 void
2219 dmsg_state_relay(dmsg_msg_t *lmsg)
2220 {
2221         dmsg_state_t *lpstate;
2222         dmsg_state_t *rpstate;
2223         dmsg_state_t *lstate;
2224         dmsg_state_t *rstate;
2225         dmsg_msg_t *rmsg;
2226
2227         if ((lmsg->any.head.cmd & (DMSGF_CREATE | DMSGF_REPLY)) ==
2228             DMSGF_CREATE) {
2229                 /*
2230                  * New sub-transaction, establish new state and relay.
2231                  */
2232                 lstate = lmsg->state;
2233                 lpstate = lstate->parent;
2234                 rpstate = lpstate->relay;
2235                 assert(lstate->relay == NULL);
2236                 assert(rpstate != NULL);
2237
2238                 rmsg = dmsg_msg_alloc(rpstate,
2239                                       lmsg->aux_size,
2240                                       lmsg->any.head.cmd,
2241                                       dmsg_state_relay, NULL);
2242                 rstate = rmsg->state;
2243                 rstate->relay = lstate;
2244                 lstate->relay = rstate;
2245                 dmsg_state_hold(lstate);
2246                 dmsg_state_hold(rstate);
2247         } else {
2248                 /*
2249                  * State & relay already established
2250                  */
2251                 lstate = lmsg->state;
2252                 rstate = lstate->relay;
2253                 assert(rstate != NULL);
2254
2255                 rmsg = dmsg_msg_alloc(rstate,
2256                                       lmsg->aux_size,
2257                                       lmsg->any.head.cmd,
2258                                       dmsg_state_relay, NULL);
2259         }
2260         if (lmsg->hdr_size > sizeof(lmsg->any.head)) {
2261                 bcopy(&lmsg->any.head + 1, &rmsg->any.head + 1,
2262                       lmsg->hdr_size - sizeof(lmsg->any.head));
2263         }
2264         rmsg->any.head.error = lmsg->any.head.error;
2265         rmsg->any.head.reserved02 = lmsg->any.head.reserved02;
2266         rmsg->any.head.reserved18 = lmsg->any.head.reserved18;
2267         rmsg->aux_data = lmsg->aux_data;
2268         lmsg->aux_data = NULL;
2269         /*
2270         fprintf(stderr, "RELAY %08x\n", rmsg->any.head.cmd);
2271         */
2272         dmsg_msg_write(rmsg);
2273 }
2274
2275 /*
2276  * Cleanup and retire msg after processing
2277  */
2278 void
2279 dmsg_state_cleanuprx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2280 {
2281         dmsg_state_t *state;
2282         dmsg_state_t *pstate;
2283
2284         assert(msg->state->iocom == iocom);
2285         state = msg->state;
2286         if (state->flags & DMSG_STATE_ROOT) {
2287                 /*
2288                  * Free a non-transactional message, there is no state
2289                  * to worry about.
2290                  */
2291                 dmsg_msg_free(msg);
2292         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2293                 /*
2294                  * Message terminating transaction, destroy the related
2295                  * state, the original message, and this message (if it
2296                  * isn't the original message due to a CREATE|DELETE).
2297                  *
2298                  * It's possible for governing state to terminate while
2299                  * sub-transactions still exist.  This is allowed but
2300                  * will cause sub-transactions to recursively fail.
2301                  * Further reception of sub-transaction messages will be
2302                  * impossible because the circuit will no longer exist.
2303                  * (XXX need code to make sure that happens properly).
2304                  */
2305                 pthread_mutex_lock(&iocom->mtx);
2306                 state->rxcmd |= DMSGF_DELETE;
2307
2308                 if (state->txcmd & DMSGF_DELETE) {
2309                         assert(state->flags & DMSG_STATE_INSERTED);
2310                         if (state->rxcmd & DMSGF_REPLY) {
2311                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2312                                 RB_REMOVE(dmsg_state_tree,
2313                                           &iocom->statewr_tree, state);
2314                         } else {
2315                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2316                                 RB_REMOVE(dmsg_state_tree,
2317                                           &iocom->staterd_tree, state);
2318                         }
2319                         pstate = state->parent;
2320                         TAILQ_REMOVE(&pstate->subq, state, entry);
2321                         state->flags &= ~DMSG_STATE_INSERTED;
2322                         state->parent = NULL;
2323                         dmsg_state_drop(pstate);
2324
2325                         if (state->relay) {
2326                                 dmsg_state_drop(state->relay);
2327                                 state->relay = NULL;
2328                         }
2329                         dmsg_msg_free(msg);
2330                         dmsg_state_drop(state);
2331                 } else {
2332                         dmsg_msg_free(msg);
2333                 }
2334                 pthread_mutex_unlock(&iocom->mtx);
2335         } else {
2336                 /*
2337                  * Message not terminating transaction, leave state intact
2338                  * and free message if it isn't the CREATE message.
2339                  */
2340                 dmsg_msg_free(msg);
2341         }
2342 }
2343
2344 /*
2345  * Clean up the state after pulling out needed fields and queueing the
2346  * message for transmission.   This occurs in dmsg_msg_write().
2347  */
2348 static void
2349 dmsg_state_cleanuptx(dmsg_iocom_t *iocom, dmsg_msg_t *msg)
2350 {
2351         dmsg_state_t *state;
2352         dmsg_state_t *pstate;
2353
2354         assert(iocom == msg->state->iocom);
2355         state = msg->state;
2356         if (state->flags & DMSG_STATE_ROOT) {
2357                 ;
2358         } else if (msg->any.head.cmd & DMSGF_DELETE) {
2359                 /*
2360                  * Message terminating transaction, destroy the related
2361                  * state, the original message, and this message (if it
2362                  * isn't the original message due to a CREATE|DELETE).
2363                  *
2364                  * It's possible for governing state to terminate while
2365                  * sub-transactions still exist.  This is allowed but
2366                  * will cause sub-transactions to recursively fail.
2367                  * Further reception of sub-transaction messages will be
2368                  * impossible because the circuit will no longer exist.
2369                  * (XXX need code to make sure that happens properly).
2370                  */
2371                 pthread_mutex_lock(&iocom->mtx);
2372                 assert((state->txcmd & DMSGF_DELETE) == 0);
2373                 state->txcmd |= DMSGF_DELETE;
2374                 if (state->rxcmd & DMSGF_DELETE) {
2375                         assert(state->flags & DMSG_STATE_INSERTED);
2376                         if (state->txcmd & DMSGF_REPLY) {
2377                                 assert(msg->any.head.cmd & DMSGF_REPLY);
2378                                 RB_REMOVE(dmsg_state_tree,
2379                                           &iocom->staterd_tree, state);
2380                         } else {
2381                                 assert((msg->any.head.cmd & DMSGF_REPLY) == 0);
2382                                 RB_REMOVE(dmsg_state_tree,
2383                                           &iocom->statewr_tree, state);
2384                         }
2385                         pstate = state->parent;
2386                         TAILQ_REMOVE(&pstate->subq, state, entry);
2387                         state->flags &= ~DMSG_STATE_INSERTED;
2388                         state->parent = NULL;
2389                         dmsg_state_drop(pstate);
2390
2391                         if (state->relay) {
2392                                 dmsg_state_drop(state->relay);
2393                                 state->relay = NULL;
2394                         }
2395                         dmsg_state_drop(state); /* usually the last drop */
2396                 }
2397                 pthread_mutex_unlock(&iocom->mtx);
2398         }
2399 }
2400
2401 /*
2402  * Called with or without locks
2403  */
2404 void
2405 dmsg_state_hold(dmsg_state_t *state)
2406 {
2407         atomic_add_int(&state->refs, 1);
2408 }
2409
2410 void
2411 dmsg_state_drop(dmsg_state_t *state)
2412 {
2413         if (atomic_fetchadd_int(&state->refs, -1) == 1)
2414                 dmsg_state_free(state);
2415 }
2416
2417 /*
2418  * Called with iocom locked
2419  */
2420 static void
2421 dmsg_state_free(dmsg_state_t *state)
2422 {
2423         atomic_add_int(&dmsg_state_count, -1);
2424         if (DMsgDebugOpt) {
2425                 fprintf(stderr, "terminate state %p id=%08x\n",
2426                         state, (uint32_t)state->msgid);
2427         }
2428         assert((state->flags & (DMSG_STATE_ROOT | DMSG_STATE_INSERTED)) == 0);
2429         assert(TAILQ_EMPTY(&state->subq));
2430         assert(state->refs == 0);
2431         if (state->any.any != NULL)   /* XXX avoid deadlock w/exit & kernel */
2432                 closefrom(3);
2433         assert(state->any.any == NULL);
2434         free(state);
2435 }
2436
2437 /*
2438  * This swaps endian for a hammer2_msg_hdr.  Note that the extended
2439  * header is not adjusted, just the core header.
2440  */
2441 void
2442 dmsg_bswap_head(dmsg_hdr_t *head)
2443 {
2444         head->magic     = bswap16(head->magic);
2445         head->reserved02 = bswap16(head->reserved02);
2446         head->salt      = bswap32(head->salt);
2447
2448         head->msgid     = bswap64(head->msgid);
2449         head->circuit   = bswap64(head->circuit);
2450         head->reserved18= bswap64(head->reserved18);
2451
2452         head->cmd       = bswap32(head->cmd);
2453         head->aux_crc   = bswap32(head->aux_crc);
2454         head->aux_bytes = bswap32(head->aux_bytes);
2455         head->error     = bswap32(head->error);
2456         head->aux_descr = bswap64(head->aux_descr);
2457         head->reserved38= bswap32(head->reserved38);
2458         head->hdr_crc   = bswap32(head->hdr_crc);
2459 }