hammer2: More hammer2 msg cleanup, get reconnect ioctl working
[dragonfly.git] / sys / vfs / hammer2 / hammer2_msg.c
CommitLineData
26bf1a36
MD
1/*-
2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 */
34
35#include "hammer2.h"
36
9b8b748f
MD
37RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
38
26bf1a36
MD
39/*
40 * Process state tracking for a message after reception, prior to
41 * execution.
42 *
43 * Called with msglk held and the msg dequeued.
44 *
45 * All messages are called with dummy state and return actual state.
46 * (One-off messages often just return the same dummy state).
47 *
48 * May request that caller discard the message by setting *discardp to 1.
49 * The returned state is not used in this case and is allowed to be NULL.
50 *
51 * --
52 *
53 * These routines handle persistent and command/reply message state via the
54 * CREATE and DELETE flags. The first message in a command or reply sequence
55 * sets CREATE, the last message in a command or reply sequence sets DELETE.
56 *
57 * There can be any number of intermediate messages belonging to the same
58 * sequence sent inbetween the CREATE message and the DELETE message,
59 * which set neither flag. This represents a streaming command or reply.
60 *
61 * Any command message received with CREATE set expects a reply sequence to
62 * be returned. Reply sequences work the same as command sequences except the
63 * REPLY bit is also sent. Both the command side and reply side can
64 * degenerate into a single message with both CREATE and DELETE set. Note
65 * that one side can be streaming and the other side not, or neither, or both.
66 *
67 * The msgid is unique for the initiator. That is, two sides sending a new
68 * message can use the same msgid without colliding.
69 *
70 * --
71 *
72 * ABORT sequences work by setting the ABORT flag along with normal message
73 * state. However, ABORTs can also be sent on half-closed messages, that is
74 * even if the command or reply side has already sent a DELETE, as long as
75 * the message has not been fully closed it can still send an ABORT+DELETE
76 * to terminate the half-closed message state.
77 *
78 * Since ABORT+DELETEs can race we silently discard ABORT's for message
79 * state which has already been fully closed. REPLY+ABORT+DELETEs can
80 * also race, and in this situation the other side might have already
81 * initiated a new unrelated command with the same message id. Since
82 * the abort has not set the CREATE flag the situation can be detected
83 * and the message will also be discarded.
84 *
85 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86 * The ABORT request is essentially integrated into the command instead
87 * of being sent later on. In this situation the command implementation
88 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89 * special-case non-blocking operation for the command.
90 *
91 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
92 * to be mid-stream aborts for command/reply sequences. ABORTs on
93 * one-way messages are not supported.
94 *
95 * NOTE! If a command sequence does not support aborts the ABORT flag is
96 * simply ignored.
97 *
98 * --
99 *
100 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101 * set. One-off messages cannot be aborted and typically aren't processed
102 * by these routines. The REPLY bit can be used to distinguish whether a
103 * one-off message is a command or reply. For example, one-off replies
104 * will typically just contain status updates.
105 */
106int
10c86c4e 107hammer2_state_msgrx(hammer2_msg_t *msg)
26bf1a36 108{
10c86c4e 109 hammer2_pfsmount_t *pmp;
26bf1a36
MD
110 hammer2_state_t *state;
111 int error;
112
10c86c4e
MD
113 pmp = msg->router->pmp;
114
115 /*
116 * XXX resolve msg->any.head.source and msg->any.head.target
117 * into LNK_SPAN references.
118 *
119 * XXX replace msg->router
120 */
121
26bf1a36
MD
122 /*
123 * Make sure a state structure is ready to go in case we need a new
124 * one. This is the only routine which uses freerd_state so no
125 * races are possible.
126 */
127 if ((state = pmp->freerd_state) == NULL) {
128 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
129 state->pmp = pmp;
130 state->flags = HAMMER2_STATE_DYNAMIC;
131 pmp->freerd_state = state;
132 }
133
134 /*
135 * Lock RB tree and locate existing persistent state, if any.
136 *
137 * If received msg is a command state is on staterd_tree.
138 * If received msg is a reply state is on statewr_tree.
139 */
140 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
141
142 state->msgid = msg->any.head.msgid;
10c86c4e
MD
143 state->router = &pmp->router;
144 kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
8c280d5d 145 msg->any.head.cmd,
10c86c4e
MD
146 (intmax_t)msg->any.head.msgid,
147 (intmax_t)msg->any.head.source,
148 (intmax_t)msg->any.head.target);
26bf1a36
MD
149 if (msg->any.head.cmd & HAMMER2_MSGF_REPLY)
150 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
151 else
152 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
153 msg->state = state;
154
155 /*
156 * Short-cut one-off or mid-stream messages (state may be NULL).
157 */
158 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
159 HAMMER2_MSGF_ABORT)) == 0) {
160 lockmgr(&pmp->msglk, LK_RELEASE);
161 return(0);
162 }
163
164 /*
165 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
166 * inside the case statements.
167 */
168 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
169 HAMMER2_MSGF_REPLY)) {
170 case HAMMER2_MSGF_CREATE:
171 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
172 /*
173 * New persistant command received.
174 */
175 if (state) {
176 kprintf("hammer2_state_msgrx: duplicate transaction\n");
177 error = EINVAL;
178 break;
179 }
180 state = pmp->freerd_state;
181 pmp->freerd_state = NULL;
182 msg->state = state;
10c86c4e 183 state->router = msg->router;
26bf1a36
MD
184 state->msg = msg;
185 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
62e59746 186 state->txcmd = HAMMER2_MSGF_REPLY;
26bf1a36
MD
187 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
188 state->flags |= HAMMER2_STATE_INSERTED;
189 error = 0;
190 break;
191 case HAMMER2_MSGF_DELETE:
192 /*
193 * Persistent state is expected but might not exist if an
194 * ABORT+DELETE races the close.
195 */
196 if (state == NULL) {
197 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
198 error = EALREADY;
199 } else {
200 kprintf("hammer2_state_msgrx: no state "
201 "for DELETE\n");
202 error = EINVAL;
203 }
204 break;
205 }
206
207 /*
208 * Handle another ABORT+DELETE case if the msgid has already
209 * been reused.
210 */
211 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
212 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
213 error = EALREADY;
214 } else {
215 kprintf("hammer2_state_msgrx: state reused "
216 "for DELETE\n");
217 error = EINVAL;
218 }
219 break;
220 }
221 error = 0;
222 break;
223 default:
224 /*
225 * Check for mid-stream ABORT command received, otherwise
226 * allow.
227 */
228 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
229 if (state == NULL ||
230 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
231 error = EALREADY;
232 break;
233 }
234 }
235 error = 0;
236 break;
237 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
238 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
239 /*
240 * When receiving a reply with CREATE set the original
241 * persistent state message should already exist.
242 */
243 if (state == NULL) {
244 kprintf("hammer2_state_msgrx: no state match for "
62e59746
MD
245 "REPLY cmd=%08x msgid=%016jx\n",
246 msg->any.head.cmd,
247 (intmax_t)msg->any.head.msgid);
26bf1a36
MD
248 error = EINVAL;
249 break;
250 }
251 state->rxcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
252 error = 0;
253 break;
254 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
255 /*
256 * Received REPLY+ABORT+DELETE in case where msgid has
257 * already been fully closed, ignore the message.
258 */
259 if (state == NULL) {
260 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
261 error = EALREADY;
262 } else {
263 kprintf("hammer2_state_msgrx: no state match "
264 "for REPLY|DELETE\n");
265 error = EINVAL;
266 }
267 break;
268 }
269
270 /*
271 * Received REPLY+ABORT+DELETE in case where msgid has
272 * already been reused for an unrelated message,
273 * ignore the message.
274 */
275 if ((state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
276 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
277 error = EALREADY;
278 } else {
279 kprintf("hammer2_state_msgrx: state reused "
280 "for REPLY|DELETE\n");
281 error = EINVAL;
282 }
283 break;
284 }
285 error = 0;
286 break;
287 case HAMMER2_MSGF_REPLY:
288 /*
289 * Check for mid-stream ABORT reply received to sent command.
290 */
291 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
292 if (state == NULL ||
293 (state->rxcmd & HAMMER2_MSGF_CREATE) == 0) {
294 error = EALREADY;
295 break;
296 }
297 }
298 error = 0;
299 break;
300 }
301 lockmgr(&pmp->msglk, LK_RELEASE);
302 return (error);
303}
304
305void
10c86c4e 306hammer2_state_cleanuprx(hammer2_msg_t *msg)
26bf1a36 307{
10c86c4e 308 hammer2_pfsmount_t *pmp = msg->router->pmp;
26bf1a36
MD
309 hammer2_state_t *state;
310
311 if ((state = msg->state) == NULL) {
10c86c4e 312 hammer2_msg_free(msg);
26bf1a36
MD
313 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
314 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
315 state->rxcmd |= HAMMER2_MSGF_DELETE;
316 if (state->txcmd & HAMMER2_MSGF_DELETE) {
317 if (state->msg == msg)
318 state->msg = NULL;
319 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
62e59746
MD
320 if (state->rxcmd & HAMMER2_MSGF_REPLY) {
321 KKASSERT(msg->any.head.cmd &
322 HAMMER2_MSGF_REPLY);
26bf1a36
MD
323 RB_REMOVE(hammer2_state_tree,
324 &pmp->statewr_tree, state);
325 } else {
62e59746
MD
326 KKASSERT((msg->any.head.cmd &
327 HAMMER2_MSGF_REPLY) == 0);
26bf1a36
MD
328 RB_REMOVE(hammer2_state_tree,
329 &pmp->staterd_tree, state);
330 }
331 state->flags &= ~HAMMER2_STATE_INSERTED;
332 lockmgr(&pmp->msglk, LK_RELEASE);
333 hammer2_state_free(state);
334 } else {
335 lockmgr(&pmp->msglk, LK_RELEASE);
336 }
10c86c4e 337 hammer2_msg_free(msg);
26bf1a36 338 } else if (state->msg != msg) {
10c86c4e 339 hammer2_msg_free(msg);
26bf1a36
MD
340 }
341}
342
343/*
344 * Process state tracking for a message prior to transmission.
345 *
346 * Called with msglk held and the msg dequeued.
347 *
348 * One-off messages are usually with dummy state and msg->state may be NULL
349 * in this situation.
350 *
351 * New transactions (when CREATE is set) will insert the state.
352 *
353 * May request that caller discard the message by setting *discardp to 1.
354 * A NULL state may be returned in this case.
355 */
356int
10c86c4e 357hammer2_state_msgtx(hammer2_msg_t *msg)
26bf1a36 358{
10c86c4e 359 hammer2_pfsmount_t *pmp = msg->router->pmp;
26bf1a36
MD
360 hammer2_state_t *state;
361 int error;
362
363 /*
364 * Make sure a state structure is ready to go in case we need a new
365 * one. This is the only routine which uses freewr_state so no
366 * races are possible.
367 */
368 if ((state = pmp->freewr_state) == NULL) {
369 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
370 state->pmp = pmp;
371 state->flags = HAMMER2_STATE_DYNAMIC;
372 pmp->freewr_state = state;
373 }
374
375 /*
376 * Lock RB tree. If persistent state is present it will have already
377 * been assigned to msg.
378 */
379 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
380 state = msg->state;
381
382 /*
383 * Short-cut one-off or mid-stream messages (state may be NULL).
384 */
385 if ((msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
386 HAMMER2_MSGF_ABORT)) == 0) {
387 lockmgr(&pmp->msglk, LK_RELEASE);
388 return(0);
389 }
390
391
392 /*
393 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
394 * inside the case statements.
395 */
396 switch(msg->any.head.cmd & (HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE |
397 HAMMER2_MSGF_REPLY)) {
398 case HAMMER2_MSGF_CREATE:
399 case HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
400 /*
401 * Insert the new persistent message state and mark
402 * half-closed if DELETE is set. Since this is a new
403 * message it isn't possible to transition into the fully
404 * closed state here.
9b8b748f
MD
405 *
406 * XXX state must be assigned and inserted by
407 * hammer2_msg_write(). txcmd is assigned by us
408 * on-transmit.
26bf1a36 409 */
9b8b748f 410 KKASSERT(state != NULL);
26bf1a36 411 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
62e59746 412 state->rxcmd = HAMMER2_MSGF_REPLY;
26bf1a36
MD
413 error = 0;
414 break;
415 case HAMMER2_MSGF_DELETE:
416 /*
417 * Sent ABORT+DELETE in case where msgid has already
418 * been fully closed, ignore the message.
419 */
420 if (state == NULL) {
421 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
422 error = EALREADY;
423 } else {
424 kprintf("hammer2_state_msgtx: no state match "
62e59746
MD
425 "for DELETE cmd=%08x msgid=%016jx\n",
426 msg->any.head.cmd,
427 (intmax_t)msg->any.head.msgid);
26bf1a36
MD
428 error = EINVAL;
429 }
430 break;
431 }
432
433 /*
434 * Sent ABORT+DELETE in case where msgid has
435 * already been reused for an unrelated message,
436 * ignore the message.
437 */
438 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
439 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
440 error = EALREADY;
441 } else {
442 kprintf("hammer2_state_msgtx: state reused "
443 "for DELETE\n");
444 error = EINVAL;
445 }
446 break;
447 }
448 error = 0;
449 break;
450 default:
451 /*
452 * Check for mid-stream ABORT command sent
453 */
454 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
455 if (state == NULL ||
456 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
457 error = EALREADY;
458 break;
459 }
460 }
461 error = 0;
462 break;
463 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE:
464 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_CREATE | HAMMER2_MSGF_DELETE:
465 /*
466 * When transmitting a reply with CREATE set the original
467 * persistent state message should already exist.
468 */
469 if (state == NULL) {
470 kprintf("hammer2_state_msgtx: no state match "
471 "for REPLY | CREATE\n");
472 error = EINVAL;
473 break;
474 }
475 state->txcmd = msg->any.head.cmd & ~HAMMER2_MSGF_DELETE;
476 error = 0;
477 break;
478 case HAMMER2_MSGF_REPLY | HAMMER2_MSGF_DELETE:
479 /*
480 * When transmitting a reply with DELETE set the original
481 * persistent state message should already exist.
482 *
483 * This is very similar to the REPLY|CREATE|* case except
484 * txcmd is already stored, so we just add the DELETE flag.
485 *
486 * Sent REPLY+ABORT+DELETE in case where msgid has
487 * already been fully closed, ignore the message.
488 */
489 if (state == NULL) {
490 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
491 error = EALREADY;
492 } else {
493 kprintf("hammer2_state_msgtx: no state match "
494 "for REPLY | DELETE\n");
495 error = EINVAL;
496 }
497 break;
498 }
499
500 /*
501 * Sent REPLY+ABORT+DELETE in case where msgid has already
502 * been reused for an unrelated message, ignore the message.
503 */
504 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
505 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
506 error = EALREADY;
507 } else {
508 kprintf("hammer2_state_msgtx: state reused "
509 "for REPLY | DELETE\n");
510 error = EINVAL;
511 }
512 break;
513 }
514 error = 0;
515 break;
516 case HAMMER2_MSGF_REPLY:
517 /*
518 * Check for mid-stream ABORT reply sent.
519 *
520 * One-off REPLY messages are allowed for e.g. status updates.
521 */
522 if (msg->any.head.cmd & HAMMER2_MSGF_ABORT) {
523 if (state == NULL ||
524 (state->txcmd & HAMMER2_MSGF_CREATE) == 0) {
525 error = EALREADY;
526 break;
527 }
528 }
529 error = 0;
530 break;
531 }
532 lockmgr(&pmp->msglk, LK_RELEASE);
533 return (error);
534}
535
536void
10c86c4e 537hammer2_state_cleanuptx(hammer2_msg_t *msg)
26bf1a36 538{
10c86c4e 539 hammer2_pfsmount_t *pmp = msg->router->pmp;
26bf1a36
MD
540 hammer2_state_t *state;
541
542 if ((state = msg->state) == NULL) {
10c86c4e 543 hammer2_msg_free(msg);
26bf1a36
MD
544 } else if (msg->any.head.cmd & HAMMER2_MSGF_DELETE) {
545 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
546 state->txcmd |= HAMMER2_MSGF_DELETE;
547 if (state->rxcmd & HAMMER2_MSGF_DELETE) {
548 if (state->msg == msg)
549 state->msg = NULL;
550 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
62e59746
MD
551 if (state->txcmd & HAMMER2_MSGF_REPLY) {
552 KKASSERT(msg->any.head.cmd &
553 HAMMER2_MSGF_REPLY);
26bf1a36
MD
554 RB_REMOVE(hammer2_state_tree,
555 &pmp->staterd_tree, state);
556 } else {
62e59746
MD
557 KKASSERT((msg->any.head.cmd &
558 HAMMER2_MSGF_REPLY) == 0);
26bf1a36
MD
559 RB_REMOVE(hammer2_state_tree,
560 &pmp->statewr_tree, state);
561 }
562 state->flags &= ~HAMMER2_STATE_INSERTED;
563 lockmgr(&pmp->msglk, LK_RELEASE);
564 hammer2_state_free(state);
565 } else {
566 lockmgr(&pmp->msglk, LK_RELEASE);
567 }
10c86c4e 568 hammer2_msg_free(msg);
26bf1a36 569 } else if (state->msg != msg) {
10c86c4e 570 hammer2_msg_free(msg);
26bf1a36
MD
571 }
572}
573
574void
575hammer2_state_free(hammer2_state_t *state)
576{
577 hammer2_pfsmount_t *pmp = state->pmp;
578 hammer2_msg_t *msg;
579
70c3c3b7 580 KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
26bf1a36
MD
581 msg = state->msg;
582 state->msg = NULL;
583 kfree(state, pmp->mmsg);
584 if (msg)
10c86c4e 585 hammer2_msg_free(msg);
26bf1a36
MD
586}
587
9b8b748f 588hammer2_msg_t *
1a34728c
MD
589hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
590 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
9b8b748f 591{
1a34728c 592 hammer2_pfsmount_t *pmp = router->pmp;
9b8b748f 593 hammer2_msg_t *msg;
1a34728c 594 hammer2_state_t *state;
9b8b748f
MD
595 size_t hbytes;
596
597 hbytes = (cmd & HAMMER2_MSGF_SIZE) * HAMMER2_MSG_ALIGN;
598 msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
1a34728c 599 pmp->mmsg, M_WAITOK | M_ZERO);
9b8b748f 600 msg->hdr_size = hbytes;
10c86c4e
MD
601 msg->router = router;
602 KKASSERT(router != NULL);
9b8b748f 603 msg->any.head.magic = HAMMER2_MSGHDR_MAGIC;
10c86c4e
MD
604 msg->any.head.source = 0;
605 msg->any.head.target = router->target;
9b8b748f
MD
606 msg->any.head.cmd = cmd;
607
1a34728c
MD
608 if (cmd & HAMMER2_MSGF_CREATE) {
609 /*
610 * New transaction, requires tracking state and a unique
611 * msgid to be allocated.
612 */
613 KKASSERT(msg->state == NULL);
614 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
615 state->pmp = pmp;
616 state->flags = HAMMER2_STATE_DYNAMIC;
617 state->func = func;
618 state->any.any = data;
619 state->msg = msg;
620 state->msgid = (uint64_t)(uintptr_t)state;
621 state->router = msg->router;
622 msg->state = state;
623 msg->any.head.source = 0;
624 msg->any.head.target = state->router->target;
625 msg->any.head.msgid = state->msgid;
626
627 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
628 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
629 panic("duplicate msgid allocated");
70c3c3b7 630 state->flags |= HAMMER2_STATE_INSERTED;
1a34728c
MD
631 msg->any.head.msgid = state->msgid;
632 lockmgr(&pmp->msglk, LK_RELEASE);
633 }
634
9b8b748f
MD
635 return (msg);
636}
637
26bf1a36 638void
10c86c4e 639hammer2_msg_free(hammer2_msg_t *msg)
26bf1a36 640{
10c86c4e
MD
641 hammer2_pfsmount_t *pmp = msg->router->pmp;
642
26bf1a36
MD
643 if (msg->aux_data && msg->aux_size) {
644 kfree(msg->aux_data, pmp->mmsg);
645 msg->aux_data = NULL;
646 msg->aux_size = 0;
10c86c4e 647 msg->router = NULL;
26bf1a36
MD
648 }
649 kfree(msg, pmp->mmsg);
650}
651
652/*
653 * Indexed messages are stored in a red-black tree indexed by their
654 * msgid. Only persistent messages are indexed.
655 */
656int
657hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
658{
10c86c4e 659 if (state1->router < state2->router)
9b8b748f 660 return(-1);
10c86c4e 661 if (state1->router > state2->router)
9b8b748f 662 return(1);
26bf1a36
MD
663 if (state1->msgid < state2->msgid)
664 return(-1);
665 if (state1->msgid > state2->msgid)
666 return(1);
667 return(0);
668}
669
670/*
8c280d5d
MD
671 * Write a message. All requisit command flags have been set.
672 *
673 * If msg->state is non-NULL the message is written to the existing
10c86c4e 674 * transaction. msgid will be set accordingly.
8c280d5d
MD
675 *
676 * If msg->state is NULL and CREATE is set new state is allocated and
10c86c4e 677 * (func, data) is installed. A msgid is assigned.
26bf1a36 678 *
8c280d5d 679 * If msg->state is NULL and CREATE is not set the message is assumed
10c86c4e
MD
680 * to be a one-way message. The originator must assign the msgid
681 * (or leave it 0, which is typical.
8c280d5d
MD
682 *
683 * This function merely queues the message to the management thread, it
684 * does not write to the message socket/pipe.
26bf1a36 685 */
8c280d5d 686void
1a34728c 687hammer2_msg_write(hammer2_msg_t *msg)
26bf1a36 688{
10c86c4e 689 hammer2_pfsmount_t *pmp = msg->router->pmp;
9b8b748f 690 hammer2_state_t *state;
9b8b748f 691
8c280d5d
MD
692 if (msg->state) {
693 /*
694 * Continuance or termination of existing transaction.
695 * The transaction could have been initiated by either end.
696 *
697 * (Function callback and aux data for the receive side can
698 * be replaced or left alone).
699 */
10c86c4e
MD
700 state = msg->state;
701 msg->any.head.msgid = state->msgid;
702 msg->any.head.source = 0;
703 msg->any.head.target = state->router->target;
8c280d5d 704 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
9b8b748f
MD
705 } else {
706 /*
8c280d5d
MD
707 * One-off message (always uses msgid 0 to distinguish
708 * between a possibly lost in-transaction message due to
709 * competing aborts and a real one-off message?)
9b8b748f
MD
710 */
711 msg->any.head.msgid = 0;
10c86c4e
MD
712 msg->any.head.source = 0;
713 msg->any.head.target = msg->router->target;
9b8b748f
MD
714 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
715 }
716
717 /*
8c280d5d 718 * Finish up the msg fields
9b8b748f 719 */
8c280d5d
MD
720 msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
721 ++pmp->msg_seq;
722
723 msg->any.head.hdr_crc = 0;
724 msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
9b8b748f
MD
725
726 TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
1a34728c 727 hammer2_clusterctl_wakeup(pmp);
9b8b748f 728 lockmgr(&pmp->msglk, LK_RELEASE);
8c280d5d
MD
729}
730
731/*
732 * Reply to a message and terminate our side of the transaction.
733 *
734 * If msg->state is non-NULL we are replying to a one-way message.
735 */
736void
10c86c4e 737hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
8c280d5d
MD
738{
739 hammer2_state_t *state = msg->state;
10c86c4e 740 hammer2_msg_t *nmsg;
8c280d5d
MD
741 uint32_t cmd;
742
743 /*
744 * Reply with a simple error code and terminate the transaction.
745 */
746 cmd = HAMMER2_LNK_ERROR;
747
748 /*
749 * Check if our direction has even been initiated yet, set CREATE.
750 *
751 * Check what direction this is (command or reply direction). Note
752 * that txcmd might not have been initiated yet.
753 *
754 * If our direction has already been closed we just return without
755 * doing anything.
756 */
757 if (state) {
70c3c3b7 758 if (state->txcmd & HAMMER2_MSGF_DELETE)
8c280d5d
MD
759 return;
760 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
761 cmd |= HAMMER2_MSGF_CREATE;
62e59746 762 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
763 cmd |= HAMMER2_MSGF_REPLY;
764 cmd |= HAMMER2_MSGF_DELETE;
765 } else {
766 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
767 cmd |= HAMMER2_MSGF_REPLY;
768 }
70c3c3b7 769 kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
8c280d5d 770
1a34728c
MD
771 /* XXX messy mask cmd to avoid allocating state */
772 nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
773 NULL, NULL);
774 nmsg->any.head.cmd = cmd;
10c86c4e
MD
775 nmsg->any.head.error = error;
776 nmsg->state = state;
1a34728c 777 hammer2_msg_write(nmsg);
8c280d5d
MD
778}
779
780/*
781 * Reply to a message and continue our side of the transaction.
782 *
783 * If msg->state is non-NULL we are replying to a one-way message and this
784 * function degenerates into the same as hammer2_msg_reply().
785 */
786void
10c86c4e 787hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
8c280d5d
MD
788{
789 hammer2_state_t *state = msg->state;
10c86c4e 790 hammer2_msg_t *nmsg;
8c280d5d
MD
791 uint32_t cmd;
792
793 /*
794 * Return a simple result code, do NOT terminate the transaction.
795 */
796 cmd = HAMMER2_LNK_ERROR;
797
798 /*
799 * Check if our direction has even been initiated yet, set CREATE.
800 *
801 * Check what direction this is (command or reply direction). Note
802 * that txcmd might not have been initiated yet.
803 *
804 * If our direction has already been closed we just return without
805 * doing anything.
806 */
807 if (state) {
70c3c3b7 808 if (state->txcmd & HAMMER2_MSGF_DELETE)
8c280d5d
MD
809 return;
810 if ((state->txcmd & HAMMER2_MSGF_CREATE) == 0)
811 cmd |= HAMMER2_MSGF_CREATE;
62e59746 812 if (state->txcmd & HAMMER2_MSGF_REPLY)
8c280d5d
MD
813 cmd |= HAMMER2_MSGF_REPLY;
814 /* continuing transaction, do not set MSGF_DELETE */
815 } else {
816 if ((msg->any.head.cmd & HAMMER2_MSGF_REPLY) == 0)
817 cmd |= HAMMER2_MSGF_REPLY;
818 }
9b8b748f 819
1a34728c
MD
820 /* XXX messy mask cmd to avoid allocating state */
821 nmsg = hammer2_msg_alloc(msg->router, cmd & HAMMER2_MSGF_BASECMDMASK,
822 NULL, NULL);
823 nmsg->any.head.cmd = cmd;
10c86c4e
MD
824 nmsg->any.head.error = error;
825 nmsg->state = state;
1a34728c 826 hammer2_msg_write(nmsg);
26bf1a36 827}