socket: Replicate soreceive() to sorecvtcp() for cleanup and optimization
authorSepherosa Ziehau <sephe@dragonflybsd.org>
Thu, 30 Aug 2012 03:26:13 +0000 (11:26 +0800)
committerSepherosa Ziehau <sephe@dragonflybsd.org>
Thu, 30 Aug 2012 03:26:13 +0000 (11:26 +0800)
sys/kern/uipc_socket.c
sys/netinet/tcp_usrreq.c
sys/sys/socketvar.h

index 59f5a74..e6eeb54 100644 (file)
@@ -1502,6 +1502,356 @@ done:
        return (error);
 }
 
+int
+sorecvtcp(struct socket *so, struct sockaddr **psa, struct uio *uio,
+         struct sockbuf *sio, struct mbuf **controlp, int *flagsp)
+{
+       struct mbuf *m, *n;
+       struct mbuf *free_chain = NULL;
+       int flags, len, error, offset;
+       struct protosw *pr = so->so_proto;
+       int moff, type = 0;
+       size_t resid, orig_resid;
+
+       if (uio)
+               resid = uio->uio_resid;
+       else
+               resid = (size_t)(sio->sb_climit - sio->sb_cc);
+       orig_resid = resid;
+
+       if (psa)
+               *psa = NULL;
+       if (controlp)
+               *controlp = NULL;
+       if (flagsp)
+               flags = *flagsp &~ MSG_EOR;
+       else
+               flags = 0;
+       if (flags & MSG_OOB) {
+               m = m_get(MB_WAIT, MT_DATA);
+               if (m == NULL)
+                       return (ENOBUFS);
+               error = so_pru_rcvoob(so, m, flags & MSG_PEEK);
+               if (error)
+                       goto bad;
+               if (sio) {
+                       do {
+                               sbappend(sio, m);
+                               KKASSERT(resid >= (size_t)m->m_len);
+                               resid -= (size_t)m->m_len;
+                       } while (resid > 0 && m);
+               } else {
+                       do {
+                               uio->uio_resid = resid;
+                               error = uiomove(mtod(m, caddr_t),
+                                               (int)szmin(resid, m->m_len),
+                                               uio);
+                               resid = uio->uio_resid;
+                               m = m_free(m);
+                       } while (uio->uio_resid && error == 0 && m);
+               }
+bad:
+               if (m)
+                       m_freem(m);
+               return (error);
+       }
+       if ((so->so_state & SS_ISCONFIRMING) && resid)
+               so_pru_rcvd(so, 0);
+
+       /*
+        * The token interlocks against the protocol thread while
+        * ssb_lock is a blocking lock against other userland entities.
+        */
+       lwkt_gettoken(&so->so_rcv.ssb_token);
+restart:
+       error = ssb_lock(&so->so_rcv, SBLOCKWAIT(flags));
+       if (error)
+               goto done;
+
+       m = so->so_rcv.ssb_mb;
+       /*
+        * If we have less data than requested, block awaiting more
+        * (subject to any timeout) if:
+        *   1. the current count is less than the low water mark, or
+        *   2. MSG_WAITALL is set, and it is possible to do the entire
+        *      receive operation at once if we block (resid <= hiwat).
+        *   3. MSG_DONTWAIT is not set
+        * If MSG_WAITALL is set but resid is larger than the receive buffer,
+        * we have to do the receive in sections, and thus risk returning
+        * a short count if a timeout or signal occurs after we start.
+        */
+       if (m == NULL || (((flags & MSG_DONTWAIT) == 0 &&
+           (size_t)so->so_rcv.ssb_cc < resid) &&
+           (so->so_rcv.ssb_cc < so->so_rcv.ssb_lowat ||
+           ((flags & MSG_WAITALL) && resid <= (size_t)so->so_rcv.ssb_hiwat)) &&
+           m->m_nextpkt == 0 && (pr->pr_flags & PR_ATOMIC) == 0)) {
+               KASSERT(m != NULL || !so->so_rcv.ssb_cc, ("receive 1"));
+               if (so->so_error) {
+                       if (m)
+                               goto dontblock;
+                       error = so->so_error;
+                       if ((flags & MSG_PEEK) == 0)
+                               so->so_error = 0;
+                       goto release;
+               }
+               if (so->so_state & SS_CANTRCVMORE) {
+                       if (m)
+                               goto dontblock;
+                       else
+                               goto release;
+               }
+               for (; m; m = m->m_next) {
+                       if (m->m_type == MT_OOBDATA  || (m->m_flags & M_EOR)) {
+                               m = so->so_rcv.ssb_mb;
+                               goto dontblock;
+                       }
+               }
+               if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0 &&
+                   (pr->pr_flags & PR_CONNREQUIRED)) {
+                       error = ENOTCONN;
+                       goto release;
+               }
+               if (resid == 0)
+                       goto release;
+               if (flags & (MSG_FNONBLOCKING|MSG_DONTWAIT)) {
+                       error = EWOULDBLOCK;
+                       goto release;
+               }
+               ssb_unlock(&so->so_rcv);
+               error = ssb_wait(&so->so_rcv);
+               if (error)
+                       goto done;
+               goto restart;
+       }
+dontblock:
+       if (uio && uio->uio_td && uio->uio_td->td_proc)
+               uio->uio_td->td_lwp->lwp_ru.ru_msgrcv++;
+
+       /*
+        * note: m should be == sb_mb here.  Cache the next record while
+        * cleaning up.  Note that calling m_free*() will break out critical
+        * section.
+        */
+       KKASSERT(m == so->so_rcv.ssb_mb);
+
+       /*
+        * Skip any address mbufs prepending the record.
+        */
+       if (pr->pr_flags & PR_ADDR) {
+               KASSERT(m->m_type == MT_SONAME, ("receive 1a"));
+               orig_resid = 0;
+               if (psa)
+                       *psa = dup_sockaddr(mtod(m, struct sockaddr *));
+               if (flags & MSG_PEEK)
+                       m = m->m_next;
+               else
+                       m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain);
+       }
+
+       /*
+        * Skip any control mbufs prepending the record.
+        */
+#ifdef SCTP
+       if (pr->pr_flags & PR_ADDR_OPT) {
+               /*
+                * For SCTP we may be getting a
+                * whole message OR a partial delivery.
+                */
+               if (m && m->m_type == MT_SONAME) {
+                       orig_resid = 0;
+                       if (psa)
+                               *psa = dup_sockaddr(mtod(m, struct sockaddr *));
+                       if (flags & MSG_PEEK)
+                               m = m->m_next;
+                       else
+                               m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain);
+               }
+       }
+#endif /* SCTP */
+       while (m && m->m_type == MT_CONTROL && error == 0) {
+               if (flags & MSG_PEEK) {
+                       if (controlp)
+                               *controlp = m_copy(m, 0, m->m_len);
+                       m = m->m_next;  /* XXX race */
+               } else {
+                       if (controlp) {
+                               n = sbunlinkmbuf(&so->so_rcv.sb, m, NULL);
+                               if (pr->pr_domain->dom_externalize &&
+                                   mtod(m, struct cmsghdr *)->cmsg_type ==
+                                   SCM_RIGHTS)
+                                  error = (*pr->pr_domain->dom_externalize)(m);
+                               *controlp = m;
+                               m = n;
+                       } else {
+                               m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain);
+                       }
+               }
+               if (controlp && *controlp) {
+                       orig_resid = 0;
+                       controlp = &(*controlp)->m_next;
+               }
+       }
+
+       /*
+        * flag OOB data.
+        */
+       if (m) {
+               type = m->m_type;
+               if (type == MT_OOBDATA)
+                       flags |= MSG_OOB;
+       }
+
+       /*
+        * Copy to the UIO or mbuf return chain (*mp).
+        */
+       moff = 0;
+       offset = 0;
+       while (m && resid > 0 && error == 0) {
+               if (m->m_type == MT_OOBDATA) {
+                       if (type != MT_OOBDATA)
+                               break;
+               } else if (type == MT_OOBDATA)
+                       break;
+               else
+                   KASSERT(m->m_type == MT_DATA || m->m_type == MT_HEADER,
+                       ("receive 3"));
+               soclrstate(so, SS_RCVATMARK);
+               len = (resid > INT_MAX) ? INT_MAX : resid;
+               if (so->so_oobmark && len > so->so_oobmark - offset)
+                       len = so->so_oobmark - offset;
+               if (len > m->m_len - moff)
+                       len = m->m_len - moff;
+
+               /*
+                * Copy out to the UIO or pass the mbufs back to the SIO.
+                * The SIO is dealt with when we eat the mbuf, but deal
+                * with the resid here either way.
+                */
+               if (uio) {
+                       uio->uio_resid = resid;
+                       error = uiomove(mtod(m, caddr_t) + moff, len, uio);
+                       resid = uio->uio_resid;
+                       if (error)
+                               goto release;
+               } else {
+                       resid -= (size_t)len;
+               }
+
+               /*
+                * Eat the entire mbuf or just a piece of it
+                */
+               if (len == m->m_len - moff) {
+                       if (m->m_flags & M_EOR)
+                               flags |= MSG_EOR;
+#ifdef SCTP
+                       if (m->m_flags & M_NOTIFICATION)
+                               flags |= MSG_NOTIFICATION;
+#endif /* SCTP */
+                       if (flags & MSG_PEEK) {
+                               m = m->m_next;
+                               moff = 0;
+                       } else {
+                               if (sio) {
+                                       n = sbunlinkmbuf(&so->so_rcv.sb, m, NULL);
+                                       sbappend(sio, m);
+                                       m = n;
+                               } else {
+                                       m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain);
+                               }
+                       }
+               } else {
+                       if (flags & MSG_PEEK) {
+                               moff += len;
+                       } else {
+                               if (sio) {
+                                       n = m_copym(m, 0, len, MB_WAIT);
+                                       if (n)
+                                               sbappend(sio, n);
+                               }
+                               m->m_data += len;
+                               m->m_len -= len;
+                               so->so_rcv.ssb_cc -= len;
+                       }
+               }
+               if (so->so_oobmark) {
+                       if ((flags & MSG_PEEK) == 0) {
+                               so->so_oobmark -= len;
+                               if (so->so_oobmark == 0) {
+                                       sosetstate(so, SS_RCVATMARK);
+                                       break;
+                               }
+                       } else {
+                               offset += len;
+                               if (offset == so->so_oobmark)
+                                       break;
+                       }
+               }
+               if (flags & MSG_EOR)
+                       break;
+               /*
+                * If the MSG_WAITALL flag is set (for non-atomic socket),
+                * we must not quit until resid == 0 or an error
+                * termination.  If a signal/timeout occurs, return
+                * with a short count but without error.
+                * Keep signalsockbuf locked against other readers.
+                */
+               while ((flags & MSG_WAITALL) && m == NULL && 
+                      resid > 0 && !sosendallatonce(so) && 
+                      so->so_rcv.ssb_mb == NULL) {
+                       if (so->so_error || so->so_state & SS_CANTRCVMORE)
+                               break;
+                       /*
+                        * The window might have closed to zero, make
+                        * sure we send an ack now that we've drained
+                        * the buffer or we might end up blocking until
+                        * the idle takes over (5 seconds).
+                        */
+                       if (pr->pr_flags & PR_WANTRCVD && so->so_pcb)
+                               so_pru_rcvd(so, flags);
+                       error = ssb_wait(&so->so_rcv);
+                       if (error) {
+                               ssb_unlock(&so->so_rcv);
+                               error = 0;
+                               goto done;
+                       }
+                       m = so->so_rcv.ssb_mb;
+               }
+       }
+
+       /*
+        * If an atomic read was requested but unread data still remains
+        * in the record, set MSG_TRUNC.
+        */
+       if (m && pr->pr_flags & PR_ATOMIC)
+               flags |= MSG_TRUNC;
+
+       /*
+        * Cleanup.  If an atomic read was requested drop any unread data.
+        */
+       if ((flags & MSG_PEEK) == 0) {
+               if (m && (pr->pr_flags & PR_ATOMIC))
+                       sbdroprecord(&so->so_rcv.sb);
+               if ((pr->pr_flags & PR_WANTRCVD) && so->so_pcb)
+                       so_pru_rcvd(so, flags);
+       }
+
+       if (orig_resid == resid && orig_resid &&
+           (flags & MSG_EOR) == 0 && (so->so_state & SS_CANTRCVMORE) == 0) {
+               ssb_unlock(&so->so_rcv);
+               goto restart;
+       }
+
+       if (flagsp)
+               *flagsp |= flags;
+release:
+       ssb_unlock(&so->so_rcv);
+done:
+       lwkt_reltoken(&so->so_rcv.ssb_token);
+       if (free_chain)
+               m_freem(free_chain);
+       return (error);
+}
+
 /*
  * Shut a socket down.  Note that we do not get a frontend lock as we
  * want to be able to shut the socket down even if another thread is
index 53fc120..7063e5c 100644 (file)
@@ -909,7 +909,7 @@ struct pr_usrreqs tcp_usrreqs = {
        .pru_shutdown = tcp_usr_shutdown,
        .pru_sockaddr = in_setsockaddr_dispatch,
        .pru_sosend = sosendtcp,
-       .pru_soreceive = soreceive,
+       .pru_soreceive = sorecvtcp,
        .pru_savefaddr = tcp_usr_savefaddr
 };
 
@@ -933,7 +933,7 @@ struct pr_usrreqs tcp6_usrreqs = {
        .pru_shutdown = tcp_usr_shutdown,
        .pru_sockaddr = in6_mapped_sockaddr_dispatch,
        .pru_sosend = sosendtcp,
-       .pru_soreceive = soreceive,
+       .pru_soreceive = sorecvtcp,
        .pru_savefaddr = tcp6_usr_savefaddr
 };
 #endif /* INET6 */
index fe14c4c..b32d2e0 100644 (file)
@@ -473,6 +473,9 @@ int soopt_from_mbuf (struct sockopt *sopt, struct mbuf *m);
 int    soreceive (struct socket *so, struct sockaddr **paddr,
                       struct uio *uio, struct sockbuf *sio,
                       struct mbuf **controlp, int *flagsp);
+int    sorecvtcp (struct socket *so, struct sockaddr **paddr,
+                      struct uio *uio, struct sockbuf *sio,
+                      struct mbuf **controlp, int *flagsp);
 int    soreserve (struct socket *so, u_long sndcc, u_long rcvcc,
                   struct rlimit *rl);
 void   sorflush (struct socket *so);