hammer2 - Flesh out span code, API cleanups
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
1 /*-
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer2.h"
36
37 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
39 /*
40  * Process state tracking for a message after reception, prior to
41  * execution.
42  *
43  * Called with msglk held and the msg dequeued.
44  *
45  * All messages are called with dummy state and return actual state.
46  * (One-off messages often just return the same dummy state).
47  *
48  * May request that caller discard the message by setting *discardp to 1.
49  * The returned state is not used in this case and is allowed to be NULL.
50  *
51  * --
52  *
53  * These routines handle persistent and command/reply message state via the
54  * CREATE and DELETE flags.  The first message in a command or reply sequence
55  * sets CREATE, the last message in a command or reply sequence sets DELETE.
56  *
57  * There can be any number of intermediate messages belonging to the same
58  * sequence sent inbetween the CREATE message and the DELETE message,
59  * which set neither flag.  This represents a streaming command or reply.
60  *
61  * Any command message received with CREATE set expects a reply sequence to
62  * be returned.  Reply sequences work the same as command sequences except the
63  * REPLY bit is also sent.  Both the command side and reply side can
64  * degenerate into a single message with both CREATE and DELETE set.  Note
65  * that one side can be streaming and the other side not, or neither, or both.
66  *
67  * The msgid is unique for the initiator.  That is, two sides sending a new
68  * message can use the same msgid without colliding.
69  *
70  * --
71  *
72  * ABORT sequences work by setting the ABORT flag along with normal message
73  * state.  However, ABORTs can also be sent on half-closed messages, that is
74  * even if the command or reply side has already sent a DELETE, as long as
75  * the message has not been fully closed it can still send an ABORT+DELETE
76  * to terminate the half-closed message state.
77  *
78  * Since ABORT+DELETEs can race we silently discard ABORT's for message
79  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
80  * also race, and in this situation the other side might have already
81  * initiated a new unrelated command with the same message id.  Since
82  * the abort has not set the CREATE flag the situation can be detected
83  * and the message will also be discarded.
84  *
85  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86  * The ABORT request is essentially integrated into the command instead
87  * of being sent later on.  In this situation the command implementation
88  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89  * special-case non-blocking operation for the command.
90  *
91  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
92  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
93  *        one-way messages are not supported.
94  *
95  * NOTE!  If a command sequence does not support aborts the ABORT flag is
96  *        simply ignored.
97  *
98  * --
99  *
100  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101  * set.  One-off messages cannot be aborted and typically aren't processed
102  * by these routines.  The REPLY bit can be used to distinguish whether a
103  * one-off message is a command or reply.  For example, one-off replies
104  * will typically just contain status updates.
105  */
106 int
107 hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
108 {
109         hammer2_state_t *state;
110         int error;
111
112         /*
113          * Make sure a state structure is ready to go in case we need a new
114          * one.  This is the only routine which uses freerd_state so no
115          * races are possible.
116          */
117         if ((state = pmp->freerd_state) == NULL) {
118                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
119                 state->pmp = pmp;
120                 state->flags = HAMMER2_STATE_DYNAMIC;
121                 pmp->freerd_state = state;
122         }
123
124         /*
125          * Lock RB tree and locate existing persistent state, if any.
126          *
127          * If received msg is a command state is on staterd_tree.
128          * If received msg is a reply state is on statewr_tree.
129          */
130         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
131
132         state->msgid = msg->any.head.msgid;
133         state->spanid = msg->any.head.spanid;
134         kprintf("received msg %08x msgid %jx spanid=%jx\n",
135                 msg->any.head.cmd,
136                 (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
137         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
138                 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
139         else
140                 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
141         msg->state = state;
142
143         /*
144          * Short-cut one-off or mid-stream messages (state may be NULL).
145          */
146         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
147                                   HAMMER2_MSGF_ABORT)) == 0) {
148                 lockmgr(&pmp->msglk, LK_RELEASE);
149                 return(0);
150         }
151
152         /*
153          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
154          * inside the case statements.
155          */
156         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
157                                     HAMMER2_MSGF_REPLY)) {
158         case HAMMER2_MSGF_CREATE:
159         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
160                 /*
161                  * New persistant command received.
162                  */
163                 if (state) {
164                         kprintf("hammer2_state_msgrx: duplicate transaction\n");
165                         error = EINVAL;
166                         break;
167                 }
168                 state = pmp->freerd_state;
169                 pmp->freerd_state = NULL;
170                 msg->state = state;
171                 state->msg = msg;
172                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
173                 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
174                 state->flags |= HAMMER2_STATE_INSERTED;
175                 error = 0;
176                 break;
177         case HAMMER2_MSGF_DELETE:
178                 /*
179                  * Persistent state is expected but might not exist if an
180                  * ABORT+DELETE races the close.
181                  */
182                 if (state == NULL) {
183                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
184                                 error = EALREADY;
185                         } else {
186                                 kprintf("hammer2_state_msgrx: no state "
187                                         "for DELETE\n");
188                                 error = EINVAL;
189                         }
190                         break;
191                 }
192
193                 /*
194                  * Handle another ABORT+DELETE case if the msgid has already
195                  * been reused.
196                  */
197                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
198                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
199                                 error = EALREADY;
200                         } else {
201                                 kprintf("hammer2_state_msgrx: state reused "
202                                         "for DELETE\n");
203                                 error = EINVAL;
204                         }
205                         break;
206                 }
207                 error = 0;
208                 break;
209         default:
210                 /*
211                  * Check for mid-stream ABORT command received, otherwise
212                  * allow.
213                  */
214                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
215                         if (state == NULL ||
216                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
217                                 error = EALREADY;
218                                 break;
219                         }
220                 }
221                 error = 0;
222                 break;
223         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
224         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
225                 /*
226                  * When receiving a reply with CREATE set the original
227                  * persistent state message should already exist.
228                  */
229                 if (state == NULL) {
230                         kprintf("hammer2_state_msgrx: no state match for "
231                                 "REPLY cmd=%08x\n", msg->any.head.cmd);
232                         error = EINVAL;
233                         break;
234                 }
235                 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
236                 error = 0;
237                 break;
238         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
239                 /*
240                  * Received REPLY+ABORT+DELETE in case where msgid has
241                  * already been fully closed, ignore the message.
242                  */
243                 if (state == NULL) {
244                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
245                                 error = EALREADY;
246                         } else {
247                                 kprintf("hammer2_state_msgrx: no state match "
248                                         "for REPLY|DELETE\n");
249                                 error = EINVAL;
250                         }
251                         break;
252                 }
253
254                 /*
255                  * Received REPLY+ABORT+DELETE in case where msgid has
256                  * already been reused for an unrelated message,
257                  * ignore the message.
258                  */
259                 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
260                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
261                                 error = EALREADY;
262                         } else {
263                                 kprintf("hammer2_state_msgrx: state reused "
264                                         "for REPLY|DELETE\n");
265                                 error = EINVAL;
266                         }
267                         break;
268                 }
269                 error = 0;
270                 break;
271         case HAMMER2_MSGF_REPLY:
272                 /*
273                  * Check for mid-stream ABORT reply received to sent command.
274                  */
275                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
276                         if (state == NULL ||
277                             (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
278                                 error = EALREADY;
279                                 break;
280                         }
281                 }
282                 error = 0;
283                 break;
284         }
285         lockmgr(&pmp->msglk, LK_RELEASE);
286         return (error);
287 }
288
289 void
290 hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
291 {
292         hammer2_state_t *state;
293
294         if ((state = msg->state) == NULL) {
295                 hammer2_msg_free(pmp, msg);
296         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
297                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
298                 state->rxcmd |= HAMMER2_MSGF_DELETE;
299                 if (state->txcmd & HAMMER2_MSGF_DELETE) {
300                         if (state->msg == msg)
301                                 state->msg = NULL;
302                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
303                         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
304                                 RB_REMOVE(hammer2_state_tree,
305                                           &pmp->statewr_tree, state);
306                         } else {
307                                 RB_REMOVE(hammer2_state_tree,
308                                           &pmp->staterd_tree, state);
309                         }
310                         state->flags &= ~HAMMER2_STATE_INSERTED;
311                         lockmgr(&pmp->msglk, LK_RELEASE);
312                         hammer2_state_free(state);
313                 } else {
314                         lockmgr(&pmp->msglk, LK_RELEASE);
315                 }
316                 hammer2_msg_free(pmp, msg);
317         } else if (state->msg != msg) {
318                 hammer2_msg_free(pmp, msg);
319         }
320 }
321
322 /*
323  * Process state tracking for a message prior to transmission.
324  *
325  * Called with msglk held and the msg dequeued.
326  *
327  * One-off messages are usually with dummy state and msg->state may be NULL
328  * in this situation.
329  *
330  * New transactions (when CREATE is set) will insert the state.
331  *
332  * May request that caller discard the message by setting *discardp to 1.
333  * A NULL state may be returned in this case.
334  */
335 int
336 hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
337 {
338         hammer2_state_t *state;
339         int error;
340
341         /*
342          * Make sure a state structure is ready to go in case we need a new
343          * one.  This is the only routine which uses freewr_state so no
344          * races are possible.
345          */
346         if ((state = pmp->freewr_state) == NULL) {
347                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
348                 state->pmp = pmp;
349                 state->flags = HAMMER2_STATE_DYNAMIC;
350                 pmp->freewr_state = state;
351         }
352
353         /*
354          * Lock RB tree.  If persistent state is present it will have already
355          * been assigned to msg.
356          */
357         lockmgr(&pmp->msglk, LK_EXCLUSIVE);
358         state = msg->state;
359
360         /*
361          * Short-cut one-off or mid-stream messages (state may be NULL).
362          */
363         if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
364                                   HAMMER2_MSGF_ABORT)) == 0) {
365                 lockmgr(&pmp->msglk, LK_RELEASE);
366                 return(0);
367         }
368
369
370         /*
371          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
372          * inside the case statements.
373          */
374         switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
375                                     HAMMER2_MSGF_REPLY)) {
376         case HAMMER2_MSGF_CREATE:
377         case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
378                 /*
379                  * Insert the new persistent message state and mark
380                  * half-closed if DELETE is set.  Since this is a new
381                  * message it isn't possible to transition into the fully
382                  * closed state here.
383                  *
384                  * XXX state must be assigned and inserted by
385                  *     hammer2_msg_write().  txcmd is assigned by us
386                  *     on-transmit.
387                  */
388                 KKASSERT(state != NULL);
389 #if 0
390                 if (state == NULL) {
391                         state = pmp->freerd_state;
392                         pmp->freerd_state = NULL;
393                         msg->state = state;
394                         state->msg = msg;
395                         state->msgid = msg->any.head.msgid;
396                         state->spanid = msg->any.head.spanid;
397                 }
398                 KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
399                 if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
400                         kprintf("hammer2_state_msgtx: duplicate transaction\n");
401                         error = EINVAL;
402                         break;
403                 }
404                 state->flags |= HAMMER2_STATE_INSERTED;
405 #endif
406                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
407                 error = 0;
408                 break;
409         case HAMMER2_MSGF_DELETE:
410                 /*
411                  * Sent ABORT+DELETE in case where msgid has already
412                  * been fully closed, ignore the message.
413                  */
414                 if (state == NULL) {
415                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
416                                 error = EALREADY;
417                         } else {
418                                 kprintf("hammer2_state_msgtx: no state match "
419                                         "for DELETE\n");
420                                 error = EINVAL;
421                         }
422                         break;
423                 }
424
425                 /*
426                  * Sent ABORT+DELETE in case where msgid has
427                  * already been reused for an unrelated message,
428                  * ignore the message.
429                  */
430                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
431                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
432                                 error = EALREADY;
433                         } else {
434                                 kprintf("hammer2_state_msgtx: state reused "
435                                         "for DELETE\n");
436                                 error = EINVAL;
437                         }
438                         break;
439                 }
440                 error = 0;
441                 break;
442         default:
443                 /*
444                  * Check for mid-stream ABORT command sent
445                  */
446                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
447                         if (state == NULL ||
448                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
449                                 error = EALREADY;
450                                 break;
451                         }
452                 }
453                 error = 0;
454                 break;
455         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
456         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
457                 /*
458                  * When transmitting a reply with CREATE set the original
459                  * persistent state message should already exist.
460                  */
461                 if (state == NULL) {
462                         kprintf("hammer2_state_msgtx: no state match "
463                                 "for REPLY | CREATE\n");
464                         error = EINVAL;
465                         break;
466                 }
467                 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
468                 error = 0;
469                 break;
470         case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
471                 /*
472                  * When transmitting a reply with DELETE set the original
473                  * persistent state message should already exist.
474                  *
475                  * This is very similar to the REPLY|CREATE|* case except
476                  * txcmd is already stored, so we just add the DELETE flag.
477                  *
478                  * Sent REPLY+ABORT+DELETE in case where msgid has
479                  * already been fully closed, ignore the message.
480                  */
481                 if (state == NULL) {
482                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
483                                 error = EALREADY;
484                         } else {
485                                 kprintf("hammer2_state_msgtx: no state match "
486                                         "for REPLY | DELETE\n");
487                                 error = EINVAL;
488                         }
489                         break;
490                 }
491
492                 /*
493                  * Sent REPLY+ABORT+DELETE in case where msgid has already
494                  * been reused for an unrelated message, ignore the message.
495                  */
496                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
497                         if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
498                                 error = EALREADY;
499                         } else {
500                                 kprintf("hammer2_state_msgtx: state reused "
501                                         "for REPLY | DELETE\n");
502                                 error = EINVAL;
503                         }
504                         break;
505                 }
506                 error = 0;
507                 break;
508         case HAMMER2_MSGF_REPLY:
509                 /*
510                  * Check for mid-stream ABORT reply sent.
511                  *
512                  * One-off REPLY messages are allowed for e.g. status updates.
513                  */
514                 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
515                         if (state == NULL ||
516                             (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
517                                 error = EALREADY;
518                                 break;
519                         }
520                 }
521                 error = 0;
522                 break;
523         }
524         lockmgr(&pmp->msglk, LK_RELEASE);
525         return (error);
526 }
527
528 void
529 hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
530 {
531         hammer2_state_t *state;
532
533         if ((state = msg->state) == NULL) {
534                 hammer2_msg_free(pmp, msg);
535         } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
536                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
537                 state->txcmd |= HAMMER2_MSGF_DELETE;
538                 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
539                         if (state->msg == msg)
540                                 state->msg = NULL;
541                         KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
542                         if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
543                                 RB_REMOVE(hammer2_state_tree,
544                                           &pmp->staterd_tree, state);
545                         } else {
546                                 RB_REMOVE(hammer2_state_tree,
547                                           &pmp->statewr_tree, state);
548                         }
549                         state->flags &= ~HAMMER2_STATE_INSERTED;
550                         lockmgr(&pmp->msglk, LK_RELEASE);
551                         hammer2_state_free(state);
552                 } else {
553                         lockmgr(&pmp->msglk, LK_RELEASE);
554                 }
555                 hammer2_msg_free(pmp, msg);
556         } else if (state->msg != msg) {
557                 hammer2_msg_free(pmp, msg);
558         }
559 }
560
561 void
562 hammer2_state_free(hammer2_state_t *state)
563 {
564         hammer2_pfsmount_t *pmp = state->pmp;
565         hammer2_msg_t *msg;
566
567         msg = state->msg;
568         state->msg = NULL;
569         kfree(state, pmp->mmsg);
570         if (msg)
571                 hammer2_msg_free(pmp, msg);
572 }
573
574 hammer2_msg_t *
575 hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
576 {
577         hammer2_msg_t *msg;
578         size_t hbytes;
579
580         hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
581         msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
582                       pmp->mmsg, M_WAITOK | M_ZERO);
583         msg->hdr_size = hbytes;
584         msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
585         msg->any.head.spanid = spanid;
586         msg->any.head.cmd = cmd;
587
588         return (msg);
589 }
590
591 void
592 hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
593 {
594         if (msg->aux_data && msg->aux_size) {
595                 kfree(msg->aux_data, pmp->mmsg);
596                 msg->aux_data = NULL;
597                 msg->aux_size = 0;
598         }
599         kfree(msg, pmp->mmsg);
600 }
601
602 /*
603  * Indexed messages are stored in a red-black tree indexed by their
604  * msgid.  Only persistent messages are indexed.
605  */
606 int
607 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
608 {
609         if (state1->spanid < state2->spanid)
610                 return(-1);
611         if (state1->spanid > state2->spanid)
612                 return(1);
613         if (state1->msgid < state2->msgid)
614                 return(-1);
615         if (state1->msgid > state2->msgid)
616                 return(1);
617         return(0);
618 }
619
620 /*
621  * Write a message.  All requisit command flags have been set.
622  *
623  * If msg->state is non-NULL the message is written to the existing
624  * transaction.  Both msgid and spanid will be set accordingly.
625  *
626  * If msg->state is NULL and CREATE is set new state is allocated and
627  * (func, data) is installed.
628  *
629  * If msg->state is NULL and CREATE is not set the message is assumed
630  * to be a one-way message.  The msgid and spanid must be set by the
631  * caller (msgid is typically set to 0 for this case).
632  *
633  * This function merely queues the message to the management thread, it
634  * does not write to the message socket/pipe.
635  */
636 void
637 hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
638                   int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
639 {
640         hammer2_state_t *state;
641
642         if (msg->state) {
643                 /*
644                  * Continuance or termination of existing transaction.
645                  * The transaction could have been initiated by either end.
646                  *
647                  * (Function callback and aux data for the receive side can
648                  * be replaced or left alone).
649                  */
650                 msg->any.head.msgid = msg->state->msgid;
651                 msg->any.head.spanid = msg->state->spanid;
652                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
653                 if (func) {
654                         msg->state->func = func;
655                         msg->state->any.any = data;
656                 }
657         } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
658                 /*
659                  * New transaction, requires tracking state and a unique
660                  * msgid to be allocated.
661                  */
662                 KKASSERT(msg->state == NULL);
663                 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
664                 state->pmp = pmp;
665                 state->flags = HAMMER2_STATE_DYNAMIC;
666                 state->func = func;
667                 state->any.any = data;
668                 state->msg = msg;
669                 state->msgid = (uint64_t)(uintptr_t)state;
670                 state->spanid = msg->any.head.spanid;
671                 msg->state = state;
672
673                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
674                 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
675                         panic("duplicate msgid allocated");
676                 msg->any.head.msgid = state->msgid;
677         } else {
678                 /*
679                  * One-off message (always uses msgid 0 to distinguish
680                  * between a possibly lost in-transaction message due to
681                  * competing aborts and a real one-off message?)
682                  */
683                 msg->any.head.msgid = 0;
684                 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
685         }
686
687         /*
688          * Finish up the msg fields
689          */
690         msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
691         ++pmp->msg_seq;
692
693         msg->any.head.hdr_crc = 0;
694         msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
695
696         TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
697         lockmgr(&pmp->msglk, LK_RELEASE);
698 }
699
700 /*
701  * Reply to a message and terminate our side of the transaction.
702  *
703  * If msg->state is non-NULL we are replying to a one-way message.
704  */
705 void
706 hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
707 {
708         hammer2_state_t *state = msg->state;
709         uint32_t cmd;
710
711         /*
712          * Reply with a simple error code and terminate the transaction.
713          */
714         cmd = HAMMER2_LNK_ERROR;
715
716         /*
717          * Check if our direction has even been initiated yet, set CREATE.
718          *
719          * Check what direction this is (command or reply direction).  Note
720          * that txcmd might not have been initiated yet.
721          *
722          * If our direction has already been closed we just return without
723          * doing anything.
724          */
725         if (state) {
726                 if (state->txcmd & HAMMER2_MSGF_DELETE)
727                         return;
728                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
729                         cmd |= HAMMER2_MSGF_CREATE;
730                 if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
731                         cmd |= HAMMER2_MSGF_REPLY;
732                 cmd |= HAMMER2_MSGF_DELETE;
733         } else {
734                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
735                         cmd |= HAMMER2_MSGF_REPLY;
736         }
737
738         msg = hammer2_msg_alloc(pmp, 0, cmd);
739         msg->any.head.error = error;
740         hammer2_msg_write(pmp, msg, NULL, NULL);
741 }
742
743 /*
744  * Reply to a message and continue our side of the transaction.
745  *
746  * If msg->state is non-NULL we are replying to a one-way message and this
747  * function degenerates into the same as hammer2_msg_reply().
748  */
749 void
750 hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
751 {
752         hammer2_state_t *state = msg->state;
753         uint32_t cmd;
754
755         /*
756          * Return a simple result code, do NOT terminate the transaction.
757          */
758         cmd = HAMMER2_LNK_ERROR;
759
760         /*
761          * Check if our direction has even been initiated yet, set CREATE.
762          *
763          * Check what direction this is (command or reply direction).  Note
764          * that txcmd might not have been initiated yet.
765          *
766          * If our direction has already been closed we just return without
767          * doing anything.
768          */
769         if (state) {
770                 if (state->txcmd & HAMMER2_MSGF_DELETE)
771                         return;
772                 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
773                         cmd |= HAMMER2_MSGF_CREATE;
774                 if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
775                         cmd |= HAMMER2_MSGF_REPLY;
776                 /* continuing transaction, do not set MSGF_DELETE */
777         } else {
778                 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
779                         cmd |= HAMMER2_MSGF_REPLY;
780         }
781
782         msg = hammer2_msg_alloc(pmp, 0, cmd);
783         msg->any.head.error = error;
784         hammer2_msg_write(pmp, msg, NULL, NULL);
785 }