Merge branches 'hammer2' and 'master' of ssh://crater.dragonflybsd.org/repository...
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
1 /*-
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer2.h"
36
37 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
39 /*
40  * Process state tracking for a message after reception, prior to
41  * execution.
42  *
43  * Called with msglk held and the msg dequeued.
44  *
45  * All messages are called with dummy state and return actual state.
46  * (One-off messages often just return the same dummy state).
47  *
48  * May request that caller discard the message by setting *discardp to 1.
49  * The returned state is not used in this case and is allowed to be NULL.
50  *
51  * --
52  *
53  * These routines handle persistent and command/reply message state via the
54  * CREATE and DELETE flags.  The first message in a command or reply sequence
55  * sets CREATE, the last message in a command or reply sequence sets DELETE.
56  *
57  * There can be any number of intermediate messages belonging to the same
58  * sequence sent inbetween the CREATE message and the DELETE message,
59  * which set neither flag.  This represents a streaming command or reply.
60  *
61  * Any command message received with CREATE set expects a reply sequence to
62  * be returned.  Reply sequences work the same as command sequences except the
63  * REPLY bit is also sent.  Both the command side and reply side can
64  * degenerate into a single message with both CREATE and DELETE set.  Note
65  * that one side can be streaming and the other side not, or neither, or both.
66  *
67  * The msgid is unique for the initiator.  That is, two sides sending a new
68  * message can use the same msgid without colliding.
69  *
70  * --
71  *
72  * ABORT sequences work by setting the ABORT flag along with normal message
73  * state.  However, ABORTs can also be sent on half-closed messages, that is
74  * even if the command or reply side has already sent a DELETE, as long as
75  * the message has not been fully closed it can still send an ABORT+DELETE
76  * to terminate the half-closed message state.
77  *
78  * Since ABORT+DELETEs can race we silently discard ABORT's for message
79  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
80  * also race, and in this situation the other side might have already
81  * initiated a new unrelated command with the same message id.  Since
82  * the abort has not set the CREATE flag the situation can be detected
83  * and the message will also be discarded.
84  *
85  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86  * The ABORT request is essentially integrated into the command instead
87  * of being sent later on.  In this situation the command implementation
88  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89  * special-case non-blocking operation for the command.
90  *
91  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
92  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
93  *        one-way messages are not supported.
94  *
95  * NOTE!  If a command sequence does not support aborts the ABORT flag is
96  *        simply ignored.
97  *
98  * --
99  *
100  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101  * set.  One-off messages cannot be aborted and typically aren't processed
102  * by these routines.  The REPLY bit can be used to distinguish whether a
103  * one-off message is a command or reply.  For example, one-off replies
104  * will typically just contain status updates.
105  */
106 int
107 hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
108 {
109         hammer2_state_t *state;
110         int error;
111
112         /*
113          * Make sure a state structure is ready to go in case we need a new
114          * one.  This is the only routine which uses freerd_state so no
115          * races are possible.
116          */
117         if ((state = pmp->freerd_state) == NULL) {
118                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
119                 state->pmp = pmp;
120                 state->flags = HAMMER2_STATE_DYNAMIC;
121                 pmp->freerd_state = state;
122         }
123
124         /*
125          * Lock RB tree and locate existing persistent state, if any.
126          *
127          * If received msg is a command state is on staterd_tree.
128          * If received msg is a reply state is on statewr_tree.
129          */
130         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
131
132         state->msgid = msg->any.head.msgid;
133         state->spanid = msg->any.head.spanid;
134         kprintf("received msg %08x msgid %jx spanid=%jx\n",
135                 msg->any.head.cmd,
136                 (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
137         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
138                 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
139         else
140                 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
141         msg->state = state;
142
143         /*
144          * Short-cut one-off or mid-stream messages (state may be NULL).
145          */
146         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
147                                   HAMMER2_MSGF_ABORT)) == 0) {
148                 lockmgr(&pmp->msglk, LK_RELEASE);
149                 return(0);
150         }
151
152         /*
153          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
154          * inside the case statements.
155          */
156         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
157                                     HAMMER2_MSGF_REPLY)) {
158         case HAMMER2_MSGF_CREATE:
159         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
160                 /*
161                  * New persistant command received.
162                  */
163                 if (state) {
164                         kprintf("hammer2_state_msgrx: duplicate transaction\n");
165                         error = EINVAL;
166                         break;
167                 }
168                 state = pmp->freerd_state;
169                 pmp->freerd_state = NULL;
170                 msg->state = state;
171                 state->msg = msg;
172                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
173                 state->txcmd = HAMMER2_MSGF_REPLY;
174                 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
175                 state->flags |= HAMMER2_STATE_INSERTED;
176                 error = 0;
177                 break;
178         case HAMMER2_MSGF_DELETE:
179                 /*
180                  * Persistent state is expected but might not exist if an
181                  * ABORT+DELETE races the close.
182                  */
183                 if (state == NULL) {
184                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
185                                 error = EALREADY;
186                         } else {
187                                 kprintf("hammer2_state_msgrx: no state "
188                                         "for DELETE\n");
189                                 error = EINVAL;
190                         }
191                         break;
192                 }
193
194                 /*
195                  * Handle another ABORT+DELETE case if the msgid has already
196                  * been reused.
197                  */
198                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
199                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
200                                 error = EALREADY;
201                         } else {
202                                 kprintf("hammer2_state_msgrx: state reused "
203                                         "for DELETE\n");
204                                 error = EINVAL;
205                         }
206                         break;
207                 }
208                 error = 0;
209                 break;
210         default:
211                 /*
212                  * Check for mid-stream ABORT command received, otherwise
213                  * allow.
214                  */
215                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
216                         if (state == NULL ||
217                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
218                                 error = EALREADY;
219                                 break;
220                         }
221                 }
222                 error = 0;
223                 break;
224         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
225         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
226                 /*
227                  * When receiving a reply with CREATE set the original
228                  * persistent state message should already exist.
229                  */
230                 if (state == NULL) {
231                         kprintf("hammer2_state_msgrx: no state match for "
232                                 "REPLY cmd=%08x msgid=%016jx\n",
233                                 msg->any.head.cmd,
234                                 (intmax_t)msg->any.head.msgid);
235                         error = EINVAL;
236                         break;
237                 }
238                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
239                 error = 0;
240                 break;
241         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
242                 /*
243                  * Received REPLY+ABORT+DELETE in case where msgid has
244                  * already been fully closed, ignore the message.
245                  */
246                 if (state == NULL) {
247                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
248                                 error = EALREADY;
249                         } else {
250                                 kprintf("hammer2_state_msgrx: no state match "
251                                         "for REPLY|DELETE\n");
252                                 error = EINVAL;
253                         }
254                         break;
255                 }
256
257                 /*
258                  * Received REPLY+ABORT+DELETE in case where msgid has
259                  * already been reused for an unrelated message,
260                  * ignore the message.
261                  */
262                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
263                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
264                                 error = EALREADY;
265                         } else {
266                                 kprintf("hammer2_state_msgrx: state reused "
267                                         "for REPLY|DELETE\n");
268                                 error = EINVAL;
269                         }
270                         break;
271                 }
272                 error = 0;
273                 break;
274         case HAMMER2_MSGF_REPLY:
275                 /*
276                  * Check for mid-stream ABORT reply received to sent command.
277                  */
278                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
279                         if (state == NULL ||
280                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
281                                 error = EALREADY;
282                                 break;
283                         }
284                 }
285                 error = 0;
286                 break;
287         }
288         lockmgr(&pmp->msglk, LK_RELEASE);
289         return (error);
290 }
291
292 void
293 hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
294 {
295         hammer2_state_t *state;
296
297         if ((state = msg->state) == NULL) {
298                 hammer2_msg_free(pmp, msg);
299         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
300                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
301                 state->rxcmd |= HAMMER2_MSGF_DELETE;
302                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
303                         if (state->msg == msg)
304                                 state->msg = NULL;
305                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
306                         if (state->rxcmd & HAMMER2_MSGF_REPLY) {
307                                 KKASSERT(msg->any.head.cmd &
308                                          HAMMER2_MSGF_REPLY);
309                                 RB_REMOVE(hammer2_state_tree,
310                                           &pmp->statewr_tree, state);
311                         } else {
312                                 KKASSERT((msg->any.head.cmd &
313                                           HAMMER2_MSGF_REPLY) == 0);
314                                 RB_REMOVE(hammer2_state_tree,
315                                           &pmp->staterd_tree, state);
316                         }
317                         state->flags &= ~HAMMER2_STATE_INSERTED;
318                         lockmgr(&pmp->msglk, LK_RELEASE);
319                         hammer2_state_free(state);
320                 } else {
321                         lockmgr(&pmp->msglk, LK_RELEASE);
322                 }
323                 hammer2_msg_free(pmp, msg);
324         } else if (state->msg != msg) {
325                 hammer2_msg_free(pmp, msg);
326         }
327 }
328
329 /*
330  * Process state tracking for a message prior to transmission.
331  *
332  * Called with msglk held and the msg dequeued.
333  *
334  * One-off messages are usually with dummy state and msg->state may be NULL
335  * in this situation.
336  *
337  * New transactions (when CREATE is set) will insert the state.
338  *
339  * May request that caller discard the message by setting *discardp to 1.
340  * A NULL state may be returned in this case.
341  */
342 int
343 hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
344 {
345         hammer2_state_t *state;
346         int error;
347
348         /*
349          * Make sure a state structure is ready to go in case we need a new
350          * one.  This is the only routine which uses freewr_state so no
351          * races are possible.
352          */
353         if ((state = pmp->freewr_state) == NULL) {
354                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
355                 state->pmp = pmp;
356                 state->flags = HAMMER2_STATE_DYNAMIC;
357                 pmp->freewr_state = state;
358         }
359
360         /*
361          * Lock RB tree.  If persistent state is present it will have already
362          * been assigned to msg.
363          */
364         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
365         state = msg->state;
366
367         /*
368          * Short-cut one-off or mid-stream messages (state may be NULL).
369          */
370         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
371                                   HAMMER2_MSGF_ABORT)) == 0) {
372                 lockmgr(&pmp->msglk, LK_RELEASE);
373                 return(0);
374         }
375
376
377         /*
378          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
379          * inside the case statements.
380          */
381         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
382                                     HAMMER2_MSGF_REPLY)) {
383         case HAMMER2_MSGF_CREATE:
384         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
385                 /*
386                  * Insert the new persistent message state and mark
387                  * half-closed if DELETE is set.  Since this is a new
388                  * message it isn't possible to transition into the fully
389                  * closed state here.
390                  *
391                  * XXX state must be assigned and inserted by
392                  *     hammer2_msg_write().  txcmd is assigned by us
393                  *     on-transmit.
394                  */
395                 KKASSERT(state != NULL);
396                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
397                 state->rxcmd = HAMMER2_MSGF_REPLY;
398                 error = 0;
399                 break;
400         case HAMMER2_MSGF_DELETE:
401                 /*
402                  * Sent ABORT+DELETE in case where msgid has already
403                  * been fully closed, ignore the message.
404                  */
405                 if (state == NULL) {
406                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
407                                 error = EALREADY;
408                         } else {
409                                 kprintf("hammer2_state_msgtx: no state match "
410                                         "for DELETE cmd=%08x msgid=%016jx\n",
411                                         msg->any.head.cmd,
412                                         (intmax_t)msg->any.head.msgid);
413                                 error = EINVAL;
414                         }
415                         break;
416                 }
417
418                 /*
419                  * Sent ABORT+DELETE in case where msgid has
420                  * already been reused for an unrelated message,
421                  * ignore the message.
422                  */
423                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
424                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
425                                 error = EALREADY;
426                         } else {
427                                 kprintf("hammer2_state_msgtx: state reused "
428                                         "for DELETE\n");
429                                 error = EINVAL;
430                         }
431                         break;
432                 }
433                 error = 0;
434                 break;
435         default:
436                 /*
437                  * Check for mid-stream ABORT command sent
438                  */
439                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
440                         if (state == NULL ||
441                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
442                                 error = EALREADY;
443                                 break;
444                         }
445                 }
446                 error = 0;
447                 break;
448         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
449         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
450                 /*
451                  * When transmitting a reply with CREATE set the original
452                  * persistent state message should already exist.
453                  */
454                 if (state == NULL) {
455                         kprintf("hammer2_state_msgtx: no state match "
456                                 "for REPLY | CREATE\n");
457                         error = EINVAL;
458                         break;
459                 }
460                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
461                 error = 0;
462                 break;
463         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
464                 /*
465                  * When transmitting a reply with DELETE set the original
466                  * persistent state message should already exist.
467                  *
468                  * This is very similar to the REPLY|CREATE|* case except
469                  * txcmd is already stored, so we just add the DELETE flag.
470                  *
471                  * Sent REPLY+ABORT+DELETE in case where msgid has
472                  * already been fully closed, ignore the message.
473                  */
474                 if (state == NULL) {
475                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
476                                 error = EALREADY;
477                         } else {
478                                 kprintf("hammer2_state_msgtx: no state match "
479                                         "for REPLY | DELETE\n");
480                                 error = EINVAL;
481                         }
482                         break;
483                 }
484
485                 /*
486                  * Sent REPLY+ABORT+DELETE in case where msgid has already
487                  * been reused for an unrelated message, ignore the message.
488                  */
489                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
490                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
491                                 error = EALREADY;
492                         } else {
493                                 kprintf("hammer2_state_msgtx: state reused "
494                                         "for REPLY | DELETE\n");
495                                 error = EINVAL;
496                         }
497                         break;
498                 }
499                 error = 0;
500                 break;
501         case HAMMER2_MSGF_REPLY:
502                 /*
503                  * Check for mid-stream ABORT reply sent.
504                  *
505                  * One-off REPLY messages are allowed for e.g. status updates.
506                  */
507                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
508                         if (state == NULL ||
509                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
510                                 error = EALREADY;
511                                 break;
512                         }
513                 }
514                 error = 0;
515                 break;
516         }
517         lockmgr(&pmp->msglk, LK_RELEASE);
518         return (error);
519 }
520
521 void
522 hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
523 {
524         hammer2_state_t *state;
525
526         if ((state = msg->state) == NULL) {
527                 hammer2_msg_free(pmp, msg);
528         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
529                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
530                 state->txcmd |= HAMMER2_MSGF_DELETE;
531                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
532                         if (state->msg == msg)
533                                 state->msg = NULL;
534                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
535                         if (state->txcmd & HAMMER2_MSGF_REPLY) {
536                                 KKASSERT(msg->any.head.cmd &
537                                          HAMMER2_MSGF_REPLY);
538                                 RB_REMOVE(hammer2_state_tree,
539                                           &pmp->staterd_tree, state);
540                         } else {
541                                 KKASSERT((msg->any.head.cmd &
542                                           HAMMER2_MSGF_REPLY) == 0);
543                                 RB_REMOVE(hammer2_state_tree,
544                                           &pmp->statewr_tree, state);
545                         }
546                         state->flags &= ~HAMMER2_STATE_INSERTED;
547                         lockmgr(&pmp->msglk, LK_RELEASE);
548                         hammer2_state_free(state);
549                 } else {
550                         lockmgr(&pmp->msglk, LK_RELEASE);
551                 }
552                 hammer2_msg_free(pmp, msg);
553         } else if (state->msg != msg) {
554                 hammer2_msg_free(pmp, msg);
555         }
556 }
557
558 void
559 hammer2_state_free(hammer2_state_t *state)
560 {
561         hammer2_pfsmount_t *pmp = state->pmp;
562         hammer2_msg_t *msg;
563
564         msg = state->msg;
565         state->msg = NULL;
566         kfree(state, pmp->mmsg);
567         if (msg)
568                 hammer2_msg_free(pmp, msg);
569 }
570
571 hammer2_msg_t *
572 hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
573 {
574         hammer2_msg_t *msg;
575         size_t hbytes;
576
577         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
578         msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
579                       pmp->mmsg, M_WAITOK | M_ZERO);
580         msg->hdr_size = hbytes;
581         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
582         msg->any.head.spanid = spanid;
583         msg->any.head.cmd = cmd;
584
585         return (msg);
586 }
587
588 void
589 hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
590 {
591         if (msg->aux_data && msg->aux_size) {
592                 kfree(msg->aux_data, pmp->mmsg);
593                 msg->aux_data = NULL;
594                 msg->aux_size = 0;
595         }
596         kfree(msg, pmp->mmsg);
597 }
598
599 /*
600  * Indexed messages are stored in a red-black tree indexed by their
601  * msgid.  Only persistent messages are indexed.
602  */
603 int
604 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
605 {
606         if (state1->spanid < state2->spanid)
607                 return(-1);
608         if (state1->spanid > state2->spanid)
609                 return(1);
610         if (state1->msgid < state2->msgid)
611                 return(-1);
612         if (state1->msgid > state2->msgid)
613                 return(1);
614         return(0);
615 }
616
617 /*
618  * Write a message.  All requisit command flags have been set.
619  *
620  * If msg->state is non-NULL the message is written to the existing
621  * transaction.  Both msgid and spanid will be set accordingly.
622  *
623  * If msg->state is NULL and CREATE is set new state is allocated and
624  * (func, data) is installed.
625  *
626  * If msg->state is NULL and CREATE is not set the message is assumed
627  * to be a one-way message.  The msgid and spanid must be set by the
628  * caller (msgid is typically set to 0 for this case).
629  *
630  * This function merely queues the message to the management thread, it
631  * does not write to the message socket/pipe.
632  */
633 void
634 hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
635                   int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
636 {
637         hammer2_state_t *state;
638
639         if (msg->state) {
640                 /*
641                  * Continuance or termination of existing transaction.
642                  * The transaction could have been initiated by either end.
643                  *
644                  * (Function callback and aux data for the receive side can
645                  * be replaced or left alone).
646                  */
647                 msg->any.head.msgid = msg->state->msgid;
648                 msg->any.head.spanid = msg->state->spanid;
649                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
650                 if (func) {
651                         msg->state->func = func;
652                         msg->state->any.any = data;
653                 }
654         } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
655                 /*
656                  * New transaction, requires tracking state and a unique
657                  * msgid to be allocated.
658                  */
659                 KKASSERT(msg->state == NULL);
660                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
661                 state->pmp = pmp;
662                 state->flags = HAMMER2_STATE_DYNAMIC;
663                 state->func = func;
664                 state->any.any = data;
665                 state->msg = msg;
666                 state->msgid = (uint64_t)(uintptr_t)state;
667                 state->spanid = msg->any.head.spanid;
668                 msg->state = state;
669
670                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
671                 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
672                         panic("duplicate msgid allocated");
673                 msg->any.head.msgid = state->msgid;
674         } else {
675                 /*
676                  * One-off message (always uses msgid 0 to distinguish
677                  * between a possibly lost in-transaction message due to
678                  * competing aborts and a real one-off message?)
679                  */
680                 msg->any.head.msgid = 0;
681                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
682         }
683
684         /*
685          * Finish up the msg fields
686          */
687         msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
688         ++pmp->msg_seq;
689
690         msg->any.head.hdr_crc = 0;
691         msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
692
693         TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
694         lockmgr(&pmp->msglk, LK_RELEASE);
695 }
696
697 /*
698  * Reply to a message and terminate our side of the transaction.
699  *
700  * If msg->state is non-NULL we are replying to a one-way message.
701  */
702 void
703 hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
704 {
705         hammer2_state_t *state = msg->state;
706         uint32_t cmd;
707
708         /*
709          * Reply with a simple error code and terminate the transaction.
710          */
711         cmd = HAMMER2_LNK_ERROR;
712
713         /*
714          * Check if our direction has even been initiated yet, set CREATE.
715          *
716          * Check what direction this is (command or reply direction).  Note
717          * that txcmd might not have been initiated yet.
718          *
719          * If our direction has already been closed we just return without
720          * doing anything.
721          */
722         if (state) {
723                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
724                         hammer2_msg_free(pmp, msg);
725                         return;
726                 }
727                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
728                         cmd |= HAMMER2_MSGF_CREATE;
729                 if (state->txcmd & HAMMER2_MSGF_REPLY)
730                         cmd |= HAMMER2_MSGF_REPLY;
731                 cmd |= HAMMER2_MSGF_DELETE;
732         } else {
733                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
734                         cmd |= HAMMER2_MSGF_REPLY;
735         }
736
737         msg = hammer2_msg_alloc(pmp, 0, cmd);
738         msg->any.head.error = error;
739         msg->state = state;
740         hammer2_msg_write(pmp, msg, NULL, NULL);
741 }
742
743 /*
744  * Reply to a message and continue our side of the transaction.
745  *
746  * If msg->state is non-NULL we are replying to a one-way message and this
747  * function degenerates into the same as hammer2_msg_reply().
748  */
749 void
750 hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
751 {
752         hammer2_state_t *state = msg->state;
753         uint32_t cmd;
754
755         /*
756          * Return a simple result code, do NOT terminate the transaction.
757          */
758         cmd = HAMMER2_LNK_ERROR;
759
760         /*
761          * Check if our direction has even been initiated yet, set CREATE.
762          *
763          * Check what direction this is (command or reply direction).  Note
764          * that txcmd might not have been initiated yet.
765          *
766          * If our direction has already been closed we just return without
767          * doing anything.
768          */
769         if (state) {
770                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
771                         hammer2_msg_free(pmp, msg);
772                         return;
773                 }
774                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
775                         cmd |= HAMMER2_MSGF_CREATE;
776                 if (state->txcmd & HAMMER2_MSGF_REPLY)
777                         cmd |= HAMMER2_MSGF_REPLY;
778                 /* continuing transaction, do not set MSGF_DELETE */
779         } else {
780                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
781                         cmd |= HAMMER2_MSGF_REPLY;
782         }
783
784         msg = hammer2_msg_alloc(pmp, 0, cmd);
785         msg->any.head.error = error;
786         msg->state = state;
787         hammer2_msg_write(pmp, msg, NULL, NULL);
788 }