tcp: Implement asynchronous pru_connect
authorSepherosa Ziehau <sephe@dragonflybsd.org>
Tue, 20 Aug 2013 12:52:35 +0000 (20:52 +0800)
committerSepherosa Ziehau <sephe@dragonflybsd.org>
Wed, 28 Aug 2013 04:49:39 +0000 (12:49 +0800)
This is mainly used to improve TCP nonblocking connect(2) performance.

Before this commit the user space thread uses nonblocking connect(2)
will have to wait for the netisr completes the SYN output.  This could
be performance hit for nonblocking connect(2).  First, the user space
thread is put into sleep, even if the connect(2) is nonblocking.
Second, it does not make too much sense for nonblocking connect(2) to
wait for the SYN output.

TCP's asynchronous pru_connect implementation will set ISCONNECTING
before dispatching netmsg to netisr0.  The errors like EADDRNOTAVAIL,
i.e. out of local port space, will be notified through kevent(2) or
getsockopt(2) SOL_SOCKET/SO_ERROR.

NFS and other kernel code still use old synchronized pru_connect.  This
commit only affects connect(2) syscall.

Sysctl node kern.ipc.soconnect_async is added to enable and disable
asynchronous pru_connect.  It is enabled by default.

The performance measurement (i7-2600 w/ bnx(4)), using
tools/tools/netrate/accept_connect/kq_connect_client:

    kq_connect_client -4 SERVADDR -p SERVPORT -i 8 -c 32 -l 30
    (8 processes, each creates 32 connections simultaniously, run 30 secs)

16 runs average:

    asynchronous pru_connect        synchronized pru_connect
        220979.89 conns/s               189106.88 conns/s

This commit gives ~16% performance improvement for nonblocking connect(2)

17 files changed:
sys/kern/uipc_msg.c
sys/kern/uipc_socket.c
sys/kern/uipc_syscalls.c
sys/net/netisr.c
sys/net/netmsg.h
sys/netgraph/ksocket/ng_ksocket.c
sys/netgraph7/bluetooth/socket/ng_btsocket_rfcomm.c
sys/netgraph7/ksocket/ng_ksocket.c
sys/netinet/in_proto.c
sys/netinet/tcp_usrreq.c
sys/netinet/udp_usrreq.c
sys/netproto/ncp/ncp_sock.c
sys/netproto/smb/smb_trantcp.c
sys/sys/protosw.h
sys/sys/socketops.h
sys/sys/socketvar.h
sys/vfs/nfs/nfs_socket.c

index 44845d3..7ab5543 100644 (file)
@@ -176,12 +176,55 @@ so_pru_connect(struct socket *so, struct sockaddr *nam, struct thread *td)
        msg.nm_nam = nam;
        msg.nm_td = td;
        msg.nm_m = NULL;
+       msg.nm_sndflags = 0;
        msg.nm_flags = 0;
-       msg.nm_reconnect = 0;
        error = lwkt_domsg(so->so_port, &msg.base.lmsg, 0);
        return (error);
 }
 
