hammer2 - Bring in the transaction state code from the hammer2 vfs
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
41
42 /*
43  * Initialize a low-level ioq
44  */
45 void
46 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
47 {
48         bzero(ioq, sizeof(*ioq));
49         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
50         TAILQ_INIT(&ioq->msgq);
51 }
52
53 void
54 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
55 {
56         hammer2_msg_t *msg;
57
58         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
59                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
60                 hammer2_msg_free(iocom, msg);
61         }
62         if ((msg = ioq->msg) != NULL) {
63                 ioq->msg = NULL;
64                 hammer2_msg_free(iocom, msg);
65         }
66 }
67
68 /*
69  * Initialize a low-level communications channel
70  */
71 void
72 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
73 {
74         bzero(iocom, sizeof(*iocom));
75
76         RB_INIT(&iocom->staterd_tree);
77         RB_INIT(&iocom->statewr_tree);
78         TAILQ_INIT(&iocom->freeq);
79         TAILQ_INIT(&iocom->freeq_aux);
80         iocom->sock_fd = sock_fd;
81         iocom->alt_fd = alt_fd;
82         iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
83         hammer2_ioq_init(iocom, &iocom->ioq_rx);
84         hammer2_ioq_init(iocom, &iocom->ioq_tx);
85
86         /*
87          * Negotiate session crypto synchronously.  This will mark the
88          * connection as error'd if it fails.
89          */
90         hammer2_crypto_negotiate(iocom);
91
92         /*
93          * Make sure our fds are set to non-blocking for the iocom core.
94          */
95         if (sock_fd >= 0)
96                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
97 #if 0
98         /* if line buffered our single fgets() should be fine */
99         if (alt_fd >= 0)
100                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
101 #endif
102 }
103
104 void
105 hammer2_iocom_done(hammer2_iocom_t *iocom)
106 {
107         hammer2_msg_t *msg;
108
109         iocom->sock_fd = -1;
110         hammer2_ioq_done(iocom, &iocom->ioq_rx);
111         hammer2_ioq_done(iocom, &iocom->ioq_tx);
112         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
113                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
114                 free(msg);
115         }
116         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
117                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
118                 free(msg->aux_data);
119                 msg->aux_data = NULL;
120                 free(msg);
121         }
122 }
123
124 /*
125  * Allocate a new one-way message.
126  */
127 hammer2_msg_t *
128 hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
129 {
130         hammer2_msg_t *msg;
131         int hbytes;
132
133         if (aux_size) {
134                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
135                            ~HAMMER2_MSG_ALIGNMASK;
136                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
137                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
138         } else {
139                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
140                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
141         }
142         if (msg == NULL) {
143                 msg = malloc(sizeof(*msg));
144                 bzero(msg, sizeof(*msg));
145                 msg->aux_data = NULL;
146                 msg->aux_size = 0;
147         }
148         if (msg->aux_size != aux_size) {
149                 if (msg->aux_data) {
150                         free(msg->aux_data);
151                         msg->aux_data = NULL;
152                         msg->aux_size = 0;
153                 }
154                 if (aux_size) {
155                         msg->aux_data = malloc(aux_size);
156                         msg->aux_size = aux_size;
157                 }
158         }
159         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
160         if (hbytes)
161                 bzero(&msg->any.head, hbytes);
162         msg->hdr_size = hbytes;
163         msg->any.head.aux_icrc = 0;
164         msg->any.head.cmd = cmd;
165
166         return (msg);
167 }
168
169 /*
170  * Free a message so it can be reused afresh.
171  *
172  * NOTE: aux_size can be 0 with a non-NULL aux_data.
173  */
174 void
175 hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
176 {
177         if (msg->aux_data)
178                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
179         else
180                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
181 }
182
183 /*
184  * I/O core loop for an iocom.
185  */
186 void
187 hammer2_iocom_core(hammer2_iocom_t *iocom,
188                    void (*recvmsg_func)(hammer2_iocom_t *),
189                    void (*sendmsg_func)(hammer2_iocom_t *),
190                    void (*altmsg_func)(hammer2_iocom_t *))
191 {
192         struct pollfd fds[2];
193         int timeout;
194
195         iocom->recvmsg_callback = recvmsg_func;
196         iocom->sendmsg_callback = sendmsg_func;
197         iocom->altmsg_callback = altmsg_func;
198
199         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
200                 timeout = 5000;
201
202                 fds[0].fd = iocom->sock_fd;
203                 fds[0].events = 0;
204                 fds[0].revents = 0;
205
206                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
207                         fds[0].events |= POLLIN;
208                 else
209                         timeout = 0;
210                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
211                         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
212                                 fds[0].events |= POLLOUT;
213                         else
214                                 timeout = 0;
215                 }
216
217                 if (iocom->alt_fd >= 0) {
218                         fds[1].fd = iocom->alt_fd;
219                         fds[1].events |= POLLIN;
220                         fds[1].revents = 0;
221                         poll(fds, 2, timeout);
222                 } else {
223                         poll(fds, 1, timeout);
224                 }
225                 if ((fds[0].revents & POLLIN) ||
226                     (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
227                         iocom->recvmsg_callback(iocom);
228                 }
229                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
230                         if ((fds[0].revents & POLLOUT) ||
231                             (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
232                                 iocom->sendmsg_callback(iocom);
233                         }
234                 }
235                 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
236                         iocom->altmsg_callback(iocom);
237         }
238 }
239
240 /*
241  * Read the next ready message from the ioq, issuing I/O if needed.
242  * Caller should retry on a read-event when NULL is returned.
243  *
244  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
245  * be returned (and the caller must not call us again after that).
246  */
247 hammer2_msg_t *
248 hammer2_ioq_read(hammer2_iocom_t *iocom)
249 {
250         hammer2_ioq_t *ioq = &iocom->ioq_rx;
251         hammer2_msg_t *msg;
252         hammer2_msg_hdr_t *head;
253         ssize_t n;
254         size_t bytes;
255         size_t nmax;
256         uint16_t xcrc16;
257         uint32_t xcrc32;
258         int error;
259
260 again:
261         /*
262          * If a message is already pending we can just remove and
263          * return it.  Message state has already been processed.
264          */
265         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
266                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
267                 return (msg);
268         }
269
270         /*
271          * Message read in-progress (msg is NULL at the moment).  We don't
272          * allocate a msg until we have its core header.
273          */
274         bytes = ioq->fifo_end - ioq->fifo_beg;
275         nmax = sizeof(ioq->buf) - ioq->fifo_end;
276         msg = ioq->msg;
277
278         switch(ioq->state) {
279         case HAMMER2_MSGQ_STATE_HEADER1:
280                 /*
281                  * Load the primary header, fail on any non-trivial read
282                  * error or on EOF.  Since the primary header is the same
283                  * size is the message alignment it will never straddle
284                  * the end of the buffer.
285                  */
286                 if (bytes < (int)sizeof(msg->any.head)) {
287                         n = read(iocom->sock_fd,
288                                  ioq->buf + ioq->fifo_end,
289                                  nmax);
290                         if (n <= 0) {
291                                 if (n == 0) {
292                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
293                                         break;
294                                 }
295                                 if (errno != EINTR &&
296                                     errno != EINPROGRESS &&
297                                     errno != EAGAIN) {
298                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
299                                         break;
300                                 }
301                                 n = 0;
302                                 /* fall through */
303                         }
304                         ioq->fifo_end += n;
305                         bytes += n;
306                         nmax -= n;
307                 }
308
309                 /*
310                  * Insufficient data accumulated (msg is NULL, caller will
311                  * retry on event).
312                  */
313                 assert(msg == NULL);
314                 if (bytes < (int)sizeof(msg->any.head))
315                         break;
316
317                 /*
318                  * Calculate the header, decrypt data received so far.
319                  * Data will be decrypted in-place.  Partial blocks are
320                  * not immediately decrypted.
321                  */
322                 hammer2_crypto_decrypt(iocom, ioq);
323                 head = (void *)(ioq->buf + ioq->fifo_beg);
324
325                 /*
326                  * Check and fixup the core header.  Note that the icrc
327                  * has to be calculated before any fixups, but the crc
328                  * fields in the msg may have to be swapped like everything
329                  * else.
330                  */
331                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
332                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
333                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
334                         break;
335                 }
336
337                 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
338                                         HAMMER2_MSGHDR_CRCBYTES);
339                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
340                         hammer2_bswap_head(head);
341                 }
342                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
343                 if (xcrc16 != head->icrc1) {
344                         ioq->error = HAMMER2_IOQ_ERROR_HCRC;
345                         break;
346                 }
347
348                 /*
349                  * Calculate the full header size and aux data size
350                  */
351                 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
352                               HAMMER2_MSG_ALIGN;
353                 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
354                 if (ioq->hbytes < sizeof(msg->any.head) ||
355                     ioq->hbytes > sizeof(msg->any) ||
356                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
357                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
358                         break;
359                 }
360
361                 /*
362                  * Finally allocate the message and copy the core header
363                  * to the embedded extended header.
364                  *
365                  * Initialize msg->aux_size to 0 and use it to track
366                  * the amount of data copied from the stream.
367                  */
368                 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
369                 ioq->msg = msg;
370
371                 /*
372                  * We are either done or we fall-through
373                  */
374                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
375                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
376                         ioq->fifo_beg += ioq->hbytes;
377                         break;
378                 }
379
380                 /*
381                  * Fall through to the next state.  Make sure that the
382                  * extended header does not straddle the end of the buffer.
383                  * We still want to issue larger reads into our buffer,
384                  * book-keeping is easier if we don't bcopy() yet.
385                  */
386                 if (bytes + nmax < ioq->hbytes) {
387                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
388                         ioq->fifo_cdx -= ioq->fifo_beg;
389                         ioq->fifo_beg = 0;
390                         ioq->fifo_end = bytes;
391                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
392                 }
393                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
394                 /* fall through */
395         case HAMMER2_MSGQ_STATE_HEADER2:
396                 /*
397                  * Fill out the extended header.
398                  */
399                 assert(msg != NULL);
400                 if (bytes < ioq->hbytes) {
401                         n = read(iocom->sock_fd,
402                                  msg->any.buf + ioq->fifo_end,
403                                  nmax);
404                         if (n <= 0) {
405                                 if (n == 0) {
406                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
407                                         break;
408                                 }
409                                 if (errno != EINTR &&
410                                     errno != EINPROGRESS &&
411                                     errno != EAGAIN) {
412                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
413                                         break;
414                                 }
415                                 n = 0;
416                                 /* fall through */
417                         }
418                         ioq->fifo_end += n;
419                         bytes += n;
420                         nmax -= n;
421                 }
422
423                 /*
424                  * Insufficient data accumulated (set msg NULL so caller will
425                  * retry on event).
426                  */
427                 if (bytes < ioq->hbytes) {
428                         msg = NULL;
429                         break;
430                 }
431
432                 /*
433                  * Calculate the extended header, decrypt data received
434                  * so far.
435                  */
436                 hammer2_crypto_decrypt(iocom, ioq);
437                 head = (void *)(ioq->buf + ioq->fifo_beg);
438
439                 /*
440                  * Check the crc on the extended header
441                  */
442                 if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) {
443                         xcrc32 = hammer2_icrc32(head + 1,
444                                                 ioq->hbytes - sizeof(*head));
445                         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
446                         if (head->icrc2 != xcrc16) {
447                                 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
448                                 break;
449                         }
450                 }
451
452                 /*
453                  * Copy the extended header into the msg and adjust the
454                  * FIFO.
455                  */
456                 bcopy(head, &msg->any, ioq->hbytes);
457
458                 /*
459                  * We are either done or we fall-through.
460                  */
461                 if (ioq->abytes == 0) {
462                         ioq->fifo_beg += ioq->hbytes;
463                         break;
464                 }
465
466                 /*
467                  * Must adjust nmax and bytes (and the state) when falling
468                  * through.
469                  */
470                 ioq->fifo_beg += ioq->hbytes;
471                 nmax -= ioq->hbytes;
472                 bytes -= ioq->hbytes;
473                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
474                 /* fall through */
475         case HAMMER2_MSGQ_STATE_AUXDATA1:
476                 /*
477                  * Copy the partial or complete payload from remaining
478                  * bytes in the FIFO.  We have to fall-through either
479                  * way so we can check the crc.
480                  *
481                  * Adjust msg->aux_size to the final actual value.
482                  */
483                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
484                 if (ioq->already > ioq->abytes)
485                         ioq->already = ioq->abytes;
486                 if (bytes >= ioq->abytes) {
487                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
488                               ioq->abytes);
489                         msg->aux_size = ioq->abytes;
490                         ioq->fifo_beg += ioq->abytes;
491                         if (ioq->fifo_cdx < ioq->fifo_beg)
492                                 ioq->fifo_cdx = ioq->fifo_beg;
493                         bytes -= ioq->abytes;
494                 } else if (bytes) {
495                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
496                               bytes);
497                         msg->aux_size = bytes;
498                         ioq->fifo_beg += bytes;
499                         if (ioq->fifo_cdx < ioq->fifo_beg)
500                                 ioq->fifo_cdx = ioq->fifo_beg;
501                         bytes = 0;
502                 } else {
503                         msg->aux_size = 0;
504                 }
505                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
506                 /* fall through */
507         case HAMMER2_MSGQ_STATE_AUXDATA2:
508                 /*
509                  * Read the remainder of the payload directly into the
510                  * msg->aux_data buffer.
511                  */
512                 assert(msg);
513                 if (msg->aux_size < ioq->abytes) {
514                         assert(bytes == 0);
515                         n = read(iocom->sock_fd,
516                                  msg->aux_data + msg->aux_size,
517                                  ioq->abytes - msg->aux_size);
518                         if (n <= 0) {
519                                 if (n == 0) {
520                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
521                                         break;
522                                 }
523                                 if (errno != EINTR &&
524                                     errno != EINPROGRESS &&
525                                     errno != EAGAIN) {
526                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
527                                         break;
528                                 }
529                                 n = 0;
530                                 /* fall through */
531                         }
532                         msg->aux_size += n;
533                 }
534
535                 /*
536                  * Insufficient data accumulated (set msg NULL so caller will
537                  * retry on event).
538                  */
539                 if (msg->aux_size < ioq->abytes) {
540                         msg = NULL;
541                         break;
542                 }
543                 assert(msg->aux_size == ioq->abytes);
544                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
545
546                 /*
547                  * Check aux_icrc, then we are done.
548                  */
549                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
550                 if (xcrc32 != msg->any.head.aux_icrc) {
551                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
552                         break;
553                 }
554                 break;
555         case HAMMER2_MSGQ_STATE_ERROR:
556         default:
557                 /*
558                  * We don't double-return errors, the caller should not
559                  * have called us again after getting an error msg.
560                  */
561                 assert(0);
562                 break;
563         }
564
565         /*
566          * Check the message sequence.  The iv[] should prevent any
567          * possibility of a replay but we add this check anyway.
568          */
569         if (msg && ioq->error == 0) {
570                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
571                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
572                 } else {
573                         ++ioq->seq;
574                 }
575         }
576
577         /*
578          * Process transactional state for the message.
579          */
580         if (msg && ioq->error == 0) {
581                 error = hammer2_state_msgrx(iocom, msg);
582                 if (error) {
583                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
584                                 hammer2_msg_free(iocom, msg);
585                                 goto again;
586                         }
587                         ioq->error = error;
588                 }
589         }
590
591         /*
592          * Handle error, RREQ, or completion
593          *
594          * NOTE: nmax and bytes are invalid at this point, we don't bother
595          *       to update them when breaking out.
596          */
597         if (ioq->error) {
598                 /*
599                  * An unrecoverable error occured during processing,
600                  * return a special error message.  Try to leave the
601                  * ioq state alone for post-mortem debugging.
602                  *
603                  * Link error messages are returned as one-way messages,
604                  * so no flags get set.  Source and target is 0 (link-level),
605                  * msgid is 0 (link-level).  All we really need to do is
606                  * set up magic, cmd, and error.
607                  */
608                 assert(ioq->msg == msg);
609                 if (msg == NULL)
610                         msg = hammer2_msg_alloc(iocom, 0, 0);
611                 else
612                         ioq->msg = NULL;
613
614                 if (msg->aux_data) {
615                         free(msg->aux_data);
616                         msg->aux_data = NULL;
617                         msg->aux_size = 0;
618                 }
619                 bzero(&msg->any.head, sizeof(msg->any.head));
620                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
621                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
622                 msg->any.head.error = ioq->error;
623                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
624                 iocom->flags |= HAMMER2_IOCOMF_EOF;
625         } else if (msg == NULL) {
626                 /*
627                  * Insufficient data received to finish building the message,
628                  * set RREQ and return NULL.
629                  *
630                  * Leave ioq->msg intact.
631                  * Leave the FIFO intact.
632                  */
633                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
634 #if 0
635                 ioq->fifo_cdx = 0;
636                 ioq->fifo_beg = 0;
637                 ioq->fifo_end = 0;
638 #endif
639         } else {
640                 /*
641                  * Return msg, clear the FIFO if it is now empty.
642                  * Flag RREQ if the caller needs to wait for a read-event
643                  * or not.
644                  *
645                  * The fifo has already been advanced past the message.
646                  * Trivially reset the FIFO indices if possible.
647                  */
648                 if (ioq->fifo_beg == ioq->fifo_end) {
649                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
650                         ioq->fifo_cdx = 0;
651                         ioq->fifo_beg = 0;
652                         ioq->fifo_end = 0;
653                 } else {
654                         iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
655                 }
656                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
657                 ioq->msg = NULL;
658         }
659         return (msg);
660 }
661
662 /*
663  * Calculate the header and data crc's and write a low-level message to
664  * the connection.  If aux_icrc is non-zero the aux_data crc is already
665  * assumed to have been set.
666  *
667  * A non-NULL msg is added to the queue but not necessarily flushed.
668  * Calling this function with msg == NULL will get a flush going.
669  */
670 void
671 hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
672 {
673         hammer2_ioq_t *ioq = &iocom->ioq_tx;
674         uint16_t xcrc16;
675         uint32_t xcrc32;
676         int hbytes;
677         int error;
678
679         assert(msg);
680
681         /*
682          * Process transactional state.
683          */
684         if (ioq->error == 0) {
685                 error = hammer2_state_msgtx(iocom, msg);
686                 if (error) {
687                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
688                                 hammer2_msg_free(iocom, msg);
689                         } else {
690                                 ioq->error = error;
691                         }
692                 }
693         }
694
695         /*
696          * Process terminal connection errors.
697          */
698         if (ioq->error) {
699                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
700                 ++ioq->msgcount;
701                 hammer2_iocom_drain(iocom);
702                 return;
703         }
704
705         /*
706          * Finish populating the msg fields.  The salt ensures that the iv[]
707          * array is ridiculously randomized and we also re-seed our PRNG
708          * every 32768 messages just to be sure.
709          */
710         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
711         msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
712         ++ioq->seq;
713         if ((ioq->seq & 32767) == 0)
714                 srandomdev();
715
716         /*
717          * Calculate aux_icrc if 0, calculate icrc2, and finally
718          * calculate icrc1.
719          */
720         if (msg->aux_size && msg->any.head.aux_icrc == 0) {
721                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
722                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
723                 msg->any.head.aux_icrc = xcrc32;
724         }
725         msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
726         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
727
728         if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
729             sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
730                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
731                         HAMMER2_MSG_ALIGN;
732                 hbytes -= sizeof(msg->any.head);
733                 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
734                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
735                 msg->any.head.icrc2 = xcrc16;
736         } else {
737                 msg->any.head.icrc2 = 0;
738         }
739         xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
740                                 HAMMER2_MSGHDR_CRCBYTES);
741         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
742         msg->any.head.icrc1 = xcrc16;
743
744         /*
745          * XXX Encrypt the message
746          */
747
748         /*
749          * Enqueue the message.
750          */
751         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
752         ++ioq->msgcount;
753         iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
754
755         /*
756          * Flush if we know we can write (WREQ not set) and if
757          * sufficient messages have accumulated.  Otherwise hold
758          * off to avoid piecemeal system calls.
759          */
760         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
761                 return;
762         if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
763                 return;
764         hammer2_iocom_flush(iocom);
765 }
766
767 void
768 hammer2_iocom_flush(hammer2_iocom_t *iocom)
769 {
770         hammer2_ioq_t *ioq = &iocom->ioq_tx;
771         hammer2_msg_t *msg;
772         ssize_t nmax;
773         ssize_t nact;
774         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
775         size_t hbytes;
776         size_t abytes;
777         int hoff;
778         int aoff;
779         int n;
780
781         /*
782          * Pump messages out the connection by building an iovec.
783          */
784         n = 0;
785         nmax = 0;
786
787         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
788                 hoff = 0;
789                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
790                          HAMMER2_MSG_ALIGN;
791                 aoff = 0;
792                 abytes = msg->aux_size;
793                 if (n == 0) {
794                         hoff += ioq->hbytes;
795                         aoff += ioq->abytes;
796                 }
797                 if (hbytes - hoff > 0) {
798                         iov[n].iov_base = (char *)&msg->any.head + hoff;
799                         iov[n].iov_len = hbytes - hoff;
800                         nmax += hbytes - hoff;
801                         ++n;
802                         if (n == HAMMER2_IOQ_MAXIOVEC)
803                                 break;
804                 }
805                 if (abytes - aoff > 0) {
806                         assert(msg->aux_data != NULL);
807                         iov[n].iov_base = msg->aux_data + aoff;
808                         iov[n].iov_len = abytes - aoff;
809                         nmax += abytes - aoff;
810                         ++n;
811                         if (n == HAMMER2_IOQ_MAXIOVEC)
812                                 break;
813                 }
814         }
815         if (n == 0)
816                 return;
817
818         /*
819          * Encrypt and write the data.  The crypto code will move the
820          * data into the fifo and adjust the iov as necessary.  If
821          * encryption is disabled the iov is left alone.
822          *
823          * hammer2_crypto_encrypt_wrote()
824          */
825         n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
826
827         /*
828          * Execute the writev() then figure out what happened.
829          */
830         nact = writev(iocom->sock_fd, iov, n);
831         if (nact < 0) {
832                 if (errno != EINTR &&
833                     errno != EINPROGRESS &&
834                     errno != EAGAIN) {
835                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
836                         hammer2_iocom_drain(iocom);
837                 } else {
838                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
839                 }
840                 return;
841         }
842         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
843         if (nact == nmax)
844                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
845         else
846                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
847
848         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
849                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
850                          HAMMER2_MSG_ALIGN;
851                 abytes = msg->aux_size;
852
853                 if ((size_t)nact < hbytes - ioq->hbytes) {
854                         ioq->hbytes += nact;
855                         break;
856                 }
857                 nact -= hbytes - ioq->hbytes;
858                 ioq->hbytes = hbytes;
859                 if ((size_t)nact < abytes - ioq->abytes) {
860                         ioq->abytes += nact;
861                         break;
862                 }
863                 nact -= abytes - ioq->abytes;
864
865                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
866                 --ioq->msgcount;
867                 ioq->hbytes = 0;
868                 ioq->abytes = 0;
869
870                 hammer2_state_cleanuptx(iocom, msg);
871         }
872         if (msg == NULL) {
873                 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
874                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
875         }
876         if (ioq->error) {
877                 iocom->flags |= HAMMER2_IOCOMF_EOF |
878                                 HAMMER2_IOCOMF_WIDLE;
879                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
880         }
881 }
882
883 /*
884  * Kill pending msgs on ioq_tx and adjust the flags such that no more
885  * write events will occur.  We don't kill read msgs because we want
886  * the caller to pull off our contrived terminal error msg to detect
887  * the connection failure.
888  */
889 void
890 hammer2_iocom_drain(hammer2_iocom_t *iocom)
891 {
892         hammer2_ioq_t *ioq = &iocom->ioq_tx;
893         hammer2_msg_t *msg;
894
895         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
896                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
897                 --ioq->msgcount;
898                 hammer2_msg_free(iocom, msg);
899         }
900         iocom->flags |= HAMMER2_IOCOMF_WIDLE;
901         iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
902 }
903
904 /*
905  * This is a shortcut to formulate a reply to msg with a simple error code.
906  * It can reply to transaction or one-way messages, or terminate one side
907  * of a stream.  A HAMMER2_LNK_ERROR command code is utilized to encode
908  * the error code (which can be 0).
909  *
910  * Replies to one-way messages are a bit of an oxymoron but the feature
911  * is used by the debug (DBG) protocol.
912  *
913  * The reply contains no data.
914  */
915 void
916 hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error)
917 {
918         hammer2_msg_t *nmsg;
919         uint32_t cmd;
920
921         cmd = HAMMER2_LNK_ERROR;
922         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
923                 /*
924                  * Reply to received reply, reply direction uses txcmd.
925                  * txcmd will be updated by hammer2_ioq_write().
926                  */
927                 if (msg->state) {
928                         if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
929                                 cmd |= HAMMER2_MSGF_CREATE;
930                         cmd |= HAMMER2_MSGF_DELETE;
931                 }
932         } else {
933                 /*
934                  * Reply to received command, reply direction uses rxcmd.
935                  * txcmd will be updated by hammer2_ioq_write().
936                  */
937                 cmd |= HAMMER2_MSGF_REPLY;
938                 if (msg->state) {
939                         if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
940                                 cmd |= HAMMER2_MSGF_CREATE;
941                         cmd |= HAMMER2_MSGF_DELETE;
942                 }
943         }
944         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
945         nmsg->any.head.error = error;
946         hammer2_ioq_write(iocom, nmsg);
947 }
948
949 /************************************************************************
950  *                      TRANSACTION STATE HANDLING                      *
951  ************************************************************************
952  *
953  */
954
955 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
956
957 /*
958  * Process state tracking for a message after reception, prior to
959  * execution.
960  *
961  * Called with msglk held and the msg dequeued.
962  *
963  * All messages are called with dummy state and return actual state.
964  * (One-off messages often just return the same dummy state).
965  *
966  * May request that caller discard the message by setting *discardp to 1.
967  * The returned state is not used in this case and is allowed to be NULL.
968  *
969  * --
970  *
971  * These routines handle persistent and command/reply message state via the
972  * CREATE and DELETE flags.  The first message in a command or reply sequence
973  * sets CREATE, the last message in a command or reply sequence sets DELETE.
974  *
975  * There can be any number of intermediate messages belonging to the same
976  * sequence sent inbetween the CREATE message and the DELETE message,
977  * which set neither flag.  This represents a streaming command or reply.
978  *
979  * Any command message received with CREATE set expects a reply sequence to
980  * be returned.  Reply sequences work the same as command sequences except the
981  * REPLY bit is also sent.  Both the command side and reply side can
982  * degenerate into a single message with both CREATE and DELETE set.  Note
983  * that one side can be streaming and the other side not, or neither, or both.
984  *
985  * The msgid is unique for the initiator.  That is, two sides sending a new
986  * message can use the same msgid without colliding.
987  *
988  * --
989  *
990  * ABORT sequences work by setting the ABORT flag along with normal message
991  * state.  However, ABORTs can also be sent on half-closed messages, that is
992  * even if the command or reply side has already sent a DELETE, as long as
993  * the message has not been fully closed it can still send an ABORT+DELETE
994  * to terminate the half-closed message state.
995  *
996  * Since ABORT+DELETEs can race we silently discard ABORT's for message
997  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
998  * also race, and in this situation the other side might have already
999  * initiated a new unrelated command with the same message id.  Since
1000  * the abort has not set the CREATE flag the situation can be detected
1001  * and the message will also be discarded.
1002  *
1003  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1004  * The ABORT request is essentially integrated into the command instead
1005  * of being sent later on.  In this situation the command implementation
1006  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1007  * special-case non-blocking operation for the command.
1008  *
1009  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1010  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1011  *        one-way messages are not supported.
1012  *
1013  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1014  *        simply ignored.
1015  *
1016  * --
1017  *
1018  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1019  * set.  One-off messages cannot be aborted and typically aren't processed
1020  * by these routines.  The REPLY bit can be used to distinguish whether a
1021  * one-off message is a command or reply.  For example, one-off replies
1022  * will typically just contain status updates.
1023  */
1024 static int
1025 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1026 {
1027         hammer2_state_t *state;
1028         hammer2_state_t dummy;
1029         int error;
1030
1031         /*
1032          * Lock RB tree and locate existing persistent state, if any.
1033          *
1034          * If received msg is a command state is on staterd_tree.
1035          * If received msg is a reply state is on statewr_tree.
1036          */
1037         /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1038
1039         dummy.msgid = msg->any.head.msgid;
1040         dummy.source = msg->any.head.source;
1041         dummy.target = msg->any.head.target;
1042         iocom_printf(iocom, msg->any.head.cmd,
1043                      "received msg %08x msgid %u source=%u target=%u\n",
1044                       msg->any.head.cmd, msg->any.head.msgid,
1045                       msg->any.head.source, msg->any.head.target);
1046         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1047                 state = RB_FIND(hammer2_state_tree,
1048                                 &iocom->statewr_tree, &dummy);
1049         } else {
1050                 state = RB_FIND(hammer2_state_tree,
1051                                 &iocom->staterd_tree, &dummy);
1052         }
1053         msg->state = state;
1054
1055         /*
1056          * Short-cut one-off or mid-stream messages (state may be NULL).
1057          */
1058         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1059                                   HAMMER2_MSGF_ABORT)) == 0) {
1060                 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1061                 return(0);
1062         }
1063
1064         /*
1065          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1066          * inside the case statements.
1067          */
1068         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1069                                     HAMMER2_MSGF_REPLY)) {
1070         case HAMMER2_MSGF_CREATE:
1071         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1072                 /*
1073                  * New persistant command received.
1074                  */
1075                 if (state) {
1076                         iocom_printf(iocom, msg->any.head.cmd,
1077                                      "hammer2_state_msgrx: "
1078                                      "duplicate transaction\n");
1079                         error = HAMMER2_IOQ_ERROR_TRANS;
1080                         break;
1081                 }
1082                 state = malloc(sizeof(*state));
1083                 bzero(state, sizeof(*state));
1084                 state->iocom = iocom;
1085                 state->flags = HAMMER2_STATE_DYNAMIC;
1086                 state->msg = msg;
1087                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1088                 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1089                 state->flags |= HAMMER2_STATE_INSERTED;
1090                 msg->state = state;
1091                 error = 0;
1092                 break;
1093         case HAMMER2_MSGF_DELETE:
1094                 /*
1095                  * Persistent state is expected but might not exist if an
1096                  * ABORT+DELETE races the close.
1097                  */
1098                 if (state == NULL) {
1099                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1100                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1101                         } else {
1102                                 iocom_printf(iocom, msg->any.head.cmd,
1103                                              "hammer2_state_msgrx: "
1104                                              "no state for DELETE\n");
1105                                 error = HAMMER2_IOQ_ERROR_TRANS;
1106                         }
1107                         break;
1108                 }
1109
1110                 /*
1111                  * Handle another ABORT+DELETE case if the msgid has already
1112                  * been reused.
1113                  */
1114                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1115                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1116                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1117                         } else {
1118                                 iocom_printf(iocom, msg->any.head.cmd,
1119                                              "hammer2_state_msgrx: "
1120                                              "state reused for DELETE\n");
1121                                 error = HAMMER2_IOQ_ERROR_TRANS;
1122                         }
1123                         break;
1124                 }
1125                 error = 0;
1126                 break;
1127         default:
1128                 /*
1129                  * Check for mid-stream ABORT command received, otherwise
1130                  * allow.
1131                  */
1132                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1133                         if (state == NULL ||
1134                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1135                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1136                                 break;
1137                         }
1138                 }
1139                 error = 0;
1140                 break;
1141         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1142         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1143                 /*
1144                  * When receiving a reply with CREATE set the original
1145                  * persistent state message should already exist.
1146                  */
1147                 if (state == NULL) {
1148                         iocom_printf(iocom, msg->any.head.cmd,
1149                                      "hammer2_state_msgrx: "
1150                                      "no state match for REPLY cmd=%08x\n",
1151                                      msg->any.head.cmd);
1152                         error = HAMMER2_IOQ_ERROR_TRANS;
1153                         break;
1154                 }
1155                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1156                 error = 0;
1157                 break;
1158         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1159                 /*
1160                  * Received REPLY+ABORT+DELETE in case where msgid has
1161                  * already been fully closed, ignore the message.
1162                  */
1163                 if (state == NULL) {
1164                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1165                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1166                         } else {
1167                                 iocom_printf(iocom, msg->any.head.cmd,
1168                                              "hammer2_state_msgrx: "
1169                                              "no state match for "
1170                                              "REPLY|DELETE\n");
1171                                 error = HAMMER2_IOQ_ERROR_TRANS;
1172                         }
1173                         break;
1174                 }
1175
1176                 /*
1177                  * Received REPLY+ABORT+DELETE in case where msgid has
1178                  * already been reused for an unrelated message,
1179                  * ignore the message.
1180                  */
1181                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1182                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1183                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1184                         } else {
1185                                 iocom_printf(iocom, msg->any.head.cmd,
1186                                              "hammer2_state_msgrx: "
1187                                              "state reused for REPLY|DELETE\n");
1188                                 error = HAMMER2_IOQ_ERROR_TRANS;
1189                         }
1190                         break;
1191                 }
1192                 error = 0;
1193                 break;
1194         case HAMMER2_MSGF_REPLY:
1195                 /*
1196                  * Check for mid-stream ABORT reply received to sent command.
1197                  */
1198                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1199                         if (state == NULL ||
1200                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1201                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1202                                 break;
1203                         }
1204                 }
1205                 error = 0;
1206                 break;
1207         }
1208         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1209         return (error);
1210 }
1211
1212 void
1213 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1214 {
1215         hammer2_state_t *state;
1216
1217         if ((state = msg->state) == NULL) {
1218                 hammer2_msg_free(iocom, msg);
1219         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1220                 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1221                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1222                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1223                         if (state->msg == msg)
1224                                 state->msg = NULL;
1225                         assert(state->flags & HAMMER2_STATE_INSERTED);
1226                         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1227                                 RB_REMOVE(hammer2_state_tree,
1228                                           &iocom->statewr_tree, state);
1229                         } else {
1230                                 RB_REMOVE(hammer2_state_tree,
1231                                           &iocom->staterd_tree, state);
1232                         }
1233                         state->flags &= ~HAMMER2_STATE_INSERTED;
1234                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1235                         hammer2_state_free(state);
1236                 } else {
1237                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1238                 }
1239                 hammer2_msg_free(iocom, msg);
1240         } else if (state->msg != msg) {
1241                 hammer2_msg_free(iocom, msg);
1242         }
1243 }
1244
1245 /*
1246  * Process state tracking for a message prior to transmission.
1247  *
1248  * Called with msglk held and the msg dequeued.
1249  *
1250  * One-off messages are usually with dummy state and msg->state may be NULL
1251  * in this situation.
1252  *
1253  * New transactions (when CREATE is set) will insert the state.
1254  *
1255  * May request that caller discard the message by setting *discardp to 1.
1256  * A NULL state may be returned in this case.
1257  */
1258 static int
1259 hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1260 {
1261         hammer2_state_t *state;
1262         int error;
1263
1264         /*
1265          * Lock RB tree.  If persistent state is present it will have already
1266          * been assigned to msg.
1267          */
1268         /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1269         state = msg->state;
1270
1271         /*
1272          * Short-cut one-off or mid-stream messages (state may be NULL).
1273          */
1274         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1275                                   HAMMER2_MSGF_ABORT)) == 0) {
1276                 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1277                 return(0);
1278         }
1279
1280
1281         /*
1282          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1283          * inside the case statements.
1284          */
1285         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1286                                     HAMMER2_MSGF_REPLY)) {
1287         case HAMMER2_MSGF_CREATE:
1288         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1289                 /*
1290                  * Insert the new persistent message state and mark
1291                  * half-closed if DELETE is set.  Since this is a new
1292                  * message it isn't possible to transition into the fully
1293                  * closed state here.
1294                  *
1295                  * XXX state must be assigned and inserted by
1296                  *     hammer2_msg_write().  txcmd is assigned by us
1297                  *     on-transmit.
1298                  */
1299                 assert(state != NULL);
1300 #if 0
1301                 if (state == NULL) {
1302                         state = pmp->freerd_state;
1303                         pmp->freerd_state = NULL;
1304                         msg->state = state;
1305                         state->msg = msg;
1306                         state->msgid = msg->any.head.msgid;
1307                         state->source = msg->any.head.source;
1308                         state->target = msg->any.head.target;
1309                 }
1310                 assert((state->flags & HAMMER2_STATE_INSERTED) == 0);
1311                 if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
1312                         iocom_printf(iocom, msg->any.head.cmd,
1313                                     "hammer2_state_msgtx: "
1314                                     "duplicate transaction\n");
1315                         error = HAMMER2_IOQ_ERROR_TRANS;
1316                         break;
1317                 }
1318                 state->flags |= HAMMER2_STATE_INSERTED;
1319 #endif
1320                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1321                 error = 0;
1322                 break;
1323         case HAMMER2_MSGF_DELETE:
1324                 /*
1325                  * Sent ABORT+DELETE in case where msgid has already
1326                  * been fully closed, ignore the message.
1327                  */
1328                 if (state == NULL) {
1329                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1330                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1331                         } else {
1332                                 iocom_printf(iocom, msg->any.head.cmd,
1333                                              "hammer2_state_msgtx: "
1334                                              "no state match for DELETE\n");
1335                                 error = HAMMER2_IOQ_ERROR_TRANS;
1336                         }
1337                         break;
1338                 }
1339
1340                 /*
1341                  * Sent ABORT+DELETE in case where msgid has
1342                  * already been reused for an unrelated message,
1343                  * ignore the message.
1344                  */
1345                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1346                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1347                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1348                         } else {
1349                                 iocom_printf(iocom, msg->any.head.cmd,
1350                                              "hammer2_state_msgtx: "
1351                                              "state reused for DELETE\n");
1352                                 error = HAMMER2_IOQ_ERROR_TRANS;
1353                         }
1354                         break;
1355                 }
1356                 error = 0;
1357                 break;
1358         default:
1359                 /*
1360                  * Check for mid-stream ABORT command sent
1361                  */
1362                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1363                         if (state == NULL ||
1364                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1365                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1366                                 break;
1367                         }
1368                 }
1369                 error = 0;
1370                 break;
1371         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1372         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1373                 /*
1374                  * When transmitting a reply with CREATE set the original
1375                  * persistent state message should already exist.
1376                  */
1377                 if (state == NULL) {
1378                         iocom_printf(iocom, msg->any.head.cmd,
1379                                      "hammer2_state_msgtx: no state match "
1380                                      "for REPLY | CREATE\n");
1381                         error = HAMMER2_IOQ_ERROR_TRANS;
1382                         break;
1383                 }
1384                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1385                 error = 0;
1386                 break;
1387         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1388                 /*
1389                  * When transmitting a reply with DELETE set the original
1390                  * persistent state message should already exist.
1391                  *
1392                  * This is very similar to the REPLY|CREATE|* case except
1393                  * txcmd is already stored, so we just add the DELETE flag.
1394                  *
1395                  * Sent REPLY+ABORT+DELETE in case where msgid has
1396                  * already been fully closed, ignore the message.
1397                  */
1398                 if (state == NULL) {
1399                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1400                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1401                         } else {
1402                                 iocom_printf(iocom, msg->any.head.cmd,
1403                                              "hammer2_state_msgtx: "
1404                                              "no state match for "
1405                                              "REPLY | DELETE\n");
1406                                 error = HAMMER2_IOQ_ERROR_TRANS;
1407                         }
1408                         break;
1409                 }
1410
1411                 /*
1412                  * Sent REPLY+ABORT+DELETE in case where msgid has already
1413                  * been reused for an unrelated message, ignore the message.
1414                  */
1415                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1416                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1417                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1418                         } else {
1419                                 iocom_printf(iocom, msg->any.head.cmd,
1420                                              "hammer2_state_msgtx: "
1421                                              "state reused for "
1422                                              "REPLY | DELETE\n");
1423                                 error = HAMMER2_IOQ_ERROR_TRANS;
1424                         }
1425                         break;
1426                 }
1427                 error = 0;
1428                 break;
1429         case HAMMER2_MSGF_REPLY:
1430                 /*
1431                  * Check for mid-stream ABORT reply sent.
1432                  *
1433                  * One-off REPLY messages are allowed for e.g. status updates.
1434                  */
1435                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1436                         if (state == NULL ||
1437                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1438                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1439                                 break;
1440                         }
1441                 }
1442                 error = 0;
1443                 break;
1444         }
1445         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1446         return (error);
1447 }
1448
1449 static void
1450 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1451 {
1452         hammer2_state_t *state;
1453
1454         if ((state = msg->state) == NULL) {
1455                 hammer2_msg_free(iocom, msg);
1456         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1457                 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1458                 state->txcmd |= HAMMER2_MSGF_DELETE;
1459                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1460                         if (state->msg == msg)
1461                                 state->msg = NULL;
1462                         assert(state->flags & HAMMER2_STATE_INSERTED);
1463                         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1464                                 RB_REMOVE(hammer2_state_tree,
1465                                           &iocom->staterd_tree, state);
1466                         } else {
1467                                 RB_REMOVE(hammer2_state_tree,
1468                                           &iocom->statewr_tree, state);
1469                         }
1470                         state->flags &= ~HAMMER2_STATE_INSERTED;
1471                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1472                         hammer2_state_free(state);
1473                 } else {
1474                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1475                 }
1476                 hammer2_msg_free(iocom, msg);
1477         } else if (state->msg != msg) {
1478                 hammer2_msg_free(iocom, msg);
1479         }
1480 }
1481
1482 void
1483 hammer2_state_free(hammer2_state_t *state)
1484 {
1485         hammer2_iocom_t *iocom = state->iocom;
1486         hammer2_msg_t *msg;
1487
1488         msg = state->msg;
1489         state->msg = NULL;
1490         free(state);
1491         if (msg)
1492                 hammer2_msg_free(iocom, msg);
1493         free(state);
1494 }
1495
1496 /*
1497  * Indexed messages are stored in a red-black tree indexed by their
1498  * msgid.  Only persistent messages are indexed.
1499  */
1500 int
1501 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
1502 {
1503         if (state1->source < state2->source)
1504                 return(-1);
1505         if (state1->source > state2->source)
1506                 return(1);
1507         if (state1->target < state2->target)
1508                 return(-1);
1509         if (state1->target > state2->target)
1510                 return(1);
1511         if (state1->msgid < state2->msgid)
1512                 return(-1);
1513         if (state1->msgid > state2->msgid)
1514                 return(1);
1515         return(0);
1516 }