From: Sepherosa Ziehau Date: Sun, 9 Nov 2008 09:20:09 +0000 (+0000) Subject: - Add priority message queue to msgport. Send a message with MSGF_PRIORITY X-Git-Url: https://gitweb.dragonflybsd.org/~lentferj/dragonfly.git/commitdiff_plain/e2ff0223a1c259e3518ec21d65a82cf5ca1d462a - Add priority message queue to msgport. Send a message with MSGF_PRIORITY flag will queue the message into the priority message queue of the target port. The priority message queue takes precendence over normal message queue, so the messages with MSGF_PRIORITY flag will be processed before other messages on the same target port. This could be used by defering callout or operation that should not be delayed too long on the target port. - Add dropmsg function to msgport. Message must be marked with MSGF_DROPABLE, else dropmsg operation is not allowed. Message marked with MSGF_DROPABLE is not waitable, i.e. you could not call domsg on this kind of message. Currently only thread msgport supports this operation and this operation must be performed in the same thread of the msgport's owner thread. Discussed-with: dillon@ --- diff --git a/sys/kern/lwkt_msgport.c b/sys/kern/lwkt_msgport.c index 6a4bdaf184..8ebcc53a4c 100644 --- a/sys/kern/lwkt_msgport.c +++ b/sys/kern/lwkt_msgport.c @@ -34,7 +34,7 @@ * NOTE! This file may be compiled for userland libraries as well as for * the kernel. * - * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.51 2008/11/01 12:30:23 sephe Exp $ + * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.52 2008/11/09 09:20:09 sephe Exp $ */ #include @@ -188,6 +188,7 @@ static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg); static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags); static void *lwkt_thread_waitport(lwkt_port_t port, int flags); static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg); +static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg); static void *lwkt_spin_getport(lwkt_port_t port); static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg); @@ -207,6 +208,7 @@ static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg); static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags); static void *lwkt_panic_waitport(lwkt_port_t port, int flags); static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg); +static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg); /* * Core port initialization (internal) @@ -218,15 +220,18 @@ _lwkt_initport(lwkt_port_t port, int (*pportfn)(lwkt_port_t, lwkt_msg_t), int (*wmsgfn)(lwkt_msg_t, int), void *(*wportfn)(lwkt_port_t, int), - void (*rportfn)(lwkt_port_t, lwkt_msg_t)) + void (*rportfn)(lwkt_port_t, lwkt_msg_t), + void (*dmsgfn)(lwkt_port_t, lwkt_msg_t)) { bzero(port, sizeof(*port)); TAILQ_INIT(&port->mp_msgq); + TAILQ_INIT(&port->mp_msgq_prio); port->mp_getport = gportfn; port->mp_putport = pportfn; port->mp_waitmsg = wmsgfn; port->mp_waitport = wportfn; port->mp_replyport = rportfn; + port->mp_dropmsg = dmsgfn; } /* @@ -261,7 +266,8 @@ lwkt_initport_thread(lwkt_port_t port, thread_t td) lwkt_thread_putport, lwkt_thread_waitmsg, lwkt_thread_waitport, - lwkt_thread_replyport); + lwkt_thread_replyport, + lwkt_thread_dropmsg); port->mpu_td = td; } @@ -280,7 +286,8 @@ lwkt_initport_spin(lwkt_port_t port) lwkt_spin_putport, lwkt_spin_waitmsg, lwkt_spin_waitport, - lwkt_spin_replyport); + lwkt_spin_replyport, + lwkt_panic_dropmsg); spin_init(&port->mpu_spin); } @@ -299,7 +306,8 @@ lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz) lwkt_serialize_putport, lwkt_serialize_waitmsg, lwkt_serialize_waitport, - lwkt_serialize_replyport); + lwkt_serialize_replyport, + lwkt_panic_dropmsg); port->mpu_serialize = slz; } @@ -315,7 +323,8 @@ lwkt_initport_replyonly_null(lwkt_port_t port) lwkt_panic_putport, lwkt_panic_waitmsg, lwkt_panic_waitport, - lwkt_null_replyport); + lwkt_null_replyport, + lwkt_panic_dropmsg); } /* @@ -328,7 +337,7 @@ lwkt_initport_replyonly(lwkt_port_t port, { _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, lwkt_panic_waitmsg, lwkt_panic_waitport, - rportfn); + rportfn, lwkt_panic_dropmsg); } void @@ -337,7 +346,7 @@ lwkt_initport_putonly(lwkt_port_t port, { _lwkt_initport(port, lwkt_panic_getport, pportfn, lwkt_panic_waitmsg, lwkt_panic_waitport, - lwkt_panic_replyport); + lwkt_panic_replyport, lwkt_panic_dropmsg); } void @@ -346,17 +355,23 @@ lwkt_initport_panic(lwkt_port_t port) _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport, lwkt_panic_waitmsg, lwkt_panic_waitport, - lwkt_panic_replyport); + lwkt_panic_replyport, lwkt_panic_dropmsg); } static __inline void _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg) { + lwkt_msg_queue *queue; + /* * normal case, remove and return the message. */ - TAILQ_REMOVE(&port->mp_msgq, msg, ms_node); + if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) + queue = &port->mp_msgq_prio; + else + queue = &port->mp_msgq; + TAILQ_REMOVE(queue, msg, ms_node); msg->ms_flags &= ~MSGF_QUEUED; } @@ -364,16 +379,38 @@ static __inline void _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg) { + lwkt_msg_queue *queue; + msg->ms_flags |= MSGF_QUEUED; - TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); + if (__predict_false(msg->ms_flags & MSGF_PRIORITY)) + queue = &port->mp_msgq_prio; + else + queue = &port->mp_msgq; + TAILQ_INSERT_TAIL(queue, msg, ms_node); +} + +static __inline +lwkt_msg_t +_lwkt_pollmsg(lwkt_port_t port) +{ + lwkt_msg_t msg; + + msg = TAILQ_FIRST(&port->mp_msgq_prio); + if (__predict_false(msg != NULL)) + return msg; + + /* + * Priority queue has no message, fallback to non-priority queue. + */ + return TAILQ_FIRST(&port->mp_msgq); } static __inline void _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg) { - TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node); - msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED; + _lwkt_pushmsg(port, msg); + msg->ms_flags |= MSGF_REPLY | MSGF_DONE; } /************************************************************************ @@ -500,6 +537,24 @@ lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg) } } +/* + * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg() + * + * This function could _only_ be used when caller is in the same thread + * as the message's target port owner thread. + */ +static void +lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg) +{ + KASSERT(port->mpu_td == curthread, + ("message could only be dropped in the same thread " + "as the message target port thread\n")); + crit_enter_quick(port->mpu_td); + _lwkt_pullmsg(port, msg); + msg->ms_flags |= MSGF_DONE; + crit_exit_quick(port->mpu_td); +} + /* * lwkt_thread_putport() - Backend to lwkt_beginmsg() * @@ -583,7 +638,7 @@ lwkt_thread_getport(lwkt_port_t port) KKASSERT(port->mpu_td == curthread); crit_enter_quick(port->mpu_td); - if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) + if ((msg = _lwkt_pollmsg(port)) != NULL) _lwkt_pullmsg(port, msg); crit_exit_quick(port->mpu_td); return(msg); @@ -600,6 +655,9 @@ static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags) { + KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, + ("can't wait dropable message\n")); + if ((msg->ms_flags & MSGF_DONE) == 0) { /* * If the done bit was not set we have to block until it is. @@ -654,7 +712,7 @@ lwkt_thread_waitport(lwkt_port_t port, int flags) KKASSERT(port->mpu_td == td); crit_enter_quick(td); - while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { + while ((msg = _lwkt_pollmsg(port)) == NULL) { port->mp_flags |= MSGPORTF_WAITING; error = lwkt_sleep("waitport", flags); port->mp_flags &= ~MSGPORTF_WAITING; @@ -691,7 +749,7 @@ lwkt_spin_getport(lwkt_port_t port) lwkt_msg_t msg; spin_lock_wr(&port->mpu_spin); - if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) + if ((msg = _lwkt_pollmsg(port)) != NULL) _lwkt_pullmsg(port, msg); spin_unlock_wr(&port->mpu_spin); return(msg); @@ -727,6 +785,9 @@ lwkt_spin_waitmsg(lwkt_msg_t msg, int flags) int sentabort; int error; + KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, + ("can't wait dropable message\n")); + if ((msg->ms_flags & MSGF_DONE) == 0) { port = msg->ms_reply_port; sentabort = 0; @@ -790,7 +851,7 @@ lwkt_spin_waitport(lwkt_port_t port, int flags) int error; spin_lock_wr(&port->mpu_spin); - while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { + while ((msg = _lwkt_pollmsg(port)) == NULL) { port->mp_flags |= MSGPORTF_WAITING; error = msleep(port, &port->mpu_spin, flags, "waitport", 0); /* see note at the top on the MSGPORTF_WAITING flag */ @@ -857,7 +918,7 @@ lwkt_serialize_getport(lwkt_port_t port) ASSERT_SERIALIZED(port->mpu_serialize); - if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) + if ((msg = _lwkt_pollmsg(port)) != NULL) _lwkt_pullmsg(port, msg); return(msg); } @@ -886,6 +947,9 @@ lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags) int sentabort; int error; + KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0, + ("can't wait dropable message\n")); + if ((msg->ms_flags & MSGF_DONE) == 0) { port = msg->ms_reply_port; @@ -953,7 +1017,7 @@ lwkt_serialize_waitport(lwkt_port_t port, int flags) ASSERT_SERIALIZED(port->mpu_serialize); - while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) { + while ((msg = _lwkt_pollmsg(port)) == NULL) { port->mp_flags |= MSGPORTF_WAITING; error = serialize_sleep(port, port->mpu_serialize, flags, "waitport", 0); @@ -1043,3 +1107,9 @@ lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg) panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg); } +static +void +lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg) +{ + panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg); +} diff --git a/sys/sys/msgport.h b/sys/sys/msgport.h index 00b073856f..a4de8aaa4b 100644 --- a/sys/sys/msgport.h +++ b/sys/sys/msgport.h @@ -3,7 +3,7 @@ * * Implements LWKT messages and ports. * - * $DragonFly: src/sys/sys/msgport.h,v 1.29 2008/09/09 07:21:57 dillon Exp $ + * $DragonFly: src/sys/sys/msgport.h,v 1.30 2008/11/09 09:20:09 sephe Exp $ */ #ifndef _SYS_MSGPORT_H_ @@ -99,7 +99,9 @@ typedef struct lwkt_msg { #define MSGF_SYNC 0x0008 /* synchronous message operation */ #define MSGF_INTRANSIT 0x0010 /* in-transit (IPI) */ #define MSGF_NORESCHED 0x0020 /* do not reschedule target lwkt */ +#define MSGF_DROPABLE 0x0040 /* message supports drop */ #define MSGF_ABORTABLE 0x0080 /* message supports abort */ +#define MSGF_PRIORITY 0x0100 /* priority message */ #define MSGF_USER0 0x00010000 #define MSGF_USER1 0x00020000 @@ -146,6 +148,7 @@ MALLOC_DECLARE(M_LWKTMSG); */ typedef struct lwkt_port { lwkt_msg_queue mp_msgq; + lwkt_msg_queue mp_msgq_prio; int mp_flags; union { struct spinlock spin; @@ -158,6 +161,7 @@ typedef struct lwkt_port { int (*mp_waitmsg)(lwkt_msg_t, int flags); void * (*mp_waitport)(lwkt_port_t, int flags); void (*mp_replyport)(lwkt_port_t, lwkt_msg_t); + void (*mp_dropmsg)(lwkt_port_t, lwkt_msg_t); } lwkt_port; #ifdef _KERNEL diff --git a/sys/sys/msgport2.h b/sys/sys/msgport2.h index 78f2da47a7..c4a66104c6 100644 --- a/sys/sys/msgport2.h +++ b/sys/sys/msgport2.h @@ -3,7 +3,7 @@ * * Implements Inlines for LWKT messages and ports. * - * $DragonFly: src/sys/sys/msgport2.h,v 1.16 2008/11/01 11:17:52 sephe Exp $ + * $DragonFly: src/sys/sys/msgport2.h,v 1.17 2008/11/09 09:20:09 sephe Exp $ */ #ifndef _SYS_MSGPORT2_H_ @@ -100,5 +100,17 @@ lwkt_checkmsg(lwkt_msg_t msg) return(msg->ms_flags & MSGF_DONE); } +static __inline +void +lwkt_dropmsg(lwkt_msg_t msg) +{ + lwkt_port_t port; + + KKASSERT((msg->ms_flags & (MSGF_DROPABLE | MSGF_DONE | MSGF_QUEUED)) == + (MSGF_DROPABLE | MSGF_QUEUED)); + port = msg->ms_target_port; + port->mp_dropmsg(port, msg); +} + #endif /* _KERNEL */ #endif /* _SYS_MSGPORT2_H_ */