hammer2 - Correct state recording bugs in vfs
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
CommitLineData
26bf1a36
MD
1/*-
2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34
35#include "hammer2.h"
36
9b8b748f
MD
37RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
26bf1a36
MD
39/*
40 * Process state tracking for a message after reception, prior to
41 * execution.
42 *
43 * Called with msglk held and the msg dequeued.
44 *
45 * All messages are called with dummy state and return actual state.
46 * (One-off messages often just return the same dummy state).
47 *
48 * May request that caller discard the message by setting *discardp to 1.
49 * The returned state is not used in this case and is allowed to be NULL.
50 *
51 * --
52 *
53 * These routines handle persistent and command/reply message state via the
54 * CREATE and DELETE flags. The first message in a command or reply sequence
55 * sets CREATE, the last message in a command or reply sequence sets DELETE.
56 *
57 * There can be any number of intermediate messages belonging to the same
58 * sequence sent inbetween the CREATE message and the DELETE message,
59 * which set neither flag. This represents a streaming command or reply.
60 *
61 * Any command message received with CREATE set expects a reply sequence to
62 * be returned. Reply sequences work the same as command sequences except the
63 * REPLY bit is also sent. Both the command side and reply side can
64 * degenerate into a single message with both CREATE and DELETE set. Note
65 * that one side can be streaming and the other side not, or neither, or both.
66 *
67 * The msgid is unique for the initiator. That is, two sides sending a new
68 * message can use the same msgid without colliding.
69 *
70 * --
71 *
72 * ABORT sequences work by setting the ABORT flag along with normal message
73 * state. However, ABORTs can also be sent on half-closed messages, that is
74 * even if the command or reply side has already sent a DELETE, as long as
75 * the message has not been fully closed it can still send an ABORT+DELETE
76 * to terminate the half-closed message state.
77 *
78 * Since ABORT+DELETEs can race we silently discard ABORT's for message
79 * state which has already been fully closed. REPLY+ABORT+DELETEs can
80 * also race, and in this situation the other side might have already
81 * initiated a new unrelated command with the same message id. Since
82 * the abort has not set the CREATE flag the situation can be detected
83 * and the message will also be discarded.
84 *
85 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86 * The ABORT request is essentially integrated into the command instead
87 * of being sent later on. In this situation the command implementation
88 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89 * special-case non-blocking operation for the command.
90 *
91 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
92 * to be mid-stream aborts for command/reply sequences. ABORTs on
93 * one-way messages are not supported.
94 *
95 * NOTE! If a command sequence does not support aborts the ABORT flag is
96 * simply ignored.
97 *
98 * --
99 *
100 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101 * set. One-off messages cannot be aborted and typically aren't processed
102 * by these routines. The REPLY bit can be used to distinguish whether a
103 * one-off message is a command or reply. For example, one-off replies
104 * will typically just contain status updates.
105 */
106int
107hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
108{
109 hammer2_state_t *state;
110 int error;
111
112 /*
113 * Make sure a state structure is ready to go in case we need a new
114 * one. This is the only routine which uses freerd_state so no
115 * races are possible.
116 */
117 if ((state = pmp->freerd_state) == NULL) {
118 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
119 state->pmp = pmp;
120 state->flags = HAMMER2_STATE_DYNAMIC;
121 pmp->freerd_state = state;
122 }
123
124 /*
125 * Lock RB tree and locate existing persistent state, if any.
126 *
127 * If received msg is a command state is on staterd_tree.
128 * If received msg is a reply state is on statewr_tree.
129 */
130 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
131
132 state->msgid = msg->any.head.msgid;
8c280d5d
MD
133 state->spanid = msg->any.head.spanid;
134 kprintf("received msg %08x msgid %jx spanid=%jx\n",
135 msg->any.head.cmd,
136 (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
26bf1a36
MD
137 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
138 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
139 else
140 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
141 msg->state = state;
142
143 /*
144 * Short-cut one-off or mid-stream messages (state may be NULL).
145 */
146 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
147 HAMMER2_MSGF_ABORT)) == 0) {
148 lockmgr(&pmp->msglk, LK_RELEASE);
149 return(0);
150 }
151
152 /*
153 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
154 * inside the case statements.
155 */
156 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
157 HAMMER2_MSGF_REPLY)) {
158 case HAMMER2_MSGF_CREATE:
159 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
160 /*
161 * New persistant command received.
162 */
163 if (state) {
164 kprintf("hammer2_state_msgrx: duplicate transaction\n");
165 error = EINVAL;
166 break;
167 }
168 state = pmp->freerd_state;
169 pmp->freerd_state = NULL;
170 msg->state = state;
171 state->msg = msg;
172 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
62e59746 173 state->txcmd = HAMMER2_MSGF_REPLY;
26bf1a36
MD
174 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
175 state->flags |= HAMMER2_STATE_INSERTED;
176 error = 0;
177 break;
178 case HAMMER2_MSGF_DELETE:
179 /*
180 * Persistent state is expected but might not exist if an
181 * ABORT+DELETE races the close.
182 */
183 if (state == NULL) {
184 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
185 error = EALREADY;
186 } else {
187 kprintf("hammer2_state_msgrx: no state "
188 "for DELETE\n");
189 error = EINVAL;
190 }
191 break;
192 }
193
194 /*
195 * Handle another ABORT+DELETE case if the msgid has already
196 * been reused.
197 */
198 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
199 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
200 error = EALREADY;
201 } else {
202 kprintf("hammer2_state_msgrx: state reused "
203 "for DELETE\n");
204 error = EINVAL;
205 }
206 break;
207 }
208 error = 0;
209 break;
210 default:
211 /*
212 * Check for mid-stream ABORT command received, otherwise
213 * allow.
214 */
215 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
216 if (state == NULL ||
217 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
218 error = EALREADY;
219 break;
220 }
221 }
222 error = 0;
223 break;
224 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
225 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
226 /*
227 * When receiving a reply with CREATE set the original
228 * persistent state message should already exist.
229 */
230 if (state == NULL) {
231 kprintf("hammer2_state_msgrx: no state match for "
62e59746
MD
232 "REPLY cmd=%08x msgid=%016jx\n",
233 msg->any.head.cmd,
234 (intmax_t)msg->any.head.msgid);
26bf1a36
MD
235 error = EINVAL;
236 break;
237 }
238 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
239 error = 0;
240 break;
241 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
242 /*
243 * Received REPLY+ABORT+DELETE in case where msgid has
244 * already been fully closed, ignore the message.
245 */
246 if (state == NULL) {
247 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
248 error = EALREADY;
249 } else {
250 kprintf("hammer2_state_msgrx: no state match "
251 "for REPLY|DELETE\n");
252 error = EINVAL;
253 }
254 break;
255 }
256
257 /*
258 * Received REPLY+ABORT+DELETE in case where msgid has
259 * already been reused for an unrelated message,
260 * ignore the message.
261 */
262 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
263 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
264 error = EALREADY;
265 } else {
266 kprintf("hammer2_state_msgrx: state reused "
267 "for REPLY|DELETE\n");
268 error = EINVAL;
269 }
270 break;
271 }
272 error = 0;
273 break;
274 case HAMMER2_MSGF_REPLY:
275 /*
276 * Check for mid-stream ABORT reply received to sent command.
277 */
278 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
279 if (state == NULL ||
280 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
281 error = EALREADY;
282 break;
283 }
284 }
285 error = 0;
286 break;
287 }
288 lockmgr(&pmp->msglk, LK_RELEASE);
289 return (error);
290}
291
292void
293hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
294{
295 hammer2_state_t *state;
296
297 if ((state = msg->state) == NULL) {
298 hammer2_msg_free(pmp, msg);
299 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
300 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
301 state->rxcmd |= HAMMER2_MSGF_DELETE;
302 if (state->txcmd & HAMMER2_MSGF_DELETE) {
303 if (state->msg == msg)
304 state->msg = NULL;
305 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
62e59746
MD
306 if (state->rxcmd & HAMMER2_MSGF_REPLY) {
307 KKASSERT(msg->any.head.cmd &
308 HAMMER2_MSGF_REPLY);
26bf1a36
MD
309 RB_REMOVE(hammer2_state_tree,
310 &pmp->statewr_tree, state);
311 } else {
62e59746
MD
312 KKASSERT((msg->any.head.cmd &
313 HAMMER2_MSGF_REPLY) == 0);
26bf1a36
MD
314 RB_REMOVE(hammer2_state_tree,
315 &pmp->staterd_tree, state);
316 }
317 state->flags &= ~HAMMER2_STATE_INSERTED;
318 lockmgr(&pmp->msglk, LK_RELEASE);
319 hammer2_state_free(state);
320 } else {
321 lockmgr(&pmp->msglk, LK_RELEASE);
322 }
323 hammer2_msg_free(pmp, msg);
324 } else if (state->msg != msg) {
325 hammer2_msg_free(pmp, msg);
326 }
327}
328
329/*
330 * Process state tracking for a message prior to transmission.
331 *
332 * Called with msglk held and the msg dequeued.
333 *
334 * One-off messages are usually with dummy state and msg->state may be NULL
335 * in this situation.
336 *
337 * New transactions (when CREATE is set) will insert the state.
338 *
339 * May request that caller discard the message by setting *discardp to 1.
340 * A NULL state may be returned in this case.
341 */
342int
343hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
344{
345 hammer2_state_t *state;
346 int error;
347
348 /*
349 * Make sure a state structure is ready to go in case we need a new
350 * one. This is the only routine which uses freewr_state so no
351 * races are possible.
352 */
353 if ((state = pmp->freewr_state) == NULL) {
354 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
355 state->pmp = pmp;
356 state->flags = HAMMER2_STATE_DYNAMIC;
357 pmp->freewr_state = state;
358 }
359
360 /*
361 * Lock RB tree. If persistent state is present it will have already
362 * been assigned to msg.
363 */
364 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
365 state = msg->state;
366
367 /*
368 * Short-cut one-off or mid-stream messages (state may be NULL).
369 */
370 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
371 HAMMER2_MSGF_ABORT)) == 0) {
372 lockmgr(&pmp->msglk, LK_RELEASE);
373 return(0);
374 }
375
376
377 /*
378 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
379 * inside the case statements.
380 */
381 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
382 HAMMER2_MSGF_REPLY)) {
383 case HAMMER2_MSGF_CREATE:
384 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
385 /*
386 * Insert the new persistent message state and mark
387 * half-closed if DELETE is set. Since this is a new
388 * message it isn't possible to transition into the fully
389 * closed state here.
9b8b748f
MD
390 *
391 * XXX state must be assigned and inserted by
392 * hammer2_msg_write(). txcmd is assigned by us
393 * on-transmit.
26bf1a36 394 */
9b8b748f 395 KKASSERT(state != NULL);
26bf1a36 396 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
62e59746 397 state->rxcmd = HAMMER2_MSGF_REPLY;
26bf1a36
MD
398 error = 0;
399 break;
400 case HAMMER2_MSGF_DELETE:
401 /*
402 * Sent ABORT+DELETE in case where msgid has already
403 * been fully closed, ignore the message.
404 */
405 if (state == NULL) {
406 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
407 error = EALREADY;
408 } else {
409 kprintf("hammer2_state_msgtx: no state match "
62e59746
MD
410 "for DELETE cmd=%08x msgid=%016jx\n",
411 msg->any.head.cmd,
412 (intmax_t)msg->any.head.msgid);
26bf1a36
MD
413 error = EINVAL;
414 }
415 break;
416 }
417
418 /*
419 * Sent ABORT+DELETE in case where msgid has
420 * already been reused for an unrelated message,
421 * ignore the message.
422 */
423 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
424 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
425 error = EALREADY;
426 } else {
427 kprintf("hammer2_state_msgtx: state reused "
428 "for DELETE\n");
429 error = EINVAL;
430 }
431 break;
432 }
433 error = 0;
434 break;
435 default:
436 /*
437 * Check for mid-stream ABORT command sent
438 */
439 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
440 if (state == NULL ||
441 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
442 error = EALREADY;
443 break;
444 }
445 }
446 error = 0;
447 break;
448 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
449 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
450 /*
451 * When transmitting a reply with CREATE set the original
452 * persistent state message should already exist.
453 */
454 if (state == NULL) {
455 kprintf("hammer2_state_msgtx: no state match "
456 "for REPLY | CREATE\n");
457 error = EINVAL;
458 break;
459 }
460 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
461 error = 0;
462 break;
463 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
464 /*
465 * When transmitting a reply with DELETE set the original
466 * persistent state message should already exist.
467 *
468 * This is very similar to the REPLY|CREATE|* case except
469 * txcmd is already stored, so we just add the DELETE flag.
470 *
471 * Sent REPLY+ABORT+DELETE in case where msgid has
472 * already been fully closed, ignore the message.
473 */
474 if (state == NULL) {
475 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
476 error = EALREADY;
477 } else {
478 kprintf("hammer2_state_msgtx: no state match "
479 "for REPLY | DELETE\n");
480 error = EINVAL;
481 }
482 break;
483 }
484
485 /*
486 * Sent REPLY+ABORT+DELETE in case where msgid has already
487 * been reused for an unrelated message, ignore the message.
488 */
489 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
490 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
491 error = EALREADY;
492 } else {
493 kprintf("hammer2_state_msgtx: state reused "
494 "for REPLY | DELETE\n");
495 error = EINVAL;
496 }
497 break;
498 }
499 error = 0;
500 break;
501 case HAMMER2_MSGF_REPLY:
502 /*
503 * Check for mid-stream ABORT reply sent.
504 *
505 * One-off REPLY messages are allowed for e.g. status updates.
506 */
507 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
508 if (state == NULL ||
509 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
510 error = EALREADY;
511 break;
512 }
513 }
514 error = 0;
515 break;
516 }
517 lockmgr(&pmp->msglk, LK_RELEASE);
518 return (error);
519}
520
521void
522hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
523{
524 hammer2_state_t *state;
525
526 if ((state = msg->state) == NULL) {
527 hammer2_msg_free(pmp, msg);
528 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
529 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
530 state->txcmd |= HAMMER2_MSGF_DELETE;
531 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
532 if (state->msg == msg)
533 state->msg = NULL;
534 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
62e59746
MD
535 if (state->txcmd & HAMMER2_MSGF_REPLY) {
536 KKASSERT(msg->any.head.cmd &
537 HAMMER2_MSGF_REPLY);
26bf1a36
MD
538 RB_REMOVE(hammer2_state_tree,
539 &pmp->staterd_tree, state);
540 } else {
62e59746
MD
541 KKASSERT((msg->any.head.cmd &
542 HAMMER2_MSGF_REPLY) == 0);
26bf1a36
MD
543 RB_REMOVE(hammer2_state_tree,
544 &pmp->statewr_tree, state);
545 }
546 state->flags &= ~HAMMER2_STATE_INSERTED;
547 lockmgr(&pmp->msglk, LK_RELEASE);
548 hammer2_state_free(state);
549 } else {
550 lockmgr(&pmp->msglk, LK_RELEASE);
551 }
552 hammer2_msg_free(pmp, msg);
553 } else if (state->msg != msg) {
554 hammer2_msg_free(pmp, msg);
555 }
556}
557
558void
559hammer2_state_free(hammer2_state_t *state)
560{
561 hammer2_pfsmount_t *pmp = state->pmp;
562 hammer2_msg_t *msg;
563
564 msg = state->msg;
565 state->msg = NULL;
566 kfree(state, pmp->mmsg);
567 if (msg)
568 hammer2_msg_free(pmp, msg);
569}
570
9b8b748f 571hammer2_msg_t *
8c280d5d 572hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
9b8b748f
MD
573{
574 hammer2_msg_t *msg;
575 size_t hbytes;
576
577 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
578 msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
579 pmp->mmsg, M_WAITOK | M_ZERO);
580 msg->hdr_size = hbytes;
581 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
8c280d5d 582 msg->any.head.spanid = spanid;
9b8b748f
MD
583 msg->any.head.cmd = cmd;
584
585 return (msg);
586}
587
26bf1a36
MD
588void
589hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
590{
591 if (msg->aux_data && msg->aux_size) {
592 kfree(msg->aux_data, pmp->mmsg);
593 msg->aux_data = NULL;
594 msg->aux_size = 0;
595 }
596 kfree(msg, pmp->mmsg);
597}
598
599/*
600 * Indexed messages are stored in a red-black tree indexed by their
601 * msgid. Only persistent messages are indexed.
602 */
603int
604hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
605{
8c280d5d 606 if (state1->spanid < state2->spanid)
9b8b748f 607 return(-1);
8c280d5d 608 if (state1->spanid > state2->spanid)
9b8b748f 609 return(1);
26bf1a36
MD
610 if (state1->msgid < state2->msgid)
611 return(-1);
612 if (state1->msgid > state2->msgid)
613 return(1);
614 return(0);
615}
616
617/*
8c280d5d
MD
618 * Write a message. All requisit command flags have been set.
619 *
620 * If msg->state is non-NULL the message is written to the existing
621 * transaction. Both msgid and spanid will be set accordingly.
622 *
623 * If msg->state is NULL and CREATE is set new state is allocated and
624 * (func, data) is installed.
26bf1a36 625 *
8c280d5d
MD
626 * If msg->state is NULL and CREATE is not set the message is assumed
627 * to be a one-way message. The msgid and spanid must be set by the
628 * caller (msgid is typically set to 0 for this case).
629 *
630 * This function merely queues the message to the management thread, it
631 * does not write to the message socket/pipe.
26bf1a36 632 */
8c280d5d 633void
9b8b748f 634hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
8c280d5d 635 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
26bf1a36 636{
9b8b748f 637 hammer2_state_t *state;
9b8b748f 638
8c280d5d
MD
639 if (msg->state) {
640 /*
641 * Continuance or termination of existing transaction.
642 * The transaction could have been initiated by either end.
643 *
644 * (Function callback and aux data for the receive side can
645 * be replaced or left alone).
646 */
647 msg->any.head.msgid = msg->state->msgid;
648 msg->any.head.spanid = msg->state->spanid;
649 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
650 if (func) {
651 msg->state->func = func;
652 msg->state->any.any = data;
653 }
654 } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
9b8b748f
MD
655 /*
656 * New transaction, requires tracking state and a unique
8c280d5d 657 * msgid to be allocated.
9b8b748f
MD
658 */
659 KKASSERT(msg->state == NULL);
660 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
661 state->pmp = pmp;
662 state->flags = HAMMER2_STATE_DYNAMIC;
663 state->func = func;
8c280d5d 664 state->any.any = data;
9b8b748f 665 state->msg = msg;
8c280d5d
MD
666 state->msgid = (uint64_t)(uintptr_t)state;
667 state->spanid = msg->any.head.spanid;
9b8b748f
MD
668 msg->state = state;
669
670 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
8c280d5d
MD
671 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
672 panic("duplicate msgid allocated");
9b8b748f 673 msg->any.head.msgid = state->msgid;
9b8b748f
MD
674 } else {
675 /*
8c280d5d
MD
676 * One-off message (always uses msgid 0 to distinguish
677 * between a possibly lost in-transaction message due to
678 * competing aborts and a real one-off message?)
9b8b748f
MD
679 */
680 msg->any.head.msgid = 0;
681 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
682 }
683
684 /*
8c280d5d 685 * Finish up the msg fields
9b8b748f 686 */
8c280d5d
MD
687 msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
688 ++pmp->msg_seq;
689
690 msg->any.head.hdr_crc = 0;
691 msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
9b8b748f
MD
692
693 TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
694 lockmgr(&pmp->msglk, LK_RELEASE);
8c280d5d
MD
695}
696
697/*
698 * Reply to a message and terminate our side of the transaction.
699 *
700 * If msg->state is non-NULL we are replying to a one-way message.
701 */
702void
703hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
704{
705 hammer2_state_t *state = msg->state;
706 uint32_t cmd;
707
708 /*
709 * Reply with a simple error code and terminate the transaction.
710 */
711 cmd = HAMMER2_LNK_ERROR;
712
713 /*
714 * Check if our direction has even been initiated yet, set CREATE.
715 *
716 * Check what direction this is (command or reply direction). Note
717 * that txcmd might not have been initiated yet.
718 *
719 * If our direction has already been closed we just return without
720 * doing anything.
721 */
722 if (state) {
62e59746
MD
723 if (state->txcmd & HAMMER2_MSGF_DELETE) {
724 hammer2_msg_free(pmp, msg);
8c280d5d 725 return;
62e59746 726 }
8c280d5d
MD
727 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
728 cmd |= HAMMER2_MSGF_CREATE;
62e59746 729 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
730 cmd |= HAMMER2_MSGF_REPLY;
731 cmd |= HAMMER2_MSGF_DELETE;
732 } else {
733 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
734 cmd |= HAMMER2_MSGF_REPLY;
735 }
736
737 msg = hammer2_msg_alloc(pmp, 0, cmd);
738 msg->any.head.error = error;
62e59746 739 msg->state = state;
8c280d5d
MD
740 hammer2_msg_write(pmp, msg, NULL, NULL);
741}
742
743/*
744 * Reply to a message and continue our side of the transaction.
745 *
746 * If msg->state is non-NULL we are replying to a one-way message and this
747 * function degenerates into the same as hammer2_msg_reply().
748 */
749void
750hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
751{
752 hammer2_state_t *state = msg->state;
753 uint32_t cmd;
754
755 /*
756 * Return a simple result code, do NOT terminate the transaction.
757 */
758 cmd = HAMMER2_LNK_ERROR;
759
760 /*
761 * Check if our direction has even been initiated yet, set CREATE.
762 *
763 * Check what direction this is (command or reply direction). Note
764 * that txcmd might not have been initiated yet.
765 *
766 * If our direction has already been closed we just return without
767 * doing anything.
768 */
769 if (state) {
62e59746
MD
770 if (state->txcmd & HAMMER2_MSGF_DELETE) {
771 hammer2_msg_free(pmp, msg);
8c280d5d 772 return;
62e59746 773 }
8c280d5d
MD
774 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
775 cmd |= HAMMER2_MSGF_CREATE;
62e59746 776 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
777 cmd |= HAMMER2_MSGF_REPLY;
778 /* continuing transaction, do not set MSGF_DELETE */
779 } else {
780 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
781 cmd |= HAMMER2_MSGF_REPLY;
782 }
9b8b748f 783
8c280d5d
MD
784 msg = hammer2_msg_alloc(pmp, 0, cmd);
785 msg->any.head.error = error;
62e59746 786 msg->state = state;
8c280d5d 787 hammer2_msg_write(pmp, msg, NULL, NULL);
26bf1a36 788}