393933ff634dded19863400ab249c29c874b7156
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40
41 /*
42  * Initialize a low-level ioq
43  */
44 void
45 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
46 {
47         bzero(ioq, sizeof(*ioq));
48         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
49         TAILQ_INIT(&ioq->msgq);
50 }
51
52 /*
53  * Cleanup queue.
54  *
55  * caller holds iocom->mtx.
56  */
57 void
58 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
59 {
60         hammer2_msg_t *msg;
61
62         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
63                 assert(0);      /* shouldn't happen */
64                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
65                 hammer2_msg_free(iocom, msg);
66         }
67         if ((msg = ioq->msg) != NULL) {
68                 ioq->msg = NULL;
69                 hammer2_msg_free(iocom, msg);
70         }
71 }
72
73 /*
74  * Initialize a low-level communications channel.
75  *
76  * NOTE: The state_func() is called at least once from the loop and can be
77  *       re-armed via hammer2_iocom_restate().
78  */
79 void
80 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd,
81                    void (*state_func)(hammer2_iocom_t *),
82                    void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
83                    void (*altmsg_func)(hammer2_iocom_t *))
84 {
85         bzero(iocom, sizeof(*iocom));
86
87         iocom->state_callback = state_func;
88         iocom->rcvmsg_callback = rcvmsg_func;
89         iocom->altmsg_callback = altmsg_func;
90
91         pthread_mutex_init(&iocom->mtx, NULL);
92         RB_INIT(&iocom->staterd_tree);
93         RB_INIT(&iocom->statewr_tree);
94         TAILQ_INIT(&iocom->freeq);
95         TAILQ_INIT(&iocom->freeq_aux);
96         TAILQ_INIT(&iocom->addrq);
97         TAILQ_INIT(&iocom->txmsgq);
98         iocom->sock_fd = sock_fd;
99         iocom->alt_fd = alt_fd;
100         iocom->flags = HAMMER2_IOCOMF_RREQ;
101         if (state_func)
102                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
103         hammer2_ioq_init(iocom, &iocom->ioq_rx);
104         hammer2_ioq_init(iocom, &iocom->ioq_tx);
105         if (pipe(iocom->wakeupfds) < 0)
106                 assert(0);
107         fcntl(iocom->wakeupfds[0], F_SETFL, O_NONBLOCK);
108         fcntl(iocom->wakeupfds[1], F_SETFL, O_NONBLOCK);
109
110         /*
111          * Negotiate session crypto synchronously.  This will mark the
112          * connection as error'd if it fails.
113          */
114         hammer2_crypto_negotiate(iocom);
115
116         /*
117          * Make sure our fds are set to non-blocking for the iocom core.
118          */
119         if (sock_fd >= 0)
120                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
121 #if 0
122         /* if line buffered our single fgets() should be fine */
123         if (alt_fd >= 0)
124                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
125 #endif
126 }
127
128 /*
129  * May only be called from a callback from iocom_core.
130  *
131  * Adjust state machine functions, set flags to guarantee that both
132  * the recevmsg_func and the sendmsg_func is called at least once.
133  */
134 void
135 hammer2_iocom_restate(hammer2_iocom_t *iocom,
136                    void (*state_func)(hammer2_iocom_t *),
137                    void (*rcvmsg_func)(hammer2_iocom_t *, hammer2_msg_t *msg),
138                    void (*altmsg_func)(hammer2_iocom_t *))
139 {
140         iocom->state_callback = state_func;
141         iocom->rcvmsg_callback = rcvmsg_func;
142         iocom->altmsg_callback = altmsg_func;
143         if (state_func)
144                 iocom->flags |= HAMMER2_IOCOMF_SWORK;
145         else
146                 iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
147 }
148
149 /*
150  * Cleanup a terminating iocom.
151  *
152  * Caller should not hold iocom->mtx.  The iocom has already been disconnected
153  * from all possible references to it.
154  */
155 void
156 hammer2_iocom_done(hammer2_iocom_t *iocom)
157 {
158         hammer2_msg_t *msg;
159
160         if (iocom->sock_fd >= 0) {
161                 close(iocom->sock_fd);
162                 iocom->sock_fd = -1;
163         }
164         if (iocom->alt_fd >= 0) {
165                 close(iocom->alt_fd);
166                 iocom->alt_fd = -1;
167         }
168         hammer2_ioq_done(iocom, &iocom->ioq_rx);
169         hammer2_ioq_done(iocom, &iocom->ioq_tx);
170         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
171                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
172                 free(msg);
173         }
174         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
175                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
176                 free(msg->aux_data);
177                 msg->aux_data = NULL;
178                 free(msg);
179         }
180         if (iocom->wakeupfds[0] >= 0) {
181                 close(iocom->wakeupfds[0]);
182                 iocom->wakeupfds[0] = -1;
183         }
184         if (iocom->wakeupfds[1] >= 0) {
185                 close(iocom->wakeupfds[1]);
186                 iocom->wakeupfds[1] = -1;
187         }
188         pthread_mutex_destroy(&iocom->mtx);
189 }
190
191 /*
192  * Allocate a new one-way message.
193  */
194 hammer2_msg_t *
195 hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
196 {
197         hammer2_msg_t *msg;
198         int hbytes;
199
200         pthread_mutex_lock(&iocom->mtx);
201         if (aux_size) {
202                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
203                            ~HAMMER2_MSG_ALIGNMASK;
204                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
205                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
206         } else {
207                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
208                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
209         }
210         pthread_mutex_unlock(&iocom->mtx);
211         if (msg == NULL) {
212                 msg = malloc(sizeof(*msg));
213                 bzero(msg, sizeof(*msg));
214                 msg->aux_data = NULL;
215                 msg->aux_size = 0;
216         }
217         if (msg->aux_size != aux_size) {
218                 if (msg->aux_data) {
219                         free(msg->aux_data);
220                         msg->aux_data = NULL;
221                         msg->aux_size = 0;
222                 }
223                 if (aux_size) {
224                         msg->aux_data = malloc(aux_size);
225                         msg->aux_size = aux_size;
226                 }
227         }
228         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
229         if (hbytes)
230                 bzero(&msg->any.head, hbytes);
231         msg->hdr_size = hbytes;
232         msg->any.head.cmd = cmd;
233         msg->any.head.aux_descr = 0;
234         msg->any.head.aux_crc = 0;
235
236         return (msg);
237 }
238
239 /*
240  * Free a message so it can be reused afresh.
241  *
242  * NOTE: aux_size can be 0 with a non-NULL aux_data.
243  */
244 static
245 void
246 hammer2_msg_free_locked(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
247 {
248         msg->state = NULL;
249         if (msg->aux_data)
250                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
251         else
252                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
253 }
254
255 void
256 hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
257 {
258         pthread_mutex_lock(&iocom->mtx);
259         hammer2_msg_free_locked(iocom, msg);
260         pthread_mutex_unlock(&iocom->mtx);
261 }
262
263 /*
264  * I/O core loop for an iocom.
265  *
266  * Thread localized, iocom->mtx not held.
267  */
268 void
269 hammer2_iocom_core(hammer2_iocom_t *iocom)
270 {
271         struct pollfd fds[3];
272         char dummybuf[256];
273         hammer2_msg_t *msg;
274         int timeout;
275         int count;
276         int wi; /* wakeup pipe */
277         int si; /* socket */
278         int ai; /* alt bulk path socket */
279
280         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
281                 if ((iocom->flags & (HAMMER2_IOCOMF_RWORK |
282                                      HAMMER2_IOCOMF_WWORK |
283                                      HAMMER2_IOCOMF_PWORK |
284                                      HAMMER2_IOCOMF_SWORK |
285                                      HAMMER2_IOCOMF_ARWORK |
286                                      HAMMER2_IOCOMF_AWWORK)) == 0) {
287                         /*
288                          * Only poll if no immediate work is pending.
289                          * Otherwise we are just wasting our time calling
290                          * poll.
291                          */
292                         timeout = 5000;
293
294                         count = 0;
295                         wi = -1;
296                         si = -1;
297                         ai = -1;
298
299                         /*
300                          * Always check the inter-thread pipe, e.g.
301                          * for iocom->txmsgq work.
302                          */
303                         wi = count++;
304                         fds[wi].fd = iocom->wakeupfds[0];
305                         fds[wi].events = POLLIN;
306                         fds[wi].revents = 0;
307
308                         /*
309                          * Check the socket input/output direction as
310                          * requested
311                          */
312                         if (iocom->flags & (HAMMER2_IOCOMF_RREQ |
313                                             HAMMER2_IOCOMF_WREQ)) {
314                                 si = count++;
315                                 fds[si].fd = iocom->sock_fd;
316                                 fds[si].events = 0;
317                                 fds[si].revents = 0;
318
319                                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
320                                         fds[si].events |= POLLIN;
321                                 if (iocom->flags & HAMMER2_IOCOMF_WREQ)
322                                         fds[si].events |= POLLOUT;
323                         }
324
325                         /*
326                          * Check the alternative fd for work.
327                          */
328                         if (iocom->alt_fd >= 0) {
329                                 ai = count++;
330                                 fds[ai].fd = iocom->alt_fd;
331                                 fds[ai].events = POLLIN;
332                                 fds[ai].revents = 0;
333                         }
334                         poll(fds, count, timeout);
335
336                         if (wi >= 0 && (fds[wi].revents & POLLIN))
337                                 iocom->flags |= HAMMER2_IOCOMF_PWORK;
338                         if (si >= 0 && (fds[si].revents & POLLIN))
339                                 iocom->flags |= HAMMER2_IOCOMF_RWORK;
340                         if (si >= 0 && (fds[si].revents & POLLOUT))
341                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
342                         if (wi >= 0 && (fds[wi].revents & POLLOUT))
343                                 iocom->flags |= HAMMER2_IOCOMF_WWORK;
344                         if (ai >= 0 && (fds[ai].revents & POLLIN))
345                                 iocom->flags |= HAMMER2_IOCOMF_ARWORK;
346                 } else {
347                         /*
348                          * Always check the pipe
349                          */
350                         iocom->flags |= HAMMER2_IOCOMF_PWORK;
351                 }
352
353                 if (iocom->flags & HAMMER2_IOCOMF_SWORK) {
354                         iocom->flags &= ~HAMMER2_IOCOMF_SWORK;
355                         iocom->state_callback(iocom);
356                 }
357
358                 /*
359                  * Pending message queues from other threads wake us up
360                  * with a write to the wakeupfds[] pipe.  We have to clear
361                  * the pipe with a dummy read.
362                  */
363                 if (iocom->flags & HAMMER2_IOCOMF_PWORK) {
364                         iocom->flags &= ~HAMMER2_IOCOMF_PWORK;
365                         read(iocom->wakeupfds[0], dummybuf, sizeof(dummybuf));
366                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
367                         iocom->flags |= HAMMER2_IOCOMF_WWORK;
368                         if (TAILQ_FIRST(&iocom->txmsgq))
369                                 hammer2_iocom_flush1(iocom);
370                 }
371
372                 /*
373                  * Message write sequencing
374                  */
375                 if (iocom->flags & HAMMER2_IOCOMF_WWORK)
376                         hammer2_iocom_flush1(iocom);
377
378                 /*
379                  * Message read sequencing.  Run this after the write
380                  * sequencing in case the write sequencing allowed another
381                  * auto-DELETE to occur on the read side.
382                  */
383                 if (iocom->flags & HAMMER2_IOCOMF_RWORK) {
384                         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0 &&
385                                (msg = hammer2_ioq_read(iocom)) != NULL) {
386                                 fprintf(stderr,
387                                         "receive msg cmd=%08x msgid=%016jx\n",
388                                         msg->any.head.cmd,
389                                         (intmax_t)msg->any.head.msgid);
390                                 iocom->rcvmsg_callback(iocom, msg);
391                                 hammer2_state_cleanuprx(iocom, msg);
392                         }
393                 }
394
395                 if (iocom->flags & HAMMER2_IOCOMF_ARWORK)
396                         iocom->altmsg_callback(iocom);
397         }
398 }
399
400 /*
401  * Read the next ready message from the ioq, issuing I/O if needed.
402  * Caller should retry on a read-event when NULL is returned.
403  *
404  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
405  * be returned for each open transaction, then the ioq and iocom
406  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
407  * msg will be returned as the final message.  The caller should not call
408  * us again after the final message is returned.
409  *
410  * Thread localized, iocom->mtx not held.
411  */
412 hammer2_msg_t *
413 hammer2_ioq_read(hammer2_iocom_t *iocom)
414 {
415         hammer2_ioq_t *ioq = &iocom->ioq_rx;
416         hammer2_msg_t *msg;
417         hammer2_msg_hdr_t *head;
418         hammer2_state_t *state;
419         ssize_t n;
420         size_t bytes;
421         size_t nmax;
422         uint32_t xcrc32;
423         int error;
424
425 again:
426         iocom->flags &= ~(HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_RWORK);
427
428         /*
429          * If a message is already pending we can just remove and
430          * return it.  Message state has already been processed.
431          */
432         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
433                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
434                 return (msg);
435         }
436
437         /*
438          * Message read in-progress (msg is NULL at the moment).  We don't
439          * allocate a msg until we have its core header.
440          */
441         bytes = ioq->fifo_end - ioq->fifo_beg;
442         nmax = sizeof(ioq->buf) - ioq->fifo_end;
443         msg = ioq->msg;
444
445         switch(ioq->state) {
446         case HAMMER2_MSGQ_STATE_HEADER1:
447                 /*
448                  * Load the primary header, fail on any non-trivial read
449                  * error or on EOF.  Since the primary header is the same
450                  * size is the message alignment it will never straddle
451                  * the end of the buffer.
452                  */
453                 if (bytes < (int)sizeof(msg->any.head)) {
454                         n = read(iocom->sock_fd,
455                                  ioq->buf + ioq->fifo_end,
456                                  nmax);
457                         if (n <= 0) {
458                                 if (n == 0) {
459                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
460                                         break;
461                                 }
462                                 if (errno != EINTR &&
463                                     errno != EINPROGRESS &&
464                                     errno != EAGAIN) {
465                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
466                                         break;
467                                 }
468                                 n = 0;
469                                 /* fall through */
470                         }
471                         ioq->fifo_end += n;
472                         bytes += n;
473                         nmax -= n;
474                 }
475
476                 /*
477                  * Insufficient data accumulated (msg is NULL, caller will
478                  * retry on event).
479                  */
480                 assert(msg == NULL);
481                 if (bytes < (int)sizeof(msg->any.head))
482                         break;
483
484                 /*
485                  * Calculate the header, decrypt data received so far.
486                  * Data will be decrypted in-place.  Partial blocks are
487                  * not immediately decrypted.
488                  *
489                  * WARNING!  The header might be in the wrong endian, we
490                  *           do not fix it up until we get the entire
491                  *           extended header.
492                  */
493                 hammer2_crypto_decrypt(iocom, ioq);
494                 head = (void *)(ioq->buf + ioq->fifo_beg);
495
496                 /*
497                  * Check and fixup the core header.  Note that the icrc
498                  * has to be calculated before any fixups, but the crc
499                  * fields in the msg may have to be swapped like everything
500                  * else.
501                  */
502                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
503                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
504                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
505                         break;
506                 }
507
508                 /*
509                  * Calculate the full header size and aux data size
510                  */
511                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
512                         ioq->hbytes = (bswap32(head->cmd) & HAMMER2_MSGF_SIZE) *
513                                       HAMMER2_MSG_ALIGN;
514                         ioq->abytes = bswap32(head->aux_bytes) *
515                                       HAMMER2_MSG_ALIGN;
516                 } else {
517                         ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
518                                       HAMMER2_MSG_ALIGN;
519                         ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
520                 }
521                 if (ioq->hbytes < sizeof(msg->any.head) ||
522                     ioq->hbytes > sizeof(msg->any) ||
523                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
524                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
525                         break;
526                 }
527
528                 /*
529                  * Finally allocate the message and copy the core header
530                  * to the embedded extended header.
531                  *
532                  * Initialize msg->aux_size to 0 and use it to track
533                  * the amount of data copied from the stream.
534                  */
535                 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
536                 ioq->msg = msg;
537
538                 /*
539                  * We are either done or we fall-through
540                  */
541                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
542                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
543                         ioq->fifo_beg += ioq->hbytes;
544                         break;
545                 }
546
547                 /*
548                  * Fall through to the next state.  Make sure that the
549                  * extended header does not straddle the end of the buffer.
550                  * We still want to issue larger reads into our buffer,
551                  * book-keeping is easier if we don't bcopy() yet.
552                  */
553                 if (bytes + nmax < ioq->hbytes) {
554                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
555                         ioq->fifo_cdx -= ioq->fifo_beg;
556                         ioq->fifo_beg = 0;
557                         ioq->fifo_end = bytes;
558                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
559                 }
560                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
561                 /* fall through */
562         case HAMMER2_MSGQ_STATE_HEADER2:
563                 /*
564                  * Fill out the extended header.
565                  */
566                 assert(msg != NULL);
567                 if (bytes < ioq->hbytes) {
568                         n = read(iocom->sock_fd,
569                                  msg->any.buf + ioq->fifo_end,
570                                  nmax);
571                         if (n <= 0) {
572                                 if (n == 0) {
573                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
574                                         break;
575                                 }
576                                 if (errno != EINTR &&
577                                     errno != EINPROGRESS &&
578                                     errno != EAGAIN) {
579                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
580                                         break;
581                                 }
582                                 n = 0;
583                                 /* fall through */
584                         }
585                         ioq->fifo_end += n;
586                         bytes += n;
587                         nmax -= n;
588                 }
589
590                 /*
591                  * Insufficient data accumulated (set msg NULL so caller will
592                  * retry on event).
593                  */
594                 if (bytes < ioq->hbytes) {
595                         msg = NULL;
596                         break;
597                 }
598
599                 /*
600                  * Calculate the extended header, decrypt data received
601                  * so far.  Handle endian-conversion for the entire extended
602                  * header.
603                  */
604                 hammer2_crypto_decrypt(iocom, ioq);
605                 head = (void *)(ioq->buf + ioq->fifo_beg);
606
607                 /*
608                  * Check the CRC.
609                  */
610                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV)
611                         xcrc32 = bswap32(head->hdr_crc);
612                 else
613                         xcrc32 = head->hdr_crc;
614                 head->hdr_crc = 0;
615                 if (hammer2_icrc32(head, ioq->hbytes) != xcrc32) {
616                         ioq->error = HAMMER2_IOQ_ERROR_XCRC;
617                         break;
618                 }
619                 head->hdr_crc = xcrc32;
620
621                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
622                         hammer2_bswap_head(head);
623                 }
624
625                 /*
626                  * Copy the extended header into the msg and adjust the
627                  * FIFO.
628                  */
629                 bcopy(head, &msg->any, ioq->hbytes);
630
631                 /*
632                  * We are either done or we fall-through.
633                  */
634                 if (ioq->abytes == 0) {
635                         ioq->fifo_beg += ioq->hbytes;
636                         break;
637                 }
638
639                 /*
640                  * Must adjust nmax and bytes (and the state) when falling
641                  * through.
642                  */
643                 ioq->fifo_beg += ioq->hbytes;
644                 nmax -= ioq->hbytes;
645                 bytes -= ioq->hbytes;
646                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
647                 /* fall through */
648         case HAMMER2_MSGQ_STATE_AUXDATA1:
649                 /*
650                  * Copy the partial or complete payload from remaining
651                  * bytes in the FIFO.  We have to fall-through either
652                  * way so we can check the crc.
653                  *
654                  * Adjust msg->aux_size to the final actual value.
655                  */
656                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
657                 if (ioq->already > ioq->abytes)
658                         ioq->already = ioq->abytes;
659                 if (bytes >= ioq->abytes) {
660                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
661                               ioq->abytes);
662                         msg->aux_size = ioq->abytes;
663                         ioq->fifo_beg += ioq->abytes;
664                         if (ioq->fifo_cdx < ioq->fifo_beg)
665                                 ioq->fifo_cdx = ioq->fifo_beg;
666                         bytes -= ioq->abytes;
667                 } else if (bytes) {
668                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
669                               bytes);
670                         msg->aux_size = bytes;
671                         ioq->fifo_beg += bytes;
672                         if (ioq->fifo_cdx < ioq->fifo_beg)
673                                 ioq->fifo_cdx = ioq->fifo_beg;
674                         bytes = 0;
675                 } else {
676                         msg->aux_size = 0;
677                 }
678                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
679                 /* fall through */
680         case HAMMER2_MSGQ_STATE_AUXDATA2:
681                 /*
682                  * Read the remainder of the payload directly into the
683                  * msg->aux_data buffer.
684                  */
685                 assert(msg);
686                 if (msg->aux_size < ioq->abytes) {
687                         assert(bytes == 0);
688                         n = read(iocom->sock_fd,
689                                  msg->aux_data + msg->aux_size,
690                                  ioq->abytes - msg->aux_size);
691                         if (n <= 0) {
692                                 if (n == 0) {
693                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
694                                         break;
695                                 }
696                                 if (errno != EINTR &&
697                                     errno != EINPROGRESS &&
698                                     errno != EAGAIN) {
699                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
700                                         break;
701                                 }
702                                 n = 0;
703                                 /* fall through */
704                         }
705                         msg->aux_size += n;
706                 }
707
708                 /*
709                  * Insufficient data accumulated (set msg NULL so caller will
710                  * retry on event).
711                  */
712                 if (msg->aux_size < ioq->abytes) {
713                         msg = NULL;
714                         break;
715                 }
716                 assert(msg->aux_size == ioq->abytes);
717                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
718
719                 /*
720                  * Check aux_crc, then we are done.
721                  */
722                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
723                 if (xcrc32 != msg->any.head.aux_crc) {
724                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
725                         break;
726                 }
727                 break;
728         case HAMMER2_MSGQ_STATE_ERROR:
729                 /*
730                  * Continued calls to drain recorded transactions (returning
731                  * a LNK_ERROR for each one), before we return the final
732                  * LNK_ERROR.
733                  */
734                 assert(msg == NULL);
735                 break;
736         default:
737                 /*
738                  * We don't double-return errors, the caller should not
739                  * have called us again after getting an error msg.
740                  */
741                 assert(0);
742                 break;
743         }
744
745         /*
746          * Check the message sequence.  The iv[] should prevent any
747          * possibility of a replay but we add this check anyway.
748          */
749         if (msg && ioq->error == 0) {
750                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
751                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
752                 } else {
753                         ++ioq->seq;
754                 }
755         }
756
757         /*
758          * Process transactional state for the message.
759          */
760         if (msg && ioq->error == 0) {
761                 error = hammer2_state_msgrx(iocom, msg);
762                 if (error) {
763                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
764                                 hammer2_msg_free(iocom, msg);
765                                 goto again;
766                         }
767                         ioq->error = error;
768                 }
769         }
770
771         /*
772          * Handle error, RREQ, or completion
773          *
774          * NOTE: nmax and bytes are invalid at this point, we don't bother
775          *       to update them when breaking out.
776          */
777         if (ioq->error) {
778                 /*
779                  * An unrecoverable error causes all active receive
780                  * transactions to be terminated with a LNK_ERROR message.
781                  *
782                  * Once all active transactions are exhausted we set the
783                  * iocom ERROR flag and return a non-transactional LNK_ERROR
784                  * message, which should cause master processing loops to
785                  * terminate.
786                  */
787                 assert(ioq->msg == msg);
788                 if (msg) {
789                         hammer2_msg_free(iocom, msg);
790                         ioq->msg = NULL;
791                 }
792
793                 /*
794                  * No more I/O read processing
795                  */
796                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
797
798                 /*
799                  * Simulate a remote LNK_ERROR DELETE msg for any open
800                  * transactions, ending with a final non-transactional
801                  * LNK_ERROR (that the session can detect) when no
802                  * transactions remain.
803                  */
804                 msg = hammer2_msg_alloc(iocom, 0, 0);
805                 bzero(&msg->any.head, sizeof(msg->any.head));
806                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
807                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
808                 msg->any.head.error = ioq->error;
809
810                 pthread_mutex_lock(&iocom->mtx);
811                 fprintf(stderr, "CHECK REMAINING RXMSGS\n");
812                 if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
813                         /*
814                          * Active remote transactions are still present.
815                          * Simulate the other end sending us a DELETE.
816                          */
817                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
818                                 fprintf(stderr, "SIMULATE DELETION RCONT %p\n", state);
819                                 hammer2_msg_free(iocom, msg);
820                                 msg = NULL;
821                         } else {
822                                 fprintf(stderr, "SIMULATE DELETION %p RD RXCMD %08x\n", state, state->rxcmd);
823                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
824                                 msg->state = state;
825                                 msg->any.head.spanid = state->spanid;
826                                 msg->any.head.msgid = state->msgid;
827                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
828                                                      HAMMER2_MSGF_DELETE;
829                         }
830                 } else if ((state = RB_ROOT(&iocom->statewr_tree)) != NULL) {
831                         /*
832                          * Active local transactions are still present.
833                          * Simulate the other end sending us a DELETE.
834                          */
835                         if (state->rxcmd & HAMMER2_MSGF_DELETE) {
836                                 fprintf(stderr, "SIMULATE DELETION WCONT STATE->txcmd = %08x rxcmd = %08x msgid=%016jx\n", state->txcmd, state->rxcmd, state->msgid );
837                                 hammer2_msg_free(iocom, msg);
838                                 msg = NULL;
839                         } else {
840                                 fprintf(stderr, "SIMULATE DELETION WD RXCMD %08x\n", state->txcmd);
841                                 /*state->txcmd |= HAMMER2_MSGF_DELETE;*/
842                                 msg->state = state;
843                                 msg->any.head.spanid = state->spanid;
844                                 msg->any.head.msgid = state->msgid;
845                                 msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
846                                                      HAMMER2_MSGF_DELETE |
847                                                      HAMMER2_MSGF_REPLY;
848                                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
849                                         msg->any.head.cmd |=
850                                                      HAMMER2_MSGF_CREATE;
851                                 }
852                         }
853                 } else {
854                         /*
855                          * No active local or remote transactions remain.
856                          * Generate a final LNK_ERROR and flag EOF.
857                          */
858                         msg->state = NULL;
859                         iocom->flags |= HAMMER2_IOCOMF_EOF;
860                         fprintf(stderr, "EOF ON SOCKET\n");
861                 }
862                 pthread_mutex_unlock(&iocom->mtx);
863
864                 /*
865                  * For the iocom error case we want to set RWORK to indicate
866                  * that more messages might be pending.
867                  *
868                  * It is possible to return NULL when there is more work to
869                  * do because each message has to be DELETEd in both
870                  * directions before we continue on with the next (though
871                  * this could be optimized).  The transmit direction will
872                  * re-set RWORK.
873                  */
874                 if (msg)
875                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
876         } else if (msg == NULL) {
877                 /*
878                  * Insufficient data received to finish building the message,
879                  * set RREQ and return NULL.
880                  *
881                  * Leave ioq->msg intact.
882                  * Leave the FIFO intact.
883                  */
884                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
885         } else {
886                 /*
887                  * Return msg.
888                  *
889                  * The fifo has already been advanced past the message.
890                  * Trivially reset the FIFO indices if possible.
891                  *
892                  * clear the FIFO if it is now empty and set RREQ to wait
893                  * for more from the socket.  If the FIFO is not empty set
894                  * TWORK to bypass the poll so we loop immediately.
895                  */
896                 if (ioq->fifo_beg == ioq->fifo_end) {
897                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
898                         ioq->fifo_cdx = 0;
899                         ioq->fifo_beg = 0;
900                         ioq->fifo_end = 0;
901                 } else {
902                         iocom->flags |= HAMMER2_IOCOMF_RWORK;
903                 }
904                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
905                 ioq->msg = NULL;
906         }
907         return (msg);
908 }
909
910 /*
911  * Calculate the header and data crc's and write a low-level message to
912  * the connection.  If aux_crc is non-zero the aux_data crc is already
913  * assumed to have been set.
914  *
915  * A non-NULL msg is added to the queue but not necessarily flushed.
916  * Calling this function with msg == NULL will get a flush going.
917  *
918  * Caller must hold iocom->mtx.
919  */
920 void
921 hammer2_iocom_flush1(hammer2_iocom_t *iocom)
922 {
923         hammer2_ioq_t *ioq = &iocom->ioq_tx;
924         hammer2_msg_t *msg;
925         uint32_t xcrc32;
926         int hbytes;
927         hammer2_msg_queue_t tmpq;
928
929         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
930         TAILQ_INIT(&tmpq);
931         pthread_mutex_lock(&iocom->mtx);
932         while ((msg = TAILQ_FIRST(&iocom->txmsgq)) != NULL) {
933                 TAILQ_REMOVE(&iocom->txmsgq, msg, qentry);
934                 TAILQ_INSERT_TAIL(&tmpq, msg, qentry);
935         }
936         pthread_mutex_unlock(&iocom->mtx);
937
938         while ((msg = TAILQ_FIRST(&tmpq)) != NULL) {
939                 /*
940                  * Process terminal connection errors.
941                  */
942                 TAILQ_REMOVE(&tmpq, msg, qentry);
943                 if (ioq->error) {
944                         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
945                         ++ioq->msgcount;
946                         continue;
947                 }
948
949                 /*
950                  * Finish populating the msg fields.  The salt ensures that
951                  * the iv[] array is ridiculously randomized and we also
952                  * re-seed our PRNG every 32768 messages just to be sure.
953                  */
954                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
955                 msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
956                 ++ioq->seq;
957                 if ((ioq->seq & 32767) == 0)
958                         srandomdev();
959
960                 /*
961                  * Calculate aux_crc if 0, then calculate hdr_crc.
962                  */
963                 if (msg->aux_size && msg->any.head.aux_crc == 0) {
964                         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
965                         xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
966                         msg->any.head.aux_crc = xcrc32;
967                 }
968                 msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
969                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
970
971                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
972                          HAMMER2_MSG_ALIGN;
973                 msg->any.head.hdr_crc = 0;
974                 msg->any.head.hdr_crc = hammer2_icrc32(&msg->any.head, hbytes);
975
976                 /*
977                  * Enqueue the message (the flush codes handles stream
978                  * encryption).
979                  */
980                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
981                 ++ioq->msgcount;
982         }
983         hammer2_iocom_flush2(iocom);
984 }
985
986 /*
987  * Thread localized, iocom->mtx not held by caller.
988  */
989 void
990 hammer2_iocom_flush2(hammer2_iocom_t *iocom)
991 {
992         hammer2_ioq_t *ioq = &iocom->ioq_tx;
993         hammer2_msg_t *msg;
994         ssize_t nmax;
995         ssize_t nact;
996         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
997         size_t hbytes;
998         size_t abytes;
999         int hoff;
1000         int aoff;
1001         int n;
1002
1003         if (ioq->error) {
1004                 hammer2_iocom_drain(iocom);
1005                 return;
1006         }
1007
1008         /*
1009          * Pump messages out the connection by building an iovec.
1010          */
1011         n = 0;
1012         nmax = 0;
1013
1014         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
1015                 hoff = 0;
1016                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1017                          HAMMER2_MSG_ALIGN;
1018                 aoff = 0;
1019                 abytes = msg->aux_size;
1020                 if (n == 0) {
1021                         hoff += ioq->hbytes;
1022                         aoff += ioq->abytes;
1023                 }
1024                 if (hbytes - hoff > 0) {
1025                         iov[n].iov_base = (char *)&msg->any.head + hoff;
1026                         iov[n].iov_len = hbytes - hoff;
1027                         nmax += hbytes - hoff;
1028                         ++n;
1029                         if (n == HAMMER2_IOQ_MAXIOVEC)
1030                                 break;
1031                 }
1032                 if (abytes - aoff > 0) {
1033                         assert(msg->aux_data != NULL);
1034                         iov[n].iov_base = msg->aux_data + aoff;
1035                         iov[n].iov_len = abytes - aoff;
1036                         nmax += abytes - aoff;
1037                         ++n;
1038                         if (n == HAMMER2_IOQ_MAXIOVEC)
1039                                 break;
1040                 }
1041         }
1042         if (n == 0)
1043                 return;
1044
1045         /*
1046          * Encrypt and write the data.  The crypto code will move the
1047          * data into the fifo and adjust the iov as necessary.  If
1048          * encryption is disabled the iov is left alone.
1049          *
1050          * hammer2_crypto_encrypt_wrote()
1051          */
1052         n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
1053
1054         /*
1055          * Execute the writev() then figure out what happened.
1056          */
1057         nact = writev(iocom->sock_fd, iov, n);
1058         if (nact < 0) {
1059                 if (errno != EINTR &&
1060                     errno != EINPROGRESS &&
1061                     errno != EAGAIN) {
1062                         /*
1063                          * Fatal write error
1064                          */
1065                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
1066                         hammer2_iocom_drain(iocom);
1067                 } else {
1068                         /*
1069                          * Wait for socket buffer space
1070                          */
1071                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
1072                 }
1073                 return;
1074         }
1075
1076         /*
1077          * Indicate bytes written successfully.  If we were unable to
1078          * write the entire iov array then set WREQ to wait for more
1079          * socket buffer space.
1080          */
1081         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
1082         if (nact != nmax)
1083                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
1084
1085         /*
1086          * Clean out the transmit queue based on what we successfully
1087          * sent.
1088          */
1089         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1090                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
1091                          HAMMER2_MSG_ALIGN;
1092                 abytes = msg->aux_size;
1093
1094                 if ((size_t)nact < hbytes - ioq->hbytes) {
1095                         ioq->hbytes += nact;
1096                         break;
1097                 }
1098                 nact -= hbytes - ioq->hbytes;
1099                 ioq->hbytes = hbytes;
1100                 if ((size_t)nact < abytes - ioq->abytes) {
1101                         ioq->abytes += nact;
1102                         break;
1103                 }
1104                 nact -= abytes - ioq->abytes;
1105
1106                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1107                 --ioq->msgcount;
1108                 ioq->hbytes = 0;
1109                 ioq->abytes = 0;
1110
1111                 hammer2_state_cleanuptx(iocom, msg);
1112         }
1113         if (ioq->error) {
1114                 hammer2_iocom_drain(iocom);
1115         }
1116 }
1117
1118 /*
1119  * Kill pending msgs on ioq_tx and adjust the flags such that no more
1120  * write events will occur.  We don't kill read msgs because we want
1121  * the caller to pull off our contrived terminal error msg to detect
1122  * the connection failure.
1123  *
1124  * Thread localized, iocom->mtx not held by caller.
1125  */
1126 void
1127 hammer2_iocom_drain(hammer2_iocom_t *iocom)
1128 {
1129         hammer2_ioq_t *ioq = &iocom->ioq_tx;
1130         hammer2_msg_t *msg;
1131
1132         iocom->flags &= ~(HAMMER2_IOCOMF_WREQ | HAMMER2_IOCOMF_WWORK);
1133
1134         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
1135                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
1136                 --ioq->msgcount;
1137                 hammer2_state_cleanuptx(iocom, msg);
1138         }
1139 }
1140
1141 /*
1142  * Write a message to an iocom, with additional state processing.
1143  */
1144 void
1145 hammer2_msg_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg,
1146                   void (*func)(hammer2_state_t *, hammer2_msg_t *),
1147                   void *data,
1148                   hammer2_state_t **statep)
1149 {
1150         hammer2_state_t *state;
1151         char dummy;
1152
1153         /*
1154          * Handle state processing, create state if necessary.
1155          */
1156         pthread_mutex_lock(&iocom->mtx);
1157         if ((state = msg->state) != NULL) {
1158                 /*
1159                  * Existing transaction (could be reply).  It is also
1160                  * possible for this to be the first reply (CREATE is set),
1161                  * in which case we populate state->txcmd.
1162                  */
1163                 msg->any.head.msgid = state->msgid;
1164                 msg->any.head.spanid = state->spanid;
1165                 if (func) {
1166                         state->func = func;
1167                         state->any.any = data;
1168                 }
1169                 assert(((state->txcmd ^ msg->any.head.cmd) &
1170                         HAMMER2_MSGF_REPLY) == 0);
1171                 if (msg->any.head.cmd & HAMMER2_MSGF_CREATE)
1172                         state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1173         } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
1174                 /*
1175                  * No existing state and CREATE is set, create new
1176                  * state for outgoing command.  This can't happen if
1177                  * REPLY is set as the state would already exist for
1178                  * a transaction reply.
1179                  */
1180                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1181
1182                 state = malloc(sizeof(*state));
1183                 bzero(state, sizeof(*state));
1184                 state->iocom = iocom;
1185                 state->flags = HAMMER2_STATE_DYNAMIC;
1186                 state->msg = msg;
1187                 state->msgid = (uint64_t)(uintptr_t)state;
1188                 state->spanid = msg->any.head.spanid;
1189                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1190                 state->rxcmd = HAMMER2_MSGF_REPLY;
1191                 state->func = func;
1192                 state->any.any = data;
1193                 RB_INSERT(hammer2_state_tree, &iocom->statewr_tree, state);
1194                 state->flags |= HAMMER2_STATE_INSERTED;
1195                 msg->state = state;
1196                 msg->any.head.msgid = state->msgid;
1197                 /* spanid set by caller */
1198         } else {
1199                 msg->any.head.msgid = 0;
1200                 /* spanid set by caller */
1201         }
1202
1203         if (statep)
1204                 *statep = state;
1205
1206         /*
1207          * Queue it for output, wake up the I/O pthread.  Note that the
1208          * I/O thread is responsible for generating the CRCs and encryption.
1209          */
1210         TAILQ_INSERT_TAIL(&iocom->txmsgq, msg, qentry);
1211         dummy = 0;
1212         write(iocom->wakeupfds[1], &dummy, 1);  /* XXX optimize me */
1213         pthread_mutex_unlock(&iocom->mtx);
1214 }
1215
1216 /*
1217  * This is a shortcut to formulate a reply to msg with a simple error code,
1218  * It can reply to and terminate a transaction, or it can reply to a one-way
1219  * messages.  A HAMMER2_LNK_ERROR command code is utilized to encode
1220  * the error code (which can be 0).  Not all transactions are terminated
1221  * with HAMMER2_LNK_ERROR status (the low level only cares about the
1222  * MSGF_DELETE flag), but most are.
1223  *
1224  * Replies to one-way messages are a bit of an oxymoron but the feature
1225  * is used by the debug (DBG) protocol.
1226  *
1227  * The reply contains no extended data.
1228  */
1229 void
1230 hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1231 {
1232         hammer2_state_t *state = msg->state;
1233         hammer2_msg_t *nmsg;
1234         uint32_t cmd;
1235
1236
1237         /*
1238          * Reply with a simple error code and terminate the transaction.
1239          */
1240         cmd = HAMMER2_LNK_ERROR;
1241
1242         /*
1243          * Check if our direction has even been initiated yet, set CREATE.
1244          *
1245          * Check what direction this is (command or reply direction).  Note
1246          * that txcmd might not have been initiated yet.
1247          *
1248          * If our direction has already been closed we just return without
1249          * doing anything.
1250          */
1251         if (state) {
1252                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1253                         return;
1254                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1255                         cmd |= HAMMER2_MSGF_CREATE;
1256                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1257                         cmd |= HAMMER2_MSGF_REPLY;
1258                 cmd |= HAMMER2_MSGF_DELETE;
1259         } else {
1260                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1261                         cmd |= HAMMER2_MSGF_REPLY;
1262         }
1263
1264         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1265         nmsg->any.head.error = error;
1266         nmsg->state = msg->state;
1267         hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
1268 }
1269
1270 /*
1271  * Similar to hammer2_msg_reply() but leave the transaction open.  That is,
1272  * we are generating a streaming reply or an intermediate acknowledgement
1273  * of some sort as part of the higher level protocol, with more to come
1274  * later.
1275  */
1276 void
1277 hammer2_msg_result(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint32_t error)
1278 {
1279         hammer2_state_t *state = msg->state;
1280         hammer2_msg_t *nmsg;
1281         uint32_t cmd;
1282
1283
1284         /*
1285          * Reply with a simple error code and terminate the transaction.
1286          */
1287         cmd = HAMMER2_LNK_ERROR;
1288
1289         /*
1290          * Check if our direction has even been initiated yet, set CREATE.
1291          *
1292          * Check what direction this is (command or reply direction).  Note
1293          * that txcmd might not have been initiated yet.
1294          *
1295          * If our direction has already been closed we just return without
1296          * doing anything.
1297          */
1298         if (state) {
1299                 if (state->txcmd & HAMMER2_MSGF_DELETE)
1300                         return;
1301                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1302                         cmd |= HAMMER2_MSGF_CREATE;
1303                 if (state->txcmd & HAMMER2_MSGF_REPLY)
1304                         cmd |= HAMMER2_MSGF_REPLY;
1305                 /* continuing transaction, do not set MSGF_DELETE */
1306         } else {
1307                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
1308                         cmd |= HAMMER2_MSGF_REPLY;
1309         }
1310
1311         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
1312         nmsg->any.head.error = error;
1313         nmsg->state = state;
1314         hammer2_msg_write(iocom, nmsg, NULL, NULL, NULL);
1315 }
1316
1317 /*
1318  * Terminate a transaction given a state structure by issuing a DELETE.
1319  */
1320 void
1321 hammer2_state_reply(hammer2_state_t *state, uint32_t error)
1322 {
1323         hammer2_msg_t *nmsg;
1324         uint32_t cmd = HAMMER2_LNK_ERROR | HAMMER2_MSGF_DELETE;
1325
1326         /*
1327          * Nothing to do if we already transmitted a delete
1328          */
1329         if (state->txcmd & HAMMER2_MSGF_DELETE)
1330                 return;
1331
1332         /*
1333          * We must also set CREATE if this is our first response to a
1334          * remote command.
1335          */
1336         if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
1337                 cmd |= HAMMER2_MSGF_CREATE;
1338
1339         /*
1340          * Set REPLY if the other end initiated the command.  Otherwise
1341          * we are the command direction.
1342          */
1343         if (state->txcmd & HAMMER2_MSGF_REPLY)
1344                 cmd |= HAMMER2_MSGF_REPLY;
1345
1346         nmsg = hammer2_msg_alloc(state->iocom, 0, cmd);
1347         nmsg->any.head.error = error;
1348         nmsg->state = state;
1349         hammer2_msg_write(state->iocom, nmsg, NULL, NULL, NULL);
1350 }
1351
1352 /************************************************************************
1353  *                      TRANSACTION STATE HANDLING                      *
1354  ************************************************************************
1355  *
1356  */
1357
1358 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
1359
1360 /*
1361  * Process state tracking for a message after reception, prior to
1362  * execution.
1363  *
1364  * Called with msglk held and the msg dequeued.
1365  *
1366  * All messages are called with dummy state and return actual state.
1367  * (One-off messages often just return the same dummy state).
1368  *
1369  * May request that caller discard the message by setting *discardp to 1.
1370  * The returned state is not used in this case and is allowed to be NULL.
1371  *
1372  * --
1373  *
1374  * These routines handle persistent and command/reply message state via the
1375  * CREATE and DELETE flags.  The first message in a command or reply sequence
1376  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1377  *
1378  * There can be any number of intermediate messages belonging to the same
1379  * sequence sent inbetween the CREATE message and the DELETE message,
1380  * which set neither flag.  This represents a streaming command or reply.
1381  *
1382  * Any command message received with CREATE set expects a reply sequence to
1383  * be returned.  Reply sequences work the same as command sequences except the
1384  * REPLY bit is also sent.  Both the command side and reply side can
1385  * degenerate into a single message with both CREATE and DELETE set.  Note
1386  * that one side can be streaming and the other side not, or neither, or both.
1387  *
1388  * The msgid is unique for the initiator.  That is, two sides sending a new
1389  * message can use the same msgid without colliding.
1390  *
1391  * --
1392  *
1393  * ABORT sequences work by setting the ABORT flag along with normal message
1394  * state.  However, ABORTs can also be sent on half-closed messages, that is
1395  * even if the command or reply side has already sent a DELETE, as long as
1396  * the message has not been fully closed it can still send an ABORT+DELETE
1397  * to terminate the half-closed message state.
1398  *
1399  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1400  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1401  * also race, and in this situation the other side might have already
1402  * initiated a new unrelated command with the same message id.  Since
1403  * the abort has not set the CREATE flag the situation can be detected
1404  * and the message will also be discarded.
1405  *
1406  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1407  * The ABORT request is essentially integrated into the command instead
1408  * of being sent later on.  In this situation the command implementation
1409  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1410  * special-case non-blocking operation for the command.
1411  *
1412  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1413  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1414  *        one-way messages are not supported.
1415  *
1416  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1417  *        simply ignored.
1418  *
1419  * --
1420  *
1421  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1422  * set.  One-off messages cannot be aborted and typically aren't processed
1423  * by these routines.  The REPLY bit can be used to distinguish whether a
1424  * one-off message is a command or reply.  For example, one-off replies
1425  * will typically just contain status updates.
1426  */
1427 static int
1428 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1429 {
1430         hammer2_state_t *state;
1431         hammer2_state_t dummy;
1432         int error;
1433
1434         /*
1435          * Lock RB tree and locate existing persistent state, if any.
1436          *
1437          * If received msg is a command state is on staterd_tree.
1438          * If received msg is a reply state is on statewr_tree.
1439          */
1440
1441         dummy.msgid = msg->any.head.msgid;
1442         dummy.spanid = msg->any.head.spanid;
1443         pthread_mutex_lock(&iocom->mtx);
1444         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1445                 state = RB_FIND(hammer2_state_tree,
1446                                 &iocom->statewr_tree, &dummy);
1447         } else {
1448                 state = RB_FIND(hammer2_state_tree,
1449                                 &iocom->staterd_tree, &dummy);
1450         }
1451         msg->state = state;
1452         pthread_mutex_unlock(&iocom->mtx);
1453
1454         /*
1455          * Short-cut one-off or mid-stream messages (state may be NULL).
1456          */
1457         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1458                                   HAMMER2_MSGF_ABORT)) == 0) {
1459                 return(0);
1460         }
1461
1462         /*
1463          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1464          * inside the case statements.
1465          */
1466         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1467                                     HAMMER2_MSGF_REPLY)) {
1468         case HAMMER2_MSGF_CREATE:
1469         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1470                 /*
1471                  * New persistant command received.
1472                  */
1473                 if (state) {
1474                         fprintf(stderr, "hammer2_state_msgrx: "
1475                                         "duplicate transaction\n");
1476                         error = HAMMER2_IOQ_ERROR_TRANS;
1477                         break;
1478                 }
1479                 state = malloc(sizeof(*state));
1480                 bzero(state, sizeof(*state));
1481                 state->iocom = iocom;
1482                 state->flags = HAMMER2_STATE_DYNAMIC;
1483                 state->msg = msg;
1484                 state->txcmd = HAMMER2_MSGF_REPLY;
1485                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1486                 pthread_mutex_lock(&iocom->mtx);
1487                 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1488                 pthread_mutex_unlock(&iocom->mtx);
1489                 state->flags |= HAMMER2_STATE_INSERTED;
1490                 state->msgid = msg->any.head.msgid;
1491                 state->spanid = msg->any.head.spanid;
1492                 msg->state = state;
1493                 error = 0;
1494                 break;
1495         case HAMMER2_MSGF_DELETE:
1496                 /*
1497                  * Persistent state is expected but might not exist if an
1498                  * ABORT+DELETE races the close.
1499                  */
1500                 if (state == NULL) {
1501                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1502                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1503                         } else {
1504                                 fprintf(stderr, "hammer2_state_msgrx: "
1505                                                 "no state for DELETE\n");
1506                                 error = HAMMER2_IOQ_ERROR_TRANS;
1507                         }
1508                         break;
1509                 }
1510
1511                 /*
1512                  * Handle another ABORT+DELETE case if the msgid has already
1513                  * been reused.
1514                  */
1515                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1516                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1517                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1518                         } else {
1519                                 fprintf(stderr, "hammer2_state_msgrx: "
1520                                                 "state reused for DELETE\n");
1521                                 error = HAMMER2_IOQ_ERROR_TRANS;
1522                         }
1523                         break;
1524                 }
1525                 error = 0;
1526                 break;
1527         default:
1528                 /*
1529                  * Check for mid-stream ABORT command received, otherwise
1530                  * allow.
1531                  */
1532                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1533                         if (state == NULL ||
1534                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1535                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1536                                 break;
1537                         }
1538                 }
1539                 error = 0;
1540                 break;
1541         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1542         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1543                 /*
1544                  * When receiving a reply with CREATE set the original
1545                  * persistent state message should already exist.
1546                  */
1547                 if (state == NULL) {
1548                         fprintf(stderr,
1549                                 "hammer2_state_msgrx: no state match for REPLY"
1550                                 " cmd=%08x msgid=%016jx\n",
1551                                 msg->any.head.cmd,
1552                                 (intmax_t)msg->any.head.msgid);
1553                         error = HAMMER2_IOQ_ERROR_TRANS;
1554                         break;
1555                 }
1556                 assert(((state->rxcmd ^ msg->any.head.cmd) &
1557                         HAMMER2_MSGF_REPLY) == 0);
1558                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1559                 error = 0;
1560                 break;
1561         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1562                 /*
1563                  * Received REPLY+ABORT+DELETE in case where msgid has
1564                  * already been fully closed, ignore the message.
1565                  */
1566                 if (state == NULL) {
1567                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1568                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1569                         } else {
1570                                 fprintf(stderr, "hammer2_state_msgrx: "
1571                                                 "no state match for "
1572                                                 "REPLY|DELETE\n");
1573                                 error = HAMMER2_IOQ_ERROR_TRANS;
1574                         }
1575                         break;
1576                 }
1577
1578                 /*
1579                  * Received REPLY+ABORT+DELETE in case where msgid has
1580                  * already been reused for an unrelated message,
1581                  * ignore the message.
1582                  */
1583                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1584                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1585                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1586                         } else {
1587                                 fprintf(stderr, "hammer2_state_msgrx: "
1588                                                 "state reused for "
1589                                                 "REPLY|DELETE\n");
1590                                 error = HAMMER2_IOQ_ERROR_TRANS;
1591                         }
1592                         break;
1593                 }
1594                 error = 0;
1595                 break;
1596         case HAMMER2_MSGF_REPLY:
1597                 /*
1598                  * Check for mid-stream ABORT reply received to sent command.
1599                  */
1600                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1601                         if (state == NULL ||
1602                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1603                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1604                                 break;
1605                         }
1606                 }
1607                 error = 0;
1608                 break;
1609         }
1610         return (error);
1611 }
1612
1613 void
1614 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1615 {
1616         hammer2_state_t *state;
1617
1618         if ((state = msg->state) == NULL) {
1619                 /*
1620                  * Free a non-transactional message, there is no state
1621                  * to worry about.
1622                  */
1623                 hammer2_msg_free(iocom, msg);
1624         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1625                 /*
1626                  * Message terminating transaction, destroy the related
1627                  * state, the original message, and this message (if it
1628                  * isn't the original message due to a CREATE|DELETE).
1629                  */
1630                 pthread_mutex_lock(&iocom->mtx);
1631                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1632                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1633                         if (state->msg == msg)
1634                                 state->msg = NULL;
1635                         assert(state->flags & HAMMER2_STATE_INSERTED);
1636                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
1637                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1638                                 RB_REMOVE(hammer2_state_tree,
1639                                           &iocom->statewr_tree, state);
1640                         } else {
1641                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1642                                 RB_REMOVE(hammer2_state_tree,
1643                                           &iocom->staterd_tree, state);
1644                         }
1645                         state->flags &= ~HAMMER2_STATE_INSERTED;
1646                         hammer2_state_free(state);
1647                 } else {
1648                         ;
1649                 }
1650                 pthread_mutex_unlock(&iocom->mtx);
1651                 hammer2_msg_free(iocom, msg);
1652         } else if (state->msg != msg) {
1653                 /*
1654                  * Message not terminating transaction, leave state intact
1655                  * and free message if it isn't the CREATE message.
1656                  */
1657                 hammer2_msg_free(iocom, msg);
1658         }
1659 }
1660
1661 static void
1662 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1663 {
1664         hammer2_state_t *state;
1665
1666         if ((state = msg->state) == NULL) {
1667                 hammer2_msg_free(iocom, msg);
1668         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1669                 pthread_mutex_lock(&iocom->mtx);
1670                 state->txcmd |= HAMMER2_MSGF_DELETE;
1671                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1672                         if (state->msg == msg)
1673                                 state->msg = NULL;
1674                         assert(state->flags & HAMMER2_STATE_INSERTED);
1675                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
1676                                 assert(msg->any.head.cmd & HAMMER2_MSGF_REPLY);
1677                                 RB_REMOVE(hammer2_state_tree,
1678                                           &iocom->staterd_tree, state);
1679                         } else {
1680                                 assert((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0);
1681                                 RB_REMOVE(hammer2_state_tree,
1682                                           &iocom->statewr_tree, state);
1683                         }
1684                         state->flags &= ~HAMMER2_STATE_INSERTED;
1685                         hammer2_state_free(state);
1686                 } else {
1687                         ;
1688                 }
1689                 pthread_mutex_unlock(&iocom->mtx);
1690                 hammer2_msg_free(iocom, msg);
1691         } else if (state->msg != msg) {
1692                 hammer2_msg_free(iocom, msg);
1693         }
1694 }
1695
1696 /*
1697  * Called with iocom locked
1698  */
1699 void
1700 hammer2_state_free(hammer2_state_t *state)
1701 {
1702         hammer2_iocom_t *iocom = state->iocom;
1703         hammer2_msg_t *msg;
1704         char dummy;
1705
1706         fprintf(stderr, "STATE FREE %p\n", state);
1707
1708         assert(state->any.any == NULL);
1709         msg = state->msg;
1710         state->msg = NULL;
1711         if (msg)
1712                 hammer2_msg_free_locked(iocom, msg);
1713         free(state);
1714
1715         /*
1716          * When an iocom error is present we are trying to close down the
1717          * iocom, but we have to wait for all states to terminate before
1718          * we can do so.  The iocom rx code will terminate the receive side
1719          * for all transactions by simulating incoming DELETE messages,
1720          * but the state doesn't go away until both sides are terminated.
1721          *
1722          * We may have to wake up the rx code.
1723          */
1724         if (iocom->ioq_rx.error &&
1725             RB_EMPTY(&iocom->staterd_tree) &&
1726             RB_EMPTY(&iocom->statewr_tree)) {
1727                 dummy = 0;
1728                 write(iocom->wakeupfds[1], &dummy, 1);
1729         }
1730 }
1731
1732 /*
1733  * Indexed messages are stored in a red-black tree indexed by their
1734  * msgid.  Only persistent messages are indexed.
1735  */
1736 int
1737 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
1738 {
1739         if (state1->spanid < state2->spanid)
1740                 return(-1);
1741         if (state1->spanid > state2->spanid)
1742                 return(1);
1743         if (state1->msgid < state2->msgid)
1744                 return(-1);
1745         if (state1->msgid > state2->msgid)
1746                 return(1);
1747         return(0);
1748 }