Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository...
[dragonfly.git] / sbin / hammer2 / msg.c
1 /*
2  * Copyright (c) 2011-2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@dragonflybsd.org>
6  * by Venkatesh Srinivas <vsrinivas@dragonflybsd.org>
7  *
8  * Redistribution and use in source and binary forms, with or without
9  * modification, are permitted provided that the following conditions
10  * are met:
11  *
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice, this list of conditions and the following disclaimer.
14  * 2. Redistributions in binary form must reproduce the above copyright
15  *    notice, this list of conditions and the following disclaimer in
16  *    the documentation and/or other materials provided with the
17  *    distribution.
18  * 3. Neither the name of The DragonFly Project nor the names of its
19  *    contributors may be used to endorse or promote products derived
20  *    from this software without specific, prior written permission.
21  *
22  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
23  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
24  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
25  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
26  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
27  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
28  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
29  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
30  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
31  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
32  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
33  * SUCH DAMAGE.
34  */
35
36 #include "hammer2.h"
37
38 static int hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
39 static int hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
40 static void hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg);
41
42 /*
43  * Initialize a low-level ioq
44  */
45 void
46 hammer2_ioq_init(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
47 {
48         bzero(ioq, sizeof(*ioq));
49         ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
50         TAILQ_INIT(&ioq->msgq);
51 }
52
53 void
54 hammer2_ioq_done(hammer2_iocom_t *iocom __unused, hammer2_ioq_t *ioq)
55 {
56         hammer2_msg_t *msg;
57
58         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
59                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
60                 hammer2_msg_free(iocom, msg);
61         }
62         if ((msg = ioq->msg) != NULL) {
63                 ioq->msg = NULL;
64                 hammer2_msg_free(iocom, msg);
65         }
66 }
67
68 /*
69  * Initialize a low-level communications channel
70  */
71 void
72 hammer2_iocom_init(hammer2_iocom_t *iocom, int sock_fd, int alt_fd)
73 {
74         bzero(iocom, sizeof(*iocom));
75
76         RB_INIT(&iocom->staterd_tree);
77         RB_INIT(&iocom->statewr_tree);
78         TAILQ_INIT(&iocom->freeq);
79         TAILQ_INIT(&iocom->freeq_aux);
80         iocom->sock_fd = sock_fd;
81         iocom->alt_fd = alt_fd;
82         iocom->flags = HAMMER2_IOCOMF_RREQ | HAMMER2_IOCOMF_WIDLE;
83         hammer2_ioq_init(iocom, &iocom->ioq_rx);
84         hammer2_ioq_init(iocom, &iocom->ioq_tx);
85
86         /*
87          * Negotiate session crypto synchronously.  This will mark the
88          * connection as error'd if it fails.
89          */
90         hammer2_crypto_negotiate(iocom);
91
92         /*
93          * Make sure our fds are set to non-blocking for the iocom core.
94          */
95         if (sock_fd >= 0)
96                 fcntl(sock_fd, F_SETFL, O_NONBLOCK);
97 #if 0
98         /* if line buffered our single fgets() should be fine */
99         if (alt_fd >= 0)
100                 fcntl(alt_fd, F_SETFL, O_NONBLOCK);
101 #endif
102 }
103
104 void
105 hammer2_iocom_done(hammer2_iocom_t *iocom)
106 {
107         hammer2_msg_t *msg;
108
109         iocom->sock_fd = -1;
110         hammer2_ioq_done(iocom, &iocom->ioq_rx);
111         hammer2_ioq_done(iocom, &iocom->ioq_tx);
112         if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL) {
113                 TAILQ_REMOVE(&iocom->freeq, msg, qentry);
114                 free(msg);
115         }
116         if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL) {
117                 TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
118                 free(msg->aux_data);
119                 msg->aux_data = NULL;
120                 free(msg);
121         }
122 }
123
124 /*
125  * Allocate a new one-way message.
126  */
127 hammer2_msg_t *
128 hammer2_msg_alloc(hammer2_iocom_t *iocom, size_t aux_size, uint32_t cmd)
129 {
130         hammer2_msg_t *msg;
131         int hbytes;
132
133         if (aux_size) {
134                 aux_size = (aux_size + HAMMER2_MSG_ALIGNMASK) &
135                            ~HAMMER2_MSG_ALIGNMASK;
136                 if ((msg = TAILQ_FIRST(&iocom->freeq_aux)) != NULL)
137                         TAILQ_REMOVE(&iocom->freeq_aux, msg, qentry);
138         } else {
139                 if ((msg = TAILQ_FIRST(&iocom->freeq)) != NULL)
140                         TAILQ_REMOVE(&iocom->freeq, msg, qentry);
141         }
142         if (msg == NULL) {
143                 msg = malloc(sizeof(*msg));
144                 bzero(msg, sizeof(*msg));
145                 msg->aux_data = NULL;
146                 msg->aux_size = 0;
147         }
148         if (msg->aux_size != aux_size) {
149                 if (msg->aux_data) {
150                         free(msg->aux_data);
151                         msg->aux_data = NULL;
152                         msg->aux_size = 0;
153                 }
154                 if (aux_size) {
155                         msg->aux_data = malloc(aux_size);
156                         msg->aux_size = aux_size;
157                 }
158         }
159         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
160         if (hbytes)
161                 bzero(&msg->any.head, hbytes);
162         msg->hdr_size = hbytes;
163         msg->any.head.aux_icrc = 0;
164         msg->any.head.cmd = cmd;
165
166         return (msg);
167 }
168
169 /*
170  * Free a message so it can be reused afresh.
171  *
172  * NOTE: aux_size can be 0 with a non-NULL aux_data.
173  */
174 void
175 hammer2_msg_free(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
176 {
177         if (msg->aux_data)
178                 TAILQ_INSERT_TAIL(&iocom->freeq_aux, msg, qentry);
179         else
180                 TAILQ_INSERT_TAIL(&iocom->freeq, msg, qentry);
181 }
182
183 /*
184  * I/O core loop for an iocom.
185  */
186 void
187 hammer2_iocom_core(hammer2_iocom_t *iocom,
188                    void (*recvmsg_func)(hammer2_iocom_t *),
189                    void (*sendmsg_func)(hammer2_iocom_t *),
190                    void (*altmsg_func)(hammer2_iocom_t *))
191 {
192         struct pollfd fds[2];
193         int timeout;
194
195         iocom->recvmsg_callback = recvmsg_func;
196         iocom->sendmsg_callback = sendmsg_func;
197         iocom->altmsg_callback = altmsg_func;
198
199         while ((iocom->flags & HAMMER2_IOCOMF_EOF) == 0) {
200                 timeout = 5000;
201
202                 fds[0].fd = iocom->sock_fd;
203                 fds[0].events = 0;
204                 fds[0].revents = 0;
205
206                 if (iocom->flags & HAMMER2_IOCOMF_RREQ)
207                         fds[0].events |= POLLIN;
208                 else
209                         timeout = 0;
210                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
211                         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
212                                 fds[0].events |= POLLOUT;
213                         else
214                                 timeout = 0;
215                 }
216
217                 if (iocom->alt_fd >= 0) {
218                         fds[1].fd = iocom->alt_fd;
219                         fds[1].events |= POLLIN;
220                         fds[1].revents = 0;
221                         poll(fds, 2, timeout);
222                 } else {
223                         poll(fds, 1, timeout);
224                 }
225                 if ((fds[0].revents & POLLIN) ||
226                     (iocom->flags & HAMMER2_IOCOMF_RREQ) == 0) {
227                         iocom->recvmsg_callback(iocom);
228                 }
229                 if ((iocom->flags & HAMMER2_IOCOMF_WIDLE) == 0) {
230                         if ((fds[0].revents & POLLOUT) ||
231                             (iocom->flags & HAMMER2_IOCOMF_WREQ) == 0) {
232                                 iocom->sendmsg_callback(iocom);
233                         }
234                 }
235                 if (iocom->alt_fd >= 0 && (fds[1].revents & POLLIN))
236                         iocom->altmsg_callback(iocom);
237         }
238 }
239
240 /*
241  * Read the next ready message from the ioq, issuing I/O if needed.
242  * Caller should retry on a read-event when NULL is returned.
243  *
244  * If an error occurs during reception a HAMMER2_LNK_ERROR msg will
245  * be returned for each open transaction, then the ioq and iocom
246  * will be errored out and a non-transactional HAMMER2_LNK_ERROR
247  * msg will be returned as the final message.  The caller should not call
248  * us again after the final message is returned.
249  */
250 hammer2_msg_t *
251 hammer2_ioq_read(hammer2_iocom_t *iocom)
252 {
253         hammer2_ioq_t *ioq = &iocom->ioq_rx;
254         hammer2_msg_t *msg;
255         hammer2_msg_hdr_t *head;
256         hammer2_state_t *state;
257         ssize_t n;
258         size_t bytes;
259         size_t nmax;
260         uint16_t xcrc16;
261         uint32_t xcrc32;
262         int error;
263
264 again:
265         /*
266          * If a message is already pending we can just remove and
267          * return it.  Message state has already been processed.
268          */
269         if ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
270                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
271                 return (msg);
272         }
273
274         /*
275          * Message read in-progress (msg is NULL at the moment).  We don't
276          * allocate a msg until we have its core header.
277          */
278         bytes = ioq->fifo_end - ioq->fifo_beg;
279         nmax = sizeof(ioq->buf) - ioq->fifo_end;
280         msg = ioq->msg;
281
282         switch(ioq->state) {
283         case HAMMER2_MSGQ_STATE_HEADER1:
284                 /*
285                  * Load the primary header, fail on any non-trivial read
286                  * error or on EOF.  Since the primary header is the same
287                  * size is the message alignment it will never straddle
288                  * the end of the buffer.
289                  */
290                 if (bytes < (int)sizeof(msg->any.head)) {
291                         n = read(iocom->sock_fd,
292                                  ioq->buf + ioq->fifo_end,
293                                  nmax);
294                         if (n <= 0) {
295                                 if (n == 0) {
296                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
297                                         break;
298                                 }
299                                 if (errno != EINTR &&
300                                     errno != EINPROGRESS &&
301                                     errno != EAGAIN) {
302                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
303                                         break;
304                                 }
305                                 n = 0;
306                                 /* fall through */
307                         }
308                         ioq->fifo_end += n;
309                         bytes += n;
310                         nmax -= n;
311                 }
312
313                 /*
314                  * Insufficient data accumulated (msg is NULL, caller will
315                  * retry on event).
316                  */
317                 assert(msg == NULL);
318                 if (bytes < (int)sizeof(msg->any.head))
319                         break;
320
321                 /*
322                  * Calculate the header, decrypt data received so far.
323                  * Data will be decrypted in-place.  Partial blocks are
324                  * not immediately decrypted.
325                  */
326                 hammer2_crypto_decrypt(iocom, ioq);
327                 head = (void *)(ioq->buf + ioq->fifo_beg);
328
329                 /*
330                  * Check and fixup the core header.  Note that the icrc
331                  * has to be calculated before any fixups, but the crc
332                  * fields in the msg may have to be swapped like everything
333                  * else.
334                  */
335                 if (head->magic != HAMMER2_MSGHDR_MAGIC &&
336                     head->magic != HAMMER2_MSGHDR_MAGIC_REV) {
337                         ioq->error = HAMMER2_IOQ_ERROR_SYNC;
338                         break;
339                 }
340
341                 xcrc32 = hammer2_icrc32((char *)head + HAMMER2_MSGHDR_CRCOFF,
342                                         HAMMER2_MSGHDR_CRCBYTES);
343                 if (head->magic == HAMMER2_MSGHDR_MAGIC_REV) {
344                         hammer2_bswap_head(head);
345                 }
346                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
347                 if (xcrc16 != head->icrc1) {
348                         ioq->error = HAMMER2_IOQ_ERROR_HCRC;
349                         break;
350                 }
351
352                 /*
353                  * Calculate the full header size and aux data size
354                  */
355                 ioq->hbytes = (head->cmd & HAMMER2_MSGF_SIZE) *
356                               HAMMER2_MSG_ALIGN;
357                 ioq->abytes = head->aux_bytes * HAMMER2_MSG_ALIGN;
358                 if (ioq->hbytes < sizeof(msg->any.head) ||
359                     ioq->hbytes > sizeof(msg->any) ||
360                     ioq->abytes > HAMMER2_MSGAUX_MAX) {
361                         ioq->error = HAMMER2_IOQ_ERROR_FIELD;
362                         break;
363                 }
364
365                 /*
366                  * Finally allocate the message and copy the core header
367                  * to the embedded extended header.
368                  *
369                  * Initialize msg->aux_size to 0 and use it to track
370                  * the amount of data copied from the stream.
371                  */
372                 msg = hammer2_msg_alloc(iocom, ioq->abytes, 0);
373                 ioq->msg = msg;
374
375                 /*
376                  * We are either done or we fall-through
377                  */
378                 if (ioq->hbytes == sizeof(msg->any.head) && ioq->abytes == 0) {
379                         bcopy(head, &msg->any.head, sizeof(msg->any.head));
380                         ioq->fifo_beg += ioq->hbytes;
381                         break;
382                 }
383
384                 /*
385                  * Fall through to the next state.  Make sure that the
386                  * extended header does not straddle the end of the buffer.
387                  * We still want to issue larger reads into our buffer,
388                  * book-keeping is easier if we don't bcopy() yet.
389                  */
390                 if (bytes + nmax < ioq->hbytes) {
391                         bcopy(ioq->buf + ioq->fifo_beg, ioq->buf, bytes);
392                         ioq->fifo_cdx -= ioq->fifo_beg;
393                         ioq->fifo_beg = 0;
394                         ioq->fifo_end = bytes;
395                         nmax = sizeof(ioq->buf) - ioq->fifo_end;
396                 }
397                 ioq->state = HAMMER2_MSGQ_STATE_HEADER2;
398                 /* fall through */
399         case HAMMER2_MSGQ_STATE_HEADER2:
400                 /*
401                  * Fill out the extended header.
402                  */
403                 assert(msg != NULL);
404                 if (bytes < ioq->hbytes) {
405                         n = read(iocom->sock_fd,
406                                  msg->any.buf + ioq->fifo_end,
407                                  nmax);
408                         if (n <= 0) {
409                                 if (n == 0) {
410                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
411                                         break;
412                                 }
413                                 if (errno != EINTR &&
414                                     errno != EINPROGRESS &&
415                                     errno != EAGAIN) {
416                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
417                                         break;
418                                 }
419                                 n = 0;
420                                 /* fall through */
421                         }
422                         ioq->fifo_end += n;
423                         bytes += n;
424                         nmax -= n;
425                 }
426
427                 /*
428                  * Insufficient data accumulated (set msg NULL so caller will
429                  * retry on event).
430                  */
431                 if (bytes < ioq->hbytes) {
432                         msg = NULL;
433                         break;
434                 }
435
436                 /*
437                  * Calculate the extended header, decrypt data received
438                  * so far.
439                  */
440                 hammer2_crypto_decrypt(iocom, ioq);
441                 head = (void *)(ioq->buf + ioq->fifo_beg);
442
443                 /*
444                  * Check the crc on the extended header
445                  */
446                 if (ioq->hbytes > sizeof(hammer2_msg_hdr_t)) {
447                         xcrc32 = hammer2_icrc32(head + 1,
448                                                 ioq->hbytes - sizeof(*head));
449                         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
450                         if (head->icrc2 != xcrc16) {
451                                 ioq->error = HAMMER2_IOQ_ERROR_XCRC;
452                                 break;
453                         }
454                 }
455
456                 /*
457                  * Copy the extended header into the msg and adjust the
458                  * FIFO.
459                  */
460                 bcopy(head, &msg->any, ioq->hbytes);
461
462                 /*
463                  * We are either done or we fall-through.
464                  */
465                 if (ioq->abytes == 0) {
466                         ioq->fifo_beg += ioq->hbytes;
467                         break;
468                 }
469
470                 /*
471                  * Must adjust nmax and bytes (and the state) when falling
472                  * through.
473                  */
474                 ioq->fifo_beg += ioq->hbytes;
475                 nmax -= ioq->hbytes;
476                 bytes -= ioq->hbytes;
477                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA1;
478                 /* fall through */
479         case HAMMER2_MSGQ_STATE_AUXDATA1:
480                 /*
481                  * Copy the partial or complete payload from remaining
482                  * bytes in the FIFO.  We have to fall-through either
483                  * way so we can check the crc.
484                  *
485                  * Adjust msg->aux_size to the final actual value.
486                  */
487                 ioq->already = ioq->fifo_cdx - ioq->fifo_beg;
488                 if (ioq->already > ioq->abytes)
489                         ioq->already = ioq->abytes;
490                 if (bytes >= ioq->abytes) {
491                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
492                               ioq->abytes);
493                         msg->aux_size = ioq->abytes;
494                         ioq->fifo_beg += ioq->abytes;
495                         if (ioq->fifo_cdx < ioq->fifo_beg)
496                                 ioq->fifo_cdx = ioq->fifo_beg;
497                         bytes -= ioq->abytes;
498                 } else if (bytes) {
499                         bcopy(ioq->buf + ioq->fifo_beg, msg->aux_data,
500                               bytes);
501                         msg->aux_size = bytes;
502                         ioq->fifo_beg += bytes;
503                         if (ioq->fifo_cdx < ioq->fifo_beg)
504                                 ioq->fifo_cdx = ioq->fifo_beg;
505                         bytes = 0;
506                 } else {
507                         msg->aux_size = 0;
508                 }
509                 ioq->state = HAMMER2_MSGQ_STATE_AUXDATA2;
510                 /* fall through */
511         case HAMMER2_MSGQ_STATE_AUXDATA2:
512                 /*
513                  * Read the remainder of the payload directly into the
514                  * msg->aux_data buffer.
515                  */
516                 assert(msg);
517                 if (msg->aux_size < ioq->abytes) {
518                         assert(bytes == 0);
519                         n = read(iocom->sock_fd,
520                                  msg->aux_data + msg->aux_size,
521                                  ioq->abytes - msg->aux_size);
522                         if (n <= 0) {
523                                 if (n == 0) {
524                                         ioq->error = HAMMER2_IOQ_ERROR_EOF;
525                                         break;
526                                 }
527                                 if (errno != EINTR &&
528                                     errno != EINPROGRESS &&
529                                     errno != EAGAIN) {
530                                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
531                                         break;
532                                 }
533                                 n = 0;
534                                 /* fall through */
535                         }
536                         msg->aux_size += n;
537                 }
538
539                 /*
540                  * Insufficient data accumulated (set msg NULL so caller will
541                  * retry on event).
542                  */
543                 if (msg->aux_size < ioq->abytes) {
544                         msg = NULL;
545                         break;
546                 }
547                 assert(msg->aux_size == ioq->abytes);
548                 hammer2_crypto_decrypt_aux(iocom, ioq, msg, ioq->already);
549
550                 /*
551                  * Check aux_icrc, then we are done.
552                  */
553                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
554                 if (xcrc32 != msg->any.head.aux_icrc) {
555                         ioq->error = HAMMER2_IOQ_ERROR_ACRC;
556                         break;
557                 }
558                 break;
559         case HAMMER2_MSGQ_STATE_ERROR:
560                 /*
561                  * Continued calls to drain recorded transactions (returning
562                  * a LNK_ERROR for each one), before we return the final
563                  * LNK_ERROR.
564                  */
565                 assert(msg == NULL);
566                 break;
567         default:
568                 /*
569                  * We don't double-return errors, the caller should not
570                  * have called us again after getting an error msg.
571                  */
572                 assert(0);
573                 break;
574         }
575
576         /*
577          * Check the message sequence.  The iv[] should prevent any
578          * possibility of a replay but we add this check anyway.
579          */
580         if (msg && ioq->error == 0) {
581                 if ((msg->any.head.salt & 255) != (ioq->seq & 255)) {
582                         ioq->error = HAMMER2_IOQ_ERROR_MSGSEQ;
583                 } else {
584                         ++ioq->seq;
585                 }
586         }
587
588         /*
589          * Process transactional state for the message.
590          */
591         if (msg && ioq->error == 0) {
592                 error = hammer2_state_msgrx(iocom, msg);
593                 if (error) {
594                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
595                                 hammer2_msg_free(iocom, msg);
596                                 goto again;
597                         }
598                         ioq->error = error;
599                 }
600         }
601
602         /*
603          * Handle error, RREQ, or completion
604          *
605          * NOTE: nmax and bytes are invalid at this point, we don't bother
606          *       to update them when breaking out.
607          */
608         if (ioq->error) {
609                 /*
610                  * An unrecoverable error causes all active receive
611                  * transactions to be terminated with a LNK_ERROR message.
612                  *
613                  * Once all active transactions are exhausted we set the
614                  * iocom ERROR flag and return a non-transactional LNK_ERROR
615                  * message, which should cause master processing loops to
616                  * terminate.
617                  */
618                 assert(ioq->msg == msg);
619                 if (msg) {
620                         hammer2_msg_free(iocom, msg);
621                         ioq->msg = NULL;
622                 }
623
624                 /*
625                  * No more I/O read processing
626                  */
627                 ioq->state = HAMMER2_MSGQ_STATE_ERROR;
628
629                 /*
630                  * Return LNK_ERROR for any open transaction, and finally
631                  * as a non-transactional message when no transactions are
632                  * left.
633                  */
634                 msg = hammer2_msg_alloc(iocom, 0, 0);
635                 bzero(&msg->any.head, sizeof(msg->any.head));
636                 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
637                 msg->any.head.cmd = HAMMER2_LNK_ERROR;
638                 msg->any.head.error = ioq->error;
639
640                 if ((state = RB_ROOT(&iocom->staterd_tree)) != NULL) {
641                         /*
642                          * Active transactions are still present.  Simulate
643                          * the other end sending us a DELETE.
644                          */
645                         state->txcmd |= HAMMER2_MSGF_DELETE;
646                         msg->state = state;
647                         msg->any.head.source = state->source;
648                         msg->any.head.target = state->target;
649                         msg->any.head.cmd |= HAMMER2_MSGF_ABORT |
650                                              HAMMER2_MSGF_DELETE;
651                 } else {
652                         /*
653                          * No active transactions remain
654                          */
655                         msg->state = NULL;
656                         iocom->flags |= HAMMER2_IOCOMF_EOF;
657                 }
658         } else if (msg == NULL) {
659                 /*
660                  * Insufficient data received to finish building the message,
661                  * set RREQ and return NULL.
662                  *
663                  * Leave ioq->msg intact.
664                  * Leave the FIFO intact.
665                  */
666                 iocom->flags |= HAMMER2_IOCOMF_RREQ;
667 #if 0
668                 ioq->fifo_cdx = 0;
669                 ioq->fifo_beg = 0;
670                 ioq->fifo_end = 0;
671 #endif
672         } else {
673                 /*
674                  * Return msg, clear the FIFO if it is now empty.
675                  * Flag RREQ if the caller needs to wait for a read-event
676                  * or not.
677                  *
678                  * The fifo has already been advanced past the message.
679                  * Trivially reset the FIFO indices if possible.
680                  */
681                 if (ioq->fifo_beg == ioq->fifo_end) {
682                         iocom->flags |= HAMMER2_IOCOMF_RREQ;
683                         ioq->fifo_cdx = 0;
684                         ioq->fifo_beg = 0;
685                         ioq->fifo_end = 0;
686                 } else {
687                         iocom->flags &= ~HAMMER2_IOCOMF_RREQ;
688                 }
689                 ioq->state = HAMMER2_MSGQ_STATE_HEADER1;
690                 ioq->msg = NULL;
691         }
692         return (msg);
693 }
694
695 /*
696  * Calculate the header and data crc's and write a low-level message to
697  * the connection.  If aux_icrc is non-zero the aux_data crc is already
698  * assumed to have been set.
699  *
700  * A non-NULL msg is added to the queue but not necessarily flushed.
701  * Calling this function with msg == NULL will get a flush going.
702  */
703 void
704 hammer2_ioq_write(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
705 {
706         hammer2_ioq_t *ioq = &iocom->ioq_tx;
707         uint16_t xcrc16;
708         uint32_t xcrc32;
709         int hbytes;
710         int error;
711
712         assert(msg);
713
714         /*
715          * Process transactional state.
716          */
717         if (ioq->error == 0) {
718                 error = hammer2_state_msgtx(iocom, msg);
719                 if (error) {
720                         if (error == HAMMER2_IOQ_ERROR_EALREADY) {
721                                 hammer2_msg_free(iocom, msg);
722                         } else {
723                                 ioq->error = error;
724                         }
725                 }
726         }
727
728         /*
729          * Process terminal connection errors.
730          */
731         if (ioq->error) {
732                 TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
733                 ++ioq->msgcount;
734                 hammer2_iocom_drain(iocom);
735                 return;
736         }
737
738         /*
739          * Finish populating the msg fields.  The salt ensures that the iv[]
740          * array is ridiculously randomized and we also re-seed our PRNG
741          * every 32768 messages just to be sure.
742          */
743         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
744         msg->any.head.salt = (random() << 8) | (ioq->seq & 255);
745         ++ioq->seq;
746         if ((ioq->seq & 32767) == 0)
747                 srandomdev();
748
749         /*
750          * Calculate aux_icrc if 0, calculate icrc2, and finally
751          * calculate icrc1.
752          */
753         if (msg->aux_size && msg->any.head.aux_icrc == 0) {
754                 assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
755                 xcrc32 = hammer2_icrc32(msg->aux_data, msg->aux_size);
756                 msg->any.head.aux_icrc = xcrc32;
757         }
758         msg->any.head.aux_bytes = msg->aux_size / HAMMER2_MSG_ALIGN;
759         assert((msg->aux_size & HAMMER2_MSG_ALIGNMASK) == 0);
760
761         if ((msg->any.head.cmd & HAMMER2_MSGF_SIZE) >
762             sizeof(msg->any.head) / HAMMER2_MSG_ALIGN) {
763                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
764                         HAMMER2_MSG_ALIGN;
765                 hbytes -= sizeof(msg->any.head);
766                 xcrc32 = hammer2_icrc32(&msg->any.head + 1, hbytes);
767                 xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
768                 msg->any.head.icrc2 = xcrc16;
769         } else {
770                 msg->any.head.icrc2 = 0;
771         }
772         xcrc32 = hammer2_icrc32(msg->any.buf + HAMMER2_MSGHDR_CRCOFF,
773                                 HAMMER2_MSGHDR_CRCBYTES);
774         xcrc16 = (uint16_t)xcrc32 ^ (uint16_t)(xcrc32 >> 16);
775         msg->any.head.icrc1 = xcrc16;
776
777         /*
778          * XXX Encrypt the message
779          */
780
781         /*
782          * Enqueue the message.
783          */
784         TAILQ_INSERT_TAIL(&ioq->msgq, msg, qentry);
785         ++ioq->msgcount;
786         iocom->flags &= ~HAMMER2_IOCOMF_WIDLE;
787
788         /*
789          * Flush if we know we can write (WREQ not set) and if
790          * sufficient messages have accumulated.  Otherwise hold
791          * off to avoid piecemeal system calls.
792          */
793         if (iocom->flags & HAMMER2_IOCOMF_WREQ)
794                 return;
795         if (ioq->msgcount < HAMMER2_IOQ_MAXIOVEC / 2)
796                 return;
797         hammer2_iocom_flush(iocom);
798 }
799
800 void
801 hammer2_iocom_flush(hammer2_iocom_t *iocom)
802 {
803         hammer2_ioq_t *ioq = &iocom->ioq_tx;
804         hammer2_msg_t *msg;
805         ssize_t nmax;
806         ssize_t nact;
807         struct iovec iov[HAMMER2_IOQ_MAXIOVEC];
808         size_t hbytes;
809         size_t abytes;
810         int hoff;
811         int aoff;
812         int n;
813
814         /*
815          * Pump messages out the connection by building an iovec.
816          */
817         n = 0;
818         nmax = 0;
819
820         TAILQ_FOREACH(msg, &ioq->msgq, qentry) {
821                 hoff = 0;
822                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
823                          HAMMER2_MSG_ALIGN;
824                 aoff = 0;
825                 abytes = msg->aux_size;
826                 if (n == 0) {
827                         hoff += ioq->hbytes;
828                         aoff += ioq->abytes;
829                 }
830                 if (hbytes - hoff > 0) {
831                         iov[n].iov_base = (char *)&msg->any.head + hoff;
832                         iov[n].iov_len = hbytes - hoff;
833                         nmax += hbytes - hoff;
834                         ++n;
835                         if (n == HAMMER2_IOQ_MAXIOVEC)
836                                 break;
837                 }
838                 if (abytes - aoff > 0) {
839                         assert(msg->aux_data != NULL);
840                         iov[n].iov_base = msg->aux_data + aoff;
841                         iov[n].iov_len = abytes - aoff;
842                         nmax += abytes - aoff;
843                         ++n;
844                         if (n == HAMMER2_IOQ_MAXIOVEC)
845                                 break;
846                 }
847         }
848         if (n == 0)
849                 return;
850
851         /*
852          * Encrypt and write the data.  The crypto code will move the
853          * data into the fifo and adjust the iov as necessary.  If
854          * encryption is disabled the iov is left alone.
855          *
856          * hammer2_crypto_encrypt_wrote()
857          */
858         n = hammer2_crypto_encrypt(iocom, ioq, iov, n);
859
860         /*
861          * Execute the writev() then figure out what happened.
862          */
863         nact = writev(iocom->sock_fd, iov, n);
864         if (nact < 0) {
865                 if (errno != EINTR &&
866                     errno != EINPROGRESS &&
867                     errno != EAGAIN) {
868                         ioq->error = HAMMER2_IOQ_ERROR_SOCK;
869                         hammer2_iocom_drain(iocom);
870                 } else {
871                         iocom->flags |= HAMMER2_IOCOMF_WREQ;
872                 }
873                 return;
874         }
875         hammer2_crypto_encrypt_wrote(iocom, ioq, nact);
876         if (nact == nmax)
877                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
878         else
879                 iocom->flags |= HAMMER2_IOCOMF_WREQ;
880
881         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
882                 hbytes = (msg->any.head.cmd & HAMMER2_MSGF_SIZE) *
883                          HAMMER2_MSG_ALIGN;
884                 abytes = msg->aux_size;
885
886                 if ((size_t)nact < hbytes - ioq->hbytes) {
887                         ioq->hbytes += nact;
888                         break;
889                 }
890                 nact -= hbytes - ioq->hbytes;
891                 ioq->hbytes = hbytes;
892                 if ((size_t)nact < abytes - ioq->abytes) {
893                         ioq->abytes += nact;
894                         break;
895                 }
896                 nact -= abytes - ioq->abytes;
897
898                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
899                 --ioq->msgcount;
900                 ioq->hbytes = 0;
901                 ioq->abytes = 0;
902
903                 hammer2_state_cleanuptx(iocom, msg);
904         }
905         if (msg == NULL) {
906                 iocom->flags |= HAMMER2_IOCOMF_WIDLE;
907                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
908         }
909         if (ioq->error) {
910                 iocom->flags |= HAMMER2_IOCOMF_EOF |
911                                 HAMMER2_IOCOMF_WIDLE;
912                 iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
913         }
914 }
915
916 /*
917  * Kill pending msgs on ioq_tx and adjust the flags such that no more
918  * write events will occur.  We don't kill read msgs because we want
919  * the caller to pull off our contrived terminal error msg to detect
920  * the connection failure.
921  */
922 void
923 hammer2_iocom_drain(hammer2_iocom_t *iocom)
924 {
925         hammer2_ioq_t *ioq = &iocom->ioq_tx;
926         hammer2_msg_t *msg;
927
928         while ((msg = TAILQ_FIRST(&ioq->msgq)) != NULL) {
929                 TAILQ_REMOVE(&ioq->msgq, msg, qentry);
930                 --ioq->msgcount;
931                 hammer2_msg_free(iocom, msg);
932         }
933         iocom->flags |= HAMMER2_IOCOMF_WIDLE;
934         iocom->flags &= ~HAMMER2_IOCOMF_WREQ;
935 }
936
937 /*
938  * This is a shortcut to formulate a reply to msg with a simple error code.
939  * It can reply to transaction or one-way messages, or terminate one side
940  * of a stream.  A HAMMER2_LNK_ERROR command code is utilized to encode
941  * the error code (which can be 0).
942  *
943  * Replies to one-way messages are a bit of an oxymoron but the feature
944  * is used by the debug (DBG) protocol.
945  *
946  * The reply contains no data.
947  */
948 void
949 hammer2_msg_reply(hammer2_iocom_t *iocom, hammer2_msg_t *msg, uint16_t error)
950 {
951         hammer2_msg_t *nmsg;
952         uint32_t cmd;
953
954         cmd = HAMMER2_LNK_ERROR;
955         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
956                 /*
957                  * Reply to received reply, reply direction uses txcmd.
958                  * txcmd will be updated by hammer2_ioq_write().
959                  */
960                 if (msg->state) {
961                         if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
962                                 cmd |= HAMMER2_MSGF_CREATE;
963                         cmd |= HAMMER2_MSGF_DELETE;
964                 }
965         } else {
966                 /*
967                  * Reply to received command, reply direction uses rxcmd.
968                  * txcmd will be updated by hammer2_ioq_write().
969                  */
970                 cmd |= HAMMER2_MSGF_REPLY;
971                 if (msg->state) {
972                         if ((msg->state->rxcmd & HAMMER2_MSGF_CREATE) == 0)
973                                 cmd |= HAMMER2_MSGF_CREATE;
974                         cmd |= HAMMER2_MSGF_DELETE;
975                 }
976         }
977         nmsg = hammer2_msg_alloc(iocom, 0, cmd);
978         nmsg->any.head.error = error;
979         hammer2_ioq_write(iocom, nmsg);
980 }
981
982 /************************************************************************
983  *                      TRANSACTION STATE HANDLING                      *
984  ************************************************************************
985  *
986  */
987
988 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
989
990 /*
991  * Process state tracking for a message after reception, prior to
992  * execution.
993  *
994  * Called with msglk held and the msg dequeued.
995  *
996  * All messages are called with dummy state and return actual state.
997  * (One-off messages often just return the same dummy state).
998  *
999  * May request that caller discard the message by setting *discardp to 1.
1000  * The returned state is not used in this case and is allowed to be NULL.
1001  *
1002  * --
1003  *
1004  * These routines handle persistent and command/reply message state via the
1005  * CREATE and DELETE flags.  The first message in a command or reply sequence
1006  * sets CREATE, the last message in a command or reply sequence sets DELETE.
1007  *
1008  * There can be any number of intermediate messages belonging to the same
1009  * sequence sent inbetween the CREATE message and the DELETE message,
1010  * which set neither flag.  This represents a streaming command or reply.
1011  *
1012  * Any command message received with CREATE set expects a reply sequence to
1013  * be returned.  Reply sequences work the same as command sequences except the
1014  * REPLY bit is also sent.  Both the command side and reply side can
1015  * degenerate into a single message with both CREATE and DELETE set.  Note
1016  * that one side can be streaming and the other side not, or neither, or both.
1017  *
1018  * The msgid is unique for the initiator.  That is, two sides sending a new
1019  * message can use the same msgid without colliding.
1020  *
1021  * --
1022  *
1023  * ABORT sequences work by setting the ABORT flag along with normal message
1024  * state.  However, ABORTs can also be sent on half-closed messages, that is
1025  * even if the command or reply side has already sent a DELETE, as long as
1026  * the message has not been fully closed it can still send an ABORT+DELETE
1027  * to terminate the half-closed message state.
1028  *
1029  * Since ABORT+DELETEs can race we silently discard ABORT's for message
1030  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
1031  * also race, and in this situation the other side might have already
1032  * initiated a new unrelated command with the same message id.  Since
1033  * the abort has not set the CREATE flag the situation can be detected
1034  * and the message will also be discarded.
1035  *
1036  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
1037  * The ABORT request is essentially integrated into the command instead
1038  * of being sent later on.  In this situation the command implementation
1039  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
1040  * special-case non-blocking operation for the command.
1041  *
1042  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
1043  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
1044  *        one-way messages are not supported.
1045  *
1046  * NOTE!  If a command sequence does not support aborts the ABORT flag is
1047  *        simply ignored.
1048  *
1049  * --
1050  *
1051  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
1052  * set.  One-off messages cannot be aborted and typically aren't processed
1053  * by these routines.  The REPLY bit can be used to distinguish whether a
1054  * one-off message is a command or reply.  For example, one-off replies
1055  * will typically just contain status updates.
1056  */
1057 static int
1058 hammer2_state_msgrx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1059 {
1060         hammer2_state_t *state;
1061         hammer2_state_t dummy;
1062         int error;
1063
1064         /*
1065          * Lock RB tree and locate existing persistent state, if any.
1066          *
1067          * If received msg is a command state is on staterd_tree.
1068          * If received msg is a reply state is on statewr_tree.
1069          */
1070         /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1071
1072         dummy.msgid = msg->any.head.msgid;
1073         dummy.source = msg->any.head.source;
1074         dummy.target = msg->any.head.target;
1075         iocom_printf(iocom, msg->any.head.cmd,
1076                      "received msg %08x msgid %u source=%u target=%u\n",
1077                       msg->any.head.cmd, msg->any.head.msgid,
1078                       msg->any.head.source, msg->any.head.target);
1079         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1080                 state = RB_FIND(hammer2_state_tree,
1081                                 &iocom->statewr_tree, &dummy);
1082         } else {
1083                 state = RB_FIND(hammer2_state_tree,
1084                                 &iocom->staterd_tree, &dummy);
1085         }
1086         msg->state = state;
1087
1088         /*
1089          * Short-cut one-off or mid-stream messages (state may be NULL).
1090          */
1091         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1092                                   HAMMER2_MSGF_ABORT)) == 0) {
1093                 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1094                 return(0);
1095         }
1096
1097         /*
1098          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1099          * inside the case statements.
1100          */
1101         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1102                                     HAMMER2_MSGF_REPLY)) {
1103         case HAMMER2_MSGF_CREATE:
1104         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1105                 /*
1106                  * New persistant command received.
1107                  */
1108                 if (state) {
1109                         iocom_printf(iocom, msg->any.head.cmd,
1110                                      "hammer2_state_msgrx: "
1111                                      "duplicate transaction\n");
1112                         error = HAMMER2_IOQ_ERROR_TRANS;
1113                         break;
1114                 }
1115                 state = malloc(sizeof(*state));
1116                 bzero(state, sizeof(*state));
1117                 state->iocom = iocom;
1118                 state->flags = HAMMER2_STATE_DYNAMIC;
1119                 state->msg = msg;
1120                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1121                 RB_INSERT(hammer2_state_tree, &iocom->staterd_tree, state);
1122                 state->flags |= HAMMER2_STATE_INSERTED;
1123                 msg->state = state;
1124                 error = 0;
1125                 break;
1126         case HAMMER2_MSGF_DELETE:
1127                 /*
1128                  * Persistent state is expected but might not exist if an
1129                  * ABORT+DELETE races the close.
1130                  */
1131                 if (state == NULL) {
1132                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1133                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1134                         } else {
1135                                 iocom_printf(iocom, msg->any.head.cmd,
1136                                              "hammer2_state_msgrx: "
1137                                              "no state for DELETE\n");
1138                                 error = HAMMER2_IOQ_ERROR_TRANS;
1139                         }
1140                         break;
1141                 }
1142
1143                 /*
1144                  * Handle another ABORT+DELETE case if the msgid has already
1145                  * been reused.
1146                  */
1147                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1148                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1149                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1150                         } else {
1151                                 iocom_printf(iocom, msg->any.head.cmd,
1152                                              "hammer2_state_msgrx: "
1153                                              "state reused for DELETE\n");
1154                                 error = HAMMER2_IOQ_ERROR_TRANS;
1155                         }
1156                         break;
1157                 }
1158                 error = 0;
1159                 break;
1160         default:
1161                 /*
1162                  * Check for mid-stream ABORT command received, otherwise
1163                  * allow.
1164                  */
1165                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1166                         if (state == NULL ||
1167                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1168                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1169                                 break;
1170                         }
1171                 }
1172                 error = 0;
1173                 break;
1174         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1175         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1176                 /*
1177                  * When receiving a reply with CREATE set the original
1178                  * persistent state message should already exist.
1179                  */
1180                 if (state == NULL) {
1181                         iocom_printf(iocom, msg->any.head.cmd,
1182                                      "hammer2_state_msgrx: "
1183                                      "no state match for REPLY cmd=%08x\n",
1184                                      msg->any.head.cmd);
1185                         error = HAMMER2_IOQ_ERROR_TRANS;
1186                         break;
1187                 }
1188                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1189                 error = 0;
1190                 break;
1191         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1192                 /*
1193                  * Received REPLY+ABORT+DELETE in case where msgid has
1194                  * already been fully closed, ignore the message.
1195                  */
1196                 if (state == NULL) {
1197                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1198                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1199                         } else {
1200                                 iocom_printf(iocom, msg->any.head.cmd,
1201                                              "hammer2_state_msgrx: "
1202                                              "no state match for "
1203                                              "REPLY|DELETE\n");
1204                                 error = HAMMER2_IOQ_ERROR_TRANS;
1205                         }
1206                         break;
1207                 }
1208
1209                 /*
1210                  * Received REPLY+ABORT+DELETE in case where msgid has
1211                  * already been reused for an unrelated message,
1212                  * ignore the message.
1213                  */
1214                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1215                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1216                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1217                         } else {
1218                                 iocom_printf(iocom, msg->any.head.cmd,
1219                                              "hammer2_state_msgrx: "
1220                                              "state reused for REPLY|DELETE\n");
1221                                 error = HAMMER2_IOQ_ERROR_TRANS;
1222                         }
1223                         break;
1224                 }
1225                 error = 0;
1226                 break;
1227         case HAMMER2_MSGF_REPLY:
1228                 /*
1229                  * Check for mid-stream ABORT reply received to sent command.
1230                  */
1231                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1232                         if (state == NULL ||
1233                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
1234                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1235                                 break;
1236                         }
1237                 }
1238                 error = 0;
1239                 break;
1240         }
1241         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1242         return (error);
1243 }
1244
1245 void
1246 hammer2_state_cleanuprx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1247 {
1248         hammer2_state_t *state;
1249
1250         if ((state = msg->state) == NULL) {
1251                 /*
1252                  * Free a non-transactional message, there is no state
1253                  * to worry about.
1254                  */
1255                 hammer2_msg_free(iocom, msg);
1256         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1257                 /*
1258                  * Message terminating transaction, destroy the related
1259                  * state, the original message, and this message (if it
1260                  * isn't the original message due to a CREATE|DELETE).
1261                  */
1262                 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1263                 state->rxcmd |= HAMMER2_MSGF_DELETE;
1264                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
1265                         if (state->msg == msg)
1266                                 state->msg = NULL;
1267                         assert(state->flags & HAMMER2_STATE_INSERTED);
1268                         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1269                                 RB_REMOVE(hammer2_state_tree,
1270                                           &iocom->statewr_tree, state);
1271                         } else {
1272                                 RB_REMOVE(hammer2_state_tree,
1273                                           &iocom->staterd_tree, state);
1274                         }
1275                         state->flags &= ~HAMMER2_STATE_INSERTED;
1276                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1277                         hammer2_state_free(state);
1278                 } else {
1279                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1280                 }
1281                 hammer2_msg_free(iocom, msg);
1282         } else if (state->msg != msg) {
1283                 /*
1284                  * Message not terminating transaction, leave state intact
1285                  * and free message if it isn't the CREATE message.
1286                  */
1287                 hammer2_msg_free(iocom, msg);
1288         }
1289 }
1290
1291 /*
1292  * Process state tracking for a message prior to transmission.
1293  *
1294  * Called with msglk held and the msg dequeued.
1295  *
1296  * One-off messages are usually with dummy state and msg->state may be NULL
1297  * in this situation.
1298  *
1299  * New transactions (when CREATE is set) will insert the state.
1300  *
1301  * May request that caller discard the message by setting *discardp to 1.
1302  * A NULL state may be returned in this case.
1303  */
1304 static int
1305 hammer2_state_msgtx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1306 {
1307         hammer2_state_t *state;
1308         int error;
1309
1310         /*
1311          * Lock RB tree.  If persistent state is present it will have already
1312          * been assigned to msg.
1313          */
1314         /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1315         state = msg->state;
1316
1317         /*
1318          * Short-cut one-off or mid-stream messages (state may be NULL).
1319          */
1320         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1321                                   HAMMER2_MSGF_ABORT)) == 0) {
1322                 /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1323                 return(0);
1324         }
1325
1326
1327         /*
1328          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
1329          * inside the case statements.
1330          */
1331         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
1332                                     HAMMER2_MSGF_REPLY)) {
1333         case HAMMER2_MSGF_CREATE:
1334         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1335                 /*
1336                  * Insert the new persistent message state and mark
1337                  * half-closed if DELETE is set.  Since this is a new
1338                  * message it isn't possible to transition into the fully
1339                  * closed state here.
1340                  *
1341                  * XXX state must be assigned and inserted by
1342                  *     hammer2_msg_write().  txcmd is assigned by us
1343                  *     on-transmit.
1344                  */
1345                 assert(state != NULL);
1346 #if 0
1347                 if (state == NULL) {
1348                         state = pmp->freerd_state;
1349                         pmp->freerd_state = NULL;
1350                         msg->state = state;
1351                         state->msg = msg;
1352                         state->msgid = msg->any.head.msgid;
1353                         state->source = msg->any.head.source;
1354                         state->target = msg->any.head.target;
1355                 }
1356                 assert((state->flags & HAMMER2_STATE_INSERTED) == 0);
1357                 if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
1358                         iocom_printf(iocom, msg->any.head.cmd,
1359                                     "hammer2_state_msgtx: "
1360                                     "duplicate transaction\n");
1361                         error = HAMMER2_IOQ_ERROR_TRANS;
1362                         break;
1363                 }
1364                 state->flags |= HAMMER2_STATE_INSERTED;
1365 #endif
1366                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1367                 error = 0;
1368                 break;
1369         case HAMMER2_MSGF_DELETE:
1370                 /*
1371                  * Sent ABORT+DELETE in case where msgid has already
1372                  * been fully closed, ignore the message.
1373                  */
1374                 if (state == NULL) {
1375                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1376                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1377                         } else {
1378                                 iocom_printf(iocom, msg->any.head.cmd,
1379                                              "hammer2_state_msgtx: "
1380                                              "no state match for DELETE\n");
1381                                 error = HAMMER2_IOQ_ERROR_TRANS;
1382                         }
1383                         break;
1384                 }
1385
1386                 /*
1387                  * Sent ABORT+DELETE in case where msgid has
1388                  * already been reused for an unrelated message,
1389                  * ignore the message.
1390                  */
1391                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1392                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1393                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1394                         } else {
1395                                 iocom_printf(iocom, msg->any.head.cmd,
1396                                              "hammer2_state_msgtx: "
1397                                              "state reused for DELETE\n");
1398                                 error = HAMMER2_IOQ_ERROR_TRANS;
1399                         }
1400                         break;
1401                 }
1402                 error = 0;
1403                 break;
1404         default:
1405                 /*
1406                  * Check for mid-stream ABORT command sent
1407                  */
1408                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1409                         if (state == NULL ||
1410                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1411                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1412                                 break;
1413                         }
1414                 }
1415                 error = 0;
1416                 break;
1417         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
1418         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
1419                 /*
1420                  * When transmitting a reply with CREATE set the original
1421                  * persistent state message should already exist.
1422                  */
1423                 if (state == NULL) {
1424                         iocom_printf(iocom, msg->any.head.cmd,
1425                                      "hammer2_state_msgtx: no state match "
1426                                      "for REPLY | CREATE\n");
1427                         error = HAMMER2_IOQ_ERROR_TRANS;
1428                         break;
1429                 }
1430                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
1431                 error = 0;
1432                 break;
1433         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
1434                 /*
1435                  * When transmitting a reply with DELETE set the original
1436                  * persistent state message should already exist.
1437                  *
1438                  * This is very similar to the REPLY|CREATE|* case except
1439                  * txcmd is already stored, so we just add the DELETE flag.
1440                  *
1441                  * Sent REPLY+ABORT+DELETE in case where msgid has
1442                  * already been fully closed, ignore the message.
1443                  */
1444                 if (state == NULL) {
1445                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1446                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1447                         } else {
1448                                 iocom_printf(iocom, msg->any.head.cmd,
1449                                              "hammer2_state_msgtx: "
1450                                              "no state match for "
1451                                              "REPLY | DELETE\n");
1452                                 error = HAMMER2_IOQ_ERROR_TRANS;
1453                         }
1454                         break;
1455                 }
1456
1457                 /*
1458                  * Sent REPLY+ABORT+DELETE in case where msgid has already
1459                  * been reused for an unrelated message, ignore the message.
1460                  */
1461                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1462                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1463                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1464                         } else {
1465                                 iocom_printf(iocom, msg->any.head.cmd,
1466                                              "hammer2_state_msgtx: "
1467                                              "state reused for "
1468                                              "REPLY | DELETE\n");
1469                                 error = HAMMER2_IOQ_ERROR_TRANS;
1470                         }
1471                         break;
1472                 }
1473                 error = 0;
1474                 break;
1475         case HAMMER2_MSGF_REPLY:
1476                 /*
1477                  * Check for mid-stream ABORT reply sent.
1478                  *
1479                  * One-off REPLY messages are allowed for e.g. status updates.
1480                  */
1481                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
1482                         if (state == NULL ||
1483                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
1484                                 error = HAMMER2_IOQ_ERROR_EALREADY;
1485                                 break;
1486                         }
1487                 }
1488                 error = 0;
1489                 break;
1490         }
1491         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1492         return (error);
1493 }
1494
1495 static void
1496 hammer2_state_cleanuptx(hammer2_iocom_t *iocom, hammer2_msg_t *msg)
1497 {
1498         hammer2_state_t *state;
1499
1500         if ((state = msg->state) == NULL) {
1501                 hammer2_msg_free(iocom, msg);
1502         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
1503                 /*lockmgr(&pmp->msglk, LK_EXCLUSIVE);*/
1504                 state->txcmd |= HAMMER2_MSGF_DELETE;
1505                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
1506                         if (state->msg == msg)
1507                                 state->msg = NULL;
1508                         assert(state->flags & HAMMER2_STATE_INSERTED);
1509                         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
1510                                 RB_REMOVE(hammer2_state_tree,
1511                                           &iocom->staterd_tree, state);
1512                         } else {
1513                                 RB_REMOVE(hammer2_state_tree,
1514                                           &iocom->statewr_tree, state);
1515                         }
1516                         state->flags &= ~HAMMER2_STATE_INSERTED;
1517                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1518                         hammer2_state_free(state);
1519                 } else {
1520                         /*lockmgr(&pmp->msglk, LK_RELEASE);*/
1521                 }
1522                 hammer2_msg_free(iocom, msg);
1523         } else if (state->msg != msg) {
1524                 hammer2_msg_free(iocom, msg);
1525         }
1526 }
1527
1528 void
1529 hammer2_state_free(hammer2_state_t *state)
1530 {
1531         hammer2_iocom_t *iocom = state->iocom;
1532         hammer2_msg_t *msg;
1533
1534         msg = state->msg;
1535         state->msg = NULL;
1536         if (msg)
1537                 hammer2_msg_free(iocom, msg);
1538         free(state);
1539 }
1540
1541 /*
1542  * Indexed messages are stored in a red-black tree indexed by their
1543  * msgid.  Only persistent messages are indexed.
1544  */
1545 int
1546 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
1547 {
1548         if (state1->source < state2->source)
1549                 return(-1);
1550         if (state1->source > state2->source)
1551                 return(1);
1552         if (state1->target < state2->target)
1553                 return(-1);
1554         if (state1->target > state2->target)
1555                 return(1);
1556         if (state1->msgid < state2->msgid)
1557                 return(-1);
1558         if (state1->msgid > state2->msgid)
1559                 return(1);
1560         return(0);
1561 }