This is the initial implmentation of the LWKT messaging infrastructure.
authorMatthew Dillon <dillon@dragonflybsd.org>
Sun, 20 Jul 2003 01:37:22 +0000 (01:37 +0000)
committerMatthew Dillon <dillon@dragonflybsd.org>
Sun, 20 Jul 2003 01:37:22 +0000 (01:37 +0000)
Messages are sent to message ports and typically replied to a message port
embedded in the originating thread's thread structure (td_msgport).
The port functions match up and optimization client sync/asynch requests
verses target synch/asynch responses.

In this initial implementation a port must be owned by a particular thread,
and we use *asynch* IPI messaging to forward queueing and dequeueing operations
to the correct cpu.  Most of the IPI overhead will be absorbed by the fact
that these same IPIs also tend to schedule the threads in question, which on
the correct cpu (which is the one it will be on) costs nothing.

Message ports have in-context dispatch functions for initiating, aborting,
and replying to a message which can be overriden and will queue by default.

This code compiles but is as yet unreferenced, and almost certainly needs more
work.

sys/conf/files
sys/kern/lwkt_msgport.c [new file with mode: 0644]
sys/kern/lwkt_rwlock.c
sys/kern/lwkt_thread.c
sys/sys/msgport.h [new file with mode: 0644]
sys/sys/msgport2.h [new file with mode: 0644]
sys/sys/thread.h

index ec77129..01525bd 100644 (file)
@@ -1,5 +1,5 @@
 # $FreeBSD: src/sys/conf/files,v 1.340.2.137 2003/06/04 17:10:30 sam Exp $
-# $DragonFly: src/sys/conf/files,v 1.6 2003/06/29 03:28:41 dillon Exp $
+# $DragonFly: src/sys/conf/files,v 1.7 2003/07/20 01:37:19 dillon Exp $
 #
 # The long compile-with and dependency lines are required because of
 # limitations in config: backslash-newline doesn't work in strings, and
@@ -627,6 +627,7 @@ kern/kern_sig.c             standard
 kern/kern_subr.c       standard
 kern/kern_switch.c     standard
 kern/lwkt_thread.c     standard
+kern/lwkt_msgport.c    standard
 kern/lwkt_rwlock.c     standard
 kern/kern_synch.c      standard
 kern/kern_syscalls.c   standard
