tcp: Implement asynchronized pru_rcvd
authorSepherosa Ziehau <sephe@dragonflybsd.org>
Mon, 3 Sep 2012 09:46:58 +0000 (17:46 +0800)
committerSepherosa Ziehau <sephe@dragonflybsd.org>
Tue, 4 Sep 2012 11:30:51 +0000 (19:30 +0800)
This mainly avoids extra scheduling cost on the reception path due to
lwkt_domsg().  lwkt_sendmsg() is now used to carry out TCP pru_rcvd.

Since TCP's pru_rcvd could be batched, one pru_rcvd netmsg is embedded
into struct socket to avoid pru_rcvd netmsg allocation for each pru_rcvd,
and this netmsg will be used by lwkt_sendmsg().  Whether this embedded
pcu_rcvd netmsg should be sent or not is determined by its MSG_DONE bit.
Since user thread and netisr thread could be on different CPUs, the
embedded pru_rcvd netmsg's MSG_DONE bit is protected by a spinlock.

To cope with the following race that could drop window updates,
tcp_usr_rcvd() replies asynchronized rcvd netmsg before tcp_output():

      netisr thread                     user thread

tcp_usr_rcvd()                   sorcvtcp()
{                                {
    tcp_output()                          :
          :                               :
          :                          sbunlinkmbuf()
          :                          if (rcvd & MSG_DONE) (2)
          :                              lwkt_sendmsg(rvcd)
          :                               :
    lwkt_replymsg(rcvd) (1)
}

At (2) window update is dropped, since rcvd netmsg is not replied yet at (1)

The result:
On i7-2600 (4C/8T, 3.4GHz):
32 parallel netperf -H 127.0.0.1 -t TCP_STREAM -P0 -l 30 (4 runs, unit: Mbps)

old   30253.88 30242.58 30162.55 30101.51
new   33962.74 33798.70 33499.92 33482.35

This gives ~12% performance improvement.

sys/kern/uipc_msg.c
sys/kern/uipc_socket.c
sys/kern/uipc_socket2.c
sys/net/netmsg.h
sys/netinet/in_proto.c
sys/netinet/tcp_subr.c
sys/netinet/tcp_usrreq.c
sys/netinet6/in6_proto.c
sys/sys/protosw.h
sys/sys/socketops.h
sys/sys/socketvar.h

index 86bfa9a..c33409c 100644 (file)
@@ -42,6 +42,7 @@
 #include <sys/thread.h>
 #include <sys/thread2.h>
 #include <sys/msgport2.h>
+#include <sys/spinlock2.h>
 #include <sys/mbuf.h>
 #include <vm/pmap.h>
 #include <net/netmsg2.h>
@@ -296,10 +297,25 @@ so_pru_rcvd(struct socket *so, int flags)
        netmsg_init(&msg.base, so, &curthread->td_msgport,
                    0, so->so_proto->pr_usrreqs->pru_rcvd);
        msg.nm_flags = flags;
+       msg.nm_pru_flags = 0;
        error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
        return (error);
 }
 
+void
+so_pru_rcvd_async(struct socket *so)
+{
+       lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg;
+
+       KASSERT(so->so_proto->pr_flags & PR_ASYNC_RCVD,
+           ("async pru_rcvd is not supported"));
+
+       spin_lock(&so->so_rcvd_spin);
+       if (lmsg->ms_flags & MSGF_DONE)
+               lwkt_sendmsg(so->so_port, lmsg);
+       spin_unlock(&so->so_rcvd_spin);
+}
+
 int
 so_pru_rcvoob(struct socket *so, struct mbuf *m, int flags)
 {
@@ -566,3 +582,22 @@ netmsg_so_notify_abort(netmsg_t msg)
         */
        lwkt_replymsg(&abrtmsg->base.lmsg, 0);
 }
+
+void
+so_async_rcvd_reply(struct socket *so)
+{
+       spin_lock(&so->so_rcvd_spin);
+       lwkt_replymsg(&so->so_rcvd_msg.base.lmsg, 0);
+       spin_unlock(&so->so_rcvd_spin);
+}
+
+void
+so_async_rcvd_drop(struct socket *so)
+{
+       lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg;
+
+       spin_lock(&so->so_rcvd_spin);
+       if ((lmsg->ms_flags & MSGF_DONE) == 0)
+               lwkt_dropmsg(lmsg);
+       spin_unlock(&so->so_rcvd_spin);
+}
index b944980..6d49d10 100644 (file)
@@ -95,6 +95,7 @@
 
 #include <sys/thread2.h>
 #include <sys/socketvar2.h>
