2 * Copyright (c) 2012 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
37 RB_GENERATE(hammer2_state_tree, hammer2_state, rbnode, hammer2_state_cmp);
40 * Process state tracking for a message after reception, prior to
43 * Called with msglk held and the msg dequeued.
45 * All messages are called with dummy state and return actual state.
46 * (One-off messages often just return the same dummy state).
48 * May request that caller discard the message by setting *discardp to 1.
49 * The returned state is not used in this case and is allowed to be NULL.
53 * These routines handle persistent and command/reply message state via the
54 * CREATE and DELETE flags. The first message in a command or reply sequence
55 * sets CREATE, the last message in a command or reply sequence sets DELETE.
57 * There can be any number of intermediate messages belonging to the same
58 * sequence sent inbetween the CREATE message and the DELETE message,
59 * which set neither flag. This represents a streaming command or reply.
61 * Any command message received with CREATE set expects a reply sequence to
62 * be returned. Reply sequences work the same as command sequences except the
63 * REPLY bit is also sent. Both the command side and reply side can
64 * degenerate into a single message with both CREATE and DELETE set. Note
65 * that one side can be streaming and the other side not, or neither, or both.
67 * The msgid is unique for the initiator. That is, two sides sending a new
68 * message can use the same msgid without colliding.
72 * ABORT sequences work by setting the ABORT flag along with normal message
73 * state. However, ABORTs can also be sent on half-closed messages, that is
74 * even if the command or reply side has already sent a DELETE, as long as
75 * the message has not been fully closed it can still send an ABORT+DELETE
76 * to terminate the half-closed message state.
78 * Since ABORT+DELETEs can race we silently discard ABORT's for message
79 * state which has already been fully closed. REPLY+ABORT+DELETEs can
80 * also race, and in this situation the other side might have already
81 * initiated a new unrelated command with the same message id. Since
82 * the abort has not set the CREATE flag the situation can be detected
83 * and the message will also be discarded.
85 * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
86 * The ABORT request is essentially integrated into the command instead
87 * of being sent later on. In this situation the command implementation
88 * detects that CREATE and ABORT are both set (vs ABORT alone) and can
89 * special-case non-blocking operation for the command.
91 * NOTE! Messages with ABORT set without CREATE or DELETE are considered
92 * to be mid-stream aborts for command/reply sequences. ABORTs on
93 * one-way messages are not supported.
95 * NOTE! If a command sequence does not support aborts the ABORT flag is
100 * One-off messages (no reply expected) are sent with neither CREATE or DELETE
101 * set. One-off messages cannot be aborted and typically aren't processed
102 * by these routines. The REPLY bit can be used to distinguish whether a
103 * one-off message is a command or reply. For example, one-off replies
104 * will typically just contain status updates.
107 hammer2_state_msgrx(hammer2_msg_t *msg)
109 hammer2_pfsmount_t *pmp;
110 hammer2_state_t *state;
113 pmp = msg->router->pmp;
116 * XXX resolve msg->any.head.source and msg->any.head.target
117 * into LNK_SPAN references.
119 * XXX replace msg->router
123 * Make sure a state structure is ready to go in case we need a new
124 * one. This is the only routine which uses freerd_state so no
125 * races are possible.
127 if ((state = pmp->freerd_state) == NULL) {
128 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
130 state->flags = HAMMER2_STATE_DYNAMIC;
131 pmp->freerd_state = state;
135 * Lock RB tree and locate existing persistent state, if any.
137 * If received msg is a command state is on staterd_tree.
138 * If received msg is a reply state is on statewr_tree.
140 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
142 state->msgid = msg->any.head.msgid;
143 state->router = &pmp->router;
144 kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
146 (intmax_t)msg->any.head.msgid,
147 (intmax_t)msg->any.head.source,
148 (intmax_t)msg->any.head.target);
149 if (msg->any.head.cmd & DMSGF_REPLY)
150 state = RB_FIND(hammer2_state_tree, &pmp->statewr_tree, state);
152 state = RB_FIND(hammer2_state_tree, &pmp->staterd_tree, state);
156 * Short-cut one-off or mid-stream messages (state may be NULL).
158 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
159 DMSGF_ABORT)) == 0) {
160 lockmgr(&pmp->msglk, LK_RELEASE);
165 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
166 * inside the case statements.
168 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)) {
170 case DMSGF_CREATE | DMSGF_DELETE:
172 * New persistant command received.
175 kprintf("hammer2_state_msgrx: duplicate transaction\n");
179 state = pmp->freerd_state;
180 pmp->freerd_state = NULL;
182 state->router = msg->router;
184 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
185 state->txcmd = DMSGF_REPLY;
186 RB_INSERT(hammer2_state_tree, &pmp->staterd_tree, state);
187 state->flags |= HAMMER2_STATE_INSERTED;
192 * Persistent state is expected but might not exist if an
193 * ABORT+DELETE races the close.
196 if (msg->any.head.cmd & DMSGF_ABORT) {
199 kprintf("hammer2_state_msgrx: no state "
207 * Handle another ABORT+DELETE case if the msgid has already
210 if ((state->rxcmd & DMSGF_CREATE) == 0) {
211 if (msg->any.head.cmd & DMSGF_ABORT) {
214 kprintf("hammer2_state_msgrx: state reused "
224 * Check for mid-stream ABORT command received, otherwise
227 if (msg->any.head.cmd & DMSGF_ABORT) {
229 (state->rxcmd & DMSGF_CREATE) == 0) {
236 case DMSGF_REPLY | DMSGF_CREATE:
237 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
239 * When receiving a reply with CREATE set the original
240 * persistent state message should already exist.
243 kprintf("hammer2_state_msgrx: no state match for "
244 "REPLY cmd=%08x msgid=%016jx\n",
246 (intmax_t)msg->any.head.msgid);
250 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
253 case DMSGF_REPLY | DMSGF_DELETE:
255 * Received REPLY+ABORT+DELETE in case where msgid has
256 * already been fully closed, ignore the message.
259 if (msg->any.head.cmd & DMSGF_ABORT) {
262 kprintf("hammer2_state_msgrx: no state match "
263 "for REPLY|DELETE\n");
270 * Received REPLY+ABORT+DELETE in case where msgid has
271 * already been reused for an unrelated message,
272 * ignore the message.
274 if ((state->rxcmd & DMSGF_CREATE) == 0) {
275 if (msg->any.head.cmd & DMSGF_ABORT) {
278 kprintf("hammer2_state_msgrx: state reused "
279 "for REPLY|DELETE\n");
288 * Check for mid-stream ABORT reply received to sent command.
290 if (msg->any.head.cmd & DMSGF_ABORT) {
292 (state->rxcmd & DMSGF_CREATE) == 0) {
300 lockmgr(&pmp->msglk, LK_RELEASE);
305 hammer2_state_cleanuprx(hammer2_msg_t *msg)
307 hammer2_pfsmount_t *pmp = msg->router->pmp;
308 hammer2_state_t *state;
310 if ((state = msg->state) == NULL) {
311 hammer2_msg_free(msg);
312 } else if (msg->any.head.cmd & DMSGF_DELETE) {
313 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
314 state->rxcmd |= DMSGF_DELETE;
315 if (state->txcmd & DMSGF_DELETE) {
316 if (state->msg == msg)
318 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
319 if (state->rxcmd & DMSGF_REPLY) {
320 KKASSERT(msg->any.head.cmd &
322 RB_REMOVE(hammer2_state_tree,
323 &pmp->statewr_tree, state);
325 KKASSERT((msg->any.head.cmd &
327 RB_REMOVE(hammer2_state_tree,
328 &pmp->staterd_tree, state);
330 state->flags &= ~HAMMER2_STATE_INSERTED;
331 lockmgr(&pmp->msglk, LK_RELEASE);
332 hammer2_state_free(state);
334 lockmgr(&pmp->msglk, LK_RELEASE);
336 hammer2_msg_free(msg);
337 } else if (state->msg != msg) {
338 hammer2_msg_free(msg);
343 * Process state tracking for a message prior to transmission.
345 * Called with msglk held and the msg dequeued.
347 * One-off messages are usually with dummy state and msg->state may be NULL
350 * New transactions (when CREATE is set) will insert the state.
352 * May request that caller discard the message by setting *discardp to 1.
353 * A NULL state may be returned in this case.
356 hammer2_state_msgtx(hammer2_msg_t *msg)
358 hammer2_pfsmount_t *pmp = msg->router->pmp;
359 hammer2_state_t *state;
363 * Make sure a state structure is ready to go in case we need a new
364 * one. This is the only routine which uses freewr_state so no
365 * races are possible.
367 if ((state = pmp->freewr_state) == NULL) {
368 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
370 state->flags = HAMMER2_STATE_DYNAMIC;
371 pmp->freewr_state = state;
375 * Lock RB tree. If persistent state is present it will have already
376 * been assigned to msg.
378 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
382 * Short-cut one-off or mid-stream messages (state may be NULL).
384 if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
385 DMSGF_ABORT)) == 0) {
386 lockmgr(&pmp->msglk, LK_RELEASE);
392 * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
393 * inside the case statements.
395 switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
398 case DMSGF_CREATE | DMSGF_DELETE:
400 * Insert the new persistent message state and mark
401 * half-closed if DELETE is set. Since this is a new
402 * message it isn't possible to transition into the fully
405 * XXX state must be assigned and inserted by
406 * hammer2_msg_write(). txcmd is assigned by us
409 KKASSERT(state != NULL);
410 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
411 state->rxcmd = DMSGF_REPLY;
416 * Sent ABORT+DELETE in case where msgid has already
417 * been fully closed, ignore the message.
420 if (msg->any.head.cmd & DMSGF_ABORT) {
423 kprintf("hammer2_state_msgtx: no state match "
424 "for DELETE cmd=%08x msgid=%016jx\n",
426 (intmax_t)msg->any.head.msgid);
433 * Sent ABORT+DELETE in case where msgid has
434 * already been reused for an unrelated message,
435 * ignore the message.
437 if ((state->txcmd & DMSGF_CREATE) == 0) {
438 if (msg->any.head.cmd & DMSGF_ABORT) {
441 kprintf("hammer2_state_msgtx: state reused "
451 * Check for mid-stream ABORT command sent
453 if (msg->any.head.cmd & DMSGF_ABORT) {
455 (state->txcmd & DMSGF_CREATE) == 0) {
462 case DMSGF_REPLY | DMSGF_CREATE:
463 case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
465 * When transmitting a reply with CREATE set the original
466 * persistent state message should already exist.
469 kprintf("hammer2_state_msgtx: no state match "
470 "for REPLY | CREATE\n");
474 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
477 case DMSGF_REPLY | DMSGF_DELETE:
479 * When transmitting a reply with DELETE set the original
480 * persistent state message should already exist.
482 * This is very similar to the REPLY|CREATE|* case except
483 * txcmd is already stored, so we just add the DELETE flag.
485 * Sent REPLY+ABORT+DELETE in case where msgid has
486 * already been fully closed, ignore the message.
489 if (msg->any.head.cmd & DMSGF_ABORT) {
492 kprintf("hammer2_state_msgtx: no state match "
493 "for REPLY | DELETE\n");
500 * Sent REPLY+ABORT+DELETE in case where msgid has already
501 * been reused for an unrelated message, ignore the message.
503 if ((state->txcmd & DMSGF_CREATE) == 0) {
504 if (msg->any.head.cmd & DMSGF_ABORT) {
507 kprintf("hammer2_state_msgtx: state reused "
508 "for REPLY | DELETE\n");
517 * Check for mid-stream ABORT reply sent.
519 * One-off REPLY messages are allowed for e.g. status updates.
521 if (msg->any.head.cmd & DMSGF_ABORT) {
523 (state->txcmd & DMSGF_CREATE) == 0) {
531 lockmgr(&pmp->msglk, LK_RELEASE);
536 hammer2_state_cleanuptx(hammer2_msg_t *msg)
538 hammer2_pfsmount_t *pmp = msg->router->pmp;
539 hammer2_state_t *state;
541 if ((state = msg->state) == NULL) {
542 hammer2_msg_free(msg);
543 } else if (msg->any.head.cmd & DMSGF_DELETE) {
544 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
545 state->txcmd |= DMSGF_DELETE;
546 if (state->rxcmd & DMSGF_DELETE) {
547 if (state->msg == msg)
549 KKASSERT(state->flags & HAMMER2_STATE_INSERTED);
550 if (state->txcmd & DMSGF_REPLY) {
551 KKASSERT(msg->any.head.cmd &
553 RB_REMOVE(hammer2_state_tree,
554 &pmp->staterd_tree, state);
556 KKASSERT((msg->any.head.cmd &
558 RB_REMOVE(hammer2_state_tree,
559 &pmp->statewr_tree, state);
561 state->flags &= ~HAMMER2_STATE_INSERTED;
562 lockmgr(&pmp->msglk, LK_RELEASE);
563 hammer2_state_free(state);
565 lockmgr(&pmp->msglk, LK_RELEASE);
567 hammer2_msg_free(msg);
568 } else if (state->msg != msg) {
569 hammer2_msg_free(msg);
574 hammer2_state_free(hammer2_state_t *state)
576 hammer2_pfsmount_t *pmp = state->pmp;
579 KKASSERT((state->flags & HAMMER2_STATE_INSERTED) == 0);
582 kfree(state, pmp->mmsg);
584 hammer2_msg_free(msg);
588 hammer2_msg_alloc(hammer2_router_t *router, uint32_t cmd,
589 int (*func)(hammer2_state_t *, hammer2_msg_t *), void *data)
591 hammer2_pfsmount_t *pmp = router->pmp;
593 hammer2_state_t *state;
596 hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
597 msg = kmalloc(offsetof(struct hammer2_msg, any) + hbytes,
598 pmp->mmsg, M_WAITOK | M_ZERO);
599 msg->hdr_size = hbytes;
600 msg->router = router;
601 KKASSERT(router != NULL);
602 msg->any.head.magic = DMSG_HDR_MAGIC;
603 msg->any.head.source = 0;
604 msg->any.head.target = router->target;
605 msg->any.head.cmd = cmd;
607 if (cmd & DMSGF_CREATE) {
609 * New transaction, requires tracking state and a unique
610 * msgid to be allocated.
612 KKASSERT(msg->state == NULL);
613 state = kmalloc(sizeof(*state), pmp->mmsg, M_WAITOK | M_ZERO);
615 state->flags = HAMMER2_STATE_DYNAMIC;
617 state->any.any = data;
619 state->msgid = (uint64_t)(uintptr_t)state;
620 state->router = msg->router;
622 msg->any.head.source = 0;
623 msg->any.head.target = state->router->target;
624 msg->any.head.msgid = state->msgid;
626 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
627 if (RB_INSERT(hammer2_state_tree, &pmp->statewr_tree, state))
628 panic("duplicate msgid allocated");
629 state->flags |= HAMMER2_STATE_INSERTED;
630 msg->any.head.msgid = state->msgid;
631 lockmgr(&pmp->msglk, LK_RELEASE);
638 hammer2_msg_free(hammer2_msg_t *msg)
640 hammer2_pfsmount_t *pmp = msg->router->pmp;
642 if (msg->aux_data && msg->aux_size) {
643 kfree(msg->aux_data, pmp->mmsg);
644 msg->aux_data = NULL;
648 kfree(msg, pmp->mmsg);
652 * Indexed messages are stored in a red-black tree indexed by their
653 * msgid. Only persistent messages are indexed.
656 hammer2_state_cmp(hammer2_state_t *state1, hammer2_state_t *state2)
658 if (state1->router < state2->router)
660 if (state1->router > state2->router)
662 if (state1->msgid < state2->msgid)
664 if (state1->msgid > state2->msgid)
670 * Write a message. All requisit command flags have been set.
672 * If msg->state is non-NULL the message is written to the existing
673 * transaction. msgid will be set accordingly.
675 * If msg->state is NULL and CREATE is set new state is allocated and
676 * (func, data) is installed. A msgid is assigned.
678 * If msg->state is NULL and CREATE is not set the message is assumed
679 * to be a one-way message. The originator must assign the msgid
680 * (or leave it 0, which is typical.
682 * This function merely queues the message to the management thread, it
683 * does not write to the message socket/pipe.
686 hammer2_msg_write(hammer2_msg_t *msg)
688 hammer2_pfsmount_t *pmp = msg->router->pmp;
689 hammer2_state_t *state;
693 * Continuance or termination of existing transaction.
694 * The transaction could have been initiated by either end.
696 * (Function callback and aux data for the receive side can
697 * be replaced or left alone).
700 msg->any.head.msgid = state->msgid;
701 msg->any.head.source = 0;
702 msg->any.head.target = state->router->target;
703 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
706 * One-off message (always uses msgid 0 to distinguish
707 * between a possibly lost in-transaction message due to
708 * competing aborts and a real one-off message?)
710 msg->any.head.msgid = 0;
711 msg->any.head.source = 0;
712 msg->any.head.target = msg->router->target;
713 lockmgr(&pmp->msglk, LK_EXCLUSIVE);
717 * Finish up the msg fields
719 msg->any.head.salt = /* (random << 8) | */ (pmp->msg_seq & 255);
722 msg->any.head.hdr_crc = 0;
723 msg->any.head.hdr_crc = hammer2_icrc32(msg->any.buf, msg->hdr_size);
725 TAILQ_INSERT_TAIL(&pmp->msgq, msg, qentry);
726 hammer2_clusterctl_wakeup(pmp);
727 lockmgr(&pmp->msglk, LK_RELEASE);
731 * Reply to a message and terminate our side of the transaction.
733 * If msg->state is non-NULL we are replying to a one-way message.
736 hammer2_msg_reply(hammer2_msg_t *msg, uint32_t error)
738 hammer2_state_t *state = msg->state;
743 * Reply with a simple error code and terminate the transaction.
745 cmd = DMSG_LNK_ERROR;
748 * Check if our direction has even been initiated yet, set CREATE.
750 * Check what direction this is (command or reply direction). Note
751 * that txcmd might not have been initiated yet.
753 * If our direction has already been closed we just return without
757 if (state->txcmd & DMSGF_DELETE)
759 if ((state->txcmd & DMSGF_CREATE) == 0)
761 if (state->txcmd & DMSGF_REPLY)
765 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
768 kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
770 /* XXX messy mask cmd to avoid allocating state */
771 nmsg = hammer2_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
773 nmsg->any.head.cmd = cmd;
774 nmsg->any.head.error = error;
776 hammer2_msg_write(nmsg);
780 * Reply to a message and continue our side of the transaction.
782 * If msg->state is non-NULL we are replying to a one-way message and this
783 * function degenerates into the same as hammer2_msg_reply().
786 hammer2_msg_result(hammer2_msg_t *msg, uint32_t error)
788 hammer2_state_t *state = msg->state;
793 * Return a simple result code, do NOT terminate the transaction.
795 cmd = DMSG_LNK_ERROR;
798 * Check if our direction has even been initiated yet, set CREATE.
800 * Check what direction this is (command or reply direction). Note
801 * that txcmd might not have been initiated yet.
803 * If our direction has already been closed we just return without
807 if (state->txcmd & DMSGF_DELETE)
809 if ((state->txcmd & DMSGF_CREATE) == 0)
811 if (state->txcmd & DMSGF_REPLY)
813 /* continuing transaction, do not set MSGF_DELETE */
815 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
819 /* XXX messy mask cmd to avoid allocating state */
820 nmsg = hammer2_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
822 nmsg->any.head.cmd = cmd;
823 nmsg->any.head.error = error;
825 hammer2_msg_write(nmsg);