+int
+so_pru_connect_async(struct socket *so, struct sockaddr *nam, struct thread *td)
+{
+       struct netmsg_pru_connect *msg;
+       int error, flags;
+
+       KASSERT(so->so_proto->pr_usrreqs->pru_preconnect != NULL,
+           ("async pru_connect is not supported"));
+
+       /* NOTE: sockaddr immediately follows netmsg */
+       msg = kmalloc(sizeof(*msg) + nam->sa_len, M_LWKTMSG, M_NOWAIT);
+       if (msg == NULL) {
+               /*
+                * Fail to allocate address w/o waiting;
+                * fallback to synchronized pru_connect.
+                */
+               return so_pru_connect(so, nam, td);
+       }
+
+       error = so->so_proto->pr_usrreqs->pru_preconnect(so, nam, td);
+       if (error) {
+               kfree(msg, M_LWKTMSG);
+               return error;
+       }
+
+       flags = PRUC_ASYNC;
+       if (td != NULL && (so->so_proto->pr_flags & PR_ACONN_HOLDTD)) {
+               lwkt_hold(td);
+               flags |= PRUC_HELDTD;
+       }
+
+       netmsg_init(&msg->base, so, &netisr_afree_rport, 0,
+           so->so_proto->pr_usrreqs->pru_connect);
+       msg->nm_nam = (struct sockaddr *)(msg + 1);
+       memcpy(msg->nm_nam, nam, nam->sa_len);
+       msg->nm_td = td;
+       msg->nm_m = NULL;
+       msg->nm_sndflags = 0;
+       msg->nm_flags = flags;
+       lwkt_sendmsg(so->so_port, &msg->base.lmsg);
+       return 0;
+}
+
 int
 so_pru_connect2(struct socket *so1, struct socket *so2)
 {
index 16b0d39..a464edc 100644 (file)
@@ -145,6 +145,10 @@ int use_sendfile_async = 1;
 SYSCTL_INT(_kern_ipc, OID_AUTO, sendfile_async, CTLFLAG_RW,
     &use_sendfile_async, 0, "sendfile uses asynchronized pru_send");
 
+int use_soconnect_async = 1;
+SYSCTL_INT(_kern_ipc, OID_AUTO, soconnect_async, CTLFLAG_RW,
+    &use_soconnect_async, 0, "soconnect uses asynchronized pru_connect");
+
 /*
  * Socket operation routines.
  * These routines are called by the routines in
@@ -695,7 +699,8 @@ soaccept(struct socket *so, struct sockaddr **nam)
 }
 
 int
-soconnect(struct socket *so, struct sockaddr *nam, struct thread *td)
+soconnect(struct socket *so, struct sockaddr *nam, struct thread *td,
+    boolean_t sync)
 {
        int error;
 
@@ -717,7 +722,10 @@ soconnect(struct socket *so, struct sockaddr *nam, struct thread *td)
                 * from biting us.
                 */
                so->so_error = 0;
-               error = so_pru_connect(so, nam, td);
+               if (!sync && so->so_proto->pr_usrreqs->pru_preconnect)
+                       error = so_pru_connect_async(so, nam, td);
+               else
+                       error = so_pru_connect(so, nam, td);
        }
        return (error);
 }
index 47d416a..7b1a698 100644 (file)
@@ -82,6 +82,7 @@
 
 extern int use_soaccept_pred_fast;
 extern int use_sendfile_async;
+extern int use_soconnect_async;
 
 /*
  * System call interface to the socket abstraction.
@@ -512,7 +513,7 @@ kern_connect(int s, int fflags, struct sockaddr *sa)
                error = EALREADY;
                goto done;
        }
-       error = soconnect(so, sa, td);
+       error = soconnect(so, sa, td, use_soconnect_async ? FALSE : TRUE);
        if (error)
                goto bad;
        if ((fflags & FNONBLOCK) && (so->so_state & SS_ISCONNECTING)) {
index bf0f808..7444e26 100644 (file)
@@ -299,8 +299,16 @@ netmsg_service_loop(void *arg)
                                 * ops can change ports on us.  Chase the
                                 * port.
                                 */
+#ifdef foo
+                               /*
+                                * This could be quite common for protocols
+                                * which support asynchronous pru_connect,
+                                * e.g. TCP, so kprintf socket port chasing
+                                * could be too verbose for the console.
+                                */
                                kprintf("netmsg_service_loop: Warning, "
                                        "port changed so=%p\n", msg->nm_so);
