272fa5a8c2d1e1d0f45dd135c602cd52fb0c1e62
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
1 /*-
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer2.h"
36
37 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
39 /*
40  * Process state tracking for a message after reception, prior to
41  * execution.
42  *
43  * Called with msglk held and the msg dequeued.
44  *
45  * All messages are called with dummy state and return actual state.
46  * (One-off messages often just return the same dummy state).
47  *
48  * May request that caller discard the message by setting *discardp to 1.
49  * The returned state is not used in this case and is allowed to be NULL.
50  *
51  * --
52  *
53  * These routines handle persistent and command/reply message state via the
54  * CREATE and DELETE flags.  The first message in a command or reply sequence
55  * sets CREATE, the last message in a command or reply sequence sets DELETE.
56  *
57  * There can be any number of intermediate messages belonging to the same
58  * sequence sent inbetween the CREATE message and the DELETE message,
59  * which set neither flag.  This represents a streaming command or reply.
60  *
61  * Any command message received with CREATE set expects a reply sequence to
62  * be returned.  Reply sequences work the same as command sequences except the
63  * REPLY bit is also sent.  Both the command side and reply side can
64  * degenerate into a single message with both CREATE and DELETE set.  Note
65  * that one side can be streaming and the other side not, or neither, or both.
66  *
67  * The msgid is unique for the initiator.  That is, two sides sending a new
68  * message can use the same msgid without colliding.
69  *
70  * --
71  *
72  * ABORT sequences work by setting the ABORT flag along with normal message
73  * state.  However, ABORTs can also be sent on half-closed messages, that is
74  * even if the command or reply side has already sent a DELETE, as long as
75  * the message has not been fully closed it can still send an ABORT+DELETE
76  * to terminate the half-closed message state.
77  *
78  * Since ABORT+DELETEs can race we silently discard ABORT's for message
79  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
80  * also race, and in this situation the other side might have already
81  * initiated a new unrelated command with the same message id.  Since
82  * the abort has not set the CREATE flag the situation can be detected
83  * and the message will also be discarded.
84  *
85  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86  * The ABORT request is essentially integrated into the command instead
87  * of being sent later on.  In this situation the command implementation
88  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89  * special-case non-blocking operation for the command.
90  *
91  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
92  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
93  *        one-way messages are not supported.
94  *
95  * NOTE!  If a command sequence does not support aborts the ABORT flag is
96  *        simply ignored.
97  *
98  * --
99  *
100  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101  * set.  One-off messages cannot be aborted and typically aren't processed
102  * by these routines.  The REPLY bit can be used to distinguish whether a
103  * one-off message is a command or reply.  For example, one-off replies
104  * will typically just contain status updates.
105  */
106 int
107 hammer2_state_msgrx(hammer2_msg_t *msg)
108 {
109         hammer2_pfsmount_t *pmp;
110         hammer2_state_t *state;
111         int error;
112
113         pmp = msg->router->pmp;
114
115         /*
116          * XXX resolve msg->any.head.source and msg->any.head.target
117          *     into LNK_SPAN references.
118          *
119          * XXX replace msg->router
120          */
121
122         /*
123          * Make sure a state structure is ready to go in case we need a new
124          * one.  This is the only routine which uses freerd_state so no
125          * races are possible.
126          */
127         if ((state = pmp->freerd_state) == NULL) {
128                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
129                 state->pmp = pmp;
130                 state->flags = HAMMER2_STATE_DYNAMIC;
131                 pmp->freerd_state = state;
132         }
133
134         /*
135          * Lock RB tree and locate existing persistent state, if any.
136          *
137          * If received msg is a command state is on staterd_tree.
138          * If received msg is a reply state is on statewr_tree.
139          */
140         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
141
142         state->msgid = msg->any.head.msgid;
143         state->router = &pmp->router;
144         kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
145                 msg->any.head.cmd,
146                 (intmax_t)msg->any.head.msgid,
147                 (intmax_t)msg->any.head.source,
148                 (intmax_t)msg->any.head.target);
149         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
150                 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
151         else
152                 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
153         msg->state = state;
154
155         /*
156          * Short-cut one-off or mid-stream messages (state may be NULL).
157          */
158         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
159                                   HAMMER2_MSGF_ABORT)) == 0) {
160                 lockmgr(&pmp->msglk, LK_RELEASE);
161                 return(0);
162         }
163
164         /*
165          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
166          * inside the case statements.
167          */
168         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
169                                     HAMMER2_MSGF_REPLY)) {
170         case HAMMER2_MSGF_CREATE:
171         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
172                 /*
173                  * New persistant command received.
174                  */
175                 if (state) {
176                         kprintf("hammer2_state_msgrx: duplicate transaction\n");
177                         error = EINVAL;
178                         break;
179                 }
180                 state = pmp->freerd_state;
181                 pmp->freerd_state = NULL;
182                 msg->state = state;
183                 state->router = msg->router;
184                 state->msg = msg;
185                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
186                 state->txcmd = HAMMER2_MSGF_REPLY;
187                 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
188                 state->flags |= HAMMER2_STATE_INSERTED;
189                 error = 0;
190                 break;
191         case HAMMER2_MSGF_DELETE:
192                 /*
193                  * Persistent state is expected but might not exist if an
194                  * ABORT+DELETE races the close.
195                  */
196                 if (state == NULL) {
197                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
198                                 error = EALREADY;
199                         } else {
200                                 kprintf("hammer2_state_msgrx: no state "
201                                         "for DELETE\n");
202                                 error = EINVAL;
203                         }
204                         break;
205                 }
206
207                 /*
208                  * Handle another ABORT+DELETE case if the msgid has already
209                  * been reused.
210                  */
211                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
212                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
213                                 error = EALREADY;
214                         } else {
215                                 kprintf("hammer2_state_msgrx: state reused "
216                                         "for DELETE\n");
217                                 error = EINVAL;
218                         }
219                         break;
220                 }
221                 error = 0;
222                 break;
223         default:
224                 /*
225                  * Check for mid-stream ABORT command received, otherwise
226                  * allow.
227                  */
228                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
229                         if (state == NULL ||
230                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
231                                 error = EALREADY;
232                                 break;
233                         }
234                 }
235                 error = 0;
236                 break;
237         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
238         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
239                 /*
240                  * When receiving a reply with CREATE set the original
241                  * persistent state message should already exist.
242                  */
243                 if (state == NULL) {
244                         kprintf("hammer2_state_msgrx: no state match for "
245                                 "REPLY cmd=%08x msgid=%016jx\n",
246                                 msg->any.head.cmd,
247                                 (intmax_t)msg->any.head.msgid);
248                         error = EINVAL;
249                         break;
250                 }
251                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
252                 error = 0;
253                 break;
254         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
255                 /*
256                  * Received REPLY+ABORT+DELETE in case where msgid has
257                  * already been fully closed, ignore the message.
258                  */
259                 if (state == NULL) {
260                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
261                                 error = EALREADY;
262                         } else {
263                                 kprintf("hammer2_state_msgrx: no state match "
264                                         "for REPLY|DELETE\n");
265                                 error = EINVAL;
266                         }
267                         break;
268                 }
269
270                 /*
271                  * Received REPLY+ABORT+DELETE in case where msgid has
272                  * already been reused for an unrelated message,
273                  * ignore the message.
274                  */
275                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
276                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
277                                 error = EALREADY;
278                         } else {
279                                 kprintf("hammer2_state_msgrx: state reused "
280                                         "for REPLY|DELETE\n");
281                                 error = EINVAL;
282                         }
283                         break;
284                 }
285                 error = 0;
286                 break;
287         case HAMMER2_MSGF_REPLY:
288                 /*
289                  * Check for mid-stream ABORT reply received to sent command.
290                  */
291                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
292                         if (state == NULL ||
293                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
294                                 error = EALREADY;
295                                 break;
296                         }
297                 }
298                 error = 0;
299                 break;
300         }
301         lockmgr(&pmp->msglk, LK_RELEASE);
302         return (error);
303 }
304
305 void
306 hammer2_state_cleanuprx(hammer2_msg_t *msg)
307 {
308         hammer2_pfsmount_t *pmp = msg->router->pmp;
309         hammer2_state_t *state;
310
311         if ((state = msg->state) == NULL) {
312                 hammer2_msg_free(msg);
313         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
314                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
315                 state->rxcmd |= HAMMER2_MSGF_DELETE;
316                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
317                         if (state->msg == msg)
318                                 state->msg = NULL;
319                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
320                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
321                                 KKASSERT(msg->any.head.cmd &
322                                          HAMMER2_MSGF_REPLY);
323                                 RB_REMOVE(hammer2_state_tree,
324                                           &pmp->statewr_tree, state);
325                         } else {
326                                 KKASSERT((msg->any.head.cmd &
327                                           HAMMER2_MSGF_REPLY) == 0);
328                                 RB_REMOVE(hammer2_state_tree,
329                                           &pmp->staterd_tree, state);
330                         }
331                         state->flags &= ~HAMMER2_STATE_INSERTED;
332                         lockmgr(&pmp->msglk, LK_RELEASE);
333                         hammer2_state_free(state);
334                 } else {
335                         lockmgr(&pmp->msglk, LK_RELEASE);
336                 }
337                 hammer2_msg_free(msg);
338         } else if (state->msg != msg) {
339                 hammer2_msg_free(msg);
340         }
341 }
342
343 /*
344  * Process state tracking for a message prior to transmission.
345  *
346  * Called with msglk held and the msg dequeued.
347  *
348  * One-off messages are usually with dummy state and msg->state may be NULL
349  * in this situation.
350  *
351  * New transactions (when CREATE is set) will insert the state.
352  *
353  * May request that caller discard the message by setting *discardp to 1.
354  * A NULL state may be returned in this case.
355  */
356 int
357 hammer2_state_msgtx(hammer2_msg_t *msg)
358 {
359         hammer2_pfsmount_t *pmp = msg->router->pmp;
360         hammer2_state_t *state;
361         int error;
362
363         /*
364          * Make sure a state structure is ready to go in case we need a new
365          * one.  This is the only routine which uses freewr_state so no
366          * races are possible.
367          */
368         if ((state = pmp->freewr_state) == NULL) {
369                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
370                 state->pmp = pmp;
371                 state->flags = HAMMER2_STATE_DYNAMIC;
372                 pmp->freewr_state = state;
373         }
374
375         /*
376          * Lock RB tree.  If persistent state is present it will have already
377          * been assigned to msg.
378          */
379         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
380         state = msg->state;
381
382         /*
383          * Short-cut one-off or mid-stream messages (state may be NULL).
384          */
385         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
386                                   HAMMER2_MSGF_ABORT)) == 0) {
387                 lockmgr(&pmp->msglk, LK_RELEASE);
388                 return(0);
389         }
390
391
392         /*
393          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
394          * inside the case statements.
395          */
396         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
397                                     HAMMER2_MSGF_REPLY)) {
398         case HAMMER2_MSGF_CREATE:
399         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
400                 /*
401                  * Insert the new persistent message state and mark
402                  * half-closed if DELETE is set.  Since this is a new
403                  * message it isn't possible to transition into the fully
404                  * closed state here.
405                  *
406                  * XXX state must be assigned and inserted by
407                  *     hammer2_msg_write().  txcmd is assigned by us
408                  *     on-transmit.
409                  */
410                 KKASSERT(state != NULL);
411                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
412                 state->rxcmd = HAMMER2_MSGF_REPLY;
413                 error = 0;
414                 break;
415         case HAMMER2_MSGF_DELETE:
416                 /*
417                  * Sent ABORT+DELETE in case where msgid has already
418                  * been fully closed, ignore the message.
419                  */
420                 if (state == NULL) {
421                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
422                                 error = EALREADY;
423                         } else {
424                                 kprintf("hammer2_state_msgtx: no state match "
425                                         "for DELETE cmd=%08x msgid=%016jx\n",
426                                         msg->any.head.cmd,
427                                         (intmax_t)msg->any.head.msgid);
428                                 error = EINVAL;
429                         }
430                         break;
431                 }
432
433                 /*
434                  * Sent ABORT+DELETE in case where msgid has
435                  * already been reused for an unrelated message,
436                  * ignore the message.
437                  */
438                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
439                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
440                                 error = EALREADY;
441                         } else {
442                                 kprintf("hammer2_state_msgtx: state reused "
443                                         "for DELETE\n");
444                                 error = EINVAL;
445                         }
446                         break;
447                 }
448                 error = 0;
449                 break;
450         default:
451                 /*
452                  * Check for mid-stream ABORT command sent
453                  */
454                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
455                         if (state == NULL ||
456                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
457                                 error = EALREADY;
458                                 break;
459                         }
460                 }
461                 error = 0;
462                 break;
463         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
464         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
465                 /*
466                  * When transmitting a reply with CREATE set the original
467                  * persistent state message should already exist.
468                  */
469                 if (state == NULL) {
470                         kprintf("hammer2_state_msgtx: no state match "
471                                 "for REPLY | CREATE\n");
472                         error = EINVAL;
473                         break;
474                 }
475                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
476                 error = 0;
477                 break;
478         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
479                 /*
480                  * When transmitting a reply with DELETE set the original
481                  * persistent state message should already exist.
482                  *
483                  * This is very similar to the REPLY|CREATE|* case except
484                  * txcmd is already stored, so we just add the DELETE flag.
485                  *
486                  * Sent REPLY+ABORT+DELETE in case where msgid has
487                  * already been fully closed, ignore the message.
488                  */
489                 if (state == NULL) {
490                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
491                                 error = EALREADY;
492                         } else {
493                                 kprintf("hammer2_state_msgtx: no state match "
494                                         "for REPLY | DELETE\n");
495                                 error = EINVAL;
496                         }
497                         break;
498                 }
499
500                 /*
501                  * Sent REPLY+ABORT+DELETE in case where msgid has already
502                  * been reused for an unrelated message, ignore the message.
503                  */
504                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
505                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
506                                 error = EALREADY;
507                         } else {
508                                 kprintf("hammer2_state_msgtx: state reused "
509                                         "for REPLY | DELETE\n");
510                                 error = EINVAL;
511                         }
512                         break;
513                 }
514                 error = 0;
515                 break;
516         case HAMMER2_MSGF_REPLY:
517                 /*
518                  * Check for mid-stream ABORT reply sent.
519                  *
520                  * One-off REPLY messages are allowed for e.g. status updates.
521                  */
522                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
523                         if (state == NULL ||
524                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
525                                 error = EALREADY;
526                                 break;
527                         }
528                 }
529                 error = 0;
530                 break;
531         }
532         lockmgr(&pmp->msglk, LK_RELEASE);
533         return (error);
534 }
535
536 void
537 hammer2_state_cleanuptx(hammer2_msg_t *msg)
538 {
539         hammer2_pfsmount_t *pmp = msg->router->pmp;
540         hammer2_state_t *state;
541
542         if ((state = msg->state) == NULL) {
543                 hammer2_msg_free(msg);
544         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
545                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
546                 state->txcmd |= HAMMER2_MSGF_DELETE;
547                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
548                         if (state->msg == msg)
549                                 state->msg = NULL;
550                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
551                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
552                                 KKASSERT(msg->any.head.cmd &
553                                          HAMMER2_MSGF_REPLY);
554                                 RB_REMOVE(hammer2_state_tree,
555                                           &pmp->staterd_tree, state);
556                         } else {
557                                 KKASSERT((msg->any.head.cmd &
558                                           HAMMER2_MSGF_REPLY) == 0);
559                                 RB_REMOVE(hammer2_state_tree,
560                                           &pmp->statewr_tree, state);
561                         }
562                         state->flags &= ~HAMMER2_STATE_INSERTED;
563                         lockmgr(&pmp->msglk, LK_RELEASE);
564                         hammer2_state_free(state);
565                 } else {
566                         lockmgr(&pmp->msglk, LK_RELEASE);
567                 }
568                 hammer2_msg_free(msg);
569         } else if (state->msg != msg) {
570                 hammer2_msg_free(msg);
571         }
572 }
573
574 void
575 hammer2_state_free(hammer2_state_t *state)
576 {
577         hammer2_pfsmount_t *pmp = state->pmp;
578         hammer2_msg_t *msg;
579
580         msg = state->msg;
581         state->msg = NULL;
582         kfree(state, pmp->mmsg);
583         if (msg)
584                 hammer2_msg_free(msg);
585 }
586
587 hammer2_msg_t *
588 hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
589                   int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
590 {
591         hammer2_pfsmount_t *pmp = router->pmp;
592         hammer2_msg_t *msg;
593         hammer2_state_t *state;
594         size_t hbytes;
595
596         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
597         msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
598                       pmp->mmsg, M_WAITOK | M_ZERO);
599         msg->hdr_size = hbytes;
600         msg->router = router;
601         KKASSERT(router != NULL);
602         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
603         msg->any.head.source = 0;
604         msg->any.head.target = router->target;
605         msg->any.head.cmd = cmd;
606
607         if (cmd & HAMMER2_MSGF_CREATE) {
608                 /*
609                  * New transaction, requires tracking state and a unique
610                  * msgid to be allocated.
611                  */
612                 KKASSERT(msg->state == NULL);
613                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
614                 state->pmp = pmp;
615                 state->flags = HAMMER2_STATE_DYNAMIC;
616                 state->func = func;
617                 state->any.any = data;
618                 state->msg = msg;
619                 state->msgid = (uint64_t)(uintptr_t)state;
620                 state->router = msg->router;
621                 msg->state = state;
622                 msg->any.head.source = 0;
623                 msg->any.head.target = state->router->target;
624                 msg->any.head.msgid = state->msgid;
625
626                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
627                 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
628                         panic("duplicate msgid allocated");
629                 msg->any.head.msgid = state->msgid;
630                 lockmgr(&pmp->msglk, LK_RELEASE);
631         }
632
633         return (msg);
634 }
635
636 void
637 hammer2_msg_free(hammer2_msg_t *msg)
638 {
639         hammer2_pfsmount_t *pmp = msg->router->pmp;
640
641         if (msg->aux_data && msg->aux_size) {
642                 kfree(msg->aux_data, pmp->mmsg);
643                 msg->aux_data = NULL;
644                 msg->aux_size = 0;
645                 msg->router = NULL;
646         }
647         kfree(msg, pmp->mmsg);
648 }
649
650 /*
651  * Indexed messages are stored in a red-black tree indexed by their
652  * msgid.  Only persistent messages are indexed.
653  */
654 int
655 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
656 {
657         if (state1->router < state2->router)
658                 return(-1);
659         if (state1->router > state2->router)
660                 return(1);
661         if (state1->msgid < state2->msgid)
662                 return(-1);
663         if (state1->msgid > state2->msgid)
664                 return(1);
665         return(0);
666 }
667
668 /*
669  * Write a message.  All requisit command flags have been set.
670  *
671  * If msg->state is non-NULL the message is written to the existing
672  * transaction.  msgid will be set accordingly.
673  *
674  * If msg->state is NULL and CREATE is set new state is allocated and
675  * (func, data) is installed.  A msgid is assigned.
676  *
677  * If msg->state is NULL and CREATE is not set the message is assumed
678  * to be a one-way message.  The originator must assign the msgid
679  * (or leave it 0, which is typical.
680  *
681  * This function merely queues the message to the management thread, it
682  * does not write to the message socket/pipe.
683  */
684 void
685 hammer2_msg_write(hammer2_msg_t *msg)
686 {
687         hammer2_pfsmount_t *pmp = msg->router->pmp;
688         hammer2_state_t *state;
689
690         if (msg->state) {
691                 /*
692                  * Continuance or termination of existing transaction.
693                  * The transaction could have been initiated by either end.
694                  *
695                  * (Function callback and aux data for the receive side can
696                  * be replaced or left alone).
697                  */
698                 state = msg->state;
699                 msg->any.head.msgid = state->msgid;
700                 msg->any.head.source = 0;
701                 msg->any.head.target = state->router->target;
702                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
703         } else {
704                 /*
705                  * One-off message (always uses msgid 0 to distinguish
706                  * between a possibly lost in-transaction message due to
707                  * competing aborts and a real one-off message?)
708                  */
709                 msg->any.head.msgid = 0;
710                 msg->any.head.source = 0;
711                 msg->any.head.target = msg->router->target;
712                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
713         }
714
715         /*
716          * Finish up the msg fields
717          */
718         msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
719         ++pmp->msg_seq;
720
721         msg->any.head.hdr_crc = 0;
722         msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
723
724         TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
725         hammer2_clusterctl_wakeup(pmp);
726         lockmgr(&pmp->msglk, LK_RELEASE);
727 }
728
729 /*
730  * Reply to a message and terminate our side of the transaction.
731  *
732  * If msg->state is non-NULL we are replying to a one-way message.
733  */
734 void
735 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
736 {
737         hammer2_state_t *state = msg->state;
738         hammer2_msg_t *nmsg;
739         uint32_t cmd;
740
741         /*
742          * Reply with a simple error code and terminate the transaction.
743          */
744         cmd = HAMMER2_LNK_ERROR;
745
746         /*
747          * Check if our direction has even been initiated yet, set CREATE.
748          *
749          * Check what direction this is (command or reply direction).  Note
750          * that txcmd might not have been initiated yet.
751          *
752          * If our direction has already been closed we just return without
753          * doing anything.
754          */
755         if (state) {
756                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
757                         hammer2_msg_free(msg);
758                         return;
759                 }
760                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
761                         cmd |= HAMMER2_MSGF_CREATE;
762                 if (state->txcmd & HAMMER2_MSGF_REPLY)
763                         cmd |= HAMMER2_MSGF_REPLY;
764                 cmd |= HAMMER2_MSGF_DELETE;
765         } else {
766                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
767                         cmd |= HAMMER2_MSGF_REPLY;
768         }
769
770         /* XXX messy mask cmd to avoid allocating state */
771         nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
772                                  NULL, NULL);
773         nmsg->any.head.cmd = cmd;
774         nmsg->any.head.error = error;
775         nmsg->state = state;
776         hammer2_msg_write(nmsg);
777 }
778
779 /*
780  * Reply to a message and continue our side of the transaction.
781  *
782  * If msg->state is non-NULL we are replying to a one-way message and this
783  * function degenerates into the same as hammer2_msg_reply().
784  */
785 void
786 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
787 {
788         hammer2_state_t *state = msg->state;
789         hammer2_msg_t *nmsg;
790         uint32_t cmd;
791
792         /*
793          * Return a simple result code, do NOT terminate the transaction.
794          */
795         cmd = HAMMER2_LNK_ERROR;
796
797         /*
798          * Check if our direction has even been initiated yet, set CREATE.
799          *
800          * Check what direction this is (command or reply direction).  Note
801          * that txcmd might not have been initiated yet.
802          *
803          * If our direction has already been closed we just return without
804          * doing anything.
805          */
806         if (state) {
807                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
808                         hammer2_msg_free(msg);
809                         return;
810                 }
811                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
812                         cmd |= HAMMER2_MSGF_CREATE;
813                 if (state->txcmd & HAMMER2_MSGF_REPLY)
814                         cmd |= HAMMER2_MSGF_REPLY;
815                 /* continuing transaction, do not set MSGF_DELETE */
816         } else {
817                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
818                         cmd |= HAMMER2_MSGF_REPLY;
819         }
820
821         /* XXX messy mask cmd to avoid allocating state */
822         nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
823                                  NULL, NULL);
824         nmsg->any.head.cmd = cmd;
825         nmsg->any.head.error = error;
826         nmsg->state = state;
827         hammer2_msg_write(nmsg);
828 }