diff --git a/sys/kern/lwkt_msgport.c b/sys/kern/lwkt_msgport.c
new file mode 100644 (file)
index 0000000..3e22b80
--- /dev/null
@@ -0,0 +1,394 @@
+/*
+ * Copyright (c) 2003 Matthew Dillon <dillon@backplane.com>
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions
+ * are met:
+ * 1. Redistributions of source code must retain the above copyright
+ *    notice, this list of conditions and the following disclaimer.
+ * 2. Redistributions in binary form must reproduce the above copyright
+ *    notice, this list of conditions and the following disclaimer in the
+ *    documentation and/or other materials provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+ * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+ * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+ * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+ * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+ * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+ * SUCH DAMAGE.
+ *
+ * $DragonFly: src/sys/kern/lwkt_msgport.c,v 1.1 2003/07/20 01:37:22 dillon Exp $
+ */
+
+#include <sys/param.h>
+#include <sys/systm.h>
+#include <sys/kernel.h>
+#include <sys/proc.h>
+#include <sys/rtprio.h>
+#include <sys/queue.h>
+#include <sys/sysctl.h>
+#include <sys/kthread.h>
+#include <machine/cpu.h>
+#include <sys/lock.h>
+
+#include <vm/vm.h>
+#include <vm/vm_param.h>
+#include <vm/vm_kern.h>
+#include <vm/vm_object.h>
+#include <vm/vm_page.h>
+#include <vm/vm_map.h>
+#include <vm/vm_pager.h>
+#include <vm/vm_extern.h>
+#include <vm/vm_zone.h>
+
+#include <sys/thread2.h>
+#include <sys/msgport2.h>
+
+#include <machine/stdarg.h>
+#include <machine/ipl.h>
+#ifdef SMP
+#include <machine/smp.h>
+#endif
+
+
+/************************************************************************
+ *                             MESSAGE FUNCTIONS                       *
+ ************************************************************************/
+
+static void lwkt_replyport_remote(lwkt_msg_t msg);
+static void lwkt_putport_remote(lwkt_msg_t msg);
+static void lwkt_abortport_remote(lwkt_port_t port);
+
+/*
+ * lwkt_sendmsg()
+ *
+ *     Send a message asynchronously.  This function requests asynchronous
+ *     completion and calls lwkt_beginmsg().  If the target port decides to
+ *     run the message synchronously this function will automatically queue
+ *     the message to the current thread's message queue to present a
+ *     consistent interface to the caller. 
+ *
+ *     The message's ms_cmd must be initialized and its ms_flags must be
+ *     at least zero'd out.  lwkt_sendmsg() will initialize the message's
+ *     reply port to the current thread's built-in reply port.
+ */
+void
+lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    int error;
+
+    msg->ms_flags |= MSGF_ASYNC;
+    msg->ms_flags &= ~(MSGF_REPLY | MSGF_QUEUED);
+    msg->ms_reply_port = &curthread->td_msgport;
+    if ((error = lwkt_beginmsg(port, msg)) != EINPROGRESS) {
+       lwkt_replymsg(msg, error);
+    }
+}
+
+/*
+ * lwkt_domsg()
+ *
+ *     Send a message synchronously.  This function requests synchronous
+ *     completion and calls lwkt_beginmsg().  If the target port decides to
+ *     run the message asynchronously this function will block waiting for
+ *     the message to complete.  Since MSGF_ASYNC is not set the target
+ *     will not attempt to queue the reply to a reply port but will simply
+ *     wake up anyone waiting on the message.
+ *
+ *     A synchronous error code is always returned.
+ *
+ *     The message's ms_cmd must be initialized and its ms_flags must be
+ *     at least zero'd out.  lwkt_domsg() will initialize the message's
+ *     reply port to the current thread's built-in reply port.
+ */
+int
+lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    int error;
+
+    msg->ms_flags &= ~(MSGF_ASYNC | MSGF_REPLY | MSGF_QUEUED);
+    msg->ms_reply_port = &curthread->td_msgport;
+    if ((error = lwkt_beginmsg(port, msg)) == EINPROGRESS) {
+       error = lwkt_waitmsg(msg);
+    }
+    return(error);
+}
+
+/*
+ * lwkt_waitmsg()
+ *
+ *     Wait for a message that we originated to complete, remove it from
+ *     the reply queue if necessary, and return its error code.
+ *
+ *     This call may be used in virtually any situation, including for the
+ *     case where you used lwkt_sendmsg() to initiate the message.
+ *
+ *     Note that we don't own the message any more so we cannot safely 
+ *     modify ms_flags, meaning we can't clear MSGF_ASYNC as an optimization.
+ *     However, since any remote cpu replying to the message will IPI the
+ *     message over to us for action, a critical section is sufficient to
+ *     protect td_msgq.
+ */
+int
+lwkt_waitmsg(lwkt_msg_t msg)
+{
+    lwkt_port_t port;
+
+    /*
+     * Done but not queued case (message was originally a synchronous request)
+     */
+    if ((msg->ms_flags & (MSGF_DONE|MSGF_REPLY)) == MSGF_DONE)
+       return(msg->ms_error);
+
+    port = msg->ms_reply_port;
+    KKASSERT(port->mp_td == curthread);        /* for now */
+    crit_enter();
+    if ((msg->ms_flags & MSGF_DONE) == 0) {
+       port->mp_flags |= MSGPORTF_WAITING;
+       do {
+           lwkt_deschedule_self();
+           lwkt_switch();
+       } while ((msg->ms_flags & MSGF_DONE) == 0);
+       port->mp_flags &= ~MSGPORTF_WAITING;
+    }
+    /*
+     * We own the message now.
+     */
+    if (msg->ms_flags & MSGF_QUEUED) {
+       msg->ms_flags &= ~MSGF_QUEUED;
+       TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+    }
+    crit_exit();
+    return(msg->ms_error);
+}
+
+
+/************************************************************************
+ *                             PORT FUNCTIONS                          *
+ ************************************************************************/
+
+/*
+ * lwkt_initport()
+ *
+ *     Initialize a port for use and assign it to the specified thread.
+ */
+void
+lwkt_init_port(lwkt_port_t port, thread_t td)
+{
+    bzero(port, sizeof(*port));
+    TAILQ_INIT(&port->mp_msgq);
+    port->mp_td = td;
+    port->mp_beginmsg = lwkt_putport;
+    port->mp_abortmsg =  lwkt_abortport;
+    port->mp_returnmsg = lwkt_replyport;
+}
+
+/*
+ * lwkt_replyport()
+ *
+ *     This function is typically assigned to the mp_replymsg port vector.
+ *
+ *     The message is being returned to the specified port.  The port is
+ *     owned by the mp_td thread.  If we are on the same cpu as the mp_td
+ *     thread we can trivially queue the message to the messageq, otherwise
+ *     we have to send an ipi message to the correct cpu.   We then schedule
+ *     the target thread.
+ *
+ *     If MSGF_ASYNC is not set we do not bother queueing the message, we
+ *     just set the DONE bit.  
+ *
+ *     Note that the IPIQ callback function (*_remote) is entered with a
+ *     critical section already held.
+ */
+
+static __inline
+void
+_lwkt_replyport(lwkt_port_t port, lwkt_msg_t msg)
+{
+    thread_t td = port->mp_td;
+
+    if (td->td_cpu == mycpu->gd_cpuid) {
+       TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
+       msg->ms_flags |= MSGF_DONE | MSGF_REPLY | MSGF_QUEUED;
+       if (port->mp_flags & MSGPORTF_WAITING)
+           lwkt_schedule(td);
+    } else {
+       lwkt_send_ipiq(td->td_cpu, (ipifunc_t)lwkt_replyport_remote, msg);
+    }
+}
+
+static
+void
+lwkt_replyport_remote(lwkt_msg_t msg)
+{
+    _lwkt_replyport(msg->ms_reply_port, msg);
+}
+
+
+void
+lwkt_replyport(lwkt_port_t port, lwkt_msg_t msg)
+{
+    crit_enter();
+    if (msg->ms_flags & MSGF_ASYNC) {
+       _lwkt_replyport(port, msg);
+    } else {
+       msg->ms_flags |= MSGF_DONE;
+       if (port->mp_flags & MSGPORTF_WAITING)
+           lwkt_schedule(port->mp_td);
+    }
+    crit_exit();
+}
+
+/*
+ * lwkt_putport()
+ *
+ *     This function is typically assigned to the mp_beginmsg port vector.
+ *
+ *     Queue a message to the target port and wakeup the thread owning it.
+ *     This function always returns EINPROGRESS and may be assigned to a
+ *     message port's mp_beginmsg function vector.
+ */
+static
+__inline
+void
+_lwkt_putport(lwkt_port_t port, lwkt_msg_t msg)
+{
+    thread_t td = port->mp_td;
+
+    if (td->td_cpu == mycpu->gd_cpuid) {
+       TAILQ_INSERT_TAIL(&port->mp_msgq, msg, ms_node);
+       msg->ms_flags |= MSGF_QUEUED;
+       if (port->mp_flags & MSGPORTF_WAITING)
+           lwkt_schedule(td);
+    } else {
+       msg->ms_target_port = port;
+       lwkt_send_ipiq(td->td_cpu, (ipifunc_t)lwkt_putport_remote, msg);
+    }
+}
+
+static
+void
+lwkt_putport_remote(lwkt_msg_t msg)
+{
+    _lwkt_putport(msg->ms_target_port, msg);
+}
+
+int
+lwkt_putport(lwkt_port_t port, lwkt_msg_t msg)
+{
+    _lwkt_putport(port, msg);
+    return(EINPROGRESS);
+}
+
+/*
+ * lwkt_abortport()
+ *
+ *     This function is typically assigned to the mp_abortmsg port vector.
+ *
+ *     This function attempts to abort a message.  Aborts are always 
+ *     optional, so we could just do nothing if we wanted.  We get onto the
+ *     cpu owning the port, check to see if the message is queued on the 
+ *     port's message queue, and remove and abort it if it is.  Otherwise
+ *     we do nothing.  
+ *
+ *     Note that we cannot safely use ms_target_port because the port might
+ *     have forwarded the message on to some other port and we could race
+ *     against that use of ms_target_port.
+ */
+static
+__inline
+void
+_lwkt_abortport(lwkt_port_t port)
+{
+    thread_t td = port->mp_td;
+    lwkt_msg_t msg;
+
+    if (td->td_cpu == mycpu->gd_cpuid) {
+again:
+       msg = TAILQ_FIRST(&port->mp_msgq);
+       while (msg) {
+           if (msg->ms_abortreq) {
+               TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+               msg->ms_flags &= ~MSGF_QUEUED;
+               lwkt_replymsg(msg, ECONNABORTED); /* YYY dangerous from IPI? */
+               goto again;
+           }
+           msg = TAILQ_NEXT(msg, ms_node);
+       }
+    } else {
+       lwkt_send_ipiq(td->td_cpu, (ipifunc_t)lwkt_abortport_remote, port);
+    }
+}
+
+static
+void
+lwkt_abortport_remote(lwkt_port_t port)
+{
+    _lwkt_abortport(port);
+}
+
+void
+lwkt_abortport(lwkt_port_t port, lwkt_msg_t msg)
+{
+    msg->ms_abortreq = 1;
+    _lwkt_abortport(port);
+}
+
+/*
+ * lwkt_getport()
+ *
+ *     Retrieve the next message from the port's message queue, return NULL
+ *     if no messages are pending.
+ *
+ *     The calling thread MUST own the port.
+ */
+void *
+lwkt_getport(lwkt_port_t port)
+{
+    lwkt_msg_t msg;
+
+    KKASSERT(port->mp_td == curthread);
+
+    crit_enter();
+    if ((msg = TAILQ_FIRST(&port->mp_msgq)) != NULL) {
+       TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+       msg->ms_flags &= ~MSGF_QUEUED;
+    }
+    crit_exit();
+    return(msg);
+}
+
+/*
+ * lwkt_waitport()
+ *
+ *     Retrieve the next message from the port's message queue, block until
+ *     a message is ready.  This function never returns NULL.
+ */
+void *
+lwkt_waitport(lwkt_port_t port)
+{
+    lwkt_msg_t msg;
+
+    KKASSERT(port->mp_td == curthread);
+
+    crit_enter();
+    if ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL) {
+       port->mp_flags |= MSGPORTF_WAITING;
+       do {
+           lwkt_deschedule_self();
+           lwkt_switch();
+       } while ((msg = TAILQ_FIRST(&port->mp_msgq)) == NULL);
+       port->mp_flags &= ~MSGPORTF_WAITING;
+    }
+    TAILQ_REMOVE(&port->mp_msgq, msg, ms_node);
+    msg->ms_flags &= ~MSGF_QUEUED;
+    crit_exit();
+    return(msg);
+}
+
index 3ad1e74..4962e79 100644 (file)
@@ -25,7 +25,7 @@
  *
  * Implements simple shared/exclusive locks using LWKT. 
  *
