From 96c6eb294d6698d9a56882102cb936d326b2a3c7 Mon Sep 17 00:00:00 2001 From: Sepherosa Ziehau Date: Mon, 3 Sep 2012 17:46:58 +0800 Subject: [PATCH] tcp: Implement asynchronized pru_rcvd This mainly avoids extra scheduling cost on the reception path due to lwkt_domsg(). lwkt_sendmsg() is now used to carry out TCP pru_rcvd. Since TCP's pru_rcvd could be batched, one pru_rcvd netmsg is embedded into struct socket to avoid pru_rcvd netmsg allocation for each pru_rcvd, and this netmsg will be used by lwkt_sendmsg(). Whether this embedded pcu_rcvd netmsg should be sent or not is determined by its MSG_DONE bit. Since user thread and netisr thread could be on different CPUs, the embedded pru_rcvd netmsg's MSG_DONE bit is protected by a spinlock. To cope with the following race that could drop window updates, tcp_usr_rcvd() replies asynchronized rcvd netmsg before tcp_output(): netisr thread user thread tcp_usr_rcvd() sorcvtcp() { { tcp_output() : : : : sbunlinkmbuf() : if (rcvd & MSG_DONE) (2) : lwkt_sendmsg(rvcd) : : lwkt_replymsg(rcvd) (1) } At (2) window update is dropped, since rcvd netmsg is not replied yet at (1) The result: On i7-2600 (4C/8T, 3.4GHz): 32 parallel netperf -H 127.0.0.1 -t TCP_STREAM -P0 -l 30 (4 runs, unit: Mbps) old 30253.88 30242.58 30162.55 30101.51 new 33962.74 33798.70 33499.92 33482.35 This gives ~12% performance improvement. --- sys/kern/uipc_msg.c | 35 +++++++++++++++++++++++++++++++++++ sys/kern/uipc_socket.c | 15 ++++++++++----- sys/kern/uipc_socket2.c | 3 +-- sys/net/netmsg.h | 3 +++ sys/netinet/in_proto.c | 3 ++- sys/netinet/tcp_subr.c | 3 +++ sys/netinet/tcp_usrreq.c | 11 +++++++++-- sys/netinet6/in6_proto.c | 2 +- sys/sys/protosw.h | 1 + sys/sys/socketops.h | 3 +++ sys/sys/socketvar.h | 9 ++++++++- 11 files changed, 76 insertions(+), 12 deletions(-) diff --git a/sys/kern/uipc_msg.c b/sys/kern/uipc_msg.c index 86bfa9a586..c33409c8d2 100644 --- a/sys/kern/uipc_msg.c +++ b/sys/kern/uipc_msg.c @@ -42,6 +42,7 @@ #include #include #include +#include #include #include #include @@ -296,10 +297,25 @@ so_pru_rcvd(struct socket *so, int flags) netmsg_init(&msg.base, so, &curthread->td_msgport, 0, so->so_proto->pr_usrreqs->pru_rcvd); msg.nm_flags = flags; + msg.nm_pru_flags = 0; error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0); return (error); } +void +so_pru_rcvd_async(struct socket *so) +{ + lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg; + + KASSERT(so->so_proto->pr_flags & PR_ASYNC_RCVD, + ("async pru_rcvd is not supported")); + + spin_lock(&so->so_rcvd_spin); + if (lmsg->ms_flags & MSGF_DONE) + lwkt_sendmsg(so->so_port, lmsg); + spin_unlock(&so->so_rcvd_spin); +} + int so_pru_rcvoob(struct socket *so, struct mbuf *m, int flags) { @@ -566,3 +582,22 @@ netmsg_so_notify_abort(netmsg_t msg) */ lwkt_replymsg(&abrtmsg->base.lmsg, 0); } + +void +so_async_rcvd_reply(struct socket *so) +{ + spin_lock(&so->so_rcvd_spin); + lwkt_replymsg(&so->so_rcvd_msg.base.lmsg, 0); + spin_unlock(&so->so_rcvd_spin); +} + +void +so_async_rcvd_drop(struct socket *so) +{ + lwkt_msg_t lmsg = &so->so_rcvd_msg.base.lmsg; + + spin_lock(&so->so_rcvd_spin); + if ((lmsg->ms_flags & MSGF_DONE) == 0) + lwkt_dropmsg(lmsg); + spin_unlock(&so->so_rcvd_spin); +} diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index b944980493..6d49d105b0 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -95,6 +95,7 @@ #include #include +#include #include @@ -161,7 +162,7 @@ SYSCTL_INT(_kern_ipc, OID_AUTO, sendfile_async, CTLFLAG_RW, * the protocols can be easily modified to do this. */ struct socket * -soalloc(int waitok) +soalloc(int waitok, struct protosw *pr) { struct socket *so; unsigned waitmask; @@ -170,11 +171,16 @@ soalloc(int waitok) so = kmalloc(sizeof(struct socket), M_SOCKET, M_ZERO|waitmask); if (so) { /* XXX race condition for reentrant kernel */ + so->so_proto = pr; TAILQ_INIT(&so->so_aiojobq); TAILQ_INIT(&so->so_rcv.ssb_kq.ki_mlist); TAILQ_INIT(&so->so_snd.ssb_kq.ki_mlist); lwkt_token_init(&so->so_rcv.ssb_token, "rcvtok"); lwkt_token_init(&so->so_snd.ssb_token, "sndtok"); + spin_init(&so->so_rcvd_spin); + netmsg_init(&so->so_rcvd_msg.base, so, &netisr_adone_rport, + MSGF_DROPABLE, so->so_proto->pr_usrreqs->pru_rcvd); + so->so_rcvd_msg.nm_pru_flags |= PRUR_ASYNC; so->so_state = SS_NOFDREF; so->so_refs = 1; } @@ -209,7 +215,7 @@ socreate(int dom, struct socket **aso, int type, if (prp->pr_type != type) return (EPROTOTYPE); - so = soalloc(p != NULL); + so = soalloc(p != NULL, prp); if (so == NULL) return (ENOBUFS); @@ -239,7 +245,6 @@ socreate(int dom, struct socket **aso, int type, TAILQ_INIT(&so->so_comp); so->so_type = type; so->so_cred = crhold(p->p_ucred); - so->so_proto = prp; ai.sb_rlimit = &p->p_rlimit[RLIMIT_SBSIZE]; ai.p_ucred = p->p_ucred; ai.fd_rdir = p->p_fd->fd_rdir; @@ -1718,7 +1723,7 @@ dontblock: * the idle takes over (5 seconds). */ if (so->so_pcb) - so_pru_rcvd(so, flags); + so_pru_rcvd_async(so); error = ssb_wait(&so->so_rcv); if (error) { ssb_unlock(&so->so_rcv); @@ -1734,7 +1739,7 @@ dontblock: */ if ((flags & MSG_PEEK) == 0) { if (so->so_pcb) - so_pru_rcvd(so, flags); + so_pru_rcvd_async(so); } if (orig_resid == resid && orig_resid && diff --git a/sys/kern/uipc_socket2.c b/sys/kern/uipc_socket2.c index dc3ca6a9be..21a3a210d1 100644 --- a/sys/kern/uipc_socket2.c +++ b/sys/kern/uipc_socket2.c @@ -336,7 +336,7 @@ sonewconn_faddr(struct socket *head, int connstatus, if (head->so_qlen > 3 * head->so_qlimit / 2) return (NULL); - so = soalloc(1); + so = soalloc(1, head->so_proto); if (so == NULL) return (NULL); @@ -360,7 +360,6 @@ sonewconn_faddr(struct socket *head, int connstatus, * soreference(). */ so->so_state = head->so_state | SS_NOFDREF | SS_ASSERTINPROG; - so->so_proto = head->so_proto; so->so_cred = crhold(head->so_cred); ai.sb_rlimit = NULL; ai.p_ucred = NULL; diff --git a/sys/net/netmsg.h b/sys/net/netmsg.h index 82c4d48d4f..68d01618bc 100644 --- a/sys/net/netmsg.h +++ b/sys/net/netmsg.h @@ -170,8 +170,11 @@ struct netmsg_pru_peeraddr { struct netmsg_pru_rcvd { struct netmsg_base base; int nm_flags; + int nm_pru_flags; /* PRUR_xxx */ }; +#define PRUR_ASYNC 0x1 + struct netmsg_pru_rcvoob { struct netmsg_base base; struct mbuf *nm_m; diff --git a/sys/netinet/in_proto.c b/sys/netinet/in_proto.c index aea13c16de..461362f0a1 100644 --- a/sys/netinet/in_proto.c +++ b/sys/netinet/in_proto.c @@ -143,7 +143,8 @@ struct protosw inetsw[] = { .pr_type = SOCK_STREAM, .pr_domain = &inetdomain, .pr_protocol = IPPROTO_TCP, - .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_MPSAFE|PR_ASYNC_SEND, + .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD| + PR_MPSAFE|PR_ASYNC_SEND|PR_ASYNC_RCVD, .pr_input = tcp_input, .pr_output = NULL, diff --git a/sys/netinet/tcp_subr.c b/sys/netinet/tcp_subr.c index 9a88fc732c..7af28f9791 100644 --- a/sys/netinet/tcp_subr.c +++ b/sys/netinet/tcp_subr.c @@ -87,6 +87,7 @@ #include #include #include +#include #include #include #include @@ -1008,6 +1009,8 @@ no_valid_rt: if (tp->t_flags & TF_LISTEN) syncache_destroy(tp); + so_async_rcvd_drop(so); + /* * NOTE: * pcbdetach removes any wildcard hash entry on the current CPU. diff --git a/sys/netinet/tcp_usrreq.c b/sys/netinet/tcp_usrreq.c index 7063e5cf02..76ef199a0e 100644 --- a/sys/netinet/tcp_usrreq.c +++ b/sys/netinet/tcp_usrreq.c @@ -86,6 +86,7 @@ #endif /* INET6 */ #include #include +#include #include #include @@ -719,13 +720,19 @@ static void tcp_usr_rcvd(netmsg_t msg) { struct socket *so = msg->rcvd.base.nm_so; - int error = 0; + int error = 0, noreply = 0; struct inpcb *inp; struct tcpcb *tp; COMMON_START(so, inp, 0); + + if (msg->rcvd.nm_pru_flags & PRUR_ASYNC) { + noreply = 1; + so_async_rcvd_reply(so); + } tcp_output(tp); - COMMON_END(PRU_RCVD); + + COMMON_END1(PRU_RCVD, noreply); } /* diff --git a/sys/netinet6/in6_proto.c b/sys/netinet6/in6_proto.c index 7ee69897b9..b19aae40a8 100644 --- a/sys/netinet6/in6_proto.c +++ b/sys/netinet6/in6_proto.c @@ -187,7 +187,7 @@ struct protosw inet6sw[] = { .pr_domain = &inet6domain, .pr_protocol = IPPROTO_TCP, .pr_flags = PR_CONNREQUIRED | PR_WANTRCVD | PR_LISTEN | - PR_MPSAFE | PR_LASTHDR | PR_ASYNC_SEND, + PR_MPSAFE | PR_LASTHDR | PR_ASYNC_SEND | PR_ASYNC_RCVD, .pr_input = tcp6_input, .pr_output = NULL, diff --git a/sys/sys/protosw.h b/sys/sys/protosw.h index ed71cf21ec..39d602e7fe 100644 --- a/sys/sys/protosw.h +++ b/sys/sys/protosw.h @@ -145,6 +145,7 @@ struct protosw { #define PR_MPSAFE 0x0100 /* protocal is MPSAFE */ #define PR_SYNC_PORT 0x0200 /* synchronous port (no proto thrds) */ #define PR_ASYNC_SEND 0x0400 /* async pru_send */ +#define PR_ASYNC_RCVD 0x0400 /* async pru_rcvd */ /* * The arguments to usrreq are: diff --git a/sys/sys/socketops.h b/sys/sys/socketops.h index b7d776cd48..9f0fb8bef4 100644 --- a/sys/sys/socketops.h +++ b/sys/sys/socketops.h @@ -93,6 +93,9 @@ void so_pru_disconnect_direct (struct socket *so); int so_pru_listen (struct socket *so, struct thread *td); int so_pru_peeraddr (struct socket *so, struct sockaddr **nam); int so_pru_rcvd (struct socket *so, int flags); +void so_pru_rcvd_async (struct socket *so); +void so_async_rcvd_reply (struct socket *so); +void so_async_rcvd_drop (struct socket *so); int so_pru_rcvoob (struct socket *so, struct mbuf *m, int flags); void so_pru_sync (struct socket *so); int so_pru_send (struct socket *so, int flags, struct mbuf *m, diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h index b32d2e0b68..ce17e31c04 100644 --- a/sys/sys/socketvar.h +++ b/sys/sys/socketvar.h @@ -60,6 +60,10 @@ #include #endif +#ifndef _SYS_SPINLOCK_H_ +#include +#endif + struct accept_filter; /* @@ -157,6 +161,9 @@ struct socket { struct netmsg_base so_clomsg; struct sockaddr *so_faddr; + + struct spinlock so_rcvd_spin; + struct netmsg_pru_rcvd so_rcvd_msg; }; #endif @@ -431,7 +438,7 @@ void soaborta (struct socket *so); void soabort_oncpu (struct socket *so); int soaccept (struct socket *so, struct sockaddr **nam); void soaccept_generic (struct socket *so); -struct socket *soalloc (int waitok); +struct socket *soalloc (int waitok, struct protosw *); int sobind (struct socket *so, struct sockaddr *nam, struct thread *td); void socantrcvmore (struct socket *so); void socantsendmore (struct socket *so); -- 2.41.0