- Add priority message queue to msgport. Send a message with MSGF_PRIORITY
authorSepherosa Ziehau <sephe@dragonflybsd.org>
Sun, 9 Nov 2008 09:20:09 +0000 (09:20 +0000)
committerSepherosa Ziehau <sephe@dragonflybsd.org>
Sun, 9 Nov 2008 09:20:09 +0000 (09:20 +0000)
  flag will queue the message into the priority message queue of the target
  port.  The priority message queue takes precendence over normal message
  queue, so the messages with MSGF_PRIORITY flag will be processed before
  other messages on the same target port.  This could be used by defering
  callout or operation that should not be delayed too long on the target
  port.
- Add dropmsg function to msgport.  Message must be marked with MSGF_DROPABLE,
  else dropmsg operation is not allowed.  Message marked with MSGF_DROPABLE
  is not waitable, i.e. you could not call domsg on this kind of message.
  Currently only thread msgport supports this operation and this operation
  must be performed in the same thread of the msgport's owner thread.

Discussed-with: dillon@

sys/kern/lwkt_msgport.c
sys/sys/msgport.h
sys/sys/msgport2.h

index 6a4bdaf..8ebcc53 100644 (file)
@@ -34,7 +34,7 @@
  * NOTE! This file may be compiled for userland libraries as well as for
  * the kernel.
  *
- * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.51 2008/11/01 12:30:23 sephe Exp $
+ * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.52 2008/11/09 09:20:09 sephe Exp $
  */
 
 #include <sys/param.h>
@@ -188,6 +188,7 @@ static int lwkt_thread_putport(lwkt_port_t port, lwkt_msg_t msg);
 static int lwkt_thread_waitmsg(lwkt_msg_t msg, int flags);
 static void *lwkt_thread_waitport(lwkt_port_t port, int flags);
 static void lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg);
+static void lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
 
 static void *lwkt_spin_getport(lwkt_port_t port);
 static int lwkt_spin_putport(lwkt_port_t port, lwkt_msg_t msg);
@@ -207,6 +208,7 @@ static int lwkt_panic_putport(lwkt_port_t port, lwkt_msg_t msg);
 static int lwkt_panic_waitmsg(lwkt_msg_t msg, int flags);
 static void *lwkt_panic_waitport(lwkt_port_t port, int flags);
 static void lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg);