- * $DragonFly: src/sys/kern/Attic/lwkt_rwlock.c,v 1.3 2003/06/22 20:32:19 dillon Exp $
+ * $DragonFly: src/sys/kern/Attic/lwkt_rwlock.c,v 1.4 2003/07/20 01:37:22 dillon Exp $
  */
 
 #include <sys/param.h>
@@ -44,6 +44,8 @@ lwkt_rwlock_init(lwkt_rwlock_t lock)
     lwkt_init_wait(&lock->rw_wait);
 }
 
+#if 0
+
 void
 lwkt_exlock(lwkt_rwlock_t lock, const char *wmesg)
 {
@@ -91,7 +93,7 @@ lwkt_exunlock(lwkt_rwlock_t lock)
     if (--lock->rw_count == 0) {
        lock->rw_owner = NULL;
        if (lock->rw_requests)
-           lwkt_signal(&lock->rw_wait);
+           lwkt_signal(&lock->rw_wait, 1);
     }
     lwkt_reltoken(&lock->rw_token);
 }
@@ -103,8 +105,9 @@ lwkt_shunlock(lwkt_rwlock_t lock)
     KASSERT(lock->rw_owner == NULL, ("lwkt_shunlock: exclusive lock"));
     if (--lock->rw_count == 0) {
        if (lock->rw_requests)
-           lwkt_signal(&lock->rw_wait);
+           lwkt_signal(&lock->rw_wait, 1);
     }
     lwkt_reltoken(&lock->rw_token);
 }
 
