hammer2 - spanning tree and messaging work
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
41 /*
42  * Initialize a low-level ioq
43  */
44 void
45 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
46 {
47         bzero(ioq, sizeof(*ioq));
48         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
49         TAILQ_INIT(&ioq->msgq);
50 }
51
52 /*
53  * Cleanup queue.
54  *
55  * caller holds iocom->mtx.
56  */
57 void
58 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
59 {
60         hammer2_msg_t *msg;
61
62         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
63                 assert(0);      /* shouldn't happen */
64                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
65                 hammer2_msg_free(iocom, msg);
66         }
67         if ((msg = ioq->msg) != NULL) {
68                 ioq->msg = NULL;
69                 hammer2_msg_free(iocom, msg);
70         }
71 }
72
73 /*
74  * Initialize a low-level communications channel
75  */
76 void
77 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
78 {
79         bzero(iocom, sizeof(*iocom));
80
81         pthread_mutex_init(&iocom->mtx, NULL);
82         RB_INIT(&iocom->staterd_tree);
83         RB_INIT(&iocom->statewr_tree);
84         TAILQ_INIT(&iocom->freeq);
85         TAILQ_INIT(&iocom->freeq_aux);
86         TAILQ_INIT(&iocom->addrq);
87         TAILQ_INIT(&iocom->txmsgq);
88         iocom->sock_fd = sock_fd;
89         iocom->alt_fd = alt_fd;
90         iocom->flags = HAMMER2_IOCOMF_RREQ;
91         hammer2_ioq_init(iocom, &iocom->ioq_rx);
92         hammer2_ioq_init(iocom, &iocom->ioq_tx);
93         if (pipe(iocom->wakeupfds) < 0)
94                 assert(0);
95         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
96         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
97
98         /*
99          * Negotiate session crypto synchronously.  This will mark the
100          * connection as error'd if it fails.
101          */
102         hammer2_crypto_negotiate(iocom);
103
104         /*
105          * Make sure our fds are set to non-blocking for the iocom core.
106          */
107         if (sock_fd >= 0)
108                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
109 #if 0
110         /* if line buffered our single fgets() should be fine */
111         if (alt_fd >= 0)
112                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
113 #endif
114 }
115
116 /*
117  * Cleanup a terminating iocom.
118  *
119  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
120  * from all possible references to it.
121  */
122 void
123 hammer2_iocom_done(hammer2_iocom_t *iocom)
124 {
125         hammer2_msg_t *msg;
126
127         if (iocom->sock_fd >= 0) {
128                 close(iocom->sock_fd);
129                 iocom->sock_fd = -1;
130         }
131         if (iocom->alt_fd >= 0) {
132                 close(iocom->alt_fd);
133                 iocom->alt_fd = -1;
134         }
135         hammer2_ioq_done(iocom, &iocom->ioq_rx);
136         hammer2_ioq_done(iocom, &iocom->ioq_tx);
137         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
138                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
139                 free(msg);
140         }
141         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
142                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
143                 free(msg->aux_data);
144                 msg->aux_data = NULL;
145                 free(msg);
146         }
147         if (iocom->wakeupfds[0] >= 0) {
148                 close(iocom->wakeupfds[0]);
149                 iocom->wakeupfds[0] = -1;
150         }
151         if (iocom->wakeupfds[1] >= 0) {
152                 close(iocom->wakeupfds[1]);
153                 iocom->wakeupfds[1] = -1;
154         }
155         pthread_mutex_destroy(&iocom->mtx);
156 }
157
158 /*
159  * Allocate a new one-way message.
160  */
161 hammer2_msg_t *
162 hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
163 {
164         hammer2_msg_t *msg;
165         int hbytes;
166
167         pthread_mutex_lock(&iocom->mtx);
168         if (aux_size) {
169                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
170                            ~HAMMER2_MSG_ALIGNMASK;
171                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
172                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
173         } else {
174                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
175                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
176         }
177         pthread_mutex_unlock(&iocom->mtx);
178         if (msg == NULL) {
179                 msg = malloc(sizeof(*msg));
180                 bzero(msg, sizeof(*msg));
181                 msg->aux_data = NULL;
182                 msg->aux_size = 0;
183         }
184         if (msg->aux_size != aux_size) {
185                 if (msg->aux_data) {
186                         free(msg->aux_data);
187                         msg->aux_data = NULL;
188                         msg->aux_size = 0;
189                 }
190                 if (aux_size) {
191                         msg->aux_data = malloc(aux_size);
192                         msg->aux_size = aux_size;
193                 }
194         }
195         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
196         if (hbytes)
197                 bzero(&msg->any.head, hbytes);
198         msg->hdr_size = hbytes;
199         msg->any.head.cmd = cmd;
200         msg->any.head.aux_descr = 0;
201         msg->any.head.aux_crc = 0;
202
203         return (msg);
204 }
205
206 /*
207  * Free a message so it can be reused afresh.
208  *
209  * NOTE: aux_size can be 0 with a non-NULL aux_data.
210  */
211 static
212 void
213 hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
214 {
215         msg->state = NULL;
216         if (msg->aux_data)
217                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
218         else
219                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
220 }
221
222 void
223 hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
224 {
225         pthread_mutex_lock(&iocom->mtx);
226         hammer2_msg_free_locked(iocom, msg);
227         pthread_mutex_unlock(&iocom->mtx);
228 }
229
230 /*
231  * I/O core loop for an iocom.
232  *
233  * Thread localized, iocom->mtx not held.
234  */
235 void
236 hammer2_iocom_core(hammer2_iocom_t *iocom,
237                    void (*recvmsg_func)(hammer2_iocom_t *),
238                    void (*sendmsg_func)(hammer2_iocom_t *),
239                    void (*altmsg_func)(hammer2_iocom_t *))
240 {
241         struct pollfd fds[3];
242         char dummybuf[256];
243         int timeout;
244         int count;
245         int wi; /* wakeup pipe */
246         int si; /* socket */
247         int ai; /* alt bulk path socket */
248
249         iocom->recvmsg_callback = recvmsg_func;
250         iocom->sendmsg_callback = sendmsg_func;
251         iocom->altmsg_callback = altmsg_func;
252
253         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
254                 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
255                                      HAMMER2_IOCOMF_WWORK |
256                                      HAMMER2_IOCOMF_PWORK |
257                                      HAMMER2_IOCOMF_ARWORK |
258                                      HAMMER2_IOCOMF_AWWORK)) == 0) {
259                         /*
260                          * Only poll if no immediate work is pending.
261                          * Otherwise we are just wasting our time calling
262                          * poll.
263                          */
264                         timeout = 5000;
265
266                         count = 0;
267                         wi = -1;
268                         si = -1;
269                         ai = -1;
270
271                         /*
272                          * Always check the inter-thread pipe, e.g.
273                          * for iocom->txmsgq work.
274                          */
275                         wi = count++;
276                         fds[wi].fd = iocom->wakeupfds[0];
277                         fds[wi].events = POLLIN;
278                         fds[wi].revents = 0;
279
280                         /*
281                          * Check the socket input/output direction as
282                          * requested
283                          */
284                         if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
285                                             HAMMER2_IOCOMF_WREQ)) {
286                                 si = count++;
287                                 fds[si].fd = iocom->sock_fd;
288                                 fds[si].events = 0;
289                                 fds[si].revents = 0;
290
291                                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
292                                         fds[si].events |= POLLIN;
293                                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
294                                         fds[si].events |= POLLOUT;
295                         }
296
297                         /*
298                          * Check the alternative fd for work.
299                          */
300                         if (iocom->alt_fd >= 0) {
301                                 ai = count++;
302                                 fds[ai].fd = iocom->alt_fd;
303                                 fds[ai].events = POLLIN;
304                                 fds[ai].revents = 0;
305                         }
306                         poll(fds, count, timeout);
307
308                         if (wi >= 0 && (fds[wi].revents & POLLIN))
309                                 iocom->flags |= HAMMER2_IOCOMF_PWORK;
310                         if (si >= 0 && (fds[si].revents & POLLIN))
311                                 iocom->flags |= HAMMER2_IOCOMF_RWORK;
312                         if (si >= 0 && (fds[si].revents & POLLOUT))
313                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
314                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
315                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
316                         if (ai >= 0 && (fds[ai].revents & POLLIN))
317                                 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
318                 } else {
319                         /*
320                          * Always check the pipe
321                          */
322                         iocom->flags |= HAMMER2_IOCOMF_PWORK;
323                 }
324
325                 /*
326                  * Pending message queues from other threads wake us up
327                  * with a write to the wakeupfds[] pipe.  We have to clear
328                  * the pipe with a dummy read.
329                  */
330                 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
331                         iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
332                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
333                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
334                         iocom->flags |= HAMMER2_IOCOMF_WWORK;
335                         if (TAILQ_FIRST(&iocom->txmsgq))
336                                 iocom->sendmsg_callback(iocom);
337                 }
338
339                 /*
340                  * Message write sequencing
341                  */
342                 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
343                         iocom->sendmsg_callback(iocom);
344
345                 /*
346                  * Message read sequencing.  Run this after the write
347                  * sequencing in case the write sequencing allowed another
348                  * auto-DELETE to occur on the read side.
349                  */
350                 if (iocom->flags & HAMMER2_IOCOMF_RWORK)
351                         iocom->recvmsg_callback(iocom);
352
353                 if (iocom->flags & HAMMER2_IOCOMF_ARWORK)
354                         iocom->altmsg_callback(iocom);
355         }
356 }
357
358 /*
359  * Read the next ready message from the ioq, issuing I/O if needed.
360  * Caller should retry on a read-event when NULL is returned.
361  *
362  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
363  * be returned for each open transaction, then the ioq and iocom
364  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
365  * msg will be returned as the final message.  The caller should not call
366  * us again after the final message is returned.
367  *
368  * Thread localized, iocom->mtx not held.
369  */
370 hammer2_msg_t *
371 hammer2_ioq_read(hammer2_iocom_t *iocom)
372 {
373         hammer2_ioq_t *ioq = &iocom->ioq_rx;
374         hammer2_msg_t *msg;
375         hammer2_msg_hdr_t *head;
376         hammer2_state_t *state;
377         ssize_t n;
378         size_t bytes;
379         size_t nmax;
380         uint32_t xcrc32;
381         int error;
382
383 again:
384         iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
385
386         /*
387          * If a message is already pending we can just remove and
388          * return it.  Message state has already been processed.
389          */
390         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
391                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
392                 return (msg);
393         }
394
395         /*
396          * Message read in-progress (msg is NULL at the moment).  We don't
397          * allocate a msg until we have its core header.
398          */
399         bytes = ioq->fifo_end - ioq->fifo_beg;
400         nmax = sizeof(ioq->buf) - ioq->fifo_end;
401         msg = ioq->msg;
402
403         switch(ioq->state) {
404         case HAMMER2_MSGQ_STATE_HEADER1:
405                 /*
406                  * Load the primary header, fail on any non-trivial read
407                  * error or on EOF.  Since the primary header is the same
408                  * size is the message alignment it will never straddle
409                  * the end of the buffer.
410                  */
411                 if (bytes < (int)sizeof(msg->any.head)) {
412                         n = read(iocom->sock_fd,
413                                  ioq->buf + ioq->fifo_end,
414                                  nmax);
415                         if (n <= 0) {
416                                 if (n == 0) {
417                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
418                                         break;
419                                 }
420                                 if (errno != EINTR &&
421                                     errno != EINPROGRESS &&
422                                     errno != EAGAIN) {
423                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
424                                         break;
425                                 }
426                                 n = 0;
427                                 /* fall through */
428                         }
429                         ioq->fifo_end += n;
430                         bytes += n;
431                         nmax -= n;
432                 }
433
434                 /*
435                  * Insufficient data accumulated (msg is NULL, caller will
436                  * retry on event).
437                  */
438                 assert(msg == NULL);
439                 if (bytes < (int)sizeof(msg->any.head))
440                         break;
441
442                 /*
443                  * Calculate the header, decrypt data received so far.
444                  * Data will be decrypted in-place.  Partial blocks are
445                  * not immediately decrypted.
446                  *
447                  * WARNING!  The header might be in the wrong endian, we
448                  *           do not fix it up until we get the entire
449                  *           extended header.
450                  */
451                 hammer2_crypto_decrypt(iocom, ioq);
452                 head = (void *)(ioq->buf + ioq->fifo_beg);
453
454                 /*
455                  * Check and fixup the core header.  Note that the icrc
456                  * has to be calculated before any fixups, but the crc
457                  * fields in the msg may have to be swapped like everything
458                  * else.
459                  */
460                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
461                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
462                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
463                         break;
464                 }
465
466                 /*
467                  * Calculate the full header size and aux data size
468                  */
469                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
470                         ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
471                                       HAMMER2_MSG_ALIGN;
472                         ioq->abytes = bswap32(head->aux_bytes) *
473                                       HAMMER2_MSG_ALIGN;
474                 } else {
475                         ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
476                                       HAMMER2_MSG_ALIGN;
477                         ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
478                 }
479                 if (ioq->hbytes < sizeof(msg->any.head) ||
480                     ioq->hbytes > sizeof(msg->any) ||
481                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
482                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
483                         break;
484                 }
485
486                 /*
487                  * Finally allocate the message and copy the core header
488                  * to the embedded extended header.
489                  *
490                  * Initialize msg->aux_size to 0 and use it to track
491                  * the amount of data copied from the stream.
492                  */
493                 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
494                 ioq->msg = msg;
495
496                 /*
497                  * We are either done or we fall-through
498                  */
499                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
500                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
501                         ioq->fifo_beg += ioq->hbytes;
502                         break;
503                 }
504
505                 /*
506                  * Fall through to the next state.  Make sure that the
507                  * extended header does not straddle the end of the buffer.
508                  * We still want to issue larger reads into our buffer,
509                  * book-keeping is easier if we don't bcopy() yet.
510                  */
511                 if (bytes + nmax < ioq->hbytes) {
512                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
513                         ioq->fifo_cdx -= ioq->fifo_beg;
514                         ioq->fifo_beg = 0;
515                         ioq->fifo_end = bytes;
516                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
517                 }
518                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
519                 /* fall through */
520         case HAMMER2_MSGQ_STATE_HEADER2:
521                 /*
522                  * Fill out the extended header.
523                  */
524                 assert(msg != NULL);
525                 if (bytes < ioq->hbytes) {
526                         n = read(iocom->sock_fd,
527                                  msg->any.buf + ioq->fifo_end,
528                                  nmax);
529                         if (n <= 0) {
530                                 if (n == 0) {
531                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
532                                         break;
533                                 }
534                                 if (errno != EINTR &&
535                                     errno != EINPROGRESS &&
536                                     errno != EAGAIN) {
537                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
538                                         break;
539                                 }
540                                 n = 0;
541                                 /* fall through */
542                         }
543                         ioq->fifo_end += n;
544                         bytes += n;
545                         nmax -= n;
546                 }
547
548                 /*
549                  * Insufficient data accumulated (set msg NULL so caller will
550                  * retry on event).
551                  */
552                 if (bytes < ioq->hbytes) {
553                         msg = NULL;
554                         break;
555                 }
556
557                 /*
558                  * Calculate the extended header, decrypt data received
559                  * so far.  Handle endian-conversion for the entire extended
560                  * header.
561                  */
562                 hammer2_crypto_decrypt(iocom, ioq);
563                 head = (void *)(ioq->buf + ioq->fifo_beg);
564
565                 /*
566                  * Check the CRC.
567                  */
568                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
569                         xcrc32 = bswap32(head->hdr_crc);
570                 else
571                         xcrc32 = head->hdr_crc;
572                 head->hdr_crc = 0;
573                 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
574                         ioq->error = HAMMER2_IOQ_ERROR_XCRC;
575                         break;
576                 }
577                 head->hdr_crc = xcrc32;
578
579                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
580                         hammer2_bswap_head(head);
581                 }
582
583                 /*
584                  * Copy the extended header into the msg and adjust the
585                  * FIFO.
586                  */
587                 bcopy(head, &msg->any, ioq->hbytes);
588
589                 /*
590                  * We are either done or we fall-through.
591                  */
592                 if (ioq->abytes == 0) {
593                         ioq->fifo_beg += ioq->hbytes;
594                         break;
595                 }
596
597                 /*
598                  * Must adjust nmax and bytes (and the state) when falling
599                  * through.
600                  */
601                 ioq->fifo_beg += ioq->hbytes;
602                 nmax -= ioq->hbytes;
603                 bytes -= ioq->hbytes;
604                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
605                 /* fall through */
606         case HAMMER2_MSGQ_STATE_AUXDATA1:
607                 /*
608                  * Copy the partial or complete payload from remaining
609                  * bytes in the FIFO.  We have to fall-through either
610                  * way so we can check the crc.
611                  *
612                  * Adjust msg->aux_size to the final actual value.
613                  */
614                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
615                 if (ioq->already > ioq->abytes)
616                         ioq->already = ioq->abytes;
617                 if (bytes >= ioq->abytes) {
618                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
619                               ioq->abytes);
620                         msg->aux_size = ioq->abytes;
621                         ioq->fifo_beg += ioq->abytes;
622                         if (ioq->fifo_cdx < ioq->fifo_beg)
623                                 ioq->fifo_cdx = ioq->fifo_beg;
624                         bytes -= ioq->abytes;
625                 } else if (bytes) {
626                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
627                               bytes);
628                         msg->aux_size = bytes;
629                         ioq->fifo_beg += bytes;
630                         if (ioq->fifo_cdx < ioq->fifo_beg)
631                                 ioq->fifo_cdx = ioq->fifo_beg;
632                         bytes = 0;
633                 } else {
634                         msg->aux_size = 0;
635                 }
636                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
637                 /* fall through */
638         case HAMMER2_MSGQ_STATE_AUXDATA2:
639                 /*
640                  * Read the remainder of the payload directly into the
641                  * msg->aux_data buffer.
642                  */
643                 assert(msg);
644                 if (msg->aux_size < ioq->abytes) {
645                         assert(bytes == 0);
646                         n = read(iocom->sock_fd,
647                                  msg->aux_data + msg->aux_size,
648                                  ioq->abytes - msg->aux_size);
649                         if (n <= 0) {
650                                 if (n == 0) {
651                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
652                                         break;
653                                 }
654                                 if (errno != EINTR &&
655                                     errno != EINPROGRESS &&
656                                     errno != EAGAIN) {
657                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
658                                         break;
659                                 }
660                                 n = 0;
661                                 /* fall through */
662                         }
663                         msg->aux_size += n;
664                 }
665
666                 /*
667                  * Insufficient data accumulated (set msg NULL so caller will
668                  * retry on event).
669                  */
670                 if (msg->aux_size < ioq->abytes) {
671                         msg = NULL;
672                         break;
673                 }
674                 assert(msg->aux_size == ioq->abytes);
675                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
676
677                 /*
678                  * Check aux_crc, then we are done.
679                  */
680                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
681                 if (xcrc32 != msg->any.head.aux_crc) {
682                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
683                         break;
684                 }
685                 break;
686         case HAMMER2_MSGQ_STATE_ERROR:
687                 /*
688                  * Continued calls to drain recorded transactions (returning
689                  * a LNK_ERROR for each one), before we return the final
690                  * LNK_ERROR.
691                  */
692                 assert(msg == NULL);
693                 break;
694         default:
695                 /*
696                  * We don't double-return errors, the caller should not
697                  * have called us again after getting an error msg.
698                  */
699                 assert(0);
700                 break;
701         }
702
703         /*
704          * Check the message sequence.  The iv[] should prevent any
705          * possibility of a replay but we add this check anyway.
706          */
707         if (msg && ioq->error == 0) {
708                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
709                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
710                 } else {
711                         ++ioq->seq;
712                 }
713         }
714
715         /*
716          * Process transactional state for the message.
717          */
718         if (msg && ioq->error == 0) {
719                 error = hammer2_state_msgrx(iocom, msg);
720                 if (error) {
721                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
722                                 hammer2_msg_free(iocom, msg);
723                                 goto again;
724                         }
725                         ioq->error = error;
726                 }
727         }
728
729         /*
730          * Handle error, RREQ, or completion
731          *
732          * NOTE: nmax and bytes are invalid at this point, we don't bother
733          *       to update them when breaking out.
734          */
735         if (ioq->error) {
736                 /*
737                  * An unrecoverable error causes all active receive
738                  * transactions to be terminated with a LNK_ERROR message.
739                  *
740                  * Once all active transactions are exhausted we set the
741                  * iocom ERROR flag and return a non-transactional LNK_ERROR
742                  * message, which should cause master processing loops to
743                  * terminate.
744                  */
745                 assert(ioq->msg == msg);
746                 if (msg) {
747                         hammer2_msg_free(iocom, msg);
748                         ioq->msg = NULL;
749                 }
750
751                 /*
752                  * No more I/O read processing
753                  */
754                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
755
756                 /*
757                  * Simulate a remote LNK_ERROR DELETE msg for any open
758                  * transactions, ending with a final non-transactional
759                  * LNK_ERROR (that the session can detect) when no
760                  * transactions remain.
761                  */
762                 msg = hammer2_msg_alloc(iocom, 0, 0);
763                 bzero(&msg->any.head, sizeof(msg->any.head));
764                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
765                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
766                 msg->any.head.error = ioq->error;
767
768                 pthread_mutex_lock(&iocom->mtx);
769                 if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
770                         /*
771                          * Active remote transactions are still present.
772                          * Simulate the other end sending us a DELETE.
773                          */
774                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
775                                 fprintf(stderr, "SIMULATE DELETION RCONT %p\n", state);
776                                 hammer2_msg_free(iocom, msg);
777                                 msg = NULL;
778                         } else {
779                                 fprintf(stderr, "SIMULATE DELETION %p RD RXCMD %08x\n", state, state->rxcmd);
780                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
781                                 msg->state = state;
782                                 msg->any.head.spanid = state->spanid;
783                                 msg->any.head.msgid = state->msgid;
784                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
785                                                      HAMMER2_MSGF_DELETE;
786                         }
787                 } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
788                         /*
789                          * Active local transactions are still present.
790                          * Simulate the other end sending us a DELETE.
791                          */
792                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
793                                 fprintf(stderr, "SIMULATE DELETION WCONT\n");
794                                 hammer2_msg_free(iocom, msg);
795                                 msg = NULL;
796                         } else {
797                                 fprintf(stderr, "SIMULATE DELETION WD RXCMD %08x\n", state->txcmd);
798                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
799                                 msg->state = state;
800                                 msg->any.head.spanid = state->spanid;
801                                 msg->any.head.msgid = state->msgid;
802                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
803                                                      HAMMER2_MSGF_DELETE |
804                                                      HAMMER2_MSGF_REPLY;
805                                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
806                                         msg->any.head.cmd |=
807                                                      HAMMER2_MSGF_CREATE;
808                                 }
809                         }
810                 } else {
811                         /*
812                          * No active local or remote transactions remain.
813                          * Generate a final LNK_ERROR and flag EOF.
814                          */
815                         msg->state = NULL;
816                         iocom->flags |= HAMMER2_IOCOMF_EOF;
817                         fprintf(stderr, "EOF ON SOCKET\n");
818                 }
819                 pthread_mutex_unlock(&iocom->mtx);
820
821                 /*
822                  * For the iocom error case we want to set RWORK to indicate
823                  * that more messages might be pending.
824                  *
825                  * It is possible to return NULL when there is more work to
826                  * do because each message has to be DELETEd in both
827                  * directions before we continue on with the next (though
828                  * this could be optimized).  The transmit direction will
829                  * re-set RWORK.
830                  */
831                 if (msg)
832                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
833         } else if (msg == NULL) {
834                 /*
835                  * Insufficient data received to finish building the message,
836                  * set RREQ and return NULL.
837                  *
838                  * Leave ioq->msg intact.
839                  * Leave the FIFO intact.
840                  */
841                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
842         } else {
843                 /*
844                  * Return msg.
845                  *
846                  * The fifo has already been advanced past the message.
847                  * Trivially reset the FIFO indices if possible.
848                  *
849                  * clear the FIFO if it is now empty and set RREQ to wait
850                  * for more from the socket.  If the FIFO is not empty set
851                  * TWORK to bypass the poll so we loop immediately.
852                  */
853                 if (ioq->fifo_beg == ioq->fifo_end) {
854                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
855                         ioq->fifo_cdx = 0;
856                         ioq->fifo_beg = 0;
857                         ioq->fifo_end = 0;
858                 } else {
859                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
860                 }
861                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
862                 ioq->msg = NULL;
863         }
864         return (msg);
865 }
866
867 /*
868  * Calculate the header and data crc's and write a low-level message to
869  * the connection.  If aux_crc is non-zero the aux_data crc is already
870  * assumed to have been set.
871  *
872  * A non-NULL msg is added to the queue but not necessarily flushed.
873  * Calling this function with msg == NULL will get a flush going.
874  *
875  * Caller must hold iocom->mtx.
876  */
877 void
878 hammer2_iocom_flush1(hammer2_iocom_t *iocom)
879 {
880         hammer2_ioq_t *ioq = &iocom->ioq_tx;
881         hammer2_msg_t *msg;
882         uint32_t xcrc32;
883         int hbytes;
884         hammer2_msg_queue_t tmpq;
885
886         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
887         TAILQ_INIT(&tmpq);
888         pthread_mutex_lock(&iocom->mtx);
889         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
890                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
891                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
892         }
893         pthread_mutex_unlock(&iocom->mtx);
894
895         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
896                 /*
897                  * Process terminal connection errors.
898                  */
899                 TAILQ_REMOVE(&tmpq, msg, qentry);
900                 if (ioq->error) {
901                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
902                         ++ioq->msgcount;
903                         continue;
904                 }
905
906                 /*
907                  * Finish populating the msg fields.  The salt ensures that
908                  * the iv[] array is ridiculously randomized and we also
909                  * re-seed our PRNG every 32768 messages just to be sure.
910                  */
911                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
912                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
913                 ++ioq->seq;
914                 if ((ioq->seq & 32767) == 0)
915                         srandomdev();
916
917                 /*
918                  * Calculate aux_crc if 0, then calculate hdr_crc.
919                  */
920                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
921                         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
922                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
923                         msg->any.head.aux_crc = xcrc32;
924                 }
925                 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
926                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
927
928                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
929                          HAMMER2_MSG_ALIGN;
930                 msg->any.head.hdr_crc = 0;
931                 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
932
933                 /*
934                  * Enqueue the message (the flush codes handles stream
935                  * encryption).
936                  */
937                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
938                 ++ioq->msgcount;
939         }
940         hammer2_iocom_flush2(iocom);
941 }
942
943 /*
944  * Thread localized, iocom->mtx not held by caller.
945  */
946 void
947 hammer2_iocom_flush2(hammer2_iocom_t *iocom)
948 {
949         hammer2_ioq_t *ioq = &iocom->ioq_tx;
950         hammer2_msg_t *msg;
951         ssize_t nmax;
952         ssize_t nact;
953         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
954         size_t hbytes;
955         size_t abytes;
956         int hoff;
957         int aoff;
958         int n;
959
960         if (ioq->error) {
961                 hammer2_iocom_drain(iocom);
962                 return;
963         }
964
965         /*
966          * Pump messages out the connection by building an iovec.
967          */
968         n = 0;
969         nmax = 0;
970
971         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
972                 hoff = 0;
973                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
974                          HAMMER2_MSG_ALIGN;
975                 aoff = 0;
976                 abytes = msg->aux_size;
977                 if (n == 0) {
978                         hoff += ioq->hbytes;
979                         aoff += ioq->abytes;
980                 }
981                 if (hbytes - hoff > 0) {
982                         iov[n].iov_base = (char *)&msg->any.head + hoff;
983                         iov[n].iov_len = hbytes - hoff;
984                         nmax += hbytes - hoff;
985                         ++n;
986                         if (n == HAMMER2_IOQ_MAXIOVEC)
987                                 break;
988                 }
989                 if (abytes - aoff > 0) {
990                         assert(msg->aux_data != NULL);
991                         iov[n].iov_base = msg->aux_data + aoff;
992                         iov[n].iov_len = abytes - aoff;
993                         nmax += abytes - aoff;
994                         ++n;
995                         if (n == HAMMER2_IOQ_MAXIOVEC)
996                                 break;
997                 }
998         }
999         if (n == 0)
1000                 return;
1001
1002         /*
1003          * Encrypt and write the data.  The crypto code will move the
1004          * data into the fifo and adjust the iov as necessary.  If
1005          * encryption is disabled the iov is left alone.
1006          *
1007          * hammer2_crypto_encrypt_wrote()
1008          */
1009         n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
1010
1011         /*
1012          * Execute the writev() then figure out what happened.
1013          */
1014         nact = writev(iocom->sock_fd, iov, n);
1015         if (nact < 0) {
1016                 if (errno != EINTR &&
1017                     errno != EINPROGRESS &&
1018                     errno != EAGAIN) {
1019                         /*
1020                          * Fatal write error
1021                          */
1022                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
1023                         hammer2_iocom_drain(iocom);
1024                 } else {
1025                         /*
1026                          * Wait for socket buffer space
1027                          */
1028                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
1029                 }
1030                 return;
1031         }
1032
1033         /*
1034          * Indicate bytes written successfully.  If we were unable to
1035          * write the entire iov array then set WREQ to wait for more
1036          * socket buffer space.
1037          */
1038         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
1039         if (nact != nmax)
1040                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1041
1042         /*
1043          * Clean out the transmit queue based on what we successfully
1044          * sent.
1045          */
1046         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1047                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1048                          HAMMER2_MSG_ALIGN;
1049                 abytes = msg->aux_size;
1050
1051                 if ((size_t)nact < hbytes - ioq->hbytes) {
1052                         ioq->hbytes += nact;
1053                         break;
1054                 }
1055                 nact -= hbytes - ioq->hbytes;
1056                 ioq->hbytes = hbytes;
1057                 if ((size_t)nact < abytes - ioq->abytes) {
1058                         ioq->abytes += nact;
1059                         break;
1060                 }
1061                 nact -= abytes - ioq->abytes;
1062
1063                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1064                 --ioq->msgcount;
1065                 ioq->hbytes = 0;
1066                 ioq->abytes = 0;
1067
1068                 hammer2_state_cleanuptx(iocom, msg);
1069         }
1070         if (ioq->error) {
1071                 hammer2_iocom_drain(iocom);
1072         }
1073 }
1074
1075 /*
1076  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1077  * write events will occur.  We don't kill read msgs because we want
1078  * the caller to pull off our contrived terminal error msg to detect
1079  * the connection failure.
1080  *
1081  * Thread localized, iocom->mtx not held by caller.
1082  */
1083 void
1084 hammer2_iocom_drain(hammer2_iocom_t *iocom)
1085 {
1086         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1087         hammer2_msg_t *msg;
1088
1089         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1090
1091         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1092                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1093                 --ioq->msgcount;
1094                 hammer2_state_cleanuptx(iocom, msg);
1095         }
1096 }
1097
1098 /*
1099  * Write a message to an iocom, with additional state processing.
1100  */
1101 void
1102 hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
1103                   void (*func)(hammer2_state_t *, hammer2_msg_t *),
1104                   void *data,
1105                   hammer2_state_t **statep)
1106 {
1107         hammer2_state_t *state;
1108         char dummy;
1109
1110         /*
1111          * Handle state processing, create state if necessary.
1112          */
1113         pthread_mutex_lock(&iocom->mtx);
1114         if ((state = msg->state) != NULL) {
1115                 /*
1116                  * Existing transaction (could be reply).  It is also
1117                  * possible for this to be the first reply (CREATE is set),
1118                  * in which case we populate state->txcmd.
1119                  */
1120                 msg->any.head.msgid = state->msgid;
1121                 msg->any.head.spanid = state->spanid;
1122                 if (func) {
1123                         state->func = func;
1124                         state->any.any = data;
1125                 }
1126                 assert(((state->txcmd ^ msg->any.head.cmd) &
1127                         HAMMER2_MSGF_REPLY) == 0);
1128                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1129                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1130         } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
1131                 /*
1132                  * No existing state and CREATE is set, create new
1133                  * state for outgoing command.  This can't happen if
1134                  * REPLY is set as the state would already exist for
1135                  * a transaction reply.
1136                  */
1137                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1138
1139                 state = malloc(sizeof(*state));
1140                 bzero(state, sizeof(*state));
1141                 state->iocom = iocom;
1142                 state->flags = HAMMER2_STATE_DYNAMIC;
1143                 state->msg = msg;
1144                 state->msgid = (uint64_t)(uintptr_t)state;
1145                 state->spanid = msg->any.head.spanid;
1146                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1147                 state->rxcmd = HAMMER2_MSGF_REPLY;
1148                 state->func = func;
1149                 state->any.any = data;
1150                 RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
1151                 state->flags |= HAMMER2_STATE_INSERTED;
1152                 msg->state = state;
1153                 msg->any.head.msgid = state->msgid;
1154                 /* spanid set by caller */
1155         } else {
1156                 msg->any.head.msgid = 0;
1157                 /* spanid set by caller */
1158         }
1159
1160         if (statep)
1161                 *statep = state;
1162
1163         /*
1164          * Queue it for output, wake up the I/O pthread.  Note that the
1165          * I/O thread is responsible for generating the CRCs and encryption.
1166          */
1167         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1168         dummy = 0;
1169         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1170         pthread_mutex_unlock(&iocom->mtx);
1171 }
1172
1173 /*
1174  * This is a shortcut to formulate a reply to msg with a simple error code,
1175  * It can reply to and terminate a transaction, or it can reply to a one-way
1176  * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
1177  * the error code (which can be 0).  Not all transactions are terminated
1178  * with HAMMER2_LNK_ERROR status (the low level only cares about the
1179  * MSGF_DELETE flag), but most are.
1180  *
1181  * Replies to one-way messages are a bit of an oxymoron but the feature
1182  * is used by the debug (DBG) protocol.
1183  *
1184  * The reply contains no extended data.
1185  */
1186 void
1187 hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1188 {
1189         hammer2_state_t *state = msg->state;
1190         hammer2_msg_t *nmsg;
1191         uint32_t cmd;
1192
1193
1194         /*
1195          * Reply with a simple error code and terminate the transaction.
1196          */
1197         cmd = HAMMER2_LNK_ERROR;
1198
1199         /*
1200          * Check if our direction has even been initiated yet, set CREATE.
1201          *
1202          * Check what direction this is (command or reply direction).  Note
1203          * that txcmd might not have been initiated yet.
1204          *
1205          * If our direction has already been closed we just return without
1206          * doing anything.
1207          */
1208         if (state) {
1209                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1210                         return;
1211                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1212                         cmd |= HAMMER2_MSGF_CREATE;
1213                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1214                         cmd |= HAMMER2_MSGF_REPLY;
1215                 cmd |= HAMMER2_MSGF_DELETE;
1216         } else {
1217                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1218                         cmd |= HAMMER2_MSGF_REPLY;
1219         }
1220
1221         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1222         nmsg->any.head.error = error;
1223         nmsg->state = msg->state;
1224         hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
1225 }
1226
1227 /*
1228  * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
1229  * we are generating a streaming reply or an intermediate acknowledgement
1230  * of some sort as part of the higher level protocol, with more to come
1231  * later.
1232  */
1233 void
1234 hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1235 {
1236         hammer2_state_t *state = msg->state;
1237         hammer2_msg_t *nmsg;
1238         uint32_t cmd;
1239
1240
1241         /*
1242          * Reply with a simple error code and terminate the transaction.
1243          */
1244         cmd = HAMMER2_LNK_ERROR;
1245
1246         /*
1247          * Check if our direction has even been initiated yet, set CREATE.
1248          *
1249          * Check what direction this is (command or reply direction).  Note
1250          * that txcmd might not have been initiated yet.
1251          *
1252          * If our direction has already been closed we just return without
1253          * doing anything.
1254          */
1255         if (state) {
1256                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1257                         return;
1258                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1259                         cmd |= HAMMER2_MSGF_CREATE;
1260                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1261                         cmd |= HAMMER2_MSGF_REPLY;
1262                 /* continuing transaction, do not set MSGF_DELETE */
1263         } else {
1264                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1265                         cmd |= HAMMER2_MSGF_REPLY;
1266         }
1267
1268         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1269         nmsg->any.head.error = error;
1270         nmsg->state = state;
1271         hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
1272 }
1273
1274 /*
1275  * Terminate a transaction given a state structure by issuing a DELETE.
1276  */
1277 void
1278 hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1279 {
1280         hammer2_msg_t *nmsg;
1281         uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1282
1283         /*
1284          * Nothing to do if we already transmitted a delete
1285          */
1286         if (state->txcmd & HAMMER2_MSGF_DELETE)
1287                 return;
1288
1289         /*
1290          * We must also set CREATE if this is our first response to a
1291          * remote command.
1292          */
1293         if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1294                 cmd |= HAMMER2_MSGF_CREATE;
1295
1296         /*
1297          * Set REPLY if the other end initiated the command.  Otherwise
1298          * we are the command direction.
1299          */
1300         if (state->txcmd & HAMMER2_MSGF_REPLY)
1301                 cmd |= HAMMER2_MSGF_REPLY;
1302
1303         nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
1304         nmsg->any.head.error = error;
1305         nmsg->state = state;
1306         hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
1307 }
1308
1309 /************************************************************************
1310  *                      TRANSACTION STATE HANDLING                      *
1311  ************************************************************************
1312  *
1313  */
1314
1315 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
1316
1317 /*
1318  * Process state tracking for a message after reception, prior to
1319  * execution.
1320  *
1321  * Called with msglk held and the msg dequeued.
1322  *
1323  * All messages are called with dummy state and return actual state.
1324  * (One-off messages often just return the same dummy state).
1325  *
1326  * May request that caller discard the message by setting *discardp to 1.
1327  * The returned state is not used in this case and is allowed to be NULL.
1328  *
1329  * --
1330  *
1331  * These routines handle persistent and command/reply message state via the
1332  * CREATE and DELETE flags.  The first message in a command or reply sequence
1333  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1334  *
1335  * There can be any number of intermediate messages belonging to the same
1336  * sequence sent inbetween the CREATE message and the DELETE message,
1337  * which set neither flag.  This represents a streaming command or reply.
1338  *
1339  * Any command message received with CREATE set expects a reply sequence to
1340  * be returned.  Reply sequences work the same as command sequences except the
1341  * REPLY bit is also sent.  Both the command side and reply side can
1342  * degenerate into a single message with both CREATE and DELETE set.  Note
1343  * that one side can be streaming and the other side not, or neither, or both.
1344  *
1345  * The msgid is unique for the initiator.  That is, two sides sending a new
1346  * message can use the same msgid without colliding.
1347  *
1348  * --
1349  *
1350  * ABORT sequences work by setting the ABORT flag along with normal message
1351  * state.  However, ABORTs can also be sent on half-closed messages, that is
1352  * even if the command or reply side has already sent a DELETE, as long as
1353  * the message has not been fully closed it can still send an ABORT+DELETE
1354  * to terminate the half-closed message state.
1355  *
1356  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1357  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1358  * also race, and in this situation the other side might have already
1359  * initiated a new unrelated command with the same message id.  Since
1360  * the abort has not set the CREATE flag the situation can be detected
1361  * and the message will also be discarded.
1362  *
1363  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1364  * The ABORT request is essentially integrated into the command instead
1365  * of being sent later on.  In this situation the command implementation
1366  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1367  * special-case non-blocking operation for the command.
1368  *
1369  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1370  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1371  *        one-way messages are not supported.
1372  *
1373  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1374  *        simply ignored.
1375  *
1376  * --
1377  *
1378  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1379  * set.  One-off messages cannot be aborted and typically aren't processed
1380  * by these routines.  The REPLY bit can be used to distinguish whether a
1381  * one-off message is a command or reply.  For example, one-off replies
1382  * will typically just contain status updates.
1383  */
1384 static int
1385 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1386 {
1387         hammer2_state_t *state;
1388         hammer2_state_t dummy;
1389         int error;
1390
1391         /*
1392          * Lock RB tree and locate existing persistent state, if any.
1393          *
1394          * If received msg is a command state is on staterd_tree.
1395          * If received msg is a reply state is on statewr_tree.
1396          */
1397
1398         dummy.msgid = msg->any.head.msgid;
1399         dummy.spanid = msg->any.head.spanid;
1400 #if 0
1401         iocom_printf(iocom, msg->any.head.cmd,
1402                      "received msg %08x msgid %jx spanid=%jx\n",
1403                       msg->any.head.cmd,
1404                       (intmax_t)msg->any.head.msgid,
1405                       (intmax_t)msg->any.head.spanid);
1406 #endif
1407         pthread_mutex_lock(&iocom->mtx);
1408         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1409                 state = RB_FIND(hammer2_state_tree,
1410                                 &iocom->statewr_tree, &dummy);
1411         } else {
1412                 state = RB_FIND(hammer2_state_tree,
1413                                 &iocom->staterd_tree, &dummy);
1414         }
1415         msg->state = state;
1416         pthread_mutex_unlock(&iocom->mtx);
1417
1418         /*
1419          * Short-cut one-off or mid-stream messages (state may be NULL).
1420          */
1421         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1422                                   HAMMER2_MSGF_ABORT)) == 0) {
1423                 return(0);
1424         }
1425
1426         /*
1427          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1428          * inside the case statements.
1429          */
1430         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1431                                     HAMMER2_MSGF_REPLY)) {
1432         case HAMMER2_MSGF_CREATE:
1433         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1434                 /*
1435                  * New persistant command received.
1436                  */
1437                 if (state) {
1438                         iocom_printf(iocom, msg->any.head.cmd,
1439                                      "hammer2_state_msgrx: "
1440                                      "duplicate transaction\n");
1441                         error = HAMMER2_IOQ_ERROR_TRANS;
1442                         break;
1443                 }
1444                 state = malloc(sizeof(*state));
1445                 bzero(state, sizeof(*state));
1446                 state->iocom = iocom;
1447                 state->flags = HAMMER2_STATE_DYNAMIC;
1448                 state->msg = msg;
1449                 state->txcmd = HAMMER2_MSGF_REPLY;
1450                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1451                 pthread_mutex_lock(&iocom->mtx);
1452                 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1453                 pthread_mutex_unlock(&iocom->mtx);
1454                 state->flags |= HAMMER2_STATE_INSERTED;
1455                 state->msgid = msg->any.head.msgid;
1456                 state->spanid = msg->any.head.spanid;
1457                 msg->state = state;
1458                 error = 0;
1459                 break;
1460         case HAMMER2_MSGF_DELETE:
1461                 /*
1462                  * Persistent state is expected but might not exist if an
1463                  * ABORT+DELETE races the close.
1464                  */
1465                 if (state == NULL) {
1466                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1467                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1468                         } else {
1469                                 iocom_printf(iocom, msg->any.head.cmd,
1470                                              "hammer2_state_msgrx: "
1471                                              "no state for DELETE\n");
1472                                 error = HAMMER2_IOQ_ERROR_TRANS;
1473                         }
1474                         break;
1475                 }
1476
1477                 /*
1478                  * Handle another ABORT+DELETE case if the msgid has already
1479                  * been reused.
1480                  */
1481                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1482                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1483                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1484                         } else {
1485                                 iocom_printf(iocom, msg->any.head.cmd,
1486                                              "hammer2_state_msgrx: "
1487                                              "state reused for DELETE\n");
1488                                 error = HAMMER2_IOQ_ERROR_TRANS;
1489                         }
1490                         break;
1491                 }
1492                 error = 0;
1493                 break;
1494         default:
1495                 /*
1496                  * Check for mid-stream ABORT command received, otherwise
1497                  * allow.
1498                  */
1499                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1500                         if (state == NULL ||
1501                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1502                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1503                                 break;
1504                         }
1505                 }
1506                 error = 0;
1507                 break;
1508         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1509         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1510                 /*
1511                  * When receiving a reply with CREATE set the original
1512                  * persistent state message should already exist.
1513                  */
1514                 if (state == NULL) {
1515                         iocom_printf(iocom, msg->any.head.cmd,
1516                                      "hammer2_state_msgrx: "
1517                                      "no state match for REPLY cmd=%08x\n",
1518                                      msg->any.head.cmd);
1519                         error = HAMMER2_IOQ_ERROR_TRANS;
1520                         break;
1521                 }
1522                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1523                         HAMMER2_MSGF_REPLY) == 0);
1524                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1525                 error = 0;
1526                 break;
1527         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1528                 /*
1529                  * Received REPLY+ABORT+DELETE in case where msgid has
1530                  * already been fully closed, ignore the message.
1531                  */
1532                 if (state == NULL) {
1533                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1534                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1535                         } else {
1536                                 iocom_printf(iocom, msg->any.head.cmd,
1537                                              "hammer2_state_msgrx: "
1538                                              "no state match for "
1539                                              "REPLY|DELETE\n");
1540                                 error = HAMMER2_IOQ_ERROR_TRANS;
1541                         }
1542                         break;
1543                 }
1544
1545                 /*
1546                  * Received REPLY+ABORT+DELETE in case where msgid has
1547                  * already been reused for an unrelated message,
1548                  * ignore the message.
1549                  */
1550                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1551                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1552                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1553                         } else {
1554                                 iocom_printf(iocom, msg->any.head.cmd,
1555                                              "hammer2_state_msgrx: "
1556                                              "state reused for REPLY|DELETE\n");
1557                                 error = HAMMER2_IOQ_ERROR_TRANS;
1558                         }
1559                         break;
1560                 }
1561                 error = 0;
1562                 break;
1563         case HAMMER2_MSGF_REPLY:
1564                 /*
1565                  * Check for mid-stream ABORT reply received to sent command.
1566                  */
1567                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1568                         if (state == NULL ||
1569                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1570                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1571                                 break;
1572                         }
1573                 }
1574                 error = 0;
1575                 break;
1576         }
1577         return (error);
1578 }
1579
1580 void
1581 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1582 {
1583         hammer2_state_t *state;
1584
1585         if ((state = msg->state) == NULL) {
1586                 /*
1587                  * Free a non-transactional message, there is no state
1588                  * to worry about.
1589                  */
1590                 hammer2_msg_free(iocom, msg);
1591         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1592                 /*
1593                  * Message terminating transaction, destroy the related
1594                  * state, the original message, and this message (if it
1595                  * isn't the original message due to a CREATE|DELETE).
1596                  */
1597                 pthread_mutex_lock(&iocom->mtx);
1598                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1599                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1600                         if (state->msg == msg)
1601                                 state->msg = NULL;
1602                         assert(state->flags & HAMMER2_STATE_INSERTED);
1603                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1604                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1605                                 RB_REMOVE(hammer2_state_tree,
1606                                           &iocom->statewr_tree, state);
1607                         } else {
1608                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1609                                 RB_REMOVE(hammer2_state_tree,
1610                                           &iocom->staterd_tree, state);
1611                         }
1612                         state->flags &= ~HAMMER2_STATE_INSERTED;
1613                         hammer2_state_free(state);
1614                 } else {
1615                         ;
1616                 }
1617                 pthread_mutex_unlock(&iocom->mtx);
1618                 hammer2_msg_free(iocom, msg);
1619         } else if (state->msg != msg) {
1620                 /*
1621                  * Message not terminating transaction, leave state intact
1622                  * and free message if it isn't the CREATE message.
1623                  */
1624                 hammer2_msg_free(iocom, msg);
1625         }
1626 }
1627
1628 static void
1629 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1630 {
1631         hammer2_state_t *state;
1632
1633         if ((state = msg->state) == NULL) {
1634                 hammer2_msg_free(iocom, msg);
1635         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1636                 pthread_mutex_lock(&iocom->mtx);
1637                 state->txcmd |= HAMMER2_MSGF_DELETE;
1638                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1639                         if (state->msg == msg)
1640                                 state->msg = NULL;
1641                         assert(state->flags & HAMMER2_STATE_INSERTED);
1642                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
1643                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1644                                 RB_REMOVE(hammer2_state_tree,
1645                                           &iocom->staterd_tree, state);
1646                         } else {
1647                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1648                                 RB_REMOVE(hammer2_state_tree,
1649                                           &iocom->statewr_tree, state);
1650                         }
1651                         state->flags &= ~HAMMER2_STATE_INSERTED;
1652                         hammer2_state_free(state);
1653                 } else {
1654                         ;
1655                 }
1656                 pthread_mutex_unlock(&iocom->mtx);
1657                 hammer2_msg_free(iocom, msg);
1658         } else if (state->msg != msg) {
1659                 hammer2_msg_free(iocom, msg);
1660         }
1661 }
1662
1663 /*
1664  * Called with iocom locked
1665  */
1666 void
1667 hammer2_state_free(hammer2_state_t *state)
1668 {
1669         hammer2_iocom_t *iocom = state->iocom;
1670         hammer2_msg_t *msg;
1671         char dummy;
1672
1673         fprintf(stderr, "STATE FREE %p\n", state);
1674
1675         assert(state->any.any == NULL);
1676         msg = state->msg;
1677         state->msg = NULL;
1678         if (msg)
1679                 hammer2_msg_free_locked(iocom, msg);
1680         free(state);
1681
1682         /*
1683          * When an iocom error is present we are trying to close down the
1684          * iocom, but we have to wait for all states to terminate before
1685          * we can do so.  The iocom rx code will terminate the receive side
1686          * for all transactions by simulating incoming DELETE messages,
1687          * but the state doesn't go away until both sides are terminated.
1688          *
1689          * We may have to wake up the rx code.
1690          */
1691         if (iocom->ioq_rx.error &&
1692             RB_EMPTY(&iocom->staterd_tree) &&
1693             RB_EMPTY(&iocom->statewr_tree)) {
1694                 dummy = 0;
1695                 write(iocom->wakeupfds[1], &dummy, 1);
1696         }
1697 }
1698
1699 /*
1700  * Indexed messages are stored in a red-black tree indexed by their
1701  * msgid.  Only persistent messages are indexed.
1702  */
1703 int
1704 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
1705 {
1706         if (state1->spanid < state2->spanid)
1707                 return(-1);
1708         if (state1->spanid > state2->spanid)
1709                 return(1);
1710         if (state1->msgid < state2->msgid)
1711                 return(-1);
1712         if (state1->msgid > state2->msgid)
1713                 return(1);
1714         return(0);
1715 }