hammer2 - Config notifications, cleanup HAMMER2 VFS API
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
CommitLineData
26bf1a36
MD
1/*-
2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34
35#include "hammer2.h"
36
9b8b748f
MD
37RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
26bf1a36
MD
39/*
40 * Process state tracking for a message after reception, prior to
41 * execution.
42 *
43 * Called with msglk held and the msg dequeued.
44 *
45 * All messages are called with dummy state and return actual state.
46 * (One-off messages often just return the same dummy state).
47 *
48 * May request that caller discard the message by setting *discardp to 1.
49 * The returned state is not used in this case and is allowed to be NULL.
50 *
51 * --
52 *
53 * These routines handle persistent and command/reply message state via the
54 * CREATE and DELETE flags. The first message in a command or reply sequence
55 * sets CREATE, the last message in a command or reply sequence sets DELETE.
56 *
57 * There can be any number of intermediate messages belonging to the same
58 * sequence sent inbetween the CREATE message and the DELETE message,
59 * which set neither flag. This represents a streaming command or reply.
60 *
61 * Any command message received with CREATE set expects a reply sequence to
62 * be returned. Reply sequences work the same as command sequences except the
63 * REPLY bit is also sent. Both the command side and reply side can
64 * degenerate into a single message with both CREATE and DELETE set. Note
65 * that one side can be streaming and the other side not, or neither, or both.
66 *
67 * The msgid is unique for the initiator. That is, two sides sending a new
68 * message can use the same msgid without colliding.
69 *
70 * --
71 *
72 * ABORT sequences work by setting the ABORT flag along with normal message
73 * state. However, ABORTs can also be sent on half-closed messages, that is
74 * even if the command or reply side has already sent a DELETE, as long as
75 * the message has not been fully closed it can still send an ABORT+DELETE
76 * to terminate the half-closed message state.
77 *
78 * Since ABORT+DELETEs can race we silently discard ABORT's for message
79 * state which has already been fully closed. REPLY+ABORT+DELETEs can
80 * also race, and in this situation the other side might have already
81 * initiated a new unrelated command with the same message id. Since
82 * the abort has not set the CREATE flag the situation can be detected
83 * and the message will also be discarded.
84 *
85 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86 * The ABORT request is essentially integrated into the command instead
87 * of being sent later on. In this situation the command implementation
88 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89 * special-case non-blocking operation for the command.
90 *
91 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
92 * to be mid-stream aborts for command/reply sequences. ABORTs on
93 * one-way messages are not supported.
94 *
95 * NOTE! If a command sequence does not support aborts the ABORT flag is
96 * simply ignored.
97 *
98 * --
99 *
100 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101 * set. One-off messages cannot be aborted and typically aren't processed
102 * by these routines. The REPLY bit can be used to distinguish whether a
103 * one-off message is a command or reply. For example, one-off replies
104 * will typically just contain status updates.
105 */
106int
10c86c4e 107hammer2_state_msgrx(hammer2_msg_t *msg)
26bf1a36 108{
10c86c4e 109 hammer2_pfsmount_t *pmp;
26bf1a36
MD
110 hammer2_state_t *state;
111 int error;
112
10c86c4e
MD
113 pmp = msg->router->pmp;
114
115 /*
116 * XXX resolve msg->any.head.source and msg->any.head.target
117 * into LNK_SPAN references.
118 *
119 * XXX replace msg->router
120 */
121
26bf1a36
MD
122 /*
123 * Make sure a state structure is ready to go in case we need a new
124 * one. This is the only routine which uses freerd_state so no
125 * races are possible.
126 */
127 if ((state = pmp->freerd_state) == NULL) {
128 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
129 state->pmp = pmp;
130 state->flags = HAMMER2_STATE_DYNAMIC;
131 pmp->freerd_state = state;
132 }
133
134 /*
135 * Lock RB tree and locate existing persistent state, if any.
136 *
137 * If received msg is a command state is on staterd_tree.
138 * If received msg is a reply state is on statewr_tree.
139 */
140 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
141
142 state->msgid = msg->any.head.msgid;
10c86c4e
MD
143 state->router = &pmp->router;
144 kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
8c280d5d 145 msg->any.head.cmd,
10c86c4e
MD
146 (intmax_t)msg->any.head.msgid,
147 (intmax_t)msg->any.head.source,
148 (intmax_t)msg->any.head.target);
26bf1a36
MD
149 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
150 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
151 else
152 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
153 msg->state = state;
154
155 /*
156 * Short-cut one-off or mid-stream messages (state may be NULL).
157 */
158 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
159 HAMMER2_MSGF_ABORT)) == 0) {
160 lockmgr(&pmp->msglk, LK_RELEASE);
161 return(0);
162 }
163
164 /*
165 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
166 * inside the case statements.
167 */
168 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
169 HAMMER2_MSGF_REPLY)) {
170 case HAMMER2_MSGF_CREATE:
171 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
172 /*
173 * New persistant command received.
174 */
175 if (state) {
176 kprintf("hammer2_state_msgrx: duplicate transaction\n");
177 error = EINVAL;
178 break;
179 }
180 state = pmp->freerd_state;
181 pmp->freerd_state = NULL;
182 msg->state = state;
10c86c4e 183 state->router = msg->router;
26bf1a36
MD
184 state->msg = msg;
185 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
62e59746 186 state->txcmd = HAMMER2_MSGF_REPLY;
26bf1a36
MD
187 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
188 state->flags |= HAMMER2_STATE_INSERTED;
189 error = 0;
190 break;
191 case HAMMER2_MSGF_DELETE:
192 /*
193 * Persistent state is expected but might not exist if an
194 * ABORT+DELETE races the close.
195 */
196 if (state == NULL) {
197 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
198 error = EALREADY;
199 } else {
200 kprintf("hammer2_state_msgrx: no state "
201 "for DELETE\n");
202 error = EINVAL;
203 }
204 break;
205 }
206
207 /*
208 * Handle another ABORT+DELETE case if the msgid has already
209 * been reused.
210 */
211 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
212 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
213 error = EALREADY;
214 } else {
215 kprintf("hammer2_state_msgrx: state reused "
216 "for DELETE\n");
217 error = EINVAL;
218 }
219 break;
220 }
221 error = 0;
222 break;
223 default:
224 /*
225 * Check for mid-stream ABORT command received, otherwise
226 * allow.
227 */
228 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
229 if (state == NULL ||
230 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
231 error = EALREADY;
232 break;
233 }
234 }
235 error = 0;
236 break;
237 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
238 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
239 /*
240 * When receiving a reply with CREATE set the original
241 * persistent state message should already exist.
242 */
243 if (state == NULL) {
244 kprintf("hammer2_state_msgrx: no state match for "
62e59746
MD
245 "REPLY cmd=%08x msgid=%016jx\n",
246 msg->any.head.cmd,
247 (intmax_t)msg->any.head.msgid);
26bf1a36
MD
248 error = EINVAL;
249 break;
250 }
251 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
252 error = 0;
253 break;
254 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
255 /*
256 * Received REPLY+ABORT+DELETE in case where msgid has
257 * already been fully closed, ignore the message.
258 */
259 if (state == NULL) {
260 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
261 error = EALREADY;
262 } else {
263 kprintf("hammer2_state_msgrx: no state match "
264 "for REPLY|DELETE\n");
265 error = EINVAL;
266 }
267 break;
268 }
269
270 /*
271 * Received REPLY+ABORT+DELETE in case where msgid has
272 * already been reused for an unrelated message,
273 * ignore the message.
274 */
275 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
276 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
277 error = EALREADY;
278 } else {
279 kprintf("hammer2_state_msgrx: state reused "
280 "for REPLY|DELETE\n");
281 error = EINVAL;
282 }
283 break;
284 }
285 error = 0;
286 break;
287 case HAMMER2_MSGF_REPLY:
288 /*
289 * Check for mid-stream ABORT reply received to sent command.
290 */
291 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
292 if (state == NULL ||
293 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
294 error = EALREADY;
295 break;
296 }
297 }
298 error = 0;
299 break;
300 }
301 lockmgr(&pmp->msglk, LK_RELEASE);
302 return (error);
303}
304
305void
10c86c4e 306hammer2_state_cleanuprx(hammer2_msg_t *msg)
26bf1a36 307{
10c86c4e 308 hammer2_pfsmount_t *pmp = msg->router->pmp;
26bf1a36
MD
309 hammer2_state_t *state;
310
311 if ((state = msg->state) == NULL) {
10c86c4e 312 hammer2_msg_free(msg);
26bf1a36
MD
313 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
314 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
315 state->rxcmd |= HAMMER2_MSGF_DELETE;
316 if (state->txcmd & HAMMER2_MSGF_DELETE) {
317 if (state->msg == msg)
318 state->msg = NULL;
319 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
62e59746
MD
320 if (state->rxcmd & HAMMER2_MSGF_REPLY) {
321 KKASSERT(msg->any.head.cmd &
322 HAMMER2_MSGF_REPLY);
26bf1a36
MD
323 RB_REMOVE(hammer2_state_tree,
324 &pmp->statewr_tree, state);
325 } else {
62e59746
MD
326 KKASSERT((msg->any.head.cmd &
327 HAMMER2_MSGF_REPLY) == 0);
26bf1a36
MD
328 RB_REMOVE(hammer2_state_tree,
329 &pmp->staterd_tree, state);
330 }
331 state->flags &= ~HAMMER2_STATE_INSERTED;
332 lockmgr(&pmp->msglk, LK_RELEASE);
333 hammer2_state_free(state);
334 } else {
335 lockmgr(&pmp->msglk, LK_RELEASE);
336 }
10c86c4e 337 hammer2_msg_free(msg);
26bf1a36 338 } else if (state->msg != msg) {
10c86c4e 339 hammer2_msg_free(msg);
26bf1a36
MD
340 }
341}
342
343/*
344 * Process state tracking for a message prior to transmission.
345 *
346 * Called with msglk held and the msg dequeued.
347 *
348 * One-off messages are usually with dummy state and msg->state may be NULL
349 * in this situation.
350 *
351 * New transactions (when CREATE is set) will insert the state.
352 *
353 * May request that caller discard the message by setting *discardp to 1.
354 * A NULL state may be returned in this case.
355 */
356int
10c86c4e 357hammer2_state_msgtx(hammer2_msg_t *msg)
26bf1a36 358{
10c86c4e 359 hammer2_pfsmount_t *pmp = msg->router->pmp;
26bf1a36
MD
360 hammer2_state_t *state;
361 int error;
362
363 /*
364 * Make sure a state structure is ready to go in case we need a new
365 * one. This is the only routine which uses freewr_state so no
366 * races are possible.
367 */
368 if ((state = pmp->freewr_state) == NULL) {
369 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
370 state->pmp = pmp;
371 state->flags = HAMMER2_STATE_DYNAMIC;
372 pmp->freewr_state = state;
373 }
374
375 /*
376 * Lock RB tree. If persistent state is present it will have already
377 * been assigned to msg.
378 */
379 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
380 state = msg->state;
381
382 /*
383 * Short-cut one-off or mid-stream messages (state may be NULL).
384 */
385 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
386 HAMMER2_MSGF_ABORT)) == 0) {
387 lockmgr(&pmp->msglk, LK_RELEASE);
388 return(0);
389 }
390
391
392 /*
393 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
394 * inside the case statements.
395 */
396 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
397 HAMMER2_MSGF_REPLY)) {
398 case HAMMER2_MSGF_CREATE:
399 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
400 /*
401 * Insert the new persistent message state and mark
402 * half-closed if DELETE is set. Since this is a new
403 * message it isn't possible to transition into the fully
404 * closed state here.
9b8b748f
MD
405 *
406 * XXX state must be assigned and inserted by
407 * hammer2_msg_write(). txcmd is assigned by us
408 * on-transmit.
26bf1a36 409 */
9b8b748f 410 KKASSERT(state != NULL);
26bf1a36 411 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
62e59746 412 state->rxcmd = HAMMER2_MSGF_REPLY;
26bf1a36
MD
413 error = 0;
414 break;
415 case HAMMER2_MSGF_DELETE:
416 /*
417 * Sent ABORT+DELETE in case where msgid has already
418 * been fully closed, ignore the message.
419 */
420 if (state == NULL) {
421 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
422 error = EALREADY;
423 } else {
424 kprintf("hammer2_state_msgtx: no state match "
62e59746
MD
425 "for DELETE cmd=%08x msgid=%016jx\n",
426 msg->any.head.cmd,
427 (intmax_t)msg->any.head.msgid);
26bf1a36
MD
428 error = EINVAL;
429 }
430 break;
431 }
432
433 /*
434 * Sent ABORT+DELETE in case where msgid has
435 * already been reused for an unrelated message,
436 * ignore the message.
437 */
438 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
439 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
440 error = EALREADY;
441 } else {
442 kprintf("hammer2_state_msgtx: state reused "
443 "for DELETE\n");
444 error = EINVAL;
445 }
446 break;
447 }
448 error = 0;
449 break;
450 default:
451 /*
452 * Check for mid-stream ABORT command sent
453 */
454 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
455 if (state == NULL ||
456 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
457 error = EALREADY;
458 break;
459 }
460 }
461 error = 0;
462 break;
463 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
464 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
465 /*
466 * When transmitting a reply with CREATE set the original
467 * persistent state message should already exist.
468 */
469 if (state == NULL) {
470 kprintf("hammer2_state_msgtx: no state match "
471 "for REPLY | CREATE\n");
472 error = EINVAL;
473 break;
474 }
475 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
476 error = 0;
477 break;
478 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
479 /*
480 * When transmitting a reply with DELETE set the original
481 * persistent state message should already exist.
482 *
483 * This is very similar to the REPLY|CREATE|* case except
484 * txcmd is already stored, so we just add the DELETE flag.
485 *
486 * Sent REPLY+ABORT+DELETE in case where msgid has
487 * already been fully closed, ignore the message.
488 */
489 if (state == NULL) {
490 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
491 error = EALREADY;
492 } else {
493 kprintf("hammer2_state_msgtx: no state match "
494 "for REPLY | DELETE\n");
495 error = EINVAL;
496 }
497 break;
498 }
499
500 /*
501 * Sent REPLY+ABORT+DELETE in case where msgid has already
502 * been reused for an unrelated message, ignore the message.
503 */
504 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
505 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
506 error = EALREADY;
507 } else {
508 kprintf("hammer2_state_msgtx: state reused "
509 "for REPLY | DELETE\n");
510 error = EINVAL;
511 }
512 break;
513 }
514 error = 0;
515 break;
516 case HAMMER2_MSGF_REPLY:
517 /*
518 * Check for mid-stream ABORT reply sent.
519 *
520 * One-off REPLY messages are allowed for e.g. status updates.
521 */
522 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
523 if (state == NULL ||
524 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
525 error = EALREADY;
526 break;
527 }
528 }
529 error = 0;
530 break;
531 }
532 lockmgr(&pmp->msglk, LK_RELEASE);
533 return (error);
534}
535
536void
10c86c4e 537hammer2_state_cleanuptx(hammer2_msg_t *msg)
26bf1a36 538{
10c86c4e 539 hammer2_pfsmount_t *pmp = msg->router->pmp;
26bf1a36
MD
540 hammer2_state_t *state;
541
542 if ((state = msg->state) == NULL) {
10c86c4e 543 hammer2_msg_free(msg);
26bf1a36
MD
544 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
545 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
546 state->txcmd |= HAMMER2_MSGF_DELETE;
547 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
548 if (state->msg == msg)
549 state->msg = NULL;
550 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
62e59746
MD
551 if (state->txcmd & HAMMER2_MSGF_REPLY) {
552 KKASSERT(msg->any.head.cmd &
553 HAMMER2_MSGF_REPLY);
26bf1a36
MD
554 RB_REMOVE(hammer2_state_tree,
555 &pmp->staterd_tree, state);
556 } else {
62e59746
MD
557 KKASSERT((msg->any.head.cmd &
558 HAMMER2_MSGF_REPLY) == 0);
26bf1a36
MD
559 RB_REMOVE(hammer2_state_tree,
560 &pmp->statewr_tree, state);
561 }
562 state->flags &= ~HAMMER2_STATE_INSERTED;
563 lockmgr(&pmp->msglk, LK_RELEASE);
564 hammer2_state_free(state);
565 } else {
566 lockmgr(&pmp->msglk, LK_RELEASE);
567 }
10c86c4e 568 hammer2_msg_free(msg);
26bf1a36 569 } else if (state->msg != msg) {
10c86c4e 570 hammer2_msg_free(msg);
26bf1a36
MD
571 }
572}
573
574void
575hammer2_state_free(hammer2_state_t *state)
576{
577 hammer2_pfsmount_t *pmp = state->pmp;
578 hammer2_msg_t *msg;
579
580 msg = state->msg;
581 state->msg = NULL;
582 kfree(state, pmp->mmsg);
583 if (msg)
10c86c4e 584 hammer2_msg_free(msg);
26bf1a36
MD
585}
586
9b8b748f 587hammer2_msg_t *
1a34728c
MD
588hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
589 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
9b8b748f 590{
1a34728c 591 hammer2_pfsmount_t *pmp = router->pmp;
9b8b748f 592 hammer2_msg_t *msg;
1a34728c 593 hammer2_state_t *state;
9b8b748f
MD
594 size_t hbytes;
595
596 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
597 msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
1a34728c 598 pmp->mmsg, M_WAITOK | M_ZERO);
9b8b748f 599 msg->hdr_size = hbytes;
10c86c4e
MD
600 msg->router = router;
601 KKASSERT(router != NULL);
9b8b748f 602 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
10c86c4e
MD
603 msg->any.head.source = 0;
604 msg->any.head.target = router->target;
9b8b748f
MD
605 msg->any.head.cmd = cmd;
606
1a34728c
MD
607 if (cmd & HAMMER2_MSGF_CREATE) {
608 /*
609 * New transaction, requires tracking state and a unique
610 * msgid to be allocated.
611 */
612 KKASSERT(msg->state == NULL);
613 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
614 state->pmp = pmp;
615 state->flags = HAMMER2_STATE_DYNAMIC;
616 state->func = func;
617 state->any.any = data;
618 state->msg = msg;
619 state->msgid = (uint64_t)(uintptr_t)state;
620 state->router = msg->router;
621 msg->state = state;
622 msg->any.head.source = 0;
623 msg->any.head.target = state->router->target;
624 msg->any.head.msgid = state->msgid;
625
626 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
627 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
628 panic("duplicate msgid allocated");
629 msg->any.head.msgid = state->msgid;
630 lockmgr(&pmp->msglk, LK_RELEASE);
631 }
632
9b8b748f
MD
633 return (msg);
634}
635
26bf1a36 636void
10c86c4e 637hammer2_msg_free(hammer2_msg_t *msg)
26bf1a36 638{
10c86c4e
MD
639 hammer2_pfsmount_t *pmp = msg->router->pmp;
640
26bf1a36
MD
641 if (msg->aux_data && msg->aux_size) {
642 kfree(msg->aux_data, pmp->mmsg);
643 msg->aux_data = NULL;
644 msg->aux_size = 0;
10c86c4e 645 msg->router = NULL;
26bf1a36
MD
646 }
647 kfree(msg, pmp->mmsg);
648}
649
650/*
651 * Indexed messages are stored in a red-black tree indexed by their
652 * msgid. Only persistent messages are indexed.
653 */
654int
655hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
656{
10c86c4e 657 if (state1->router < state2->router)
9b8b748f 658 return(-1);
10c86c4e 659 if (state1->router > state2->router)
9b8b748f 660 return(1);
26bf1a36
MD
661 if (state1->msgid < state2->msgid)
662 return(-1);
663 if (state1->msgid > state2->msgid)
664 return(1);
665 return(0);
666}
667
668/*
8c280d5d
MD
669 * Write a message. All requisit command flags have been set.
670 *
671 * If msg->state is non-NULL the message is written to the existing
10c86c4e 672 * transaction. msgid will be set accordingly.
8c280d5d
MD
673 *
674 * If msg->state is NULL and CREATE is set new state is allocated and
10c86c4e 675 * (func, data) is installed. A msgid is assigned.
26bf1a36 676 *
8c280d5d 677 * If msg->state is NULL and CREATE is not set the message is assumed
10c86c4e
MD
678 * to be a one-way message. The originator must assign the msgid
679 * (or leave it 0, which is typical.
8c280d5d
MD
680 *
681 * This function merely queues the message to the management thread, it
682 * does not write to the message socket/pipe.
26bf1a36 683 */
8c280d5d 684void
1a34728c 685hammer2_msg_write(hammer2_msg_t *msg)
26bf1a36 686{
10c86c4e 687 hammer2_pfsmount_t *pmp = msg->router->pmp;
9b8b748f 688 hammer2_state_t *state;
9b8b748f 689
8c280d5d
MD
690 if (msg->state) {
691 /*
692 * Continuance or termination of existing transaction.
693 * The transaction could have been initiated by either end.
694 *
695 * (Function callback and aux data for the receive side can
696 * be replaced or left alone).
697 */
10c86c4e
MD
698 state = msg->state;
699 msg->any.head.msgid = state->msgid;
700 msg->any.head.source = 0;
701 msg->any.head.target = state->router->target;
8c280d5d 702 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
9b8b748f
MD
703 } else {
704 /*
8c280d5d
MD
705 * One-off message (always uses msgid 0 to distinguish
706 * between a possibly lost in-transaction message due to
707 * competing aborts and a real one-off message?)
9b8b748f
MD
708 */
709 msg->any.head.msgid = 0;
10c86c4e
MD
710 msg->any.head.source = 0;
711 msg->any.head.target = msg->router->target;
9b8b748f
MD
712 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
713 }
714
715 /*
8c280d5d 716 * Finish up the msg fields
9b8b748f 717 */
8c280d5d
MD
718 msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
719 ++pmp->msg_seq;
720
721 msg->any.head.hdr_crc = 0;
722 msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
9b8b748f
MD
723
724 TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
1a34728c 725 hammer2_clusterctl_wakeup(pmp);
9b8b748f 726 lockmgr(&pmp->msglk, LK_RELEASE);
8c280d5d
MD
727}
728
729/*
730 * Reply to a message and terminate our side of the transaction.
731 *
732 * If msg->state is non-NULL we are replying to a one-way message.
733 */
734void
10c86c4e 735hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
8c280d5d
MD
736{
737 hammer2_state_t *state = msg->state;
10c86c4e 738 hammer2_msg_t *nmsg;
8c280d5d
MD
739 uint32_t cmd;
740
741 /*
742 * Reply with a simple error code and terminate the transaction.
743 */
744 cmd = HAMMER2_LNK_ERROR;
745
746 /*
747 * Check if our direction has even been initiated yet, set CREATE.
748 *
749 * Check what direction this is (command or reply direction). Note
750 * that txcmd might not have been initiated yet.
751 *
752 * If our direction has already been closed we just return without
753 * doing anything.
754 */
755 if (state) {
62e59746 756 if (state->txcmd & HAMMER2_MSGF_DELETE) {
10c86c4e 757 hammer2_msg_free(msg);
8c280d5d 758 return;
62e59746 759 }
8c280d5d
MD
760 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
761 cmd |= HAMMER2_MSGF_CREATE;
62e59746 762 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
763 cmd |= HAMMER2_MSGF_REPLY;
764 cmd |= HAMMER2_MSGF_DELETE;
765 } else {
766 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
767 cmd |= HAMMER2_MSGF_REPLY;
768 }
769
1a34728c
MD
770 /* XXX messy mask cmd to avoid allocating state */
771 nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
772 NULL, NULL);
773 nmsg->any.head.cmd = cmd;
10c86c4e
MD
774 nmsg->any.head.error = error;
775 nmsg->state = state;
1a34728c 776 hammer2_msg_write(nmsg);
8c280d5d
MD
777}
778
779/*
780 * Reply to a message and continue our side of the transaction.
781 *
782 * If msg->state is non-NULL we are replying to a one-way message and this
783 * function degenerates into the same as hammer2_msg_reply().
784 */
785void
10c86c4e 786hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
8c280d5d
MD
787{
788 hammer2_state_t *state = msg->state;
10c86c4e 789 hammer2_msg_t *nmsg;
8c280d5d
MD
790 uint32_t cmd;
791
792 /*
793 * Return a simple result code, do NOT terminate the transaction.
794 */
795 cmd = HAMMER2_LNK_ERROR;
796
797 /*
798 * Check if our direction has even been initiated yet, set CREATE.
799 *
800 * Check what direction this is (command or reply direction). Note
801 * that txcmd might not have been initiated yet.
802 *
803 * If our direction has already been closed we just return without
804 * doing anything.
805 */
806 if (state) {
62e59746 807 if (state->txcmd & HAMMER2_MSGF_DELETE) {
10c86c4e 808 hammer2_msg_free(msg);
8c280d5d 809 return;
62e59746 810 }
8c280d5d
MD
811 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
812 cmd |= HAMMER2_MSGF_CREATE;
62e59746 813 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
814 cmd |= HAMMER2_MSGF_REPLY;
815 /* continuing transaction, do not set MSGF_DELETE */
816 } else {
817 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
818 cmd |= HAMMER2_MSGF_REPLY;
819 }
9b8b748f 820
1a34728c
MD
821 /* XXX messy mask cmd to avoid allocating state */
822 nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
823 NULL, NULL);
824 nmsg->any.head.cmd = cmd;
10c86c4e
MD
825 nmsg->any.head.error = error;
826 nmsg->state = state;
1a34728c 827 hammer2_msg_write(nmsg);
26bf1a36 828}