hammer2 - Flesh out span code, API cleanups
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
CommitLineData
26bf1a36
MD
1/*-
2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34
35#include "hammer2.h"
36
9b8b748f
MD
37RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
26bf1a36
MD
39/*
40 * Process state tracking for a message after reception, prior to
41 * execution.
42 *
43 * Called with msglk held and the msg dequeued.
44 *
45 * All messages are called with dummy state and return actual state.
46 * (One-off messages often just return the same dummy state).
47 *
48 * May request that caller discard the message by setting *discardp to 1.
49 * The returned state is not used in this case and is allowed to be NULL.
50 *
51 * --
52 *
53 * These routines handle persistent and command/reply message state via the
54 * CREATE and DELETE flags. The first message in a command or reply sequence
55 * sets CREATE, the last message in a command or reply sequence sets DELETE.
56 *
57 * There can be any number of intermediate messages belonging to the same
58 * sequence sent inbetween the CREATE message and the DELETE message,
59 * which set neither flag. This represents a streaming command or reply.
60 *
61 * Any command message received with CREATE set expects a reply sequence to
62 * be returned. Reply sequences work the same as command sequences except the
63 * REPLY bit is also sent. Both the command side and reply side can
64 * degenerate into a single message with both CREATE and DELETE set. Note
65 * that one side can be streaming and the other side not, or neither, or both.
66 *
67 * The msgid is unique for the initiator. That is, two sides sending a new
68 * message can use the same msgid without colliding.
69 *
70 * --
71 *
72 * ABORT sequences work by setting the ABORT flag along with normal message
73 * state. However, ABORTs can also be sent on half-closed messages, that is
74 * even if the command or reply side has already sent a DELETE, as long as
75 * the message has not been fully closed it can still send an ABORT+DELETE
76 * to terminate the half-closed message state.
77 *
78 * Since ABORT+DELETEs can race we silently discard ABORT's for message
79 * state which has already been fully closed. REPLY+ABORT+DELETEs can
80 * also race, and in this situation the other side might have already
81 * initiated a new unrelated command with the same message id. Since
82 * the abort has not set the CREATE flag the situation can be detected
83 * and the message will also be discarded.
84 *
85 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86 * The ABORT request is essentially integrated into the command instead
87 * of being sent later on. In this situation the command implementation
88 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89 * special-case non-blocking operation for the command.
90 *
91 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
92 * to be mid-stream aborts for command/reply sequences. ABORTs on
93 * one-way messages are not supported.
94 *
95 * NOTE! If a command sequence does not support aborts the ABORT flag is
96 * simply ignored.
97 *
98 * --
99 *
100 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101 * set. One-off messages cannot be aborted and typically aren't processed
102 * by these routines. The REPLY bit can be used to distinguish whether a
103 * one-off message is a command or reply. For example, one-off replies
104 * will typically just contain status updates.
105 */
106int
107hammer2_state_msgrx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
108{
109 hammer2_state_t *state;
110 int error;
111
112 /*
113 * Make sure a state structure is ready to go in case we need a new
114 * one. This is the only routine which uses freerd_state so no
115 * races are possible.
116 */
117 if ((state = pmp->freerd_state) == NULL) {
118 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
119 state->pmp = pmp;
120 state->flags = HAMMER2_STATE_DYNAMIC;
121 pmp->freerd_state = state;
122 }
123
124 /*
125 * Lock RB tree and locate existing persistent state, if any.
126 *
127 * If received msg is a command state is on staterd_tree.
128 * If received msg is a reply state is on statewr_tree.
129 */
130 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
131
132 state->msgid = msg->any.head.msgid;
8c280d5d
MD
133 state->spanid = msg->any.head.spanid;
134 kprintf("received msg %08x msgid %jx spanid=%jx\n",
135 msg->any.head.cmd,
136 (intmax_t)msg->any.head.msgid, (intmax_t)msg->any.head.spanid);
26bf1a36
MD
137 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
138 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
139 else
140 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
141 msg->state = state;
142
143 /*
144 * Short-cut one-off or mid-stream messages (state may be NULL).
145 */
146 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
147 HAMMER2_MSGF_ABORT)) == 0) {
148 lockmgr(&pmp->msglk, LK_RELEASE);
149 return(0);
150 }
151
152 /*
153 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
154 * inside the case statements.
155 */
156 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
157 HAMMER2_MSGF_REPLY)) {
158 case HAMMER2_MSGF_CREATE:
159 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
160 /*
161 * New persistant command received.
162 */
163 if (state) {
164 kprintf("hammer2_state_msgrx: duplicate transaction\n");
165 error = EINVAL;
166 break;
167 }
168 state = pmp->freerd_state;
169 pmp->freerd_state = NULL;
170 msg->state = state;
171 state->msg = msg;
172 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
173 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
174 state->flags |= HAMMER2_STATE_INSERTED;
175 error = 0;
176 break;
177 case HAMMER2_MSGF_DELETE:
178 /*
179 * Persistent state is expected but might not exist if an
180 * ABORT+DELETE races the close.
181 */
182 if (state == NULL) {
183 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
184 error = EALREADY;
185 } else {
186 kprintf("hammer2_state_msgrx: no state "
187 "for DELETE\n");
188 error = EINVAL;
189 }
190 break;
191 }
192
193 /*
194 * Handle another ABORT+DELETE case if the msgid has already
195 * been reused.
196 */
197 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
198 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
199 error = EALREADY;
200 } else {
201 kprintf("hammer2_state_msgrx: state reused "
202 "for DELETE\n");
203 error = EINVAL;
204 }
205 break;
206 }
207 error = 0;
208 break;
209 default:
210 /*
211 * Check for mid-stream ABORT command received, otherwise
212 * allow.
213 */
214 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
215 if (state == NULL ||
216 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
217 error = EALREADY;
218 break;
219 }
220 }
221 error = 0;
222 break;
223 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
224 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
225 /*
226 * When receiving a reply with CREATE set the original
227 * persistent state message should already exist.
228 */
229 if (state == NULL) {
230 kprintf("hammer2_state_msgrx: no state match for "
9b8b748f 231 "REPLY cmd=%08x\n", msg->any.head.cmd);
26bf1a36
MD
232 error = EINVAL;
233 break;
234 }
235 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
236 error = 0;
237 break;
238 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
239 /*
240 * Received REPLY+ABORT+DELETE in case where msgid has
241 * already been fully closed, ignore the message.
242 */
243 if (state == NULL) {
244 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
245 error = EALREADY;
246 } else {
247 kprintf("hammer2_state_msgrx: no state match "
248 "for REPLY|DELETE\n");
249 error = EINVAL;
250 }
251 break;
252 }
253
254 /*
255 * Received REPLY+ABORT+DELETE in case where msgid has
256 * already been reused for an unrelated message,
257 * ignore the message.
258 */
259 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
260 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
261 error = EALREADY;
262 } else {
263 kprintf("hammer2_state_msgrx: state reused "
264 "for REPLY|DELETE\n");
265 error = EINVAL;
266 }
267 break;
268 }
269 error = 0;
270 break;
271 case HAMMER2_MSGF_REPLY:
272 /*
273 * Check for mid-stream ABORT reply received to sent command.
274 */
275 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
276 if (state == NULL ||
277 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
278 error = EALREADY;
279 break;
280 }
281 }
282 error = 0;
283 break;
284 }
285 lockmgr(&pmp->msglk, LK_RELEASE);
286 return (error);
287}
288
289void
290hammer2_state_cleanuprx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
291{
292 hammer2_state_t *state;
293
294 if ((state = msg->state) == NULL) {
295 hammer2_msg_free(pmp, msg);
296 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
297 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
298 state->rxcmd |= HAMMER2_MSGF_DELETE;
299 if (state->txcmd & HAMMER2_MSGF_DELETE) {
300 if (state->msg == msg)
301 state->msg = NULL;
302 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
303 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
304 RB_REMOVE(hammer2_state_tree,
305 &pmp->statewr_tree, state);
306 } else {
307 RB_REMOVE(hammer2_state_tree,
308 &pmp->staterd_tree, state);
309 }
310 state->flags &= ~HAMMER2_STATE_INSERTED;
311 lockmgr(&pmp->msglk, LK_RELEASE);
312 hammer2_state_free(state);
313 } else {
314 lockmgr(&pmp->msglk, LK_RELEASE);
315 }
316 hammer2_msg_free(pmp, msg);
317 } else if (state->msg != msg) {
318 hammer2_msg_free(pmp, msg);
319 }
320}
321
322/*
323 * Process state tracking for a message prior to transmission.
324 *
325 * Called with msglk held and the msg dequeued.
326 *
327 * One-off messages are usually with dummy state and msg->state may be NULL
328 * in this situation.
329 *
330 * New transactions (when CREATE is set) will insert the state.
331 *
332 * May request that caller discard the message by setting *discardp to 1.
333 * A NULL state may be returned in this case.
334 */
335int
336hammer2_state_msgtx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
337{
338 hammer2_state_t *state;
339 int error;
340
341 /*
342 * Make sure a state structure is ready to go in case we need a new
343 * one. This is the only routine which uses freewr_state so no
344 * races are possible.
345 */
346 if ((state = pmp->freewr_state) == NULL) {
347 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
348 state->pmp = pmp;
349 state->flags = HAMMER2_STATE_DYNAMIC;
350 pmp->freewr_state = state;
351 }
352
353 /*
354 * Lock RB tree. If persistent state is present it will have already
355 * been assigned to msg.
356 */
357 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
358 state = msg->state;
359
360 /*
361 * Short-cut one-off or mid-stream messages (state may be NULL).
362 */
363 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
364 HAMMER2_MSGF_ABORT)) == 0) {
365 lockmgr(&pmp->msglk, LK_RELEASE);
366 return(0);
367 }
368
369
370 /*
371 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
372 * inside the case statements.
373 */
374 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
375 HAMMER2_MSGF_REPLY)) {
376 case HAMMER2_MSGF_CREATE:
377 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
378 /*
379 * Insert the new persistent message state and mark
380 * half-closed if DELETE is set. Since this is a new
381 * message it isn't possible to transition into the fully
382 * closed state here.
9b8b748f
MD
383 *
384 * XXX state must be assigned and inserted by
385 * hammer2_msg_write(). txcmd is assigned by us
386 * on-transmit.
26bf1a36 387 */
9b8b748f
MD
388 KKASSERT(state != NULL);
389#if 0
26bf1a36
MD
390 if (state == NULL) {
391 state = pmp->freerd_state;
392 pmp->freerd_state = NULL;
393 msg->state = state;
394 state->msg = msg;
9b8b748f 395 state->msgid = msg->any.head.msgid;
8c280d5d 396 state->spanid = msg->any.head.spanid;
26bf1a36
MD
397 }
398 KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
399 if (RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state)) {
400 kprintf("hammer2_state_msgtx: duplicate transaction\n");
401 error = EINVAL;
402 break;
403 }
404 state->flags |= HAMMER2_STATE_INSERTED;
9b8b748f 405#endif
26bf1a36
MD
406 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
407 error = 0;
408 break;
409 case HAMMER2_MSGF_DELETE:
410 /*
411 * Sent ABORT+DELETE in case where msgid has already
412 * been fully closed, ignore the message.
413 */
414 if (state == NULL) {
415 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
416 error = EALREADY;
417 } else {
418 kprintf("hammer2_state_msgtx: no state match "
419 "for DELETE\n");
420 error = EINVAL;
421 }
422 break;
423 }
424
425 /*
426 * Sent ABORT+DELETE in case where msgid has
427 * already been reused for an unrelated message,
428 * ignore the message.
429 */
430 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
431 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
432 error = EALREADY;
433 } else {
434 kprintf("hammer2_state_msgtx: state reused "
435 "for DELETE\n");
436 error = EINVAL;
437 }
438 break;
439 }
440 error = 0;
441 break;
442 default:
443 /*
444 * Check for mid-stream ABORT command sent
445 */
446 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
447 if (state == NULL ||
448 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
449 error = EALREADY;
450 break;
451 }
452 }
453 error = 0;
454 break;
455 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
456 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
457 /*
458 * When transmitting a reply with CREATE set the original
459 * persistent state message should already exist.
460 */
461 if (state == NULL) {
462 kprintf("hammer2_state_msgtx: no state match "
463 "for REPLY | CREATE\n");
464 error = EINVAL;
465 break;
466 }
467 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
468 error = 0;
469 break;
470 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
471 /*
472 * When transmitting a reply with DELETE set the original
473 * persistent state message should already exist.
474 *
475 * This is very similar to the REPLY|CREATE|* case except
476 * txcmd is already stored, so we just add the DELETE flag.
477 *
478 * Sent REPLY+ABORT+DELETE in case where msgid has
479 * already been fully closed, ignore the message.
480 */
481 if (state == NULL) {
482 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
483 error = EALREADY;
484 } else {
485 kprintf("hammer2_state_msgtx: no state match "
486 "for REPLY | DELETE\n");
487 error = EINVAL;
488 }
489 break;
490 }
491
492 /*
493 * Sent REPLY+ABORT+DELETE in case where msgid has already
494 * been reused for an unrelated message, ignore the message.
495 */
496 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
497 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
498 error = EALREADY;
499 } else {
500 kprintf("hammer2_state_msgtx: state reused "
501 "for REPLY | DELETE\n");
502 error = EINVAL;
503 }
504 break;
505 }
506 error = 0;
507 break;
508 case HAMMER2_MSGF_REPLY:
509 /*
510 * Check for mid-stream ABORT reply sent.
511 *
512 * One-off REPLY messages are allowed for e.g. status updates.
513 */
514 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
515 if (state == NULL ||
516 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
517 error = EALREADY;
518 break;
519 }
520 }
521 error = 0;
522 break;
523 }
524 lockmgr(&pmp->msglk, LK_RELEASE);
525 return (error);
526}
527
528void
529hammer2_state_cleanuptx(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
530{
531 hammer2_state_t *state;
532
533 if ((state = msg->state) == NULL) {
534 hammer2_msg_free(pmp, msg);
535 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
536 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
537 state->txcmd |= HAMMER2_MSGF_DELETE;
538 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
539 if (state->msg == msg)
540 state->msg = NULL;
541 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
542 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY) {
543 RB_REMOVE(hammer2_state_tree,
544 &pmp->staterd_tree, state);
545 } else {
546 RB_REMOVE(hammer2_state_tree,
547 &pmp->statewr_tree, state);
548 }
549 state->flags &= ~HAMMER2_STATE_INSERTED;
550 lockmgr(&pmp->msglk, LK_RELEASE);
551 hammer2_state_free(state);
552 } else {
553 lockmgr(&pmp->msglk, LK_RELEASE);
554 }
555 hammer2_msg_free(pmp, msg);
556 } else if (state->msg != msg) {
557 hammer2_msg_free(pmp, msg);
558 }
559}
560
561void
562hammer2_state_free(hammer2_state_t *state)
563{
564 hammer2_pfsmount_t *pmp = state->pmp;
565 hammer2_msg_t *msg;
566
567 msg = state->msg;
568 state->msg = NULL;
569 kfree(state, pmp->mmsg);
570 if (msg)
571 hammer2_msg_free(pmp, msg);
572}
573
9b8b748f 574hammer2_msg_t *
8c280d5d 575hammer2_msg_alloc(hammer2_pfsmount_t *pmp, uint64_t spanid, uint32_t cmd)
9b8b748f
MD
576{
577 hammer2_msg_t *msg;
578 size_t hbytes;
579
580 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
581 msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
582 pmp->mmsg, M_WAITOK | M_ZERO);
583 msg->hdr_size = hbytes;
584 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
8c280d5d 585 msg->any.head.spanid = spanid;
9b8b748f
MD
586 msg->any.head.cmd = cmd;
587
588 return (msg);
589}
590
26bf1a36
MD
591void
592hammer2_msg_free(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg)
593{
594 if (msg->aux_data && msg->aux_size) {
595 kfree(msg->aux_data, pmp->mmsg);
596 msg->aux_data = NULL;
597 msg->aux_size = 0;
598 }
599 kfree(msg, pmp->mmsg);
600}
601
602/*
603 * Indexed messages are stored in a red-black tree indexed by their
604 * msgid. Only persistent messages are indexed.
605 */
606int
607hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
608{
8c280d5d 609 if (state1->spanid < state2->spanid)
9b8b748f 610 return(-1);
8c280d5d 611 if (state1->spanid > state2->spanid)
9b8b748f 612 return(1);
26bf1a36
MD
613 if (state1->msgid < state2->msgid)
614 return(-1);
615 if (state1->msgid > state2->msgid)
616 return(1);
617 return(0);
618}
619
620/*
8c280d5d
MD
621 * Write a message. All requisit command flags have been set.
622 *
623 * If msg->state is non-NULL the message is written to the existing
624 * transaction. Both msgid and spanid will be set accordingly.
625 *
626 * If msg->state is NULL and CREATE is set new state is allocated and
627 * (func, data) is installed.
26bf1a36 628 *
8c280d5d
MD
629 * If msg->state is NULL and CREATE is not set the message is assumed
630 * to be a one-way message. The msgid and spanid must be set by the
631 * caller (msgid is typically set to 0 for this case).
632 *
633 * This function merely queues the message to the management thread, it
634 * does not write to the message socket/pipe.
26bf1a36 635 */
8c280d5d 636void
9b8b748f 637hammer2_msg_write(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg,
8c280d5d 638 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
26bf1a36 639{
9b8b748f 640 hammer2_state_t *state;
9b8b748f 641
8c280d5d
MD
642 if (msg->state) {
643 /*
644 * Continuance or termination of existing transaction.
645 * The transaction could have been initiated by either end.
646 *
647 * (Function callback and aux data for the receive side can
648 * be replaced or left alone).
649 */
650 msg->any.head.msgid = msg->state->msgid;
651 msg->any.head.spanid = msg->state->spanid;
652 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
653 if (func) {
654 msg->state->func = func;
655 msg->state->any.any = data;
656 }
657 } else if (msg->any.head.cmd & HAMMER2_MSGF_CREATE) {
9b8b748f
MD
658 /*
659 * New transaction, requires tracking state and a unique
8c280d5d 660 * msgid to be allocated.
9b8b748f
MD
661 */
662 KKASSERT(msg->state == NULL);
663 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
664 state->pmp = pmp;
665 state->flags = HAMMER2_STATE_DYNAMIC;
666 state->func = func;
8c280d5d 667 state->any.any = data;
9b8b748f 668 state->msg = msg;
8c280d5d
MD
669 state->msgid = (uint64_t)(uintptr_t)state;
670 state->spanid = msg->any.head.spanid;
9b8b748f
MD
671 msg->state = state;
672
673 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
8c280d5d
MD
674 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
675 panic("duplicate msgid allocated");
9b8b748f 676 msg->any.head.msgid = state->msgid;
9b8b748f
MD
677 } else {
678 /*
8c280d5d
MD
679 * One-off message (always uses msgid 0 to distinguish
680 * between a possibly lost in-transaction message due to
681 * competing aborts and a real one-off message?)
9b8b748f
MD
682 */
683 msg->any.head.msgid = 0;
684 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
685 }
686
687 /*
8c280d5d 688 * Finish up the msg fields
9b8b748f 689 */
8c280d5d
MD
690 msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
691 ++pmp->msg_seq;
692
693 msg->any.head.hdr_crc = 0;
694 msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
9b8b748f
MD
695
696 TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
697 lockmgr(&pmp->msglk, LK_RELEASE);
8c280d5d
MD
698}
699
700/*
701 * Reply to a message and terminate our side of the transaction.
702 *
703 * If msg->state is non-NULL we are replying to a one-way message.
704 */
705void
706hammer2_msg_reply(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
707{
708 hammer2_state_t *state = msg->state;
709 uint32_t cmd;
710
711 /*
712 * Reply with a simple error code and terminate the transaction.
713 */
714 cmd = HAMMER2_LNK_ERROR;
715
716 /*
717 * Check if our direction has even been initiated yet, set CREATE.
718 *
719 * Check what direction this is (command or reply direction). Note
720 * that txcmd might not have been initiated yet.
721 *
722 * If our direction has already been closed we just return without
723 * doing anything.
724 */
725 if (state) {
726 if (state->txcmd & HAMMER2_MSGF_DELETE)
727 return;
728 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
729 cmd |= HAMMER2_MSGF_CREATE;
730 if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
731 cmd |= HAMMER2_MSGF_REPLY;
732 cmd |= HAMMER2_MSGF_DELETE;
733 } else {
734 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
735 cmd |= HAMMER2_MSGF_REPLY;
736 }
737
738 msg = hammer2_msg_alloc(pmp, 0, cmd);
739 msg->any.head.error = error;
740 hammer2_msg_write(pmp, msg, NULL, NULL);
741}
742
743/*
744 * Reply to a message and continue our side of the transaction.
745 *
746 * If msg->state is non-NULL we are replying to a one-way message and this
747 * function degenerates into the same as hammer2_msg_reply().
748 */
749void
750hammer2_msg_result(hammer2_pfsmount_t *pmp, hammer2_msg_t *msg, uint32_t error)
751{
752 hammer2_state_t *state = msg->state;
753 uint32_t cmd;
754
755 /*
756 * Return a simple result code, do NOT terminate the transaction.
757 */
758 cmd = HAMMER2_LNK_ERROR;
759
760 /*
761 * Check if our direction has even been initiated yet, set CREATE.
762 *
763 * Check what direction this is (command or reply direction). Note
764 * that txcmd might not have been initiated yet.
765 *
766 * If our direction has already been closed we just return without
767 * doing anything.
768 */
769 if (state) {
770 if (state->txcmd & HAMMER2_MSGF_DELETE)
771 return;
772 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
773 cmd |= HAMMER2_MSGF_CREATE;
774 if ((state->rxcmd & HAMMER2_MSGF_REPLY) == 0)
775 cmd |= HAMMER2_MSGF_REPLY;
776 /* continuing transaction, do not set MSGF_DELETE */
777 } else {
778 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
779 cmd |= HAMMER2_MSGF_REPLY;
780 }
9b8b748f 781
8c280d5d
MD
782 msg = hammer2_msg_alloc(pmp, 0, cmd);
783 msg->any.head.error = error;
784 hammer2_msg_write(pmp, msg, NULL, NULL);
26bf1a36 785}