From: Sepherosa Ziehau Date: Thu, 30 Aug 2012 03:26:13 +0000 (+0800) Subject: socket: Replicate soreceive() to sorecvtcp() for cleanup and optimization X-Git-Tag: v3.2.0~227 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/9510c423338710dcd491232d84457cfd99d5fbb9 socket: Replicate soreceive() to sorecvtcp() for cleanup and optimization --- diff --git a/sys/kern/uipc_socket.c b/sys/kern/uipc_socket.c index 59f5a741ef..e6eeb54e0e 100644 --- a/sys/kern/uipc_socket.c +++ b/sys/kern/uipc_socket.c @@ -1502,6 +1502,356 @@ done: return (error); } +int +sorecvtcp(struct socket *so, struct sockaddr **psa, struct uio *uio, + struct sockbuf *sio, struct mbuf **controlp, int *flagsp) +{ + struct mbuf *m, *n; + struct mbuf *free_chain = NULL; + int flags, len, error, offset; + struct protosw *pr = so->so_proto; + int moff, type = 0; + size_t resid, orig_resid; + + if (uio) + resid = uio->uio_resid; + else + resid = (size_t)(sio->sb_climit - sio->sb_cc); + orig_resid = resid; + + if (psa) + *psa = NULL; + if (controlp) + *controlp = NULL; + if (flagsp) + flags = *flagsp &~ MSG_EOR; + else + flags = 0; + if (flags & MSG_OOB) { + m = m_get(MB_WAIT, MT_DATA); + if (m == NULL) + return (ENOBUFS); + error = so_pru_rcvoob(so, m, flags & MSG_PEEK); + if (error) + goto bad; + if (sio) { + do { + sbappend(sio, m); + KKASSERT(resid >= (size_t)m->m_len); + resid -= (size_t)m->m_len; + } while (resid > 0 && m); + } else { + do { + uio->uio_resid = resid; + error = uiomove(mtod(m, caddr_t), + (int)szmin(resid, m->m_len), + uio); + resid = uio->uio_resid; + m = m_free(m); + } while (uio->uio_resid && error == 0 && m); + } +bad: + if (m) + m_freem(m); + return (error); + } + if ((so->so_state & SS_ISCONFIRMING) && resid) + so_pru_rcvd(so, 0); + + /* + * The token interlocks against the protocol thread while + * ssb_lock is a blocking lock against other userland entities. + */ + lwkt_gettoken(&so->so_rcv.ssb_token); +restart: + error = ssb_lock(&so->so_rcv, SBLOCKWAIT(flags)); + if (error) + goto done; + + m = so->so_rcv.ssb_mb; + /* + * If we have less data than requested, block awaiting more + * (subject to any timeout) if: + * 1. the current count is less than the low water mark, or + * 2. MSG_WAITALL is set, and it is possible to do the entire + * receive operation at once if we block (resid <= hiwat). + * 3. MSG_DONTWAIT is not set + * If MSG_WAITALL is set but resid is larger than the receive buffer, + * we have to do the receive in sections, and thus risk returning + * a short count if a timeout or signal occurs after we start. + */ + if (m == NULL || (((flags & MSG_DONTWAIT) == 0 && + (size_t)so->so_rcv.ssb_cc < resid) && + (so->so_rcv.ssb_cc < so->so_rcv.ssb_lowat || + ((flags & MSG_WAITALL) && resid <= (size_t)so->so_rcv.ssb_hiwat)) && + m->m_nextpkt == 0 && (pr->pr_flags & PR_ATOMIC) == 0)) { + KASSERT(m != NULL || !so->so_rcv.ssb_cc, ("receive 1")); + if (so->so_error) { + if (m) + goto dontblock; + error = so->so_error; + if ((flags & MSG_PEEK) == 0) + so->so_error = 0; + goto release; + } + if (so->so_state & SS_CANTRCVMORE) { + if (m) + goto dontblock; + else + goto release; + } + for (; m; m = m->m_next) { + if (m->m_type == MT_OOBDATA || (m->m_flags & M_EOR)) { + m = so->so_rcv.ssb_mb; + goto dontblock; + } + } + if ((so->so_state & (SS_ISCONNECTED|SS_ISCONNECTING)) == 0 && + (pr->pr_flags & PR_CONNREQUIRED)) { + error = ENOTCONN; + goto release; + } + if (resid == 0) + goto release; + if (flags & (MSG_FNONBLOCKING|MSG_DONTWAIT)) { + error = EWOULDBLOCK; + goto release; + } + ssb_unlock(&so->so_rcv); + error = ssb_wait(&so->so_rcv); + if (error) + goto done; + goto restart; + } +dontblock: + if (uio && uio->uio_td && uio->uio_td->td_proc) + uio->uio_td->td_lwp->lwp_ru.ru_msgrcv++; + + /* + * note: m should be == sb_mb here. Cache the next record while + * cleaning up. Note that calling m_free*() will break out critical + * section. + */ + KKASSERT(m == so->so_rcv.ssb_mb); + + /* + * Skip any address mbufs prepending the record. + */ + if (pr->pr_flags & PR_ADDR) { + KASSERT(m->m_type == MT_SONAME, ("receive 1a")); + orig_resid = 0; + if (psa) + *psa = dup_sockaddr(mtod(m, struct sockaddr *)); + if (flags & MSG_PEEK) + m = m->m_next; + else + m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain); + } + + /* + * Skip any control mbufs prepending the record. + */ +#ifdef SCTP + if (pr->pr_flags & PR_ADDR_OPT) { + /* + * For SCTP we may be getting a + * whole message OR a partial delivery. + */ + if (m && m->m_type == MT_SONAME) { + orig_resid = 0; + if (psa) + *psa = dup_sockaddr(mtod(m, struct sockaddr *)); + if (flags & MSG_PEEK) + m = m->m_next; + else + m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain); + } + } +#endif /* SCTP */ + while (m && m->m_type == MT_CONTROL && error == 0) { + if (flags & MSG_PEEK) { + if (controlp) + *controlp = m_copy(m, 0, m->m_len); + m = m->m_next; /* XXX race */ + } else { + if (controlp) { + n = sbunlinkmbuf(&so->so_rcv.sb, m, NULL); + if (pr->pr_domain->dom_externalize && + mtod(m, struct cmsghdr *)->cmsg_type == + SCM_RIGHTS) + error = (*pr->pr_domain->dom_externalize)(m); + *controlp = m; + m = n; + } else { + m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain); + } + } + if (controlp && *controlp) { + orig_resid = 0; + controlp = &(*controlp)->m_next; + } + } + + /* + * flag OOB data. + */ + if (m) { + type = m->m_type; + if (type == MT_OOBDATA) + flags |= MSG_OOB; + } + + /* + * Copy to the UIO or mbuf return chain (*mp). + */ + moff = 0; + offset = 0; + while (m && resid > 0 && error == 0) { + if (m->m_type == MT_OOBDATA) { + if (type != MT_OOBDATA) + break; + } else if (type == MT_OOBDATA) + break; + else + KASSERT(m->m_type == MT_DATA || m->m_type == MT_HEADER, + ("receive 3")); + soclrstate(so, SS_RCVATMARK); + len = (resid > INT_MAX) ? INT_MAX : resid; + if (so->so_oobmark && len > so->so_oobmark - offset) + len = so->so_oobmark - offset; + if (len > m->m_len - moff) + len = m->m_len - moff; + + /* + * Copy out to the UIO or pass the mbufs back to the SIO. + * The SIO is dealt with when we eat the mbuf, but deal + * with the resid here either way. + */ + if (uio) { + uio->uio_resid = resid; + error = uiomove(mtod(m, caddr_t) + moff, len, uio); + resid = uio->uio_resid; + if (error) + goto release; + } else { + resid -= (size_t)len; + } + + /* + * Eat the entire mbuf or just a piece of it + */ + if (len == m->m_len - moff) { + if (m->m_flags & M_EOR) + flags |= MSG_EOR; +#ifdef SCTP + if (m->m_flags & M_NOTIFICATION) + flags |= MSG_NOTIFICATION; +#endif /* SCTP */ + if (flags & MSG_PEEK) { + m = m->m_next; + moff = 0; + } else { + if (sio) { + n = sbunlinkmbuf(&so->so_rcv.sb, m, NULL); + sbappend(sio, m); + m = n; + } else { + m = sbunlinkmbuf(&so->so_rcv.sb, m, &free_chain); + } + } + } else { + if (flags & MSG_PEEK) { + moff += len; + } else { + if (sio) { + n = m_copym(m, 0, len, MB_WAIT); + if (n) + sbappend(sio, n); + } + m->m_data += len; + m->m_len -= len; + so->so_rcv.ssb_cc -= len; + } + } + if (so->so_oobmark) { + if ((flags & MSG_PEEK) == 0) { + so->so_oobmark -= len; + if (so->so_oobmark == 0) { + sosetstate(so, SS_RCVATMARK); + break; + } + } else { + offset += len; + if (offset == so->so_oobmark) + break; + } + } + if (flags & MSG_EOR) + break; + /* + * If the MSG_WAITALL flag is set (for non-atomic socket), + * we must not quit until resid == 0 or an error + * termination. If a signal/timeout occurs, return + * with a short count but without error. + * Keep signalsockbuf locked against other readers. + */ + while ((flags & MSG_WAITALL) && m == NULL && + resid > 0 && !sosendallatonce(so) && + so->so_rcv.ssb_mb == NULL) { + if (so->so_error || so->so_state & SS_CANTRCVMORE) + break; + /* + * The window might have closed to zero, make + * sure we send an ack now that we've drained + * the buffer or we might end up blocking until + * the idle takes over (5 seconds). + */ + if (pr->pr_flags & PR_WANTRCVD && so->so_pcb) + so_pru_rcvd(so, flags); + error = ssb_wait(&so->so_rcv); + if (error) { + ssb_unlock(&so->so_rcv); + error = 0; + goto done; + } + m = so->so_rcv.ssb_mb; + } + } + + /* + * If an atomic read was requested but unread data still remains + * in the record, set MSG_TRUNC. + */ + if (m && pr->pr_flags & PR_ATOMIC) + flags |= MSG_TRUNC; + + /* + * Cleanup. If an atomic read was requested drop any unread data. + */ + if ((flags & MSG_PEEK) == 0) { + if (m && (pr->pr_flags & PR_ATOMIC)) + sbdroprecord(&so->so_rcv.sb); + if ((pr->pr_flags & PR_WANTRCVD) && so->so_pcb) + so_pru_rcvd(so, flags); + } + + if (orig_resid == resid && orig_resid && + (flags & MSG_EOR) == 0 && (so->so_state & SS_CANTRCVMORE) == 0) { + ssb_unlock(&so->so_rcv); + goto restart; + } + + if (flagsp) + *flagsp |= flags; +release: + ssb_unlock(&so->so_rcv); +done: + lwkt_reltoken(&so->so_rcv.ssb_token); + if (free_chain) + m_freem(free_chain); + return (error); +} + /* * Shut a socket down. Note that we do not get a frontend lock as we * want to be able to shut the socket down even if another thread is diff --git a/sys/netinet/tcp_usrreq.c b/sys/netinet/tcp_usrreq.c index 53fc1208df..7063e5cf02 100644 --- a/sys/netinet/tcp_usrreq.c +++ b/sys/netinet/tcp_usrreq.c @@ -909,7 +909,7 @@ struct pr_usrreqs tcp_usrreqs = { .pru_shutdown = tcp_usr_shutdown, .pru_sockaddr = in_setsockaddr_dispatch, .pru_sosend = sosendtcp, - .pru_soreceive = soreceive, + .pru_soreceive = sorecvtcp, .pru_savefaddr = tcp_usr_savefaddr }; @@ -933,7 +933,7 @@ struct pr_usrreqs tcp6_usrreqs = { .pru_shutdown = tcp_usr_shutdown, .pru_sockaddr = in6_mapped_sockaddr_dispatch, .pru_sosend = sosendtcp, - .pru_soreceive = soreceive, + .pru_soreceive = sorecvtcp, .pru_savefaddr = tcp6_usr_savefaddr }; #endif /* INET6 */ diff --git a/sys/sys/socketvar.h b/sys/sys/socketvar.h index fe14c4cdaa..b32d2e0b68 100644 --- a/sys/sys/socketvar.h +++ b/sys/sys/socketvar.h @@ -473,6 +473,9 @@ int soopt_from_mbuf (struct sockopt *sopt, struct mbuf *m); int soreceive (struct socket *so, struct sockaddr **paddr, struct uio *uio, struct sockbuf *sio, struct mbuf **controlp, int *flagsp); +int sorecvtcp (struct socket *so, struct sockaddr **paddr, + struct uio *uio, struct sockbuf *sio, + struct mbuf **controlp, int *flagsp); int soreserve (struct socket *so, u_long sndcc, u_long rcvcc, struct rlimit *rl); void sorflush (struct socket *so);