a6572f8a2329dcfdc2df9d30ac07e814c8b0f792
[dragonfly.git] / sys / kern / kern_dmsg.c
1 /*-
2  * Copyright (c) 2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 #include <sys/param.h>
35 #include <sys/types.h>
36 #include <sys/kernel.h>
37 #include <sys/conf.h>
38 #include <sys/systm.h>
39 #include <sys/queue.h>
40 #include <sys/tree.h>
41 #include <sys/malloc.h>
42 #include <sys/mount.h>
43 #include <sys/socket.h>
44 #include <sys/vnode.h>
45 #include <sys/file.h>
46 #include <sys/proc.h>
47 #include <sys/priv.h>
48 #include <sys/thread.h>
49 #include <sys/globaldata.h>
50 #include <sys/limits.h>
51
52 #include <sys/dmsg.h>
53
54 RB_GENERATE(kdmsg_state_tree, kdmsg_state, rbnode, kdmsg_state_cmp);
55
56 static void kdmsg_iocom_thread_rd(void *arg);
57 static void kdmsg_iocom_thread_wr(void *arg);
58
59 /*
60  * Initialize the roll-up communications structure for a network
61  * messaging session.  This function does not install the socket.
62  */
63 void
64 kdmsg_iocom_init(kdmsg_iocom_t *iocom, void *handle,
65                  struct malloc_type *mmsg,
66                  int (*lnk_rcvmsg)(kdmsg_msg_t *msg),
67                  int (*dbg_rcvmsg)(kdmsg_msg_t *msg),
68                  int (*misc_rcvmsg)(kdmsg_msg_t *msg))
69 {
70         bzero(iocom, sizeof(*iocom));
71         iocom->handle = handle;
72         iocom->mmsg = mmsg;
73         iocom->lnk_rcvmsg = lnk_rcvmsg;
74         iocom->dbg_rcvmsg = dbg_rcvmsg;
75         iocom->misc_rcvmsg = misc_rcvmsg;
76         iocom->router.iocom = iocom;
77         lockinit(&iocom->msglk, "h2msg", 0, 0);
78         TAILQ_INIT(&iocom->msgq);
79         RB_INIT(&iocom->staterd_tree);
80         RB_INIT(&iocom->statewr_tree);
81 }
82
83 /*
84  * [Re]connect using the passed file pointer.  The caller must ref the
85  * fp for us.  We own that ref now.
86  */
87 void
88 kdmsg_iocom_reconnect(kdmsg_iocom_t *iocom, struct file *fp,
89                       const char *subsysname)
90 {
91         /*
92          * Destroy the current connection
93          */
94         atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
95         while (iocom->msgrd_td || iocom->msgwr_td) {
96                 wakeup(&iocom->msg_ctl);
97                 tsleep(iocom, 0, "clstrkl", hz);
98         }
99
100         /*
101          * Drop communications descriptor
102          */
103         if (iocom->msg_fp) {
104                 fdrop(iocom->msg_fp);
105                 iocom->msg_fp = NULL;
106         }
107         kprintf("RESTART CONNECTION\n");
108
109         /*
110          * Setup new communications descriptor
111          */
112         iocom->msg_ctl = 0;
113         iocom->msg_fp = fp;
114         iocom->msg_seq = 0;
115
116         lwkt_create(kdmsg_iocom_thread_rd, iocom, &iocom->msgrd_td,
117                     NULL, 0, -1, "%s-msgrd", subsysname);
118         lwkt_create(kdmsg_iocom_thread_wr, iocom, &iocom->msgwr_td,
119                     NULL, 0, -1, "%s-msgwr", subsysname);
120 }
121
122 /*
123  * Disconnect and clean up
124  */
125 void
126 kdmsg_iocom_uninit(kdmsg_iocom_t *iocom)
127 {
128         /*
129          * Ask the cluster controller to go away
130          */
131         atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
132
133         while (iocom->msgrd_td || iocom->msgwr_td) {
134                 wakeup(&iocom->msg_ctl);
135                 tsleep(iocom, 0, "clstrkl", hz);
136         }
137
138         /*
139          * Drop communications descriptor
140          */
141         if (iocom->msg_fp) {
142                 fdrop(iocom->msg_fp);
143                 iocom->msg_fp = NULL;
144         }
145 }
146
147 /*
148  * Cluster controller thread.  Perform messaging functions.  We have one
149  * thread for the reader and one for the writer.  The writer handles
150  * shutdown requests (which should break the reader thread).
151  */
152 static
153 void
154 kdmsg_iocom_thread_rd(void *arg)
155 {
156         kdmsg_iocom_t *iocom = arg;
157         dmsg_hdr_t hdr;
158         kdmsg_msg_t *msg;
159         kdmsg_state_t *state;
160         size_t hbytes;
161         int error = 0;
162
163         while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0) {
164                 /*
165                  * Retrieve the message from the pipe or socket.
166                  */
167                 error = fp_read(iocom->msg_fp, &hdr, sizeof(hdr),
168                                 NULL, 1, UIO_SYSSPACE);
169                 if (error)
170                         break;
171                 if (hdr.magic != DMSG_HDR_MAGIC) {
172                         kprintf("kdmsg: bad magic: %04x\n", hdr.magic);
173                         error = EINVAL;
174                         break;
175                 }
176                 hbytes = (hdr.cmd & DMSGF_SIZE) * DMSG_ALIGN;
177                 if (hbytes < sizeof(hdr) || hbytes > DMSG_AUX_MAX) {
178                         kprintf("kdmsg: bad header size %zd\n", hbytes);
179                         error = EINVAL;
180                         break;
181                 }
182                 /* XXX messy: mask cmd to avoid allocating state */
183                 msg = kdmsg_msg_alloc(&iocom->router,
184                                         hdr.cmd & DMSGF_BASECMDMASK,
185                                         NULL, NULL);
186                 msg->any.head = hdr;
187                 msg->hdr_size = hbytes;
188                 if (hbytes > sizeof(hdr)) {
189                         error = fp_read(iocom->msg_fp, &msg->any.head + 1,
190                                         hbytes - sizeof(hdr),
191                                         NULL, 1, UIO_SYSSPACE);
192                         if (error) {
193                                 kprintf("kdmsg: short msg received\n");
194                                 error = EINVAL;
195                                 break;
196                         }
197                 }
198                 msg->aux_size = hdr.aux_bytes * DMSG_ALIGN;
199                 if (msg->aux_size > DMSG_AUX_MAX) {
200                         kprintf("kdmsg: illegal msg payload size %zd\n",
201                                 msg->aux_size);
202                         error = EINVAL;
203                         break;
204                 }
205                 if (msg->aux_size) {
206                         msg->aux_data = kmalloc(msg->aux_size, iocom->mmsg,
207                                                 M_WAITOK | M_ZERO);
208                         error = fp_read(iocom->msg_fp, msg->aux_data,
209                                         msg->aux_size,
210                                         NULL, 1, UIO_SYSSPACE);
211                         if (error) {
212                                 kprintf("kdmsg: short msg payload received\n");
213                                 break;
214                         }
215                 }
216
217                 /*
218                  * State machine tracking, state assignment for msg,
219                  * returns error and discard status.  Errors are fatal
220                  * to the connection except for EALREADY which forces
221                  * a discard without execution.
222                  */
223                 error = kdmsg_state_msgrx(msg);
224                 if (error) {
225                         /*
226                          * Raw protocol or connection error
227                          */
228                         kdmsg_msg_free(msg);
229                         if (error == EALREADY)
230                                 error = 0;
231                 } else if (msg->state && msg->state->func) {
232                         /*
233                          * Message related to state which already has a
234                          * handling function installed for it.
235                          */
236                         error = msg->state->func(msg->state, msg);
237                         kdmsg_state_cleanuprx(msg);
238                 } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
239                            DMSG_PROTO_LNK) {
240                         /*
241                          * Message related to the LNK protocol set
242                          */
243                         error = iocom->lnk_rcvmsg(msg);
244                         kdmsg_state_cleanuprx(msg);
245                 } else if ((msg->any.head.cmd & DMSGF_PROTOS) ==
246                            DMSG_PROTO_DBG) {
247                         /*
248                          * Message related to the DBG protocol set
249                          */
250                         error = iocom->dbg_rcvmsg(msg);
251                         kdmsg_state_cleanuprx(msg);
252                 } else {
253                         /*
254                          * Other higher-level messages (e.g. vnops)
255                          */
256                         error = iocom->misc_rcvmsg(msg);
257                         kdmsg_state_cleanuprx(msg);
258                 }
259                 msg = NULL;
260         }
261
262         if (error)
263                 kprintf("kdmsg: read failed error %d\n", error);
264
265         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
266         if (msg) {
267                 if (msg->state && msg->state->msg == msg)
268                         msg->state->msg = NULL;
269                 kdmsg_msg_free(msg);
270         }
271
272         if ((state = iocom->freerd_state) != NULL) {
273                 iocom->freerd_state = NULL;
274                 kdmsg_state_free(state);
275         }
276
277         /*
278          * Shutdown the socket before waiting for the transmit side.
279          *
280          * If we are dying due to e.g. a socket disconnect verses being
281          * killed explicity we have to set KILL in order to kick the tx
282          * side when it might not have any other work to do.  KILL might
283          * already be set if we are in an unmount or reconnect.
284          */
285         fp_shutdown(iocom->msg_fp, SHUT_RDWR);
286
287         atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILL);
288         wakeup(&iocom->msg_ctl);
289
290         /*
291          * Wait for the transmit side to drain remaining messages
292          * before cleaning up the rx state.  The transmit side will
293          * set KILLTX and wait for the rx side to completely finish
294          * (set msgrd_td to NULL) before cleaning up any remaining
295          * tx states.
296          */
297         lockmgr(&iocom->msglk, LK_RELEASE);
298         atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLRX);
299         wakeup(&iocom->msg_ctl);
300         while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILLTX) == 0) {
301                 wakeup(&iocom->msg_ctl);
302                 tsleep(iocom, 0, "clstrkw", hz);
303         }
304
305         iocom->msgrd_td = NULL;
306
307         /*
308          * iocom can be ripped out from under us at this point but
309          * wakeup() is safe.
310          */
311         wakeup(iocom);
312         lwkt_exit();
313 }
314
315 static
316 void
317 kdmsg_iocom_thread_wr(void *arg)
318 {
319         kdmsg_iocom_t *iocom = arg;
320         kdmsg_msg_t *msg;
321         kdmsg_state_t *state;
322         ssize_t res;
323         int error = 0;
324         int retries = 20;
325
326         /*
327          * Transmit loop
328          */
329         msg = NULL;
330         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
331
332         while ((iocom->msg_ctl & KDMSG_CLUSTERCTL_KILL) == 0 && error == 0) {
333                 /*
334                  * Sleep if no messages pending.  Interlock with flag while
335                  * holding msglk.
336                  */
337                 if (TAILQ_EMPTY(&iocom->msgq)) {
338                         atomic_set_int(&iocom->msg_ctl,
339                                        KDMSG_CLUSTERCTL_SLEEPING);
340                         lksleep(&iocom->msg_ctl, &iocom->msglk, 0, "msgwr", hz);
341                         atomic_clear_int(&iocom->msg_ctl,
342                                          KDMSG_CLUSTERCTL_SLEEPING);
343                 }
344
345                 while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
346                         /*
347                          * Remove msg from the transmit queue and do
348                          * persist and half-closed state handling.
349                          */
350                         TAILQ_REMOVE(&iocom->msgq, msg, qentry);
351                         lockmgr(&iocom->msglk, LK_RELEASE);
352
353                         error = kdmsg_state_msgtx(msg);
354                         if (error == EALREADY) {
355                                 error = 0;
356                                 kdmsg_msg_free(msg);
357                                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
358                                 continue;
359                         }
360                         if (error) {
361                                 kdmsg_msg_free(msg);
362                                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
363                                 break;
364                         }
365
366                         /*
367                          * Dump the message to the pipe or socket.
368                          */
369                         error = fp_write(iocom->msg_fp, &msg->any,
370                                          msg->hdr_size, &res, UIO_SYSSPACE);
371                         if (error || res != msg->hdr_size) {
372                                 if (error == 0)
373                                         error = EINVAL;
374                                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
375                                 break;
376                         }
377                         if (msg->aux_size) {
378                                 error = fp_write(iocom->msg_fp,
379                                                  msg->aux_data, msg->aux_size,
380                                                  &res, UIO_SYSSPACE);
381                                 if (error || res != msg->aux_size) {
382                                         if (error == 0)
383                                                 error = EINVAL;
384                                         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
385                                         break;
386                                 }
387                         }
388                         kdmsg_state_cleanuptx(msg);
389                         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
390                 }
391         }
392
393         /*
394          * Cleanup messages pending transmission and release msgq lock.
395          */
396         if (error)
397                 kprintf("kdmsg: write failed error %d\n", error);
398
399         if (msg) {
400                 if (msg->state && msg->state->msg == msg)
401                         msg->state->msg = NULL;
402                 kdmsg_msg_free(msg);
403         }
404
405         /*
406          * Shutdown the socket.  This will cause the rx thread to get an
407          * EOF and ensure that both threads get to a termination state.
408          */
409         fp_shutdown(iocom->msg_fp, SHUT_RDWR);
410
411         /*
412          * Set KILLTX (which the rx side waits for), then wait for the RX
413          * side to completely finish before we clean out any remaining
414          * command states.
415          */
416         lockmgr(&iocom->msglk, LK_RELEASE);
417         atomic_set_int(&iocom->msg_ctl, KDMSG_CLUSTERCTL_KILLTX);
418         wakeup(&iocom->msg_ctl);
419         while (iocom->msgrd_td) {
420                 wakeup(&iocom->msg_ctl);
421                 tsleep(iocom, 0, "clstrkw", hz);
422         }
423         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
424
425         /*
426          * Simulate received MSGF_DELETE's for any remaining states.
427          */
428 cleanuprd:
429         RB_FOREACH(state, kdmsg_state_tree, &iocom->staterd_tree) {
430                 if (state->func &&
431                     (state->rxcmd & DMSGF_DELETE) == 0) {
432                         lockmgr(&iocom->msglk, LK_RELEASE);
433                         msg = kdmsg_msg_alloc(&iocom->router, DMSG_LNK_ERROR,
434                                               NULL, NULL);
435                         if ((state->rxcmd & DMSGF_CREATE) == 0)
436                                 msg->any.head.cmd |= DMSGF_CREATE;
437                         msg->any.head.cmd |= DMSGF_DELETE;
438                         msg->state = state;
439                         state->rxcmd = msg->any.head.cmd &
440                                        ~DMSGF_DELETE;
441                         msg->state->func(state, msg);
442                         kdmsg_state_cleanuprx(msg);
443                         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
444                         goto cleanuprd;
445                 }
446                 if (state->func == NULL) {
447                         state->flags &= ~KDMSG_STATE_INSERTED;
448                         RB_REMOVE(kdmsg_state_tree,
449                                   &iocom->staterd_tree, state);
450                         kdmsg_state_free(state);
451                         goto cleanuprd;
452                 }
453         }
454
455         /*
456          * NOTE: We have to drain the msgq to handle situations
457          *       where received states have built up output
458          *       messages, to avoid creating messages with
459          *       duplicate CREATE/DELETE flags.
460          */
461 cleanupwr:
462         kdmsg_drain_msgq(iocom);
463         RB_FOREACH(state, kdmsg_state_tree, &iocom->statewr_tree) {
464                 if (state->func &&
465                     (state->rxcmd & DMSGF_DELETE) == 0) {
466                         lockmgr(&iocom->msglk, LK_RELEASE);
467                         msg = kdmsg_msg_alloc(&iocom->router, DMSG_LNK_ERROR,
468                                               NULL, NULL);
469                         if ((state->rxcmd & DMSGF_CREATE) == 0)
470                                 msg->any.head.cmd |= DMSGF_CREATE;
471                         msg->any.head.cmd |= DMSGF_DELETE |
472                                              DMSGF_REPLY;
473                         msg->state = state;
474                         state->rxcmd = msg->any.head.cmd &
475                                        ~DMSGF_DELETE;
476                         msg->state->func(state, msg);
477                         kdmsg_state_cleanuprx(msg);
478                         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
479                         goto cleanupwr;
480                 }
481                 if (state->func == NULL) {
482                         state->flags &= ~KDMSG_STATE_INSERTED;
483                         RB_REMOVE(kdmsg_state_tree,
484                                   &iocom->statewr_tree, state);
485                         kdmsg_state_free(state);
486                         goto cleanupwr;
487                 }
488         }
489
490         kdmsg_drain_msgq(iocom);
491         if (--retries == 0)
492                 panic("kdmsg: comm thread shutdown couldn't drain");
493         if (RB_ROOT(&iocom->statewr_tree))
494                 goto cleanupwr;
495
496         if ((state = iocom->freewr_state) != NULL) {
497                 iocom->freewr_state = NULL;
498                 kdmsg_state_free(state);
499         }
500
501         lockmgr(&iocom->msglk, LK_RELEASE);
502
503         /*
504          * The state trees had better be empty now
505          */
506         KKASSERT(RB_EMPTY(&iocom->staterd_tree));
507         KKASSERT(RB_EMPTY(&iocom->statewr_tree));
508         KKASSERT(iocom->conn_state == NULL);
509
510         /*
511          * iocom can be ripped out from under us once msgwr_td is set to NULL.
512          * The wakeup is safe.
513          */
514         iocom->msgwr_td = NULL;
515         wakeup(iocom);
516         lwkt_exit();
517 }
518
519 /*
520  * This cleans out the pending transmit message queue, adjusting any
521  * persistent states properly in the process.
522  *
523  * Caller must hold pmp->iocom.msglk
524  */
525 void
526 kdmsg_drain_msgq(kdmsg_iocom_t *iocom)
527 {
528         kdmsg_msg_t *msg;
529
530         /*
531          * Clean out our pending transmit queue, executing the
532          * appropriate state adjustments.  If this tries to open
533          * any new outgoing transactions we have to loop up and
534          * clean them out.
535          */
536         while ((msg = TAILQ_FIRST(&iocom->msgq)) != NULL) {
537                 TAILQ_REMOVE(&iocom->msgq, msg, qentry);
538                 lockmgr(&iocom->msglk, LK_RELEASE);
539                 if (msg->state && msg->state->msg == msg)
540                         msg->state->msg = NULL;
541                 if (kdmsg_state_msgtx(msg))
542                         kdmsg_msg_free(msg);
543                 else
544                         kdmsg_state_cleanuptx(msg);
545                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
546         }
547 }
548
549 /*
550  * Process state tracking for a message after reception, prior to
551  * execution.
552  *
553  * Called with msglk held and the msg dequeued.
554  *
555  * All messages are called with dummy state and return actual state.
556  * (One-off messages often just return the same dummy state).
557  *
558  * May request that caller discard the message by setting *discardp to 1.
559  * The returned state is not used in this case and is allowed to be NULL.
560  *
561  * --
562  *
563  * These routines handle persistent and command/reply message state via the
564  * CREATE and DELETE flags.  The first message in a command or reply sequence
565  * sets CREATE, the last message in a command or reply sequence sets DELETE.
566  *
567  * There can be any number of intermediate messages belonging to the same
568  * sequence sent inbetween the CREATE message and the DELETE message,
569  * which set neither flag.  This represents a streaming command or reply.
570  *
571  * Any command message received with CREATE set expects a reply sequence to
572  * be returned.  Reply sequences work the same as command sequences except the
573  * REPLY bit is also sent.  Both the command side and reply side can
574  * degenerate into a single message with both CREATE and DELETE set.  Note
575  * that one side can be streaming and the other side not, or neither, or both.
576  *
577  * The msgid is unique for the initiator.  That is, two sides sending a new
578  * message can use the same msgid without colliding.
579  *
580  * --
581  *
582  * ABORT sequences work by setting the ABORT flag along with normal message
583  * state.  However, ABORTs can also be sent on half-closed messages, that is
584  * even if the command or reply side has already sent a DELETE, as long as
585  * the message has not been fully closed it can still send an ABORT+DELETE
586  * to terminate the half-closed message state.
587  *
588  * Since ABORT+DELETEs can race we silently discard ABORT's for message
589  * state which has already been fully closed.  REPLY+ABORT+DELETEs can
590  * also race, and in this situation the other side might have already
591  * initiated a new unrelated command with the same message id.  Since
592  * the abort has not set the CREATE flag the situation can be detected
593  * and the message will also be discarded.
594  *
595  * Non-blocking requests can be initiated with ABORT+CREATE[+DELETE].
596  * The ABORT request is essentially integrated into the command instead
597  * of being sent later on.  In this situation the command implementation
598  * detects that CREATE and ABORT are both set (vs ABORT alone) and can
599  * special-case non-blocking operation for the command.
600  *
601  * NOTE!  Messages with ABORT set without CREATE or DELETE are considered
602  *        to be mid-stream aborts for command/reply sequences.  ABORTs on
603  *        one-way messages are not supported.
604  *
605  * NOTE!  If a command sequence does not support aborts the ABORT flag is
606  *        simply ignored.
607  *
608  * --
609  *
610  * One-off messages (no reply expected) are sent with neither CREATE or DELETE
611  * set.  One-off messages cannot be aborted and typically aren't processed
612  * by these routines.  The REPLY bit can be used to distinguish whether a
613  * one-off message is a command or reply.  For example, one-off replies
614  * will typically just contain status updates.
615  */
616 int
617 kdmsg_state_msgrx(kdmsg_msg_t *msg)
618 {
619         kdmsg_iocom_t *iocom;
620         kdmsg_state_t *state;
621         int error;
622
623         iocom = msg->router->iocom;
624
625         /*
626          * XXX resolve msg->any.head.source and msg->any.head.target
627          *     into LNK_SPAN references.
628          *
629          * XXX replace msg->router
630          */
631
632         /*
633          * Make sure a state structure is ready to go in case we need a new
634          * one.  This is the only routine which uses freerd_state so no
635          * races are possible.
636          */
637         if ((state = iocom->freerd_state) == NULL) {
638                 state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
639                 state->flags = KDMSG_STATE_DYNAMIC;
640                 iocom->freerd_state = state;
641         }
642
643         /*
644          * Lock RB tree and locate existing persistent state, if any.
645          *
646          * If received msg is a command state is on staterd_tree.
647          * If received msg is a reply state is on statewr_tree.
648          */
649         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
650
651         state->msgid = msg->any.head.msgid;
652         state->router = &iocom->router;
653         kprintf("received msg %08x msgid %jx source=%jx target=%jx\n",
654                 msg->any.head.cmd,
655                 (intmax_t)msg->any.head.msgid,
656                 (intmax_t)msg->any.head.source,
657                 (intmax_t)msg->any.head.target);
658         if (msg->any.head.cmd & DMSGF_REPLY)
659                 state = RB_FIND(kdmsg_state_tree, &iocom->statewr_tree, state);
660         else
661                 state = RB_FIND(kdmsg_state_tree, &iocom->staterd_tree, state);
662         msg->state = state;
663
664         /*
665          * Short-cut one-off or mid-stream messages (state may be NULL).
666          */
667         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
668                                   DMSGF_ABORT)) == 0) {
669                 lockmgr(&iocom->msglk, LK_RELEASE);
670                 return(0);
671         }
672
673         /*
674          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
675          * inside the case statements.
676          */
677         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE | DMSGF_REPLY)) {
678         case DMSGF_CREATE:
679         case DMSGF_CREATE | DMSGF_DELETE:
680                 /*
681                  * New persistant command received.
682                  */
683                 if (state) {
684                         kprintf("kdmsg_state_msgrx: duplicate transaction\n");
685                         error = EINVAL;
686                         break;
687                 }
688                 state = iocom->freerd_state;
689                 iocom->freerd_state = NULL;
690                 msg->state = state;
691                 state->router = msg->router;
692                 state->msg = msg;
693                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
694                 state->txcmd = DMSGF_REPLY;
695                 RB_INSERT(kdmsg_state_tree, &iocom->staterd_tree, state);
696                 state->flags |= KDMSG_STATE_INSERTED;
697                 error = 0;
698                 break;
699         case DMSGF_DELETE:
700                 /*
701                  * Persistent state is expected but might not exist if an
702                  * ABORT+DELETE races the close.
703                  */
704                 if (state == NULL) {
705                         if (msg->any.head.cmd & DMSGF_ABORT) {
706                                 error = EALREADY;
707                         } else {
708                                 kprintf("kdmsg_state_msgrx: no state "
709                                         "for DELETE\n");
710                                 error = EINVAL;
711                         }
712                         break;
713                 }
714
715                 /*
716                  * Handle another ABORT+DELETE case if the msgid has already
717                  * been reused.
718                  */
719                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
720                         if (msg->any.head.cmd & DMSGF_ABORT) {
721                                 error = EALREADY;
722                         } else {
723                                 kprintf("kdmsg_state_msgrx: state reused "
724                                         "for DELETE\n");
725                                 error = EINVAL;
726                         }
727                         break;
728                 }
729                 error = 0;
730                 break;
731         default:
732                 /*
733                  * Check for mid-stream ABORT command received, otherwise
734                  * allow.
735                  */
736                 if (msg->any.head.cmd & DMSGF_ABORT) {
737                         if (state == NULL ||
738                             (state->rxcmd & DMSGF_CREATE) == 0) {
739                                 error = EALREADY;
740                                 break;
741                         }
742                 }
743                 error = 0;
744                 break;
745         case DMSGF_REPLY | DMSGF_CREATE:
746         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
747                 /*
748                  * When receiving a reply with CREATE set the original
749                  * persistent state message should already exist.
750                  */
751                 if (state == NULL) {
752                         kprintf("kdmsg_state_msgrx: no state match for "
753                                 "REPLY cmd=%08x msgid=%016jx\n",
754                                 msg->any.head.cmd,
755                                 (intmax_t)msg->any.head.msgid);
756                         error = EINVAL;
757                         break;
758                 }
759                 state->rxcmd = msg->any.head.cmd & ~DMSGF_DELETE;
760                 error = 0;
761                 break;
762         case DMSGF_REPLY | DMSGF_DELETE:
763                 /*
764                  * Received REPLY+ABORT+DELETE in case where msgid has
765                  * already been fully closed, ignore the message.
766                  */
767                 if (state == NULL) {
768                         if (msg->any.head.cmd & DMSGF_ABORT) {
769                                 error = EALREADY;
770                         } else {
771                                 kprintf("kdmsg_state_msgrx: no state match "
772                                         "for REPLY|DELETE\n");
773                                 error = EINVAL;
774                         }
775                         break;
776                 }
777
778                 /*
779                  * Received REPLY+ABORT+DELETE in case where msgid has
780                  * already been reused for an unrelated message,
781                  * ignore the message.
782                  */
783                 if ((state->rxcmd & DMSGF_CREATE) == 0) {
784                         if (msg->any.head.cmd & DMSGF_ABORT) {
785                                 error = EALREADY;
786                         } else {
787                                 kprintf("kdmsg_state_msgrx: state reused "
788                                         "for REPLY|DELETE\n");
789                                 error = EINVAL;
790                         }
791                         break;
792                 }
793                 error = 0;
794                 break;
795         case DMSGF_REPLY:
796                 /*
797                  * Check for mid-stream ABORT reply received to sent command.
798                  */
799                 if (msg->any.head.cmd & DMSGF_ABORT) {
800                         if (state == NULL ||
801                             (state->rxcmd & DMSGF_CREATE) == 0) {
802                                 error = EALREADY;
803                                 break;
804                         }
805                 }
806                 error = 0;
807                 break;
808         }
809         lockmgr(&iocom->msglk, LK_RELEASE);
810         return (error);
811 }
812
813 void
814 kdmsg_state_cleanuprx(kdmsg_msg_t *msg)
815 {
816         kdmsg_iocom_t *iocom;
817         kdmsg_state_t *state;
818
819         iocom = msg->router->iocom;
820
821         if ((state = msg->state) == NULL) {
822                 kdmsg_msg_free(msg);
823         } else if (msg->any.head.cmd & DMSGF_DELETE) {
824                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
825                 state->rxcmd |= DMSGF_DELETE;
826                 if (state->txcmd & DMSGF_DELETE) {
827                         if (state->msg == msg)
828                                 state->msg = NULL;
829                         KKASSERT(state->flags & KDMSG_STATE_INSERTED);
830                         if (state->rxcmd & DMSGF_REPLY) {
831                                 KKASSERT(msg->any.head.cmd &
832                                          DMSGF_REPLY);
833                                 RB_REMOVE(kdmsg_state_tree,
834                                           &iocom->statewr_tree, state);
835                         } else {
836                                 KKASSERT((msg->any.head.cmd &
837                                           DMSGF_REPLY) == 0);
838                                 RB_REMOVE(kdmsg_state_tree,
839                                           &iocom->staterd_tree, state);
840                         }
841                         state->flags &= ~KDMSG_STATE_INSERTED;
842                         lockmgr(&iocom->msglk, LK_RELEASE);
843                         kdmsg_state_free(state);
844                 } else {
845                         lockmgr(&iocom->msglk, LK_RELEASE);
846                 }
847                 kdmsg_msg_free(msg);
848         } else if (state->msg != msg) {
849                 kdmsg_msg_free(msg);
850         }
851 }
852
853 /*
854  * Process state tracking for a message prior to transmission.
855  *
856  * Called with msglk held and the msg dequeued.
857  *
858  * One-off messages are usually with dummy state and msg->state may be NULL
859  * in this situation.
860  *
861  * New transactions (when CREATE is set) will insert the state.
862  *
863  * May request that caller discard the message by setting *discardp to 1.
864  * A NULL state may be returned in this case.
865  */
866 int
867 kdmsg_state_msgtx(kdmsg_msg_t *msg)
868 {
869         kdmsg_iocom_t *iocom;
870         kdmsg_state_t *state;
871         int error;
872
873         iocom = msg->router->iocom;
874
875         /*
876          * Make sure a state structure is ready to go in case we need a new
877          * one.  This is the only routine which uses freewr_state so no
878          * races are possible.
879          */
880         if ((state = iocom->freewr_state) == NULL) {
881                 state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
882                 state->flags = KDMSG_STATE_DYNAMIC;
883                 state->router = &iocom->router;
884                 iocom->freewr_state = state;
885         }
886
887         /*
888          * Lock RB tree.  If persistent state is present it will have already
889          * been assigned to msg.
890          */
891         lockmgr(&iocom->msglk, LK_EXCLUSIVE);
892         state = msg->state;
893
894         /*
895          * Short-cut one-off or mid-stream messages (state may be NULL).
896          */
897         if ((msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
898                                   DMSGF_ABORT)) == 0) {
899                 lockmgr(&iocom->msglk, LK_RELEASE);
900                 return(0);
901         }
902
903
904         /*
905          * Switch on CREATE, DELETE, REPLY, and also handle ABORT from
906          * inside the case statements.
907          */
908         switch(msg->any.head.cmd & (DMSGF_CREATE | DMSGF_DELETE |
909                                     DMSGF_REPLY)) {
910         case DMSGF_CREATE:
911         case DMSGF_CREATE | DMSGF_DELETE:
912                 /*
913                  * Insert the new persistent message state and mark
914                  * half-closed if DELETE is set.  Since this is a new
915                  * message it isn't possible to transition into the fully
916                  * closed state here.
917                  *
918                  * XXX state must be assigned and inserted by
919                  *     kdmsg_msg_write().  txcmd is assigned by us
920                  *     on-transmit.
921                  */
922                 KKASSERT(state != NULL);
923                 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
924                 state->rxcmd = DMSGF_REPLY;
925                 error = 0;
926                 break;
927         case DMSGF_DELETE:
928                 /*
929                  * Sent ABORT+DELETE in case where msgid has already
930                  * been fully closed, ignore the message.
931                  */
932                 if (state == NULL) {
933                         if (msg->any.head.cmd & DMSGF_ABORT) {
934                                 error = EALREADY;
935                         } else {
936                                 kprintf("kdmsg_state_msgtx: no state match "
937                                         "for DELETE cmd=%08x msgid=%016jx\n",
938                                         msg->any.head.cmd,
939                                         (intmax_t)msg->any.head.msgid);
940                                 error = EINVAL;
941                         }
942                         break;
943                 }
944
945                 /*
946                  * Sent ABORT+DELETE in case where msgid has
947                  * already been reused for an unrelated message,
948                  * ignore the message.
949                  */
950                 if ((state->txcmd & DMSGF_CREATE) == 0) {
951                         if (msg->any.head.cmd & DMSGF_ABORT) {
952                                 error = EALREADY;
953                         } else {
954                                 kprintf("kdmsg_state_msgtx: state reused "
955                                         "for DELETE\n");
956                                 error = EINVAL;
957                         }
958                         break;
959                 }
960                 error = 0;
961                 break;
962         default:
963                 /*
964                  * Check for mid-stream ABORT command sent
965                  */
966                 if (msg->any.head.cmd & DMSGF_ABORT) {
967                         if (state == NULL ||
968                             (state->txcmd & DMSGF_CREATE) == 0) {
969                                 error = EALREADY;
970                                 break;
971                         }
972                 }
973                 error = 0;
974                 break;
975         case DMSGF_REPLY | DMSGF_CREATE:
976         case DMSGF_REPLY | DMSGF_CREATE | DMSGF_DELETE:
977                 /*
978                  * When transmitting a reply with CREATE set the original
979                  * persistent state message should already exist.
980                  */
981                 if (state == NULL) {
982                         kprintf("kdmsg_state_msgtx: no state match "
983                                 "for REPLY | CREATE\n");
984                         error = EINVAL;
985                         break;
986                 }
987                 state->txcmd = msg->any.head.cmd & ~DMSGF_DELETE;
988                 error = 0;
989                 break;
990         case DMSGF_REPLY | DMSGF_DELETE:
991                 /*
992                  * When transmitting a reply with DELETE set the original
993                  * persistent state message should already exist.
994                  *
995                  * This is very similar to the REPLY|CREATE|* case except
996                  * txcmd is already stored, so we just add the DELETE flag.
997                  *
998                  * Sent REPLY+ABORT+DELETE in case where msgid has
999                  * already been fully closed, ignore the message.
1000                  */
1001                 if (state == NULL) {
1002                         if (msg->any.head.cmd & DMSGF_ABORT) {
1003                                 error = EALREADY;
1004                         } else {
1005                                 kprintf("kdmsg_state_msgtx: no state match "
1006                                         "for REPLY | DELETE\n");
1007                                 error = EINVAL;
1008                         }
1009                         break;
1010                 }
1011
1012                 /*
1013                  * Sent REPLY+ABORT+DELETE in case where msgid has already
1014                  * been reused for an unrelated message, ignore the message.
1015                  */
1016                 if ((state->txcmd & DMSGF_CREATE) == 0) {
1017                         if (msg->any.head.cmd & DMSGF_ABORT) {
1018                                 error = EALREADY;
1019                         } else {
1020                                 kprintf("kdmsg_state_msgtx: state reused "
1021                                         "for REPLY | DELETE\n");
1022                                 error = EINVAL;
1023                         }
1024                         break;
1025                 }
1026                 error = 0;
1027                 break;
1028         case DMSGF_REPLY:
1029                 /*
1030                  * Check for mid-stream ABORT reply sent.
1031                  *
1032                  * One-off REPLY messages are allowed for e.g. status updates.
1033                  */
1034                 if (msg->any.head.cmd & DMSGF_ABORT) {
1035                         if (state == NULL ||
1036                             (state->txcmd & DMSGF_CREATE) == 0) {
1037                                 error = EALREADY;
1038                                 break;
1039                         }
1040                 }
1041                 error = 0;
1042                 break;
1043         }
1044         lockmgr(&iocom->msglk, LK_RELEASE);
1045         return (error);
1046 }
1047
1048 void
1049 kdmsg_state_cleanuptx(kdmsg_msg_t *msg)
1050 {
1051         kdmsg_iocom_t *iocom;
1052         kdmsg_state_t *state;
1053
1054         iocom = msg->router->iocom;
1055
1056         if ((state = msg->state) == NULL) {
1057                 kdmsg_msg_free(msg);
1058         } else if (msg->any.head.cmd & DMSGF_DELETE) {
1059                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
1060                 state->txcmd |= DMSGF_DELETE;
1061                 if (state->rxcmd & DMSGF_DELETE) {
1062                         if (state->msg == msg)
1063                                 state->msg = NULL;
1064                         KKASSERT(state->flags & KDMSG_STATE_INSERTED);
1065                         if (state->txcmd & DMSGF_REPLY) {
1066                                 KKASSERT(msg->any.head.cmd &
1067                                          DMSGF_REPLY);
1068                                 RB_REMOVE(kdmsg_state_tree,
1069                                           &iocom->staterd_tree, state);
1070                         } else {
1071                                 KKASSERT((msg->any.head.cmd &
1072                                           DMSGF_REPLY) == 0);
1073                                 RB_REMOVE(kdmsg_state_tree,
1074                                           &iocom->statewr_tree, state);
1075                         }
1076                         state->flags &= ~KDMSG_STATE_INSERTED;
1077                         lockmgr(&iocom->msglk, LK_RELEASE);
1078                         kdmsg_state_free(state);
1079                 } else {
1080                         lockmgr(&iocom->msglk, LK_RELEASE);
1081                 }
1082                 kdmsg_msg_free(msg);
1083         } else if (state->msg != msg) {
1084                 kdmsg_msg_free(msg);
1085         }
1086 }
1087
1088 void
1089 kdmsg_state_free(kdmsg_state_t *state)
1090 {
1091         kdmsg_iocom_t *iocom;
1092         kdmsg_msg_t *msg;
1093
1094         iocom = state->router->iocom;
1095
1096         KKASSERT((state->flags & KDMSG_STATE_INSERTED) == 0);
1097         msg = state->msg;
1098         state->msg = NULL;
1099         kfree(state, iocom->mmsg);
1100         if (msg)
1101                 kdmsg_msg_free(msg);
1102 }
1103
1104 kdmsg_msg_t *
1105 kdmsg_msg_alloc(kdmsg_router_t *router, uint32_t cmd,
1106                 int (*func)(kdmsg_state_t *, kdmsg_msg_t *), void *data)
1107 {
1108         kdmsg_iocom_t *iocom;
1109         kdmsg_msg_t *msg;
1110         kdmsg_state_t *state;
1111         size_t hbytes;
1112
1113         iocom = router->iocom;
1114         hbytes = (cmd & DMSGF_SIZE) * DMSG_ALIGN;
1115         msg = kmalloc(offsetof(struct kdmsg_msg, any) + hbytes,
1116                       iocom->mmsg, M_WAITOK | M_ZERO);
1117         msg->hdr_size = hbytes;
1118         msg->router = router;
1119         KKASSERT(router != NULL);
1120         msg->any.head.magic = DMSG_HDR_MAGIC;
1121         msg->any.head.source = 0;
1122         msg->any.head.target = router->target;
1123         msg->any.head.cmd = cmd;
1124
1125         if (cmd & DMSGF_CREATE) {
1126                 /*
1127                  * New transaction, requires tracking state and a unique
1128                  * msgid to be allocated.
1129                  */
1130                 KKASSERT(msg->state == NULL);
1131                 state = kmalloc(sizeof(*state), iocom->mmsg, M_WAITOK | M_ZERO);
1132                 state->flags = KDMSG_STATE_DYNAMIC;
1133                 state->func = func;
1134                 state->any.any = data;
1135                 state->msg = msg;
1136                 state->msgid = (uint64_t)(uintptr_t)state;
1137                 state->router = msg->router;
1138                 msg->state = state;
1139                 msg->any.head.source = 0;
1140                 msg->any.head.target = state->router->target;
1141                 msg->any.head.msgid = state->msgid;
1142
1143                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
1144                 if (RB_INSERT(kdmsg_state_tree, &iocom->statewr_tree, state))
1145                         panic("duplicate msgid allocated");
1146                 state->flags |= KDMSG_STATE_INSERTED;
1147                 msg->any.head.msgid = state->msgid;
1148                 lockmgr(&iocom->msglk, LK_RELEASE);
1149         }
1150
1151         return (msg);
1152 }
1153
1154 void
1155 kdmsg_msg_free(kdmsg_msg_t *msg)
1156 {
1157         kdmsg_iocom_t *iocom;
1158
1159         iocom = msg->router->iocom;
1160
1161         if (msg->aux_data && msg->aux_size) {
1162                 kfree(msg->aux_data, iocom->mmsg);
1163                 msg->aux_data = NULL;
1164                 msg->aux_size = 0;
1165                 msg->router = NULL;
1166         }
1167         kfree(msg, iocom->mmsg);
1168 }
1169
1170 /*
1171  * Indexed messages are stored in a red-black tree indexed by their
1172  * msgid.  Only persistent messages are indexed.
1173  */
1174 int
1175 kdmsg_state_cmp(kdmsg_state_t *state1, kdmsg_state_t *state2)
1176 {
1177         if (state1->router < state2->router)
1178                 return(-1);
1179         if (state1->router > state2->router)
1180                 return(1);
1181         if (state1->msgid < state2->msgid)
1182                 return(-1);
1183         if (state1->msgid > state2->msgid)
1184                 return(1);
1185         return(0);
1186 }
1187
1188 /*
1189  * Write a message.  All requisit command flags have been set.
1190  *
1191  * If msg->state is non-NULL the message is written to the existing
1192  * transaction.  msgid will be set accordingly.
1193  *
1194  * If msg->state is NULL and CREATE is set new state is allocated and
1195  * (func, data) is installed.  A msgid is assigned.
1196  *
1197  * If msg->state is NULL and CREATE is not set the message is assumed
1198  * to be a one-way message.  The originator must assign the msgid
1199  * (or leave it 0, which is typical.
1200  *
1201  * This function merely queues the message to the management thread, it
1202  * does not write to the message socket/pipe.
1203  */
1204 void
1205 kdmsg_msg_write(kdmsg_msg_t *msg)
1206 {
1207         kdmsg_iocom_t *iocom;
1208         kdmsg_state_t *state;
1209
1210         iocom = msg->router->iocom;
1211
1212         if (msg->state) {
1213                 /*
1214                  * Continuance or termination of existing transaction.
1215                  * The transaction could have been initiated by either end.
1216                  *
1217                  * (Function callback and aux data for the receive side can
1218                  * be replaced or left alone).
1219                  */
1220                 state = msg->state;
1221                 msg->any.head.msgid = state->msgid;
1222                 msg->any.head.source = 0;
1223                 msg->any.head.target = state->router->target;
1224                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
1225         } else {
1226                 /*
1227                  * One-off message (always uses msgid 0 to distinguish
1228                  * between a possibly lost in-transaction message due to
1229                  * competing aborts and a real one-off message?)
1230                  */
1231                 msg->any.head.msgid = 0;
1232                 msg->any.head.source = 0;
1233                 msg->any.head.target = msg->router->target;
1234                 lockmgr(&iocom->msglk, LK_EXCLUSIVE);
1235         }
1236
1237         /*
1238          * Finish up the msg fields
1239          */
1240         msg->any.head.salt = /* (random << 8) | */ (iocom->msg_seq & 255);
1241         ++iocom->msg_seq;
1242
1243         msg->any.head.hdr_crc = 0;
1244         msg->any.head.hdr_crc = iscsi_crc32(msg->any.buf, msg->hdr_size);
1245
1246         TAILQ_INSERT_TAIL(&iocom->msgq, msg, qentry);
1247
1248         if (iocom->msg_ctl & KDMSG_CLUSTERCTL_SLEEPING) {
1249                 atomic_clear_int(&iocom->msg_ctl,
1250                                  KDMSG_CLUSTERCTL_SLEEPING);
1251                 wakeup(&iocom->msg_ctl);
1252         }
1253
1254         lockmgr(&iocom->msglk, LK_RELEASE);
1255 }
1256
1257 /*
1258  * Reply to a message and terminate our side of the transaction.
1259  *
1260  * If msg->state is non-NULL we are replying to a one-way message.
1261  */
1262 void
1263 kdmsg_msg_reply(kdmsg_msg_t *msg, uint32_t error)
1264 {
1265         kdmsg_state_t *state = msg->state;
1266         kdmsg_msg_t *nmsg;
1267         uint32_t cmd;
1268
1269         /*
1270          * Reply with a simple error code and terminate the transaction.
1271          */
1272         cmd = DMSG_LNK_ERROR;
1273
1274         /*
1275          * Check if our direction has even been initiated yet, set CREATE.
1276          *
1277          * Check what direction this is (command or reply direction).  Note
1278          * that txcmd might not have been initiated yet.
1279          *
1280          * If our direction has already been closed we just return without
1281          * doing anything.
1282          */
1283         if (state) {
1284                 if (state->txcmd & DMSGF_DELETE)
1285                         return;
1286                 if ((state->txcmd & DMSGF_CREATE) == 0)
1287                         cmd |= DMSGF_CREATE;
1288                 if (state->txcmd & DMSGF_REPLY)
1289                         cmd |= DMSGF_REPLY;
1290                 cmd |= DMSGF_DELETE;
1291         } else {
1292                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1293                         cmd |= DMSGF_REPLY;
1294         }
1295         kprintf("MSG_REPLY state=%p msg %08x\n", state, cmd);
1296
1297         /* XXX messy mask cmd to avoid allocating state */
1298         nmsg = kdmsg_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
1299                                NULL, NULL);
1300         nmsg->any.head.cmd = cmd;
1301         nmsg->any.head.error = error;
1302         nmsg->state = state;
1303         kdmsg_msg_write(nmsg);
1304 }
1305
1306 /*
1307  * Reply to a message and continue our side of the transaction.
1308  *
1309  * If msg->state is non-NULL we are replying to a one-way message and this
1310  * function degenerates into the same as kdmsg_msg_reply().
1311  */
1312 void
1313 kdmsg_msg_result(kdmsg_msg_t *msg, uint32_t error)
1314 {
1315         kdmsg_state_t *state = msg->state;
1316         kdmsg_msg_t *nmsg;
1317         uint32_t cmd;
1318
1319         /*
1320          * Return a simple result code, do NOT terminate the transaction.
1321          */
1322         cmd = DMSG_LNK_ERROR;
1323
1324         /*
1325          * Check if our direction has even been initiated yet, set CREATE.
1326          *
1327          * Check what direction this is (command or reply direction).  Note
1328          * that txcmd might not have been initiated yet.
1329          *
1330          * If our direction has already been closed we just return without
1331          * doing anything.
1332          */
1333         if (state) {
1334                 if (state->txcmd & DMSGF_DELETE)
1335                         return;
1336                 if ((state->txcmd & DMSGF_CREATE) == 0)
1337                         cmd |= DMSGF_CREATE;
1338                 if (state->txcmd & DMSGF_REPLY)
1339                         cmd |= DMSGF_REPLY;
1340                 /* continuing transaction, do not set MSGF_DELETE */
1341         } else {
1342                 if ((msg->any.head.cmd & DMSGF_REPLY) == 0)
1343                         cmd |= DMSGF_REPLY;
1344         }
1345
1346         /* XXX messy mask cmd to avoid allocating state */
1347         nmsg = kdmsg_msg_alloc(msg->router, cmd & DMSGF_BASECMDMASK,
1348                                NULL, NULL);
1349         nmsg->any.head.cmd = cmd;
1350         nmsg->any.head.error = error;
1351         nmsg->state = state;
1352         kdmsg_msg_write(nmsg);
1353 }