hammer2 - Messaging layer separation work part 2
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
1 /*-
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer2.h"
36
37 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
39 /*
40  * Process state tracking for a message after reception, prior to
41  * execution.
42  *
43  * Called with msglk held and the msg dequeued.
44  *
45  * All messages are called with dummy state and return actual state.
46  * (One-off messages often just return the same dummy state).
47  *
48  * May request that caller discard the message by setting *discardp to 1.
49  * The returned state is not used in this case and is allowed to be NULL.
50  *
51  * --
52  *
53  * These routines handle persistent and command/reply message state via the
54  * CREATE and DELETE flags.  The first message in a command or reply sequence
55  * sets CREATE, the last message in a command or reply sequence sets DELETE.
56  *
57  * There can be any number of intermediate messages belonging to the same
58  * sequence sent inbetween the CREATE message and the DELETE message,
59  * which set neither flag.  This represents a streaming command or reply.
60  *
61  * Any command message received with CREATE set expects a reply sequence to
62  * be returned.  Reply sequences work the same as command sequences except the
63  * REPLY bit is also sent.  Both the command side and reply side can
64  * degenerate into a single message with both CREATE and DELETE set.  Note
65  * that one side can be streaming and the other side not, or neither, or both.
66  *
67  * The msgid is unique for the initiator.  That is, two sides sending a new
68  * message can use the same msgid without colliding.
69  *
70  * --
71  *
72  * ABORT sequences work by setting the ABORT flag along with normal message
73  * state.  However, ABORTs can also be sent on half-closed messages, that is
74  * even if the command or reply side has already sent a DELETE, as long as
75  * the message has not been fully closed it can still send an ABORT+DELETE
76  * to terminate the half-closed message state.
77  *
78  * Since ABORT+DELETEs can race we silently discard ABORT's for message
79  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
80  * also race, and in this situation the other side might have already
81  * initiated a new unrelated command with the same message id.  Since
82  * the abort has not set the CREATE flag the situation can be detected
83  * and the message will also be discarded.
84  *
85  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86  * The ABORT request is essentially integrated into the command instead
87  * of being sent later on.  In this situation the command implementation
88  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89  * special-case non-blocking operation for the command.
90  *
91  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
92  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
93  *        one-way messages are not supported.
94  *
95  * NOTE!  If a command sequence does not support aborts the ABORT flag is
96  *        simply ignored.
97  *
98  * --
99  *
100  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101  * set.  One-off messages cannot be aborted and typically aren't processed
102  * by these routines.  The REPLY bit can be used to distinguish whether a
103  * one-off message is a command or reply.  For example, one-off replies
104  * will typically just contain status updates.
105  */
106 int
107 hammer2_state_msgrx(hammer2_msg_t *msg)
108 {
109         hammer2_pfsmount_t *pmp;
110         hammer2_state_t *state;
111         int error;
112
113         pmp = msg->router->pmp;
114
115         /*
116          * XXX resolve msg->any.head.source and msg->any.head.target
117          *     into LNK_SPAN references.
118          *
119          * XXX replace msg->router
120          */
121
122         /*
123          * Make sure a state structure is ready to go in case we need a new
124          * one.  This is the only routine which uses freerd_state so no
125          * races are possible.
126          */
127         if ((state = pmp->freerd_state) == NULL) {
128                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
129                 state->pmp = pmp;
130                 state->flags = HAMMER2_STATE_DYNAMIC;
131                 pmp->freerd_state = state;
132         }
133
134         /*
135          * Lock RB tree and locate existing persistent state, if any.
136          *
137          * If received msg is a command state is on staterd_tree.
138          * If received msg is a reply state is on statewr_tree.
139          */
140         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
141
142         state->msgid = msg->any.head.msgid;
143         state->router = &pmp->router;
144         kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
145                 msg->any.head.cmd,
146                 (intmax_t)msg->any.head.msgid,
147                 (intmax_t)msg->any.head.source,
148                 (intmax_t)msg->any.head.target);
149         if (msg->any.head.cmd & DMSGF_REPLY)
150                 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
151         else
152                 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
153         msg->state = state;
154
155         /*
156          * Short-cut one-off or mid-stream messages (state may be NULL).
157          */
158         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
159                                   DMSGF_ABORT)) == 0) {
160                 lockmgr(&pmp->msglk, LK_RELEASE);
161                 return(0);
162         }
163
164         /*
165          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
166          * inside the case statements.
167          */
168         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)) {
169         case DMSGF_CREATE:
170         case DMSGF_CREATE | DMSGF_DELETE:
171                 /*
172                  * New persistant command received.
173                  */
174                 if (state) {
175                         kprintf("hammer2_state_msgrx: duplicate transaction\n");
176                         error = EINVAL;
177                         break;
178                 }
179                 state = pmp->freerd_state;
180                 pmp->freerd_state = NULL;
181                 msg->state = state;
182                 state->router = msg->router;
183                 state->msg = msg;
184                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
185                 state->txcmd = DMSGF_REPLY;
186                 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
187                 state->flags |= HAMMER2_STATE_INSERTED;
188                 error = 0;
189                 break;
190         case DMSGF_DELETE:
191                 /*
192                  * Persistent state is expected but might not exist if an
193                  * ABORT+DELETE races the close.
194                  */
195                 if (state == NULL) {
196                         if (msg->any.head.cmd & DMSGF_ABORT) {
197                                 error = EALREADY;
198                         } else {
199                                 kprintf("hammer2_state_msgrx: no state "
200                                         "for DELETE\n");
201                                 error = EINVAL;
202                         }
203                         break;
204                 }
205
206                 /*
207                  * Handle another ABORT+DELETE case if the msgid has already
208                  * been reused.
209                  */
210                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
211                         if (msg->any.head.cmd & DMSGF_ABORT) {
212                                 error = EALREADY;
213                         } else {
214                                 kprintf("hammer2_state_msgrx: state reused "
215                                         "for DELETE\n");
216                                 error = EINVAL;
217                         }
218                         break;
219                 }
220                 error = 0;
221                 break;
222         default:
223                 /*
224                  * Check for mid-stream ABORT command received, otherwise
225                  * allow.
226                  */
227                 if (msg->any.head.cmd & DMSGF_ABORT) {
228                         if (state == NULL ||
229                             (state->rxcmd & DMSGF_CREATE) == 0) {
230                                 error = EALREADY;
231                                 break;
232                         }
233                 }
234                 error = 0;
235                 break;
236         case DMSGF_REPLY | DMSGF_CREATE:
237         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
238                 /*
239                  * When receiving a reply with CREATE set the original
240                  * persistent state message should already exist.
241                  */
242                 if (state == NULL) {
243                         kprintf("hammer2_state_msgrx: no state match for "
244                                 "REPLY cmd=%08x msgid=%016jx\n",
245                                 msg->any.head.cmd,
246                                 (intmax_t)msg->any.head.msgid);
247                         error = EINVAL;
248                         break;
249                 }
250                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
251                 error = 0;
252                 break;
253         case DMSGF_REPLY | DMSGF_DELETE:
254                 /*
255                  * Received REPLY+ABORT+DELETE in case where msgid has
256                  * already been fully closed, ignore the message.
257                  */
258                 if (state == NULL) {
259                         if (msg->any.head.cmd & DMSGF_ABORT) {
260                                 error = EALREADY;
261                         } else {
262                                 kprintf("hammer2_state_msgrx: no state match "
263                                         "for REPLY|DELETE\n");
264                                 error = EINVAL;
265                         }
266                         break;
267                 }
268
269                 /*
270                  * Received REPLY+ABORT+DELETE in case where msgid has
271                  * already been reused for an unrelated message,
272                  * ignore the message.
273                  */
274                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
275                         if (msg->any.head.cmd & DMSGF_ABORT) {
276                                 error = EALREADY;
277                         } else {
278                                 kprintf("hammer2_state_msgrx: state reused "
279                                         "for REPLY|DELETE\n");
280                                 error = EINVAL;
281                         }
282                         break;
283                 }
284                 error = 0;
285                 break;
286         case DMSGF_REPLY:
287                 /*
288                  * Check for mid-stream ABORT reply received to sent command.
289                  */
290                 if (msg->any.head.cmd & DMSGF_ABORT) {
291                         if (state == NULL ||
292                             (state->rxcmd & DMSGF_CREATE) == 0) {
293                                 error = EALREADY;
294                                 break;
295                         }
296                 }
297                 error = 0;
298                 break;
299         }
300         lockmgr(&pmp->msglk, LK_RELEASE);
301         return (error);
302 }
303
304 void
305 hammer2_state_cleanuprx(hammer2_msg_t *msg)
306 {
307         hammer2_pfsmount_t *pmp = msg->router->pmp;
308         hammer2_state_t *state;
309
310         if ((state = msg->state) == NULL) {
311                 hammer2_msg_free(msg);
312         } else if (msg->any.head.cmd & DMSGF_DELETE) {
313                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
314                 state->rxcmd |= DMSGF_DELETE;
315                 if (state->txcmd & DMSGF_DELETE) {
316                         if (state->msg == msg)
317                                 state->msg = NULL;
318                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
319                         if (state->rxcmd & DMSGF_REPLY) {
320                                 KKASSERT(msg->any.head.cmd &
321                                          DMSGF_REPLY);
322                                 RB_REMOVE(hammer2_state_tree,
323                                           &pmp->statewr_tree, state);
324                         } else {
325                                 KKASSERT((msg->any.head.cmd &
326                                           DMSGF_REPLY) == 0);
327                                 RB_REMOVE(hammer2_state_tree,
328                                           &pmp->staterd_tree, state);
329                         }
330                         state->flags &= ~HAMMER2_STATE_INSERTED;
331                         lockmgr(&pmp->msglk, LK_RELEASE);
332                         hammer2_state_free(state);
333                 } else {
334                         lockmgr(&pmp->msglk, LK_RELEASE);
335                 }
336                 hammer2_msg_free(msg);
337         } else if (state->msg != msg) {
338                 hammer2_msg_free(msg);
339         }
340 }
341
342 /*
343  * Process state tracking for a message prior to transmission.
344  *
345  * Called with msglk held and the msg dequeued.
346  *
347  * One-off messages are usually with dummy state and msg->state may be NULL
348  * in this situation.
349  *
350  * New transactions (when CREATE is set) will insert the state.
351  *
352  * May request that caller discard the message by setting *discardp to 1.
353  * A NULL state may be returned in this case.
354  */
355 int
356 hammer2_state_msgtx(hammer2_msg_t *msg)
357 {
358         hammer2_pfsmount_t *pmp = msg->router->pmp;
359         hammer2_state_t *state;
360         int error;
361
362         /*
363          * Make sure a state structure is ready to go in case we need a new
364          * one.  This is the only routine which uses freewr_state so no
365          * races are possible.
366          */
367         if ((state = pmp->freewr_state) == NULL) {
368                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
369                 state->pmp = pmp;
370                 state->flags = HAMMER2_STATE_DYNAMIC;
371                 pmp->freewr_state = state;
372         }
373
374         /*
375          * Lock RB tree.  If persistent state is present it will have already
376          * been assigned to msg.
377          */
378         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
379         state = msg->state;
380
381         /*
382          * Short-cut one-off or mid-stream messages (state may be NULL).
383          */
384         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
385                                   DMSGF_ABORT)) == 0) {
386                 lockmgr(&pmp->msglk, LK_RELEASE);
387                 return(0);
388         }
389
390
391         /*
392          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
393          * inside the case statements.
394          */
395         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
396                                     DMSGF_REPLY)) {
397         case DMSGF_CREATE:
398         case DMSGF_CREATE | DMSGF_DELETE:
399                 /*
400                  * Insert the new persistent message state and mark
401                  * half-closed if DELETE is set.  Since this is a new
402                  * message it isn't possible to transition into the fully
403                  * closed state here.
404                  *
405                  * XXX state must be assigned and inserted by
406                  *     hammer2_msg_write().  txcmd is assigned by us
407                  *     on-transmit.
408                  */
409                 KKASSERT(state != NULL);
410                 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
411                 state->rxcmd = DMSGF_REPLY;
412                 error = 0;
413                 break;
414         case DMSGF_DELETE:
415                 /*
416                  * Sent ABORT+DELETE in case where msgid has already
417                  * been fully closed, ignore the message.
418                  */
419                 if (state == NULL) {
420                         if (msg->any.head.cmd & DMSGF_ABORT) {
421                                 error = EALREADY;
422                         } else {
423                                 kprintf("hammer2_state_msgtx: no state match "
424                                         "for DELETE cmd=%08x msgid=%016jx\n",
425                                         msg->any.head.cmd,
426                                         (intmax_t)msg->any.head.msgid);
427                                 error = EINVAL;
428                         }
429                         break;
430                 }
431
432                 /*
433                  * Sent ABORT+DELETE in case where msgid has
434                  * already been reused for an unrelated message,
435                  * ignore the message.
436                  */
437                 if ((state->txcmd & DMSGF_CREATE) == 0) {
438                         if (msg->any.head.cmd & DMSGF_ABORT) {
439                                 error = EALREADY;
440                         } else {
441                                 kprintf("hammer2_state_msgtx: state reused "
442                                         "for DELETE\n");
443                                 error = EINVAL;
444                         }
445                         break;
446                 }
447                 error = 0;
448                 break;
449         default:
450                 /*
451                  * Check for mid-stream ABORT command sent
452                  */
453                 if (msg->any.head.cmd & DMSGF_ABORT) {
454                         if (state == NULL ||
455                             (state->txcmd & DMSGF_CREATE) == 0) {
456                                 error = EALREADY;
457                                 break;
458                         }
459                 }
460                 error = 0;
461                 break;
462         case DMSGF_REPLY | DMSGF_CREATE:
463         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
464                 /*
465                  * When transmitting a reply with CREATE set the original
466                  * persistent state message should already exist.
467                  */
468                 if (state == NULL) {
469                         kprintf("hammer2_state_msgtx: no state match "
470                                 "for REPLY | CREATE\n");
471                         error = EINVAL;
472                         break;
473                 }
474                 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
475                 error = 0;
476                 break;
477         case DMSGF_REPLY | DMSGF_DELETE:
478                 /*
479                  * When transmitting a reply with DELETE set the original
480                  * persistent state message should already exist.
481                  *
482                  * This is very similar to the REPLY|CREATE|* case except
483                  * txcmd is already stored, so we just add the DELETE flag.
484                  *
485                  * Sent REPLY+ABORT+DELETE in case where msgid has
486                  * already been fully closed, ignore the message.
487                  */
488                 if (state == NULL) {
489                         if (msg->any.head.cmd & DMSGF_ABORT) {
490                                 error = EALREADY;
491                         } else {
492                                 kprintf("hammer2_state_msgtx: no state match "
493                                         "for REPLY | DELETE\n");
494                                 error = EINVAL;
495                         }
496                         break;
497                 }
498
499                 /*
500                  * Sent REPLY+ABORT+DELETE in case where msgid has already
501                  * been reused for an unrelated message, ignore the message.
502                  */
503                 if ((state->txcmd & DMSGF_CREATE) == 0) {
504                         if (msg->any.head.cmd & DMSGF_ABORT) {
505                                 error = EALREADY;
506                         } else {
507                                 kprintf("hammer2_state_msgtx: state reused "
508                                         "for REPLY | DELETE\n");
509                                 error = EINVAL;
510                         }
511                         break;
512                 }
513                 error = 0;
514                 break;
515         case DMSGF_REPLY:
516                 /*
517                  * Check for mid-stream ABORT reply sent.
518                  *
519                  * One-off REPLY messages are allowed for e.g. status updates.
520                  */
521                 if (msg->any.head.cmd & DMSGF_ABORT) {
522                         if (state == NULL ||
523                             (state->txcmd & DMSGF_CREATE) == 0) {
524                                 error = EALREADY;
525                                 break;
526                         }
527                 }
528                 error = 0;
529                 break;
530         }
531         lockmgr(&pmp->msglk, LK_RELEASE);
532         return (error);
533 }
534
535 void
536 hammer2_state_cleanuptx(hammer2_msg_t *msg)
537 {
538         hammer2_pfsmount_t *pmp = msg->router->pmp;
539         hammer2_state_t *state;
540
541         if ((state = msg->state) == NULL) {
542                 hammer2_msg_free(msg);
543         } else if (msg->any.head.cmd & DMSGF_DELETE) {
544                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
545                 state->txcmd |= DMSGF_DELETE;
546                 if (state->rxcmd & DMSGF_DELETE) {
547                         if (state->msg == msg)
548                                 state->msg = NULL;
549                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
550                         if (state->txcmd & DMSGF_REPLY) {
551                                 KKASSERT(msg->any.head.cmd &
552                                          DMSGF_REPLY);
553                                 RB_REMOVE(hammer2_state_tree,
554                                           &pmp->staterd_tree, state);
555                         } else {
556                                 KKASSERT((msg->any.head.cmd &
557                                           DMSGF_REPLY) == 0);
558                                 RB_REMOVE(hammer2_state_tree,
559                                           &pmp->statewr_tree, state);
560                         }
561                         state->flags &= ~HAMMER2_STATE_INSERTED;
562                         lockmgr(&pmp->msglk, LK_RELEASE);
563                         hammer2_state_free(state);
564                 } else {
565                         lockmgr(&pmp->msglk, LK_RELEASE);
566                 }
567                 hammer2_msg_free(msg);
568         } else if (state->msg != msg) {
569                 hammer2_msg_free(msg);
570         }
571 }
572
573 void
574 hammer2_state_free(hammer2_state_t *state)
575 {
576         hammer2_pfsmount_t *pmp = state->pmp;
577         hammer2_msg_t *msg;
578
579         KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
580         msg = state->msg;
581         state->msg = NULL;
582         kfree(state, pmp->mmsg);
583         if (msg)
584                 hammer2_msg_free(msg);
585 }
586
587 hammer2_msg_t *
588 hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
589                   int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
590 {
591         hammer2_pfsmount_t *pmp = router->pmp;
592         hammer2_msg_t *msg;
593         hammer2_state_t *state;
594         size_t hbytes;
595
596         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
597         msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
598                       pmp->mmsg, M_WAITOK | M_ZERO);
599         msg->hdr_size = hbytes;
600         msg->router = router;
601         KKASSERT(router != NULL);
602         msg->any.head.magic = DMSG_HDR_MAGIC;
603         msg->any.head.source = 0;
604         msg->any.head.target = router->target;
605         msg->any.head.cmd = cmd;
606
607         if (cmd & DMSGF_CREATE) {
608                 /*
609                  * New transaction, requires tracking state and a unique
610                  * msgid to be allocated.
611                  */
612                 KKASSERT(msg->state == NULL);
613                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
614                 state->pmp = pmp;
615                 state->flags = HAMMER2_STATE_DYNAMIC;
616                 state->func = func;
617                 state->any.any = data;
618                 state->msg = msg;
619                 state->msgid = (uint64_t)(uintptr_t)state;
620                 state->router = msg->router;
621                 msg->state = state;
622                 msg->any.head.source = 0;
623                 msg->any.head.target = state->router->target;
624                 msg->any.head.msgid = state->msgid;
625
626                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
627                 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
628                         panic("duplicate msgid allocated");
629                 state->flags |= HAMMER2_STATE_INSERTED;
630                 msg->any.head.msgid = state->msgid;
631                 lockmgr(&pmp->msglk, LK_RELEASE);
632         }
633
634         return (msg);
635 }
636
637 void
638 hammer2_msg_free(hammer2_msg_t *msg)
639 {
640         hammer2_pfsmount_t *pmp = msg->router->pmp;
641
642         if (msg->aux_data && msg->aux_size) {
643                 kfree(msg->aux_data, pmp->mmsg);
644                 msg->aux_data = NULL;
645                 msg->aux_size = 0;
646                 msg->router = NULL;
647         }
648         kfree(msg, pmp->mmsg);
649 }
650
651 /*
652  * Indexed messages are stored in a red-black tree indexed by their
653  * msgid.  Only persistent messages are indexed.
654  */
655 int
656 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
657 {
658         if (state1->router < state2->router)
659                 return(-1);
660         if (state1->router > state2->router)
661                 return(1);
662         if (state1->msgid < state2->msgid)
663                 return(-1);
664         if (state1->msgid > state2->msgid)
665                 return(1);
666         return(0);
667 }
668
669 /*
670  * Write a message.  All requisit command flags have been set.
671  *
672  * If msg->state is non-NULL the message is written to the existing
673  * transaction.  msgid will be set accordingly.
674  *
675  * If msg->state is NULL and CREATE is set new state is allocated and
676  * (func, data) is installed.  A msgid is assigned.
677  *
678  * If msg->state is NULL and CREATE is not set the message is assumed
679  * to be a one-way message.  The originator must assign the msgid
680  * (or leave it 0, which is typical.
681  *
682  * This function merely queues the message to the management thread, it
683  * does not write to the message socket/pipe.
684  */
685 void
686 hammer2_msg_write(hammer2_msg_t *msg)
687 {
688         hammer2_pfsmount_t *pmp = msg->router->pmp;
689         hammer2_state_t *state;
690
691         if (msg->state) {
692                 /*
693                  * Continuance or termination of existing transaction.
694                  * The transaction could have been initiated by either end.
695                  *
696                  * (Function callback and aux data for the receive side can
697                  * be replaced or left alone).
698                  */
699                 state = msg->state;
700                 msg->any.head.msgid = state->msgid;
701                 msg->any.head.source = 0;
702                 msg->any.head.target = state->router->target;
703                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
704         } else {
705                 /*
706                  * One-off message (always uses msgid 0 to distinguish
707                  * between a possibly lost in-transaction message due to
708                  * competing aborts and a real one-off message?)
709                  */
710                 msg->any.head.msgid = 0;
711                 msg->any.head.source = 0;
712                 msg->any.head.target = msg->router->target;
713                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
714         }
715
716         /*
717          * Finish up the msg fields
718          */
719         msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
720         ++pmp->msg_seq;
721
722         msg->any.head.hdr_crc = 0;
723         msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
724
725         TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
726         hammer2_clusterctl_wakeup(pmp);
727         lockmgr(&pmp->msglk, LK_RELEASE);
728 }
729
730 /*
731  * Reply to a message and terminate our side of the transaction.
732  *
733  * If msg->state is non-NULL we are replying to a one-way message.
734  */
735 void
736 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
737 {
738         hammer2_state_t *state = msg->state;
739         hammer2_msg_t *nmsg;
740         uint32_t cmd;
741
742         /*
743          * Reply with a simple error code and terminate the transaction.
744          */
745         cmd = DMSG_LNK_ERROR;
746
747         /*
748          * Check if our direction has even been initiated yet, set CREATE.
749          *
750          * Check what direction this is (command or reply direction).  Note
751          * that txcmd might not have been initiated yet.
752          *
753          * If our direction has already been closed we just return without
754          * doing anything.
755          */
756         if (state) {
757                 if (state->txcmd & DMSGF_DELETE)
758                         return;
759                 if ((state->txcmd & DMSGF_CREATE) == 0)
760                         cmd |= DMSGF_CREATE;
761                 if (state->txcmd & DMSGF_REPLY)
762                         cmd |= DMSGF_REPLY;
763                 cmd |= DMSGF_DELETE;
764         } else {
765                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
766                         cmd |= DMSGF_REPLY;
767         }
768         kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
769
770         /* XXX messy mask cmd to avoid allocating state */
771         nmsg = hammer2_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
772                                  NULL, NULL);
773         nmsg->any.head.cmd = cmd;
774         nmsg->any.head.error = error;
775         nmsg->state = state;
776         hammer2_msg_write(nmsg);
777 }
778
779 /*
780  * Reply to a message and continue our side of the transaction.
781  *
782  * If msg->state is non-NULL we are replying to a one-way message and this
783  * function degenerates into the same as hammer2_msg_reply().
784  */
785 void
786 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
787 {
788         hammer2_state_t *state = msg->state;
789         hammer2_msg_t *nmsg;
790         uint32_t cmd;
791
792         /*
793          * Return a simple result code, do NOT terminate the transaction.
794          */
795         cmd = DMSG_LNK_ERROR;
796
797         /*
798          * Check if our direction has even been initiated yet, set CREATE.
799          *
800          * Check what direction this is (command or reply direction).  Note
801          * that txcmd might not have been initiated yet.
802          *
803          * If our direction has already been closed we just return without
804          * doing anything.
805          */
806         if (state) {
807                 if (state->txcmd & DMSGF_DELETE)
808                         return;
809                 if ((state->txcmd & DMSGF_CREATE) == 0)
810                         cmd |= DMSGF_CREATE;
811                 if (state->txcmd & DMSGF_REPLY)
812                         cmd |= DMSGF_REPLY;
813                 /* continuing transaction, do not set MSGF_DELETE */
814         } else {
815                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
816                         cmd |= DMSGF_REPLY;
817         }
818
819         /* XXX messy mask cmd to avoid allocating state */
820         nmsg = hammer2_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
821                                  NULL, NULL);
822         nmsg->any.head.cmd = cmd;
823         nmsg->any.head.error = error;
824         nmsg->state = state;
825         hammer2_msg_write(nmsg);
826 }