hammer2 - Message routing work
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
1 /*-
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer2.h"
36
37 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
39 /*
40  * Process state tracking for a message after reception, prior to
41  * execution.
42  *
43  * Called with msglk held and the msg dequeued.
44  *
45  * All messages are called with dummy state and return actual state.
46  * (One-off messages often just return the same dummy state).
47  *
48  * May request that caller discard the message by setting *discardp to 1.
49  * The returned state is not used in this case and is allowed to be NULL.
50  *
51  * --
52  *
53  * These routines handle persistent and command/reply message state via the
54  * CREATE and DELETE flags.  The first message in a command or reply sequence
55  * sets CREATE, the last message in a command or reply sequence sets DELETE.
56  *
57  * There can be any number of intermediate messages belonging to the same
58  * sequence sent inbetween the CREATE message and the DELETE message,
59  * which set neither flag.  This represents a streaming command or reply.
60  *
61  * Any command message received with CREATE set expects a reply sequence to
62  * be returned.  Reply sequences work the same as command sequences except the
63  * REPLY bit is also sent.  Both the command side and reply side can
64  * degenerate into a single message with both CREATE and DELETE set.  Note
65  * that one side can be streaming and the other side not, or neither, or both.
66  *
67  * The msgid is unique for the initiator.  That is, two sides sending a new
68  * message can use the same msgid without colliding.
69  *
70  * --
71  *
72  * ABORT sequences work by setting the ABORT flag along with normal message
73  * state.  However, ABORTs can also be sent on half-closed messages, that is
74  * even if the command or reply side has already sent a DELETE, as long as
75  * the message has not been fully closed it can still send an ABORT+DELETE
76  * to terminate the half-closed message state.
77  *
78  * Since ABORT+DELETEs can race we silently discard ABORT's for message
79  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
80  * also race, and in this situation the other side might have already
81  * initiated a new unrelated command with the same message id.  Since
82  * the abort has not set the CREATE flag the situation can be detected
83  * and the message will also be discarded.
84  *
85  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86  * The ABORT request is essentially integrated into the command instead
87  * of being sent later on.  In this situation the command implementation
88  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89  * special-case non-blocking operation for the command.
90  *
91  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
92  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
93  *        one-way messages are not supported.
94  *
95  * NOTE!  If a command sequence does not support aborts the ABORT flag is
96  *        simply ignored.
97  *
98  * --
99  *
100  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101  * set.  One-off messages cannot be aborted and typically aren't processed
102  * by these routines.  The REPLY bit can be used to distinguish whether a
103  * one-off message is a command or reply.  For example, one-off replies
104  * will typically just contain status updates.
105  */
106 int
107 hammer2_state_msgrx(hammer2_msg_t *msg)
108 {
109         hammer2_pfsmount_t *pmp;
110         hammer2_state_t *state;
111         int error;
112
113         pmp = msg->router->pmp;
114
115         /*
116          * XXX resolve msg->any.head.source and msg->any.head.target
117          *     into LNK_SPAN references.
118          *
119          * XXX replace msg->router
120          */
121
122         /*
123          * Make sure a state structure is ready to go in case we need a new
124          * one.  This is the only routine which uses freerd_state so no
125          * races are possible.
126          */
127         if ((state = pmp->freerd_state) == NULL) {
128                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
129                 state->pmp = pmp;
130                 state->flags = HAMMER2_STATE_DYNAMIC;
131                 pmp->freerd_state = state;
132         }
133
134         /*
135          * Lock RB tree and locate existing persistent state, if any.
136          *
137          * If received msg is a command state is on staterd_tree.
138          * If received msg is a reply state is on statewr_tree.
139          */
140         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
141
142         state->msgid = msg->any.head.msgid;
143         state->router = &pmp->router;
144         kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
145                 msg->any.head.cmd,
146                 (intmax_t)msg->any.head.msgid,
147                 (intmax_t)msg->any.head.source,
148                 (intmax_t)msg->any.head.target);
149         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
150                 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
151         else
152                 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
153         msg->state = state;
154
155         /*
156          * Short-cut one-off or mid-stream messages (state may be NULL).
157          */
158         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
159                                   HAMMER2_MSGF_ABORT)) == 0) {
160                 lockmgr(&pmp->msglk, LK_RELEASE);
161                 return(0);
162         }
163
164         /*
165          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
166          * inside the case statements.
167          */
168         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
169                                     HAMMER2_MSGF_REPLY)) {
170         case HAMMER2_MSGF_CREATE:
171         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
172                 /*
173                  * New persistant command received.
174                  */
175                 if (state) {
176                         kprintf("hammer2_state_msgrx: duplicate transaction\n");
177                         error = EINVAL;
178                         break;
179                 }
180                 state = pmp->freerd_state;
181                 pmp->freerd_state = NULL;
182                 msg->state = state;
183                 state->router = msg->router;
184                 state->msg = msg;
185                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
186                 state->txcmd = HAMMER2_MSGF_REPLY;
187                 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
188                 state->flags |= HAMMER2_STATE_INSERTED;
189                 error = 0;
190                 break;
191         case HAMMER2_MSGF_DELETE:
192                 /*
193                  * Persistent state is expected but might not exist if an
194                  * ABORT+DELETE races the close.
195                  */
196                 if (state == NULL) {
197                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
198                                 error = EALREADY;
199                         } else {
200                                 kprintf("hammer2_state_msgrx: no state "
201                                         "for DELETE\n");
202                                 error = EINVAL;
203                         }
204                         break;
205                 }
206
207                 /*
208                  * Handle another ABORT+DELETE case if the msgid has already
209                  * been reused.
210                  */
211                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
212                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
213                                 error = EALREADY;
214                         } else {
215                                 kprintf("hammer2_state_msgrx: state reused "
216                                         "for DELETE\n");
217                                 error = EINVAL;
218                         }
219                         break;
220                 }
221                 error = 0;
222                 break;
223         default:
224                 /*
225                  * Check for mid-stream ABORT command received, otherwise
226                  * allow.
227                  */
228                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
229                         if (state == NULL ||
230                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
231                                 error = EALREADY;
232                                 break;
233                         }
234                 }
235                 error = 0;
236                 break;
237         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
238         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
239                 /*
240                  * When receiving a reply with CREATE set the original
241                  * persistent state message should already exist.
242                  */
243                 if (state == NULL) {
244                         kprintf("hammer2_state_msgrx: no state match for "
245                                 "REPLY cmd=%08x msgid=%016jx\n",
246                                 msg->any.head.cmd,
247                                 (intmax_t)msg->any.head.msgid);
248                         error = EINVAL;
249                         break;
250                 }
251                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
252                 error = 0;
253                 break;
254         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
255                 /*
256                  * Received REPLY+ABORT+DELETE in case where msgid has
257                  * already been fully closed, ignore the message.
258                  */
259                 if (state == NULL) {
260                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
261                                 error = EALREADY;
262                         } else {
263                                 kprintf("hammer2_state_msgrx: no state match "
264                                         "for REPLY|DELETE\n");
265                                 error = EINVAL;
266                         }
267                         break;
268                 }
269
270                 /*
271                  * Received REPLY+ABORT+DELETE in case where msgid has
272                  * already been reused for an unrelated message,
273                  * ignore the message.
274                  */
275                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
276                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
277                                 error = EALREADY;
278                         } else {
279                                 kprintf("hammer2_state_msgrx: state reused "
280                                         "for REPLY|DELETE\n");
281                                 error = EINVAL;
282                         }
283                         break;
284                 }
285                 error = 0;
286                 break;
287         case HAMMER2_MSGF_REPLY:
288                 /*
289                  * Check for mid-stream ABORT reply received to sent command.
290                  */
291                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
292                         if (state == NULL ||
293                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
294                                 error = EALREADY;
295                                 break;
296                         }
297                 }
298                 error = 0;
299                 break;
300         }
301         lockmgr(&pmp->msglk, LK_RELEASE);
302         return (error);
303 }
304
305 void
306 hammer2_state_cleanuprx(hammer2_msg_t *msg)
307 {
308         hammer2_pfsmount_t *pmp = msg->router->pmp;
309         hammer2_state_t *state;
310
311         if ((state = msg->state) == NULL) {
312                 hammer2_msg_free(msg);
313         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
314                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
315                 state->rxcmd |= HAMMER2_MSGF_DELETE;
316                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
317                         if (state->msg == msg)
318                                 state->msg = NULL;
319                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
320                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
321                                 KKASSERT(msg->any.head.cmd &
322                                          HAMMER2_MSGF_REPLY);
323                                 RB_REMOVE(hammer2_state_tree,
324                                           &pmp->statewr_tree, state);
325                         } else {
326                                 KKASSERT((msg->any.head.cmd &
327                                           HAMMER2_MSGF_REPLY) == 0);
328                                 RB_REMOVE(hammer2_state_tree,
329                                           &pmp->staterd_tree, state);
330                         }
331                         state->flags &= ~HAMMER2_STATE_INSERTED;
332                         lockmgr(&pmp->msglk, LK_RELEASE);
333                         hammer2_state_free(state);
334                 } else {
335                         lockmgr(&pmp->msglk, LK_RELEASE);
336                 }
337                 hammer2_msg_free(msg);
338         } else if (state->msg != msg) {
339                 hammer2_msg_free(msg);
340         }
341 }
342
343 /*
344  * Process state tracking for a message prior to transmission.
345  *
346  * Called with msglk held and the msg dequeued.
347  *
348  * One-off messages are usually with dummy state and msg->state may be NULL
349  * in this situation.
350  *
351  * New transactions (when CREATE is set) will insert the state.
352  *
353  * May request that caller discard the message by setting *discardp to 1.
354  * A NULL state may be returned in this case.
355  */
356 int
357 hammer2_state_msgtx(hammer2_msg_t *msg)
358 {
359         hammer2_pfsmount_t *pmp = msg->router->pmp;
360         hammer2_state_t *state;
361         int error;
362
363         /*
364          * Make sure a state structure is ready to go in case we need a new
365          * one.  This is the only routine which uses freewr_state so no
366          * races are possible.
367          */
368         if ((state = pmp->freewr_state) == NULL) {
369                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
370                 state->pmp = pmp;
371                 state->flags = HAMMER2_STATE_DYNAMIC;
372                 pmp->freewr_state = state;
373         }
374
375         /*
376          * Lock RB tree.  If persistent state is present it will have already
377          * been assigned to msg.
378          */
379         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
380         state = msg->state;
381
382         /*
383          * Short-cut one-off or mid-stream messages (state may be NULL).
384          */
385         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
386                                   HAMMER2_MSGF_ABORT)) == 0) {
387                 lockmgr(&pmp->msglk, LK_RELEASE);
388                 return(0);
389         }
390
391
392         /*
393          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
394          * inside the case statements.
395          */
396         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
397                                     HAMMER2_MSGF_REPLY)) {
398         case HAMMER2_MSGF_CREATE:
399         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
400                 /*
401                  * Insert the new persistent message state and mark
402                  * half-closed if DELETE is set.  Since this is a new
403                  * message it isn't possible to transition into the fully
404                  * closed state here.
405                  *
406                  * XXX state must be assigned and inserted by
407                  *     hammer2_msg_write().  txcmd is assigned by us
408                  *     on-transmit.
409                  */
410                 KKASSERT(state != NULL);
411                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
412                 state->rxcmd = HAMMER2_MSGF_REPLY;
413                 error = 0;
414                 break;
415         case HAMMER2_MSGF_DELETE:
416                 /*
417                  * Sent ABORT+DELETE in case where msgid has already
418                  * been fully closed, ignore the message.
419                  */
420                 if (state == NULL) {
421                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
422                                 error = EALREADY;
423                         } else {
424                                 kprintf("hammer2_state_msgtx: no state match "
425                                         "for DELETE cmd=%08x msgid=%016jx\n",
426                                         msg->any.head.cmd,
427                                         (intmax_t)msg->any.head.msgid);
428                                 error = EINVAL;
429                         }
430                         break;
431                 }
432
433                 /*
434                  * Sent ABORT+DELETE in case where msgid has
435                  * already been reused for an unrelated message,
436                  * ignore the message.
437                  */
438                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
439                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
440                                 error = EALREADY;
441                         } else {
442                                 kprintf("hammer2_state_msgtx: state reused "
443                                         "for DELETE\n");
444                                 error = EINVAL;
445                         }
446                         break;
447                 }
448                 error = 0;
449                 break;
450         default:
451                 /*
452                  * Check for mid-stream ABORT command sent
453                  */
454                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
455                         if (state == NULL ||
456                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
457                                 error = EALREADY;
458                                 break;
459                         }
460                 }
461                 error = 0;
462                 break;
463         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
464         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
465                 /*
466                  * When transmitting a reply with CREATE set the original
467                  * persistent state message should already exist.
468                  */
469                 if (state == NULL) {
470                         kprintf("hammer2_state_msgtx: no state match "
471                                 "for REPLY | CREATE\n");
472                         error = EINVAL;
473                         break;
474                 }
475                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
476                 error = 0;
477                 break;
478         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
479                 /*
480                  * When transmitting a reply with DELETE set the original
481                  * persistent state message should already exist.
482                  *
483                  * This is very similar to the REPLY|CREATE|* case except
484                  * txcmd is already stored, so we just add the DELETE flag.
485                  *
486                  * Sent REPLY+ABORT+DELETE in case where msgid has
487                  * already been fully closed, ignore the message.
488                  */
489                 if (state == NULL) {
490                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
491                                 error = EALREADY;
492                         } else {
493                                 kprintf("hammer2_state_msgtx: no state match "
494                                         "for REPLY | DELETE\n");
495                                 error = EINVAL;
496                         }
497                         break;
498                 }
499
500                 /*
501                  * Sent REPLY+ABORT+DELETE in case where msgid has already
502                  * been reused for an unrelated message, ignore the message.
503                  */
504                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
505                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
506                                 error = EALREADY;
507                         } else {
508                                 kprintf("hammer2_state_msgtx: state reused "
509                                         "for REPLY | DELETE\n");
510                                 error = EINVAL;
511                         }
512                         break;
513                 }
514                 error = 0;
515                 break;
516         case HAMMER2_MSGF_REPLY:
517                 /*
518                  * Check for mid-stream ABORT reply sent.
519                  *
520                  * One-off REPLY messages are allowed for e.g. status updates.
521                  */
522                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
523                         if (state == NULL ||
524                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
525                                 error = EALREADY;
526                                 break;
527                         }
528                 }
529                 error = 0;
530                 break;
531         }
532         lockmgr(&pmp->msglk, LK_RELEASE);
533         return (error);
534 }
535
536 void
537 hammer2_state_cleanuptx(hammer2_msg_t *msg)
538 {
539         hammer2_pfsmount_t *pmp = msg->router->pmp;
540         hammer2_state_t *state;
541
542         if ((state = msg->state) == NULL) {
543                 hammer2_msg_free(msg);
544         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
545                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
546                 state->txcmd |= HAMMER2_MSGF_DELETE;
547                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
548                         if (state->msg == msg)
549                                 state->msg = NULL;
550                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
551                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
552                                 KKASSERT(msg->any.head.cmd &
553                                          HAMMER2_MSGF_REPLY);
554                                 RB_REMOVE(hammer2_state_tree,
555                                           &pmp->staterd_tree, state);
556                         } else {
557                                 KKASSERT((msg->any.head.cmd &
558                                           HAMMER2_MSGF_REPLY) == 0);
559                                 RB_REMOVE(hammer2_state_tree,
560                                           &pmp->statewr_tree, state);
561                         }
562                         state->flags &= ~HAMMER2_STATE_INSERTED;
563                         lockmgr(&pmp->msglk, LK_RELEASE);
564                         hammer2_state_free(state);
565                 } else {
566                         lockmgr(&pmp->msglk, LK_RELEASE);
567                 }
568                 hammer2_msg_free(msg);
569         } else if (state->msg != msg) {
570                 hammer2_msg_free(msg);
571         }
572 }
573
574 void
575 hammer2_state_free(hammer2_state_t *state)
576 {
577         hammer2_pfsmount_t *pmp = state->pmp;
578         hammer2_msg_t *msg;
579
580         msg = state->msg;
581         state->msg = NULL;
582         kfree(state, pmp->mmsg);
583         if (msg)
584                 hammer2_msg_free(msg);
585 }
586
587 hammer2_msg_t *
588 hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd)
589 {
590         hammer2_msg_t *msg;
591         size_t hbytes;
592
593         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
594         msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
595                       router->pmp->mmsg, M_WAITOK | M_ZERO);
596         msg->hdr_size = hbytes;
597         msg->router = router;
598         KKASSERT(router != NULL);
599         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
600         msg->any.head.source = 0;
601         msg->any.head.target = router->target;
602         msg->any.head.cmd = cmd;
603
604         return (msg);
605 }
606
607 void
608 hammer2_msg_free(hammer2_msg_t *msg)
609 {
610         hammer2_pfsmount_t *pmp = msg->router->pmp;
611
612         if (msg->aux_data && msg->aux_size) {
613                 kfree(msg->aux_data, pmp->mmsg);
614                 msg->aux_data = NULL;
615                 msg->aux_size = 0;
616                 msg->router = NULL;
617         }
618         kfree(msg, pmp->mmsg);
619 }
620
621 /*
622  * Indexed messages are stored in a red-black tree indexed by their
623  * msgid.  Only persistent messages are indexed.
624  */
625 int
626 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
627 {
628         if (state1->router < state2->router)
629                 return(-1);
630         if (state1->router > state2->router)
631                 return(1);
632         if (state1->msgid < state2->msgid)
633                 return(-1);
634         if (state1->msgid > state2->msgid)
635                 return(1);
636         return(0);
637 }
638
639 /*
640  * Write a message.  All requisit command flags have been set.
641  *
642  * If msg->state is non-NULL the message is written to the existing
643  * transaction.  msgid will be set accordingly.
644  *
645  * If msg->state is NULL and CREATE is set new state is allocated and
646  * (func, data) is installed.  A msgid is assigned.
647  *
648  * If msg->state is NULL and CREATE is not set the message is assumed
649  * to be a one-way message.  The originator must assign the msgid
650  * (or leave it 0, which is typical.
651  *
652  * This function merely queues the message to the management thread, it
653  * does not write to the message socket/pipe.
654  */
655 void
656 hammer2_msg_write(hammer2_msg_t *msg,
657                   int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
658 {
659         hammer2_pfsmount_t *pmp = msg->router->pmp;
660         hammer2_state_t *state;
661
662         if (msg->state) {
663                 /*
664                  * Continuance or termination of existing transaction.
665                  * The transaction could have been initiated by either end.
666                  *
667                  * (Function callback and aux data for the receive side can
668                  * be replaced or left alone).
669                  */
670                 state = msg->state;
671                 msg->any.head.msgid = state->msgid;
672                 msg->any.head.source = 0;
673                 msg->any.head.target = state->router->target;
674                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
675                 if (func) {
676                         state->func = func;
677                         state->any.any = data;
678                 }
679         } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
680                 /*
681                  * New transaction, requires tracking state and a unique
682                  * msgid to be allocated.
683                  */
684                 KKASSERT(msg->state == NULL);
685                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
686                 state->pmp = pmp;
687                 state->flags = HAMMER2_STATE_DYNAMIC;
688                 state->func = func;
689                 state->any.any = data;
690                 state->msg = msg;
691                 state->msgid = (uint64_t)(uintptr_t)state;
692                 state->router = msg->router;
693                 msg->state = state;
694                 msg->any.head.source = 0;
695                 msg->any.head.target = state->router->target;
696                 msg->any.head.msgid = state->msgid;
697
698                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
699                 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
700                         panic("duplicate msgid allocated");
701                 msg->any.head.msgid = state->msgid;
702         } else {
703                 /*
704                  * One-off message (always uses msgid 0 to distinguish
705                  * between a possibly lost in-transaction message due to
706                  * competing aborts and a real one-off message?)
707                  */
708                 msg->any.head.msgid = 0;
709                 msg->any.head.source = 0;
710                 msg->any.head.target = msg->router->target;
711                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
712         }
713
714         /*
715          * Finish up the msg fields
716          */
717         msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
718         ++pmp->msg_seq;
719
720         msg->any.head.hdr_crc = 0;
721         msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
722
723         TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
724         lockmgr(&pmp->msglk, LK_RELEASE);
725 }
726
727 /*
728  * Reply to a message and terminate our side of the transaction.
729  *
730  * If msg->state is non-NULL we are replying to a one-way message.
731  */
732 void
733 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
734 {
735         hammer2_state_t *state = msg->state;
736         hammer2_msg_t *nmsg;
737         uint32_t cmd;
738
739         /*
740          * Reply with a simple error code and terminate the transaction.
741          */
742         cmd = HAMMER2_LNK_ERROR;
743
744         /*
745          * Check if our direction has even been initiated yet, set CREATE.
746          *
747          * Check what direction this is (command or reply direction).  Note
748          * that txcmd might not have been initiated yet.
749          *
750          * If our direction has already been closed we just return without
751          * doing anything.
752          */
753         if (state) {
754                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
755                         hammer2_msg_free(msg);
756                         return;
757                 }
758                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
759                         cmd |= HAMMER2_MSGF_CREATE;
760                 if (state->txcmd & HAMMER2_MSGF_REPLY)
761                         cmd |= HAMMER2_MSGF_REPLY;
762                 cmd |= HAMMER2_MSGF_DELETE;
763         } else {
764                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
765                         cmd |= HAMMER2_MSGF_REPLY;
766         }
767
768         nmsg = hammer2_msg_alloc(msg->router, cmd);
769         nmsg->any.head.error = error;
770         nmsg->state = state;
771         hammer2_msg_write(nmsg, NULL, NULL);
772 }
773
774 /*
775  * Reply to a message and continue our side of the transaction.
776  *
777  * If msg->state is non-NULL we are replying to a one-way message and this
778  * function degenerates into the same as hammer2_msg_reply().
779  */
780 void
781 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
782 {
783         hammer2_state_t *state = msg->state;
784         hammer2_msg_t *nmsg;
785         uint32_t cmd;
786
787         /*
788          * Return a simple result code, do NOT terminate the transaction.
789          */
790         cmd = HAMMER2_LNK_ERROR;
791
792         /*
793          * Check if our direction has even been initiated yet, set CREATE.
794          *
795          * Check what direction this is (command or reply direction).  Note
796          * that txcmd might not have been initiated yet.
797          *
798          * If our direction has already been closed we just return without
799          * doing anything.
800          */
801         if (state) {
802                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
803                         hammer2_msg_free(msg);
804                         return;
805                 }
806                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
807                         cmd |= HAMMER2_MSGF_CREATE;
808                 if (state->txcmd & HAMMER2_MSGF_REPLY)
809                         cmd |= HAMMER2_MSGF_REPLY;
810                 /* continuing transaction, do not set MSGF_DELETE */
811         } else {
812                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
813                         cmd |= HAMMER2_MSGF_REPLY;
814         }
815
816         nmsg = hammer2_msg_alloc(msg->router, cmd);
817         nmsg->any.head.error = error;
818         nmsg->state = state;
819         hammer2_msg_write(nmsg, NULL, NULL);
820 }