+static void lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg);
 
 /*
  * Core port initialization (internal)
@@ -218,15 +220,18 @@ _lwkt_initport(lwkt_port_t port,
               int (*pportfn)(lwkt_port_t, lwkt_msg_t),
               int (*wmsgfn)(lwkt_msg_t, int),
               void *(*wportfn)(lwkt_port_t, int),
-              void (*rportfn)(lwkt_port_t, lwkt_msg_t))
+              void (*rportfn)(lwkt_port_t, lwkt_msg_t),
+              void (*dmsgfn)(lwkt_port_t, lwkt_msg_t))
 {
     bzero(port, sizeof(*port));
     TAILQ_INIT(&port->mp_msgq);
+    TAILQ_INIT(&port->mp_msgq_prio);
     port->mp_getport = gportfn;
     port->mp_putport = pportfn;
     port->mp_waitmsg =  wmsgfn;
     port->mp_waitport =  wportfn;
     port->mp_replyport = rportfn;
+    port->mp_dropmsg = dmsgfn;
 }
 
 /*
@@ -261,7 +266,8 @@ lwkt_initport_thread(lwkt_port_t port, thread_t td)
                   lwkt_thread_putport,
                   lwkt_thread_waitmsg,
                   lwkt_thread_waitport,
-                  lwkt_thread_replyport);
+                  lwkt_thread_replyport,
+                  lwkt_thread_dropmsg);
     port->mpu_td = td;
 }
 
@@ -280,7 +286,8 @@ lwkt_initport_spin(lwkt_port_t port)
                   lwkt_spin_putport,
                   lwkt_spin_waitmsg,
                   lwkt_spin_waitport,
-                  lwkt_spin_replyport);
+                  lwkt_spin_replyport,
+                  lwkt_panic_dropmsg);
     spin_init(&port->mpu_spin);
 }
 
@@ -299,7 +306,8 @@ lwkt_initport_serialize(lwkt_port_t port, struct lwkt_serialize *slz)
                   lwkt_serialize_putport,
                   lwkt_serialize_waitmsg,
                   lwkt_serialize_waitport,
-                  lwkt_serialize_replyport);
+                  lwkt_serialize_replyport,
+                  lwkt_panic_dropmsg);
     port->mpu_serialize = slz;
 }
 
@@ -315,7 +323,8 @@ lwkt_initport_replyonly_null(lwkt_port_t port)
                   lwkt_panic_putport,
                   lwkt_panic_waitmsg,
                   lwkt_panic_waitport,
-                  lwkt_null_replyport);
+                  lwkt_null_replyport,
+                  lwkt_panic_dropmsg);
 }
 
 /*
@@ -328,7 +337,7 @@ lwkt_initport_replyonly(lwkt_port_t port,
 {
     _lwkt_initport(port, lwkt_panic_getport, lwkt_panic_putport,
                         lwkt_panic_waitmsg, lwkt_panic_waitport,
-                        rportfn);
+                        rportfn, lwkt_panic_dropmsg);
 }
 
 void
@@ -337,7 +346,7 @@ lwkt_initport_putonly(lwkt_port_t port,
 {
     _lwkt_initport(port, lwkt_panic_getport, pportfn,
                         lwkt_panic_waitmsg, lwkt_panic_waitport,
-                        lwkt_panic_replyport);
+                        lwkt_panic_replyport, lwkt_panic_dropmsg);
 }
 
 void
@@ -346,17 +355,23 @@ lwkt_initport_panic(lwkt_port_t port)
     _lwkt_initport(port,
                   lwkt_panic_getport, lwkt_panic_putport,
                   lwkt_panic_waitmsg, lwkt_panic_waitport,
-                  lwkt_panic_replyport);
+                  lwkt_panic_replyport, lwkt_panic_dropmsg);
 }
 
 static __inline
 void
 _lwkt_pullmsg(lwkt_port_t port, lwkt_msg_t msg)
 {
+    lwkt_msg_queue *queue;
+
     /*
      * normal case, remove and return the message.
      */
-    TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+    if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
+       queue = &port->mp_msgq_prio;
+    else
+       queue = &port->mp_msgq;
+    TAILQ_REMOVE(queue, msg, ms_node);
     msg->ms_flags &= ~MSGF_QUEUED;
 }
 