+#endif
                                lwkt_forwardmsg(msg->nm_so->so_port,
                                                &msg->lmsg);
                        } else {
index e2c7d17..3c090ec 100644 (file)
@@ -137,14 +137,16 @@ struct netmsg_pru_connect {
        struct sockaddr         *nm_nam;
        struct thread           *nm_td;
        struct mbuf             *nm_m;          /* connect with send */
-       int                     nm_flags;       /* connect with send */
-       int                     nm_reconnect;   /* message control */
+       int                     nm_sndflags;    /* connect with send, PRUS_ */
+       int                     nm_flags;       /* message control */
 };
 
-#define NMSG_RECONNECT_RECONNECT       0x0001  /* thread port change */
-#define NMSG_RECONNECT_NAMALLOC                0x0002  /* nm_nam allocated */
-#define NMSG_RECONNECT_PUSH            0x0004  /* call tcp_output */
-#define NMSG_RECONNECT_FALLBACK                0x0008  /* fallback to ipv4 */
+#define PRUC_RECONNECT         0x0001          /* thread port change */
+#define PRUC_NAMALLOC          0x0002          /* nm_nam allocated */
+#define PRUC_PUSH              0x0004          /* call tcp_output */
+#define PRUC_FALLBACK          0x0008          /* fallback to ipv4 */
+#define PRUC_ASYNC             0x0010
+#define PRUC_HELDTD            0x0020
 
 struct netmsg_pru_connect2 {
        struct netmsg_base      base;
index 1e23676..78f76d8 100644 (file)
@@ -726,7 +726,7 @@ ng_ksocket_rcvmsg(node_p node, struct ng_mesg *msg,
                        /* Do connect */
                        if ((so->so_state & SS_ISCONNECTING) != 0)
                                ERROUT(EALREADY);
-                       if ((error = soconnect(so, sa, td)) != 0) {
+                       if ((error = soconnect(so, sa, td, TRUE)) != 0) {
                                soclrstate(so, SS_ISCONNECTING);
                                ERROUT(error);
                        }
index f90661a..4490224 100644 (file)
@@ -1356,7 +1356,7 @@ ng_btsocket_rfcomm_session_create(ng_btsocket_rfcomm_session_p *sp,
                l2sa.l2cap_psm = htole16(NG_L2CAP_PSM_RFCOMM);
                bcopy(dst, &l2sa.l2cap_bdaddr, sizeof(l2sa.l2cap_bdaddr));
 
-               error = soconnect(s->l2so, (struct sockaddr *) &l2sa, td);
+               error = soconnect(s->l2so, (struct sockaddr *) &l2sa, td, TRUE);
                if (error != 0)
                        goto bad;
        }
index 097fc5a..4c3ac9b 100644 (file)
@@ -747,7 +747,7 @@ ng_ksocket_rcvmsg(node_p node, item_p item, hook_p lasthook)
                        /* Do connect */
                        if ((so->so_state & SS_ISCONNECTING) != 0)
                                ERROUT(EALREADY);
-                       if ((error = soconnect(so, sa, td)) != 0) {
+                       if ((error = soconnect(so, sa, td, TRUE)) != 0) {
                                soclrstate(so, SS_ISCONNECTING);
                                ERROUT(error);
                        }
index 268d887..513a0c7 100644 (file)
@@ -140,8 +140,8 @@ struct protosw inetsw[] = {
        .pr_type = SOCK_STREAM,
        .pr_domain = &inetdomain,
        .pr_protocol = IPPROTO_TCP,
-       .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|
-           PR_MPSAFE|PR_ASYNC_SEND|PR_ASYNC_RCVD,
+       .pr_flags = PR_CONNREQUIRED|PR_WANTRCVD|PR_MPSAFE|
+           PR_ASYNC_SEND|PR_ASYNC_RCVD|PR_ACONN_HOLDTD,
 
        .pr_input = tcp_input,
        .pr_output = NULL,
index 87a5271..446b6e2 100644 (file)
@@ -529,6 +529,12 @@ out:
                m_freem(msg->connect.nm_m);
                msg->connect.nm_m = NULL;
        }
+       if (msg->connect.nm_flags & PRUC_HELDTD)
+               lwkt_rele(td);
+       if (error && (msg->connect.nm_flags & PRUC_ASYNC)) {
+               so->so_error = error;
+               soisdisconnected(so);
+       }
        lwkt_replymsg(&msg->lmsg, error);
 }
 
@@ -574,7 +580,7 @@ tcp6_usr_connect(netmsg_t msg)
                inp->inp_vflag |= INP_IPV4;
                inp->inp_vflag &= ~INP_IPV6;
                msg->connect.nm_nam = (struct sockaddr *)sinp;
-               msg->connect.nm_reconnect |= NMSG_RECONNECT_NAMALLOC;
+               msg->connect.nm_flags |= PRUC_NAMALLOC;
                tcp_connect(msg);
                /* msg is invalid now */
                return;
@@ -583,7 +589,7 @@ tcp6_usr_connect(netmsg_t msg)
        inp->inp_vflag |= INP_IPV6;
        inp->inp_inc.inc_isipv6 = 1;
 
-       msg->connect.nm_reconnect |= NMSG_RECONNECT_FALLBACK;
+       msg->connect.nm_flags |= PRUC_FALLBACK;
        tcp6_connect(msg);
        /* msg is invalid now */
        return;
@@ -881,6 +887,21 @@ tcp6_usr_savefaddr(struct socket *so, const struct sockaddr *faddr)
 }
 #endif
 
+static int
+tcp_usr_preconnect(struct socket *so, const struct sockaddr *nam,
+    struct thread *td __unused)
+{
+       const struct sockaddr_in *sinp;
+
+       sinp = (const struct sockaddr_in *)nam;
+       if (sinp->sin_family == AF_INET &&
+           IN_MULTICAST(ntohl(sinp->sin_addr.s_addr)))
+               return EAFNOSUPPORT;
+
+       soisconnecting(so);
+       return 0;
+}
+
 /* xxx - should be const */
 struct pr_usrreqs tcp_usrreqs = {
        .pru_abort = tcp_usr_abort,
@@ -902,7 +923,8 @@ struct pr_usrreqs tcp_usrreqs = {
        .pru_sockaddr = in_setsockaddr_dispatch,
        .pru_sosend = sosendtcp,
        .pru_soreceive = sorecvtcp,
-       .pru_savefaddr = tcp_usr_savefaddr
+       .pru_savefaddr = tcp_usr_savefaddr,
+       .pru_preconnect = tcp_usr_preconnect
 };
 
 #ifdef INET6
@@ -1042,8 +1064,8 @@ tcp_connect(netmsg_t msg)
        /*
         * Reconnect our pcb if we have to
         */
-       if (msg->connect.nm_reconnect & NMSG_RECONNECT_RECONNECT) {
-               msg->connect.nm_reconnect &= ~NMSG_RECONNECT_RECONNECT;
+       if (msg->connect.nm_flags & PRUC_RECONNECT) {
+               msg->connect.nm_flags &= ~PRUC_RECONNECT;
                in_pcblink(so->so_pcb, &tcbinfo[mycpu->gd_cpuid]);
        }
 
@@ -1108,14 +1130,20 @@ tcp_connect(netmsg_t msg)
                 */
                in_pcbunlink(so->so_pcb, &tcbinfo[mycpu->gd_cpuid]);
                sosetport(so, port);
-               msg->connect.nm_reconnect |= NMSG_RECONNECT_RECONNECT;
+               msg->connect.nm_flags |= PRUC_RECONNECT;
                msg->connect.base.nm_dispatch = tcp_connect;
 
                lwkt_forwardmsg(port, &msg->connect.base.lmsg);
                /* msg invalid now */
                return;
+       } else if (msg->connect.nm_flags & PRUC_HELDTD) {
+               /*
+                * The original thread is no longer needed; release it.
+                */
+               lwkt_rele(td);
+               msg->connect.nm_flags &= ~PRUC_HELDTD;
        }
-       error = tcp_connect_oncpu(tp, msg->connect.nm_flags,
+       error = tcp_connect_oncpu(tp, msg->connect.nm_sndflags,
                                  msg->connect.nm_m, sin, if_sin);
        msg->connect.nm_m = NULL;
 out:
@@ -1123,10 +1151,16 @@ out:
                m_freem(msg->connect.nm_m);
                msg->connect.nm_m = NULL;
        }
-       if (msg->connect.nm_reconnect & NMSG_RECONNECT_NAMALLOC) {
+       if (msg->connect.nm_flags & PRUC_NAMALLOC) {
                kfree(msg->connect.nm_nam, M_LWKTMSG);
                msg->connect.nm_nam = NULL;
        }
+       if (msg->connect.nm_flags & PRUC_HELDTD)
+               lwkt_rele(td);
+       if (error && (msg->connect.nm_flags & PRUC_ASYNC)) {
+               so->so_error = error;
+               soisdisconnected(so);
+       }
        lwkt_replymsg(&msg->connect.base.lmsg, error);
        /* msg invalid now */
 }
@@ -1151,8 +1185,8 @@ tcp6_connect(netmsg_t msg)
        /*
         * Reconnect our pcb if we have to
         */
-       if (msg->connect.nm_reconnect & NMSG_RECONNECT_RECONNECT) {
-               msg->connect.nm_reconnect &= ~NMSG_RECONNECT_RECONNECT;
+       if (msg->connect.nm_flags & PRUC_RECONNECT) {
+               msg->connect.nm_flags &= ~PRUC_RECONNECT;
                in_pcblink(so->so_pcb, &tcbinfo[mycpu->gd_cpuid]);
        }
 
@@ -1190,18 +1224,18 @@ tcp6_connect(netmsg_t msg)
 
                in_pcbunlink(so->so_pcb, &tcbinfo[mycpu->gd_cpuid]);
                sosetport(so, port);
-               msg->connect.nm_reconnect |= NMSG_RECONNECT_RECONNECT;
+               msg->connect.nm_flags |= PRUC_RECONNECT;
                msg->connect.base.nm_dispatch = tcp6_connect;
 
                lwkt_forwardmsg(port, &msg->connect.base.lmsg);
                /* msg invalid now */
                return;
        }
-       error = tcp6_connect_oncpu(tp, msg->connect.nm_flags,
+       error = tcp6_connect_oncpu(tp, msg->connect.nm_sndflags,
                                   &msg->connect.nm_m, sin6, addr6);
        /* nm_m may still be intact */
 out:
-       if (error && (msg->connect.nm_reconnect & NMSG_RECONNECT_FALLBACK)) {
+       if (error && (msg->connect.nm_flags & PRUC_FALLBACK)) {
                tcp_connect(msg);
                /* msg invalid now */
        } else {
@@ -1209,7 +1243,7 @@ out:
                        m_freem(msg->connect.nm_m);
                        msg->connect.nm_m = NULL;
                }
-               if (msg->connect.nm_reconnect & NMSG_RECONNECT_NAMALLOC) {
+               if (msg->connect.nm_flags & PRUC_NAMALLOC) {
                        kfree(msg->connect.nm_nam, M_LWKTMSG);
                        msg->connect.nm_nam = NULL;
                }
index 6762629..d1d7838 100644 (file)
@@ -1129,10 +1129,10 @@ udp_connect(netmsg_t msg)
                goto out;
        }
 
-       if (msg->connect.nm_reconnect & NMSG_RECONNECT_RECONNECT) {
+       if (msg->connect.nm_flags & PRUC_RECONNECT) {
                panic("UDP does not support RECONNECT");
 #ifdef notyet
-               msg->connect.nm_reconnect &= ~NMSG_RECONNECT_RECONNECT;
+               msg->connect.nm_flags &= ~PRUC_RECONNECT;
                in_pcblink(inp, &udbinfo);
 #endif
        }
@@ -1188,7 +1188,7 @@ udp_connect(netmsg_t msg)
                in_pcbunlink(so->so_pcb, &udbinfo);
                /* in_pcbunlink(so->so_pcb, &udbinfo[mycpu->gd_cpuid]); */
                sosetport(so, port);
-               msg->connect.nm_reconnect |= NMSG_RECONNECT_RECONNECT;
+               msg->connect.nm_flags |= PRUC_RECONNECT;
                msg->connect.base.nm_dispatch = udp_connect;
 
                lwkt_forwardmsg(port, &msg->connect.base.lmsg);
index 515f0e0..3c611a7 100644 (file)
@@ -88,7 +88,7 @@ static int
 ncp_soconnect(struct socket *so,struct sockaddr *target, struct thread *td) {
        int error;
 
-       error = soconnect(so, target, td);
+       error = soconnect(so, target, td, TRUE);
        if (error)
                return error;
        /*
index 777f640..05b0ca0 100644 (file)
@@ -164,7 +164,7 @@ nb_connect_in(struct nbpcb *nbp, struct sockaddr_in *to, struct thread *td)
        nb_setsockopt_int(so, IPPROTO_TCP, TCP_NODELAY, 1);
        atomic_clear_int(&so->so_rcv.ssb_flags, SSB_NOINTR);
        atomic_clear_int(&so->so_snd.ssb_flags, SSB_NOINTR);
-       error = soconnect(so, (struct sockaddr*)to, td);
+       error = soconnect(so, (struct sockaddr*)to, td, TRUE);
 
        /*
         * If signals are allowed nbssn_recv() can wind up in a hard loop
index e249e16..6af56f8 100644 (file)
@@ -147,6 +147,7 @@ struct protosw {
 #define PR_ASYNC_SEND  0x0400          /* async pru_send */
 #define PR_ASYNC_RCVD  0x0800          /* async pru_rcvd */
 #define PR_ASEND_HOLDTD        0x1000          /* async pru_send hold orig thread */
+#define PR_ACONN_HOLDTD        0x2000          /* async pru_connect hold orig thread */
 
 /*
  * The arguments to usrreq are:
@@ -229,6 +230,11 @@ struct pru_attach_info {
  *
  *     pru_savefaddr() - called synchronoutly by protocol thread. Typically
  *                       save the foreign address into socket.so_faddr.
+ *
+ *     pru_preconnect() - called synchronously from user context.  Typically
+ *                         prepares for later asynchronous pru_connect, e.g.
+ *                         sets ISCONNECTING.  Non-NULL means asynchronous
+ *                         pru_connect is supported.
  */
 struct pr_usrreqs {
        void    (*pru_abort) (netmsg_t msg);
@@ -272,6 +278,11 @@ struct pr_usrreqs {
        /* synchronously called by protocol thread */
        void    (*pru_savefaddr) (struct socket *so,
                                      const struct sockaddr *addr);
+
+       /* synchronously called by user thread. */
+       int     (*pru_preconnect) (struct socket *so,
+                                     const struct sockaddr *addr,
+                                     struct thread *td);
 };
 
 typedef int (*pru_sosend_fn_t) (struct socket *so, struct sockaddr *addr,
index 9f0fb8b..bedaa86 100644 (file)
@@ -83,6 +83,8 @@ int so_pru_attach_direct(struct socket *so, int proto,
                struct pru_attach_info *ai);
 int so_pru_bind (struct socket *so, struct sockaddr *nam, struct thread *td);
 int so_pru_connect (struct socket *so, struct sockaddr *nam, struct thread *td);
+int so_pru_connect_async (struct socket *so, struct sockaddr *nam,
+               struct thread *td);
 int so_pru_connect2 (struct socket *so1, struct socket *so2);
 int so_pru_control_direct(struct socket *so, u_long cmd, caddr_t data,
                struct ifnet *ifp);
index 288ccf1..d87b952 100644 (file)
@@ -444,7 +444,8 @@ void        socantrcvmore (struct socket *so);
 void   socantsendmore (struct socket *so);
 int    socket_wait (struct socket *so, struct timespec *ts, int *res);
 int    soclose (struct socket *so, int fflags);
-int    soconnect (struct socket *so, struct sockaddr *nam, struct thread *td);
+int    soconnect (struct socket *so, struct sockaddr *nam, struct thread *td,
+           boolean_t sync);
 int    soconnect2 (struct socket *so1, struct socket *so2);
 int    socreate (int dom, struct socket **aso, int type, int proto,
            struct thread *td);
index 61e3da7..81a7d70 100644 (file)
@@ -255,7 +255,7 @@ nfs_connect(struct nfsmount *nmp, struct nfsreq *rep)
                        goto bad;
                }
        } else {
-               error = soconnect(so, nmp->nm_nam, td);
+               error = soconnect(so, nmp->nm_nam, td, TRUE);
                if (error)
                        goto bad;