+#endif
index 770ef2b..b10d56d 100644 (file)
@@ -28,7 +28,7 @@
  *     to use a critical section to avoid problems.  Foreign thread 
  *     scheduling is queued via (async) IPIs.
  *
- * $DragonFly: src/sys/kern/lwkt_thread.c,v 1.24 2003/07/19 21:14:38 dillon Exp $
+ * $DragonFly: src/sys/kern/lwkt_thread.c,v 1.25 2003/07/20 01:37:22 dillon Exp $
  */
 
 #include <sys/param.h>
@@ -198,6 +198,7 @@ lwkt_init_thread(thread_t td, void *stack, int flags, struct globaldata *gd)
     td->td_gd = gd;
     td->td_pri = TDPRI_CRIT;
     td->td_cpu = gd->gd_cpuid; /* YYY don't really need this if have td_gd */
+    lwkt_init_port(&td->td_msgport, td);
     pmap_init_thread(td);
     crit_enter();
     TAILQ_INSERT_TAIL(&mycpu->gd_tdallq, td, td_allq);
@@ -852,6 +853,12 @@ lwkt_preempted_proc(void)
     return(td->td_proc);
 }
 
+typedef struct lwkt_gettoken_req {
+    lwkt_token_t tok;
+    int        cpu;
+} lwkt_gettoken_req;
+
+#if 0
 
 /*
  * This function deschedules the current thread and blocks on the specified
@@ -864,10 +871,6 @@ lwkt_preempted_proc(void)
  *
  * Note: wait queue signals normally ping-pong the cpu as an optimization.
  */