@@ -364,16 +379,38 @@ static __inline
 void
 _lwkt_pushmsg(lwkt_port_t port, lwkt_msg_t msg)
 {
+    lwkt_msg_queue *queue;
+
     msg->ms_flags |= MSGF_QUEUED;
-    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
+    if (__predict_false(msg->ms_flags & MSGF_PRIORITY))
+       queue = &port->mp_msgq_prio;
+    else
+       queue = &port->mp_msgq;
+    TAILQ_INSERT_TAIL(queue, msg, ms_node);
+}
+
+static __inline
+lwkt_msg_t
+_lwkt_pollmsg(lwkt_port_t port)
+{
+    lwkt_msg_t msg;
+
+    msg = TAILQ_FIRST(&port->mp_msgq_prio);
+    if (__predict_false(msg != NULL))
+       return msg;
+
+    /*
+     * Priority queue has no message, fallback to non-priority queue.
+     */
+    return TAILQ_FIRST(&port->mp_msgq);
 }
 
 static __inline
 void
 _lwkt_enqueue_reply(lwkt_port_t port, lwkt_msg_t msg)
 {
-    TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
-    msg->ms_flags |= MSGF_REPLY | MSGF_DONE | MSGF_QUEUED;
+    _lwkt_pushmsg(port, msg);
+    msg->ms_flags |= MSGF_REPLY | MSGF_DONE;
 }
 
 /************************************************************************
@@ -500,6 +537,24 @@ lwkt_thread_replyport(lwkt_port_t port, lwkt_msg_t msg)
     }
 }
 
+/*
+ * lwkt_thread_dropmsg() - Backend to lwkt_dropmsg()
+ *
+ * This function could _only_ be used when caller is in the same thread
+ * as the message's target port owner thread.
+ */
+static void
+lwkt_thread_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    KASSERT(port->mpu_td == curthread,
+           ("message could only be dropped in the same thread "
+            "as the message target port thread\n"));
+    crit_enter_quick(port->mpu_td);
+    _lwkt_pullmsg(port, msg);
+    msg->ms_flags |= MSGF_DONE;
+    crit_exit_quick(port->mpu_td);
+}
+
 /*
  * lwkt_thread_putport() - Backend to lwkt_beginmsg()
  *
@@ -583,7 +638,7 @@ lwkt_thread_getport(lwkt_port_t port)
     KKASSERT(port->mpu_td == curthread);
 
     crit_enter_quick(port->mpu_td);
-    if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
+    if ((msg = _lwkt_pollmsg(port)) != NULL)
        _lwkt_pullmsg(port, msg);
     crit_exit_quick(port->mpu_td);
     return(msg);
@@ -600,6 +655,9 @@ static
 int
 lwkt_thread_waitmsg(lwkt_msg_t msg, int flags)
 {
+    KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
+           ("can't wait dropable message\n"));
+
     if ((msg->ms_flags & MSGF_DONE) == 0) {
        /*
         * If the done bit was not set we have to block until it is.
@@ -654,7 +712,7 @@ lwkt_thread_waitport(lwkt_port_t port, int flags)
 
     KKASSERT(port->mpu_td == td);
     crit_enter_quick(td);
-    while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
+    while ((msg = _lwkt_pollmsg(port)) == NULL) {
        port->mp_flags |= MSGPORTF_WAITING;
        error = lwkt_sleep("waitport", flags);
        port->mp_flags &= ~MSGPORTF_WAITING;
@@ -691,7 +749,7 @@ lwkt_spin_getport(lwkt_port_t port)
     lwkt_msg_t msg;
 
     spin_lock_wr(&port->mpu_spin);
-    if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
+    if ((msg = _lwkt_pollmsg(port)) != NULL)
        _lwkt_pullmsg(port, msg);
     spin_unlock_wr(&port->mpu_spin);
     return(msg);
@@ -727,6 +785,9 @@ lwkt_spin_waitmsg(lwkt_msg_t msg, int flags)
     int sentabort;
     int error;
 
+    KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
+           ("can't wait dropable message\n"));
+
     if ((msg->ms_flags & MSGF_DONE) == 0) {
        port = msg->ms_reply_port;
        sentabort = 0;
@@ -790,7 +851,7 @@ lwkt_spin_waitport(lwkt_port_t port, int flags)
     int error;
 
     spin_lock_wr(&port->mpu_spin);
-    while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
+    while ((msg = _lwkt_pollmsg(port)) == NULL) {
        port->mp_flags |= MSGPORTF_WAITING;
        error = msleep(port, &port->mpu_spin, flags, "waitport", 0);
        /* see note at the top on the MSGPORTF_WAITING flag */
@@ -857,7 +918,7 @@ lwkt_serialize_getport(lwkt_port_t port)
 
     ASSERT_SERIALIZED(port->mpu_serialize);
 
-    if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL)
+    if ((msg = _lwkt_pollmsg(port)) != NULL)
        _lwkt_pullmsg(port, msg);
     return(msg);
 }
@@ -886,6 +947,9 @@ lwkt_serialize_waitmsg(lwkt_msg_t msg, int flags)
     int sentabort;
     int error;
 