+#include <sys/spinlock2.h>
 
 #include <machine/limits.h>
 
@@ -161,7 +162,7 @@ SYSCTL_INT(_kern_ipc, OID_AUTO, sendfile_async, CTLFLAG_RW,
  * the protocols can be easily modified to do this.
  */
 struct socket *
-soalloc(int waitok)
+soalloc(int waitok, struct protosw *pr)
 {
        struct socket *so;
        unsigned waitmask;
@@ -170,11 +171,16 @@ soalloc(int waitok)
        so = kmalloc(sizeof(struct socket), M_SOCKET, M_ZERO|waitmask);
        if (so) {
                /* XXX race condition for reentrant kernel */
+               so->so_proto = pr;
                TAILQ_INIT(&so->so_aiojobq);
                TAILQ_INIT(&so->so_rcv.ssb_kq.ki_mlist);
                TAILQ_INIT(&so->so_snd.ssb_kq.ki_mlist);
                lwkt_token_init(&so->so_rcv.ssb_token, "rcvtok");
                lwkt_token_init(&so->so_snd.ssb_token, "sndtok");
+               spin_init(&so->so_rcvd_spin);
+               netmsg_init(&so->so_rcvd_msg.base, so, &netisr_adone_rport,
+                   MSGF_DROPABLE, so->so_proto->pr_usrreqs->pru_rcvd);
+               so->so_rcvd_msg.nm_pru_flags |= PRUR_ASYNC;
                so->so_state = SS_NOFDREF;
                so->so_refs = 1;
        }
@@ -209,7 +215,7 @@ socreate(int dom, struct socket **aso, int type,
 
        if (prp->pr_type != type)
                return (EPROTOTYPE);
-       so = soalloc(p != NULL);
+       so = soalloc(p != NULL, prp);
        if (so == NULL)
                return (ENOBUFS);
 
@@ -239,7 +245,6 @@ socreate(int dom, struct socket **aso, int type,
        TAILQ_INIT(&so->so_comp);
        so->so_type = type;
        so->so_cred = crhold(p->p_ucred);
-       so->so_proto = prp;
        ai.sb_rlimit = &p->p_rlimit[RLIMIT_SBSIZE];
        ai.p_ucred = p->p_ucred;
        ai.fd_rdir = p->p_fd->fd_rdir;
@@ -1718,7 +1723,7 @@ dontblock:
                         * the idle takes over (5 seconds).
                         */
                        if (so->so_pcb)
-                               so_pru_rcvd(so, flags);
+                               so_pru_rcvd_async(so);
                        error = ssb_wait(&so->so_rcv);
                        if (error) {
                                ssb_unlock(&so->so_rcv);
@@ -1734,7 +1739,7 @@ dontblock:
         */
        if ((flags & MSG_PEEK) == 0) {
                if (so->so_pcb)
-                       so_pru_rcvd(so, flags);
+                       so_pru_rcvd_async(so);
        }
 
        if (orig_resid == resid && orig_resid &&
index dc3ca6a..21a3a21 100644 (file)
@@ -336,7 +336,7 @@ sonewconn_faddr(struct socket *head, int connstatus,
 
        if (head->so_qlen > 3 * head->so_qlimit / 2)
                return (NULL);
-       so = soalloc(1);
+       so = soalloc(1, head->so_proto);
        if (so == NULL)
                return (NULL);
 
@@ -360,7 +360,6 @@ sonewconn_faddr(struct socket *head, int connstatus,
         *       soreference().
         */
        so->so_state = head->so_state | SS_NOFDREF | SS_ASSERTINPROG;
-       so->so_proto = head->so_proto;
        so->so_cred = crhold(head->so_cred);
        ai.sb_rlimit = NULL;
        ai.p_ucred = NULL;
index 82c4d48..68d0161 100644 (file)
@@ -170,8 +170,11 @@ struct netmsg_pru_peeraddr {
 struct netmsg_pru_rcvd {
        struct netmsg_base      base;
        int                     nm_flags;
+       int                     nm_pru_flags;   /* PRUR_xxx */
 };
 
+#define PRUR_ASYNC             0x1
+
 struct netmsg_pru_rcvoob {
        struct netmsg_base      base;
        struct mbuf             *nm_m;
index aea13c1..461362f 100644 (file)
@@ -143,7 +143,8 @@ struct protosw inetsw[] = {
        .pr_type = SOCK_STREAM,
        .pr_domain = &inetdomain,
        .pr_protocol = IPPROTO_TCP,
-       .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_MPSAFE|PR_ASYNC_SEND,
+       .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|
+           PR_MPSAFE|PR_ASYNC_SEND|PR_ASYNC_RCVD,
 
        .pr_input = tcp_input,
        .pr_output = NULL,
index 9a88fc7..7af28f9 100644 (file)
@@ -87,6 +87,7 @@
 #include <sys/proc.h>
 #include <sys/priv.h>
 #include <sys/socket.h>
+#include <sys/socketops.h>
 #include <sys/socketvar.h>
 #include <sys/protosw.h>
 #include <sys/random.h>
@@ -1008,6 +1009,8 @@ no_valid_rt:
        if (tp->t_flags & TF_LISTEN)
                syncache_destroy(tp);
 
+       so_async_rcvd_drop(so);
+
        /*
         * NOTE:
         * pcbdetach removes any wildcard hash entry on the current CPU.
index 7063e5c..76ef199 100644 (file)
@@ -86,6 +86,7 @@
 #endif /* INET6 */
 #include <sys/socket.h>
 #include <sys/socketvar.h>
+#include <sys/socketops.h>
 #include <sys/protosw.h>
 
 #include <sys/thread2.h>
@@ -719,13 +720,19 @@ static void
 tcp_usr_rcvd(netmsg_t msg)
 {
        struct socket *so = msg->rcvd.base.nm_so;
-       int error = 0;
+       int error = 0, noreply = 0;
        struct inpcb *inp;
        struct tcpcb *tp;
 
        COMMON_START(so, inp, 0);
+
+       if (msg->rcvd.nm_pru_flags & PRUR_ASYNC) {
+               noreply = 1;
+               so_async_rcvd_reply(so);
+       }
        tcp_output(tp);
-       COMMON_END(PRU_RCVD);
+
+       COMMON_END1(PRU_RCVD, noreply);
 }
 
 /*
index 7ee6989..b19aae4 100644 (file)
@@ -187,7 +187,7 @@ struct protosw inet6sw[] = {
        .pr_domain = &inet6domain,
        .pr_protocol = IPPROTO_TCP,
        .pr_flags = PR_CONNREQUIRED | PR_WANTRCVD | PR_LISTEN |
-                   PR_MPSAFE | PR_LASTHDR | PR_ASYNC_SEND,
+                   PR_MPSAFE | PR_LASTHDR | PR_ASYNC_SEND | PR_ASYNC_RCVD,
 
        .pr_input = tcp6_input,
        .pr_output = NULL,
index ed71cf2..39d602e 100644 (file)
@@ -145,6 +145,7 @@ struct protosw {
 #define PR_MPSAFE      0x0100          /* protocal is MPSAFE */
 #define PR_SYNC_PORT   0x0200          /* synchronous port (no proto thrds) */
 #define PR_ASYNC_SEND  0x0400          /* async pru_send */
+#define PR_ASYNC_RCVD  0x0400          /* async pru_rcvd */
 
 /*
  * The arguments to usrreq are:
index b7d776c..9f0fb8b 100644 (file)
@@ -93,6 +93,9 @@ void so_pru_disconnect_direct (struct socket *so);
 int so_pru_listen (struct socket *so, struct thread *td);
 int so_pru_peeraddr (struct socket *so, struct sockaddr **nam);
 int so_pru_rcvd (struct socket *so, int flags);
+void so_pru_rcvd_async (struct socket *so);
+void so_async_rcvd_reply (struct socket *so);
+void so_async_rcvd_drop (struct socket *so);
 int so_pru_rcvoob (struct socket *so, struct mbuf *m, int flags);
 void so_pru_sync (struct socket *so);
 int so_pru_send (struct socket *so, int flags, struct mbuf *m,
index b32d2e0..ce17e31 100644 (file)
 #include <net/netmsg.h>
 #endif
 
+#ifndef _SYS_SPINLOCK_H_
+#include <sys/spinlock.h>
+#endif
+
 struct accept_filter;
 
 /*
@@ -157,6 +161,9 @@ struct socket {
 
        struct netmsg_base so_clomsg;
        struct sockaddr *so_faddr;
+
+       struct spinlock so_rcvd_spin;
+       struct netmsg_pru_rcvd so_rcvd_msg;
 };
 
 #endif
@@ -431,7 +438,7 @@ void        soaborta (struct socket *so);
 void   soabort_oncpu (struct socket *so);
 int    soaccept (struct socket *so, struct sockaddr **nam);
 void   soaccept_generic (struct socket *so);
-struct socket *soalloc (int waitok);
+struct socket *soalloc (int waitok, struct protosw *);
 int    sobind (struct socket *so, struct sockaddr *nam, struct thread *td);
 void   socantrcvmore (struct socket *so);
 void   socantsendmore (struct socket *so);