-typedef struct lwkt_gettoken_req {
-    lwkt_token_t tok;
-    int        cpu;
-} lwkt_gettoken_req;
 
 void
 lwkt_block(lwkt_wait_t w, const char *wmesg, int *gen)
@@ -881,7 +884,13 @@ lwkt_block(lwkt_wait_t w, const char *wmesg, int *gen)
        ++w->wa_count;
        td->td_wait = w;
        td->td_wmesg = wmesg;
+again:
        lwkt_switch();
+       lwkt_regettoken(&w->wa_token);
+       if (td->td_wmesg != NULL) {
+           _lwkt_dequeue(td);
+           goto again;
+       }
     }
     /* token might be lost, doesn't matter for gen update */
     *gen = w->wa_gen;
@@ -897,14 +906,15 @@ lwkt_block(lwkt_wait_t w, const char *wmesg, int *gen)
  * queue.  YYY implement as sysctl.
  */
 void
-lwkt_signal(lwkt_wait_t w)
+lwkt_signal(lwkt_wait_t w, int count)
 {
     thread_t td;
     int count;
 
     lwkt_gettoken(&w->wa_token);
     ++w->wa_gen;
-    count = w->wa_count;
+    if (count < 0)
+       count = w->wa_count;
     while ((td = TAILQ_FIRST(&w->wa_waitq)) != NULL && count) {
        --count;
        --w->wa_count;
@@ -921,6 +931,8 @@ lwkt_signal(lwkt_wait_t w)
     lwkt_reltoken(&w->wa_token);
 }
 
+#endif
+
 /*
  * Acquire ownership of a token
  *
diff --git a/sys/sys/msgport.h b/sys/sys/msgport.h
new file mode 100644 (file)
index 0000000..ddd3105
--- /dev/null
@@ -0,0 +1,67 @@
+/*
+ * SYS/MSGPORT.H
+ *
+ *     Implements LWKT messages and ports.
+ * 
+ * $DragonFly: src/sys/sys/msgport.h,v 1.1 2003/07/20 01:37:22 dillon Exp $
+ */
+
+#ifndef _SYS_MSGPORT_H_
+#define _SYS_MSGPORT_H_
+
+#ifndef _SYS_QUEUE_H_
+#include <sys/queue.h>         /* TAILQ_* macros */
+#endif
+
+struct lwkt_msg;
+struct lwkt_port;
+
+typedef struct lwkt_msg                *lwkt_msg_t;
+typedef struct lwkt_port       *lwkt_port_t;
+
+/*
+ * The standard message and port structure for communications between
+ * threads.  See kern/lwkt_msgport.c for documentation on how messages and
+ * ports work.
+ */
+typedef struct lwkt_msg {
+    TAILQ_ENTRY(lwkt_msg) ms_node;     /* link node (not always used) */
+    lwkt_port_t ms_target_port;                /* only used in certain situations */
+    lwkt_port_t        ms_reply_port;          /* asynch replies returned here */
+    int                ms_abortreq;            /* set asynchronously */
+    int                ms_cmd;
+    int                ms_flags;
+    int                ms_error;
+} lwkt_msg;
+
+#define MSGF_DONE      0x0001          /* asynch message is complete */
+#define MSGF_REPLY     0x0002          /* asynch message has been returned */
+#define MSGF_QUEUED    0x0004          /* message has been queued sanitychk */
+#define MSGF_ASYNC     0x0008          /* sync/async hint */
+
+typedef struct lwkt_port {
+    lwkt_msg_queue     mp_msgq;
+    int                        mp_flags;
+    thread_t           mp_td;
+    int                        (*mp_beginmsg)(lwkt_port_t port, lwkt_msg_t msg);
+    void               (*mp_abortmsg)(lwkt_port_t port, lwkt_msg_t msg);
+    void               (*mp_returnmsg)(lwkt_port_t port, lwkt_msg_t msg);
+} lwkt_port;
+
+#define MSGPORTF_WAITING       0x0001
+
+#ifdef _KERNEL
+
+extern void lwkt_init_port(lwkt_port_t port, thread_t td);
+extern void lwkt_sendmsg(lwkt_port_t port, lwkt_msg_t msg);
+extern int lwkt_domsg(lwkt_port_t port, lwkt_msg_t msg);
+extern int lwkt_waitmsg(lwkt_msg_t msg);
+extern void lwkt_replyport(lwkt_port_t port, lwkt_msg_t msg);
+extern void lwkt_abortport(lwkt_port_t port, lwkt_msg_t msg);
+extern int lwkt_putport(lwkt_port_t port, lwkt_msg_t msg);
+extern void *lwkt_getport(lwkt_port_t port);
+extern void *lwkt_waitport(lwkt_port_t port);
+
+#endif
+
+#endif
diff --git a/sys/sys/msgport2.h b/sys/sys/msgport2.h
new file mode 100644 (file)
index 0000000..cb7b2e0
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * SYS/MSGPORT2.H
+ *
+ *     Implements Inlines for LWKT messages and ports.
+ * 
+ * $DragonFly: src/sys/sys/msgport2.h,v 1.1 2003/07/20 01:37:22 dillon Exp $
+ */
+
+#ifndef _SYS_MSGPORT2_H_
+#define _SYS_MSGPORT2_H_
+
+#ifdef _KERNEL
+
+static __inline
+int
+lwkt_beginmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    return(port->mp_beginmsg(port, msg));
+}
+
+static __inline
+int
+lwkt_forwardmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    return(port->mp_beginmsg(port, msg));
+}
+
+static __inline
+void
+lwkt_abortmsg(lwkt_port_t port, lwkt_msg_t msg)
+{
+    port->mp_abortmsg(port, msg);
+}
+
+static __inline
+void
+lwkt_replymsg(lwkt_msg_t msg, int error)
+{
+    lwkt_port_t port = msg->ms_reply_port;
+    msg->ms_error = error;
+    port->mp_returnmsg(port, msg);
+}
+
+#endif
+
+#endif
index 2573457..225d102 100644 (file)
@@ -4,7 +4,7 @@
  *     Implements the architecture independant portion of the LWKT 
  *     subsystem.
  * 