+    KASSERT((msg->ms_flags & MSGF_DROPABLE) == 0,
+           ("can't wait dropable message\n"));
+
     if ((msg->ms_flags & MSGF_DONE) == 0) {
        port = msg->ms_reply_port;
 
@@ -953,7 +1017,7 @@ lwkt_serialize_waitport(lwkt_port_t port, int flags)
 
     ASSERT_SERIALIZED(port->mpu_serialize);
 
-    while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
+    while ((msg = _lwkt_pollmsg(port)) == NULL) {
        port->mp_flags |= MSGPORTF_WAITING;
        error = serialize_sleep(port, port->mpu_serialize, flags,
                                "waitport", 0);
@@ -1043,3 +1107,9 @@ lwkt_panic_replyport(lwkt_port_t port, lwkt_msg_t msg)
     panic("lwkt_replymsg() is illegal on port %p msg %p", port, msg);
 }
 
+static
+void
+lwkt_panic_dropmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    panic("lwkt_dropmsg() is illegal on port %p msg %p", port, msg);
+}
index 00b0738..a4de8aa 100644 (file)
@@ -3,7 +3,7 @@
  *
  *     Implements LWKT messages and ports.
  * 
- * $DragonFly: src/sys/sys/msgport.h,v 1.29 2008/09/09 07:21:57 dillon Exp $
+ * $DragonFly: src/sys/sys/msgport.h,v 1.30 2008/11/09 09:20:09 sephe Exp $
  */
 
 #ifndef _SYS_MSGPORT_H_
@@ -99,7 +99,9 @@ typedef struct lwkt_msg {
 #define MSGF_SYNC      0x0008          /* synchronous message operation */
 #define MSGF_INTRANSIT 0x0010          /* in-transit (IPI) */
 #define MSGF_NORESCHED 0x0020          /* do not reschedule target lwkt */
+#define MSGF_DROPABLE  0x0040          /* message supports drop */
 #define MSGF_ABORTABLE 0x0080          /* message supports abort */
+#define MSGF_PRIORITY  0x0100          /* priority message */
 
 #define MSGF_USER0     0x00010000
 #define MSGF_USER1     0x00020000
@@ -146,6 +148,7 @@ MALLOC_DECLARE(M_LWKTMSG);
  */
 typedef struct lwkt_port {
     lwkt_msg_queue     mp_msgq;
+    lwkt_msg_queue     mp_msgq_prio;
     int                        mp_flags;
     union {
        struct spinlock spin;
@@ -158,6 +161,7 @@ typedef struct lwkt_port {
     int                        (*mp_waitmsg)(lwkt_msg_t, int flags);
     void *             (*mp_waitport)(lwkt_port_t, int flags);
     void               (*mp_replyport)(lwkt_port_t, lwkt_msg_t);
+    void               (*mp_dropmsg)(lwkt_port_t, lwkt_msg_t);
 } lwkt_port;
 
 #ifdef _KERNEL
index 78f2da4..c4a6610 100644 (file)
@@ -3,7 +3,7 @@
  *
  *     Implements Inlines for LWKT messages and ports.
  * 
- * $DragonFly: src/sys/sys/msgport2.h,v 1.16 2008/11/01 11:17:52 sephe Exp $
+ * $DragonFly: src/sys/sys/msgport2.h,v 1.17 2008/11/09 09:20:09 sephe Exp $
  */
 
 #ifndef _SYS_MSGPORT2_H_
@@ -100,5 +100,17 @@ lwkt_checkmsg(lwkt_msg_t msg)
     return(msg->ms_flags & MSGF_DONE);
 }
 
+static __inline
+void
+lwkt_dropmsg(lwkt_msg_t msg)
+{
+    lwkt_port_t port;
+
+    KKASSERT((msg->ms_flags & (MSGF_DROPABLE | MSGF_DONE | MSGF_QUEUED)) ==
+            (MSGF_DROPABLE | MSGF_QUEUED));
+    port = msg->ms_target_port;
+    port->mp_dropmsg(port, msg);
+}
+
 #endif /* _KERNEL */
 #endif /* _SYS_MSGPORT2_H_ */