- * $DragonFly: src/sys/sys/thread.h,v 1.24 2003/07/12 17:54:36 dillon Exp $
+ * $DragonFly: src/sys/sys/thread.h,v 1.25 2003/07/20 01:37:22 dillon Exp $
  */
 
 #ifndef _SYS_THREAD_H_
@@ -20,18 +20,16 @@ struct thread;
 struct lwkt_queue;
 struct lwkt_token;
 struct lwkt_wait;
-struct lwkt_msg;
 struct lwkt_ipiq;
-struct lwkt_port;
 struct lwkt_cpu_msg;
 struct lwkt_cpu_port;
 struct lwkt_rwlock;
+struct lwkt_msg;
+struct lwkt_port;
 
 typedef struct lwkt_queue      *lwkt_queue_t;
 typedef struct lwkt_token      *lwkt_token_t;
 typedef struct lwkt_wait       *lwkt_wait_t;
-typedef struct lwkt_msg                *lwkt_msg_t;
-typedef struct lwkt_port       *lwkt_port_t;
 typedef struct lwkt_cpu_msg    *lwkt_cpu_msg_t;
 typedef struct lwkt_cpu_port   *lwkt_cpu_port_t;
 typedef struct lwkt_rwlock     *lwkt_rwlock_t;
@@ -44,6 +42,9 @@ typedef TAILQ_HEAD(lwkt_msg_queue, lwkt_msg) lwkt_msg_queue;
 #ifndef _MACHINE_THREAD_H_
 #include <machine/thread.h>            /* md_thread */
 #endif
+#ifndef _SYS_MSGPORT_H_
+#include <sys/msgport.h>
+#endif
 
 /*
  * Tokens arbitrate access to information.  They are 'soft' arbitrators
@@ -71,29 +72,6 @@ typedef struct lwkt_wait {
     int                wa_count;
 } lwkt_wait;
 
-/*
- * The standard message and port structure for communications between
- * threads.
- */
-typedef struct lwkt_msg {
-    TAILQ_ENTRY(lwkt_msg) ms_node;
-    lwkt_port_t        ms_replyport;
-    int                ms_cmd;
-    int                ms_flags;
-    int                ms_error;
-} lwkt_msg;
-
-#define MSGF_DONE      0x0001
-#define MSGF_REPLY     0x0002
-#define MSGF_QUEUED    0x0004
-
-typedef struct lwkt_port {
-    lwkt_msg_queue     mp_msgq;
-    lwkt_wait          mp_wait;
-} lwkt_port;
-
-#define mp_token       mp_wait.wa_token
-
 #define MAXCPUFIFO      16     /* power of 2 */
 #define MAXCPUFIFO_MASK        (MAXCPUFIFO - 1)
 
@@ -148,6 +126,7 @@ struct md_intr_info;
 struct thread {
     TAILQ_ENTRY(thread) td_threadq;
     TAILQ_ENTRY(thread) td_allq;
+    lwkt_port  td_msgport;     /* built-in message port for replies */
     struct proc        *td_proc;       /* (optional) associated process */
     struct pcb *td_pcb;        /* points to pcb and top of kstack */
     struct globaldata *td_gd;  /* associated with this cpu */
@@ -262,7 +241,7 @@ extern void lwkt_hold(thread_t td);
 extern void lwkt_rele(thread_t td);
 
 extern void lwkt_block(lwkt_wait_t w, const char *wmesg, int *gen);
-extern void lwkt_signal(lwkt_wait_t w);
+extern void lwkt_signal(lwkt_wait_t w, int count);
 extern int lwkt_trytoken(lwkt_token_t tok);
 extern int lwkt_gettoken(lwkt_token_t tok);
 extern int lwkt_gentoken(lwkt_token_t tok, int *gen);