From 95b801b2a313c3986761ad670f92f4d1e179f50e Mon Sep 17 00:00:00 2001 From: Matthew Dillon Date: Fri, 24 Sep 2010 18:01:25 -0700 Subject: [PATCH] network - NFS Stability / races * Fix additional races in the upcall interactions which can cause a nfsd to not get woken up when it should. * Medium revamp of the SLP_* flags. * Add NFSRV_RECLIMIT() macro and do a better job limiting the number of pending records taken off a socket. * Fix a memory leak in the M_NFSSVC kmalloc pool. Each time the server lost a socket it leaked a little memory. Now it does not. --- sys/vfs/nfs/nfs.h | 21 ++++--- sys/vfs/nfs/nfs_socket.c | 112 +++++++++++++++++++---------------- sys/vfs/nfs/nfs_syscalls.c | 118 +++++++++++++++++++++++++++---------- 3 files changed, 161 insertions(+), 90 deletions(-) diff --git a/sys/vfs/nfs/nfs.h b/sys/vfs/nfs/nfs.h index 050c5585e0..bc10a9cb16 100644 --- a/sys/vfs/nfs/nfs.h +++ b/sys/vfs/nfs/nfs.h @@ -477,7 +477,8 @@ struct nfssvc_sock { STAILQ_HEAD(, nfsrv_rec) ns_rec; struct mbuf *ns_frag; int ns_numrec; - int ns_flag; + u_int32_t ns_flag; + u_int32_t ns_needq_upcall; struct mtx ns_solock; int ns_cc; int ns_reclen; @@ -490,13 +491,15 @@ struct nfssvc_sock { }; /* Bits for "ns_flag" */ -#define SLP_VALID 0x01 -#define SLP_DOREC 0x02 -#define SLP_NEEDQ 0x04 -#define SLP_DISCONN 0x08 -#define SLP_GETSTREAM 0x10 -#define SLP_LASTFRAG 0x20 -#define SLP_ALLFLAGS 0xff +#define SLP_VALID 0x0001 +#define SLP_DOREC 0x0002 /* socket holding at least one record */ +#define SLP_NEEDQ 0x0004 /* socket needs general rx processing */ +#define SLP_DISCONN 0x0008 +#define SLP_GETSTREAM 0x0010 /* receive stream interlock */ +#define SLP_LASTFRAG 0x0020 + +#define SLP_ALLFLAGS 0xff +#define SLP_ACTION_MASK (SLP_NEEDQ | SLP_DOREC | SLP_DISCONN) extern TAILQ_HEAD(nfssvc_sockhead, nfssvc_sock) nfssvc_sockhead; extern int nfssvc_sockhead_flag; @@ -530,6 +533,8 @@ struct nfsd { #define NFS_LATTR_NOSHRINK 0x01 #define NFS_LATTR_NOMTIMECHECK 0x02 +#define NFSRV_RECLIMIT(slp) ((slp)->ns_numrec >= nfsd_waiting + 4) + /* * This structure is used by the server for describing each request. * Some fields are used only when write request gathering is performed. diff --git a/sys/vfs/nfs/nfs_socket.c b/sys/vfs/nfs/nfs_socket.c index 4603e4adb5..caee990a25 100644 --- a/sys/vfs/nfs/nfs_socket.c +++ b/sys/vfs/nfs/nfs_socket.c @@ -1743,11 +1743,14 @@ nfs_timer_callout(void *arg /* never used */) cur_usec = nfs_curusec(); TAILQ_FOREACH(slp, &nfssvc_sockhead, ns_chain) { - /* XXX race against removal */ - lwkt_gettoken(&slp->ns_token); - if (slp->ns_tq.lh_first && slp->ns_tq.lh_first->nd_time<=cur_usec) - nfsrv_wakenfsd(slp, 1); - lwkt_reltoken(&slp->ns_token); + /* XXX race against removal */ + if (lwkt_trytoken(&slp->ns_token)) { + if (slp->ns_tq.lh_first && + (slp->ns_tq.lh_first->nd_time <= cur_usec)) { + nfsrv_wakenfsd(slp, 1); + } + lwkt_reltoken(&slp->ns_token); + } } #endif /* NFS_NOSERVER */ @@ -2484,21 +2487,32 @@ nfs_msg(struct thread *td, char *server, char *msg) #ifndef NFS_NOSERVER +/* + * Socket upcall routine for nfsd sockets. This runs in the protocol + * thread and passes waitflag == MB_DONTWAIT. + */ void nfsrv_rcv_upcall(struct socket *so, void *arg, int waitflag) { struct nfssvc_sock *slp = (struct nfssvc_sock *)arg; + if (slp->ns_needq_upcall == 0) { + slp->ns_needq_upcall = 1; /* ok to race */ + lwkt_gettoken(&nfs_token); + nfsrv_wakenfsd(slp, 1); + lwkt_reltoken(&nfs_token); + } +#if 0 lwkt_gettoken(&slp->ns_token); + slp->ns_flag |= SLP_NEEDQ; nfsrv_rcv(so, arg, waitflag); lwkt_reltoken(&slp->ns_token); +#endif } /* - * Socket upcall routine for the nfsd sockets. - * The caddr_t arg is a pointer to the "struct nfssvc_sock". - * Essentially do as much as possible non-blocking, else punt and it will - * be called with MB_WAIT from an nfsd. + * Process new data on a receive socket. Essentially do as much as we can + * non-blocking, else punt and it will be called with MB_WAIT from an nfsd. * * slp->ns_token is held on call */ @@ -2523,19 +2537,15 @@ nfsrv_rcv(struct socket *so, void *arg, int waitflag) * end up holding onto an unreasonable number of mbufs for requests * waiting for service. * - * This should give pretty good feedback to the TCP - * layer and prevents a memory crunch for other protocols. + * This should give pretty good feedback to the TCP layer and + * prevents a memory crunch for other protocols. * * Note that the same service socket can be dispatched to several - * nfs servers simultaniously. - * - * the tcp protocol callback calls us with MB_DONTWAIT. - * nfsd calls us with MB_WAIT (typically). + * nfs servers simultaniously. The tcp protocol callback calls us + * with MB_DONTWAIT. nfsd calls us with MB_WAIT (typically). */ - if (waitflag == MB_DONTWAIT && slp->ns_numrec >= nfsd_waiting / 2 + 1) { - slp->ns_flag |= SLP_NEEDQ; - goto dorecs; - } + if (NFSRV_RECLIMIT(slp)) + return; /* * Handle protocol specifics to parse an RPC request. We always @@ -2549,15 +2559,9 @@ nfsrv_rcv(struct socket *so, void *arg, int waitflag) * entity is messing around with the TCP stream at any given * moment. The receive sockbuf's lock in soreceive is not * sufficient. - * - * Note that this procedure can be called from any number of - * NFS severs *OR* can be upcalled directly from a TCP - * protocol thread without the lock. */ - if (slp->ns_flag & SLP_GETSTREAM) { - slp->ns_flag |= SLP_NEEDQ; - goto dorecs; - } + if (slp->ns_flag & SLP_GETSTREAM) + return; slp->ns_flag |= SLP_GETSTREAM; /* @@ -2568,12 +2572,10 @@ nfsrv_rcv(struct socket *so, void *arg, int waitflag) flags = MSG_DONTWAIT; error = so_pru_soreceive(so, &nam, NULL, &sio, NULL, &flags); if (error || sio.sb_mb == NULL) { - if (error == EWOULDBLOCK) - slp->ns_flag |= SLP_NEEDQ; - else + if (error != EWOULDBLOCK) slp->ns_flag |= SLP_DISCONN; - slp->ns_flag &= ~SLP_GETSTREAM; - goto dorecs; + slp->ns_flag &= ~(SLP_GETSTREAM | SLP_NEEDQ); + goto done; } m = sio.sb_mb; if (slp->ns_rawend) { @@ -2589,15 +2591,11 @@ nfsrv_rcv(struct socket *so, void *arg, int waitflag) /* * Now try and parse as many record(s) as we can out of the - * raw stream data. + * raw stream data. This will set SLP_DOREC. */ error = nfsrv_getstream(slp, waitflag, &nparallel_wakeup); - if (error) { - if (error == EPERM) - slp->ns_flag |= SLP_DISCONN; - else - slp->ns_flag |= SLP_NEEDQ; - } + if (error && error != EWOULDBLOCK) + slp->ns_flag |= SLP_DISCONN; slp->ns_flag &= ~SLP_GETSTREAM; } else { /* @@ -2626,28 +2624,32 @@ nfsrv_rcv(struct socket *so, void *arg, int waitflag) rec->nr_packet = sio.sb_mb; STAILQ_INSERT_TAIL(&slp->ns_rec, rec, nr_link); ++slp->ns_numrec; + slp->ns_flag |= SLP_DOREC; ++nparallel_wakeup; + } else { + slp->ns_flag &= ~SLP_NEEDQ; } if (error) { if ((so->so_proto->pr_flags & PR_CONNREQUIRED) - && error != EWOULDBLOCK) { + && error != EWOULDBLOCK) { slp->ns_flag |= SLP_DISCONN; - goto dorecs; + break; } } + if (NFSRV_RECLIMIT(slp)) + break; } while (sio.sb_mb); } /* * If we were upcalled from the tcp protocol layer and we have * fully parsed records ready to go, or there is new data pending, - * or something went wrong, try to wake up an nfsd thread to deal + * or something went wrong, try to wake up a nfsd thread to deal * with it. */ -dorecs: - if (waitflag == MB_DONTWAIT && - (slp->ns_numrec > 0 || - (slp->ns_flag & (SLP_NEEDQ | SLP_DISCONN)))) { +done: + /* XXX this code is currently not executed (nfsrv_rcv_upcall) */ + if (waitflag == MB_DONTWAIT && (slp->ns_flag & SLP_ACTION_MASK)) { lwkt_gettoken(&nfs_token); nfsrv_wakenfsd(slp, nparallel_wakeup); lwkt_reltoken(&nfs_token); @@ -2775,6 +2777,7 @@ nfsrv_getstream(struct nfssvc_sock *slp, int waitflag, int *countp) rec->nr_packet = slp->ns_frag; STAILQ_INSERT_TAIL(&slp->ns_rec, rec, nr_link); ++slp->ns_numrec; + slp->ns_flag |= SLP_DOREC; ++*countp; } slp->ns_frag = NULL; @@ -2812,6 +2815,9 @@ nfs_checkpkt(struct mbuf *m __unused, int len __unused) /* * Parse an RPC header. + * + * If the socket is invalid or no records are pending we return ENOBUFS. + * The caller must deal with NEEDQ races. */ int nfsrv_dorec(struct nfssvc_sock *slp, struct nfsd *nfsd, @@ -2829,7 +2835,8 @@ nfsrv_dorec(struct nfssvc_sock *slp, struct nfsd *nfsd, rec = STAILQ_FIRST(&slp->ns_rec); STAILQ_REMOVE_HEAD(&slp->ns_rec, nr_link); KKASSERT(slp->ns_numrec > 0); - --slp->ns_numrec; + if (--slp->ns_numrec == 0) + slp->ns_flag &= ~SLP_DOREC; nam = rec->nr_address; m = rec->nr_packet; kfree(rec, M_NFSRVDESC); @@ -2875,16 +2882,19 @@ nfsrv_wakenfsd(struct nfssvc_sock *slp, int nparallel) nd->nfsd_flag &= ~NFSD_WAITING; if (nd->nfsd_slp) panic("nfsd wakeup"); - slp->ns_sref++; + nfsrv_slpref(slp); nd->nfsd_slp = slp; wakeup((caddr_t)nd); if (--nparallel == 0) break; } } - if (nparallel) { - slp->ns_flag |= SLP_DOREC; + + /* + * If we couldn't assign slp then the NFSDs are all busy and + * we set a flag indicating that there is pending work. + */ + if (nparallel) nfsd_head_flag |= NFSD_CHECKSLP; - } } #endif /* NFS_NOSERVER */ diff --git a/sys/vfs/nfs/nfs_syscalls.c b/sys/vfs/nfs/nfs_syscalls.c index 32af58d902..df7a669847 100644 --- a/sys/vfs/nfs/nfs_syscalls.c +++ b/sys/vfs/nfs/nfs_syscalls.c @@ -404,16 +404,15 @@ nfssvc_addsock(struct file *fp, struct sockaddr *mynam, struct thread *td) atomic_clear_int(&so->so_snd.ssb_flags, SSB_NOINTR); so->so_snd.ssb_timeo = 0; - slp = (struct nfssvc_sock *)kmalloc(sizeof (struct nfssvc_sock), - M_NFSSVC, M_WAITOK | M_ZERO); + slp = kmalloc(sizeof (struct nfssvc_sock), M_NFSSVC, M_WAITOK | M_ZERO); mtx_init(&slp->ns_solock); STAILQ_INIT(&slp->ns_rec); TAILQ_INIT(&slp->ns_uidlruhead); lwkt_token_init(&slp->ns_token, 1, "nfssrv_token"); lwkt_gettoken(&nfs_token); - TAILQ_INSERT_TAIL(&nfssvc_sockhead, slp, ns_chain); nfsrv_slpref(slp); + TAILQ_INSERT_TAIL(&nfssvc_sockhead, slp, ns_chain); lwkt_gettoken(&slp->ns_token); slp->ns_so = so; @@ -445,7 +444,7 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) struct nfsd *nfsd = nsd->nsd_nfsd; struct nfsrv_descript *nd = NULL; struct mbuf *m, *mreq; - int error = 0, cacherep, sotype, writes_todo; + int error, cacherep, sotype, writes_todo; int procrastinate; u_quad_t cur_usec; @@ -472,23 +471,22 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) (nfsd_head_flag & NFSD_CHECKSLP) == 0) { nfsd->nfsd_flag |= NFSD_WAITING; nfsd_waiting++; - error = tsleep((caddr_t)nfsd, PCATCH, "nfsd", 0); + error = tsleep(nfsd, PCATCH, "nfsd", 0); nfsd_waiting--; - if (error) + if (error && nfsd->nfsd_slp == NULL) goto done; } if (nfsd->nfsd_slp == NULL && - (nfsd_head_flag & NFSD_CHECKSLP) != 0) { + (nfsd_head_flag & NFSD_CHECKSLP)) { TAILQ_FOREACH(slp, &nfssvc_sockhead, ns_chain) { - if ((slp->ns_flag & (SLP_VALID | SLP_DOREC)) - == (SLP_VALID | SLP_DOREC)) { - slp->ns_flag &= ~SLP_DOREC; + if ((slp->ns_flag & SLP_ACTION_MASK) || + slp->ns_needq_upcall) { nfsrv_slpref(slp); nfsd->nfsd_slp = slp; break; } } - if (slp == 0) + if (slp == NULL) nfsd_head_flag &= ~NFSD_CHECKSLP; } if ((slp = nfsd->nfsd_slp) == NULL) @@ -497,14 +495,25 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) lwkt_reltoken(&nfs_token); lwkt_gettoken(&slp->ns_token); + if (slp->ns_needq_upcall) { + slp->ns_needq_upcall = 0; + slp->ns_flag |= SLP_NEEDQ; + } + if (slp->ns_flag & SLP_VALID) { - if (slp->ns_flag & SLP_DISCONN) + /* + * We can both process additional received + * data into new records and process existing + * records. This keeps the pipeline hot by + * allowing the tcp socket to continue to + * drain while we are processing records. + */ + if (slp->ns_flag & SLP_DISCONN) { nfsrv_zapsock(slp); - else if (slp->ns_flag & SLP_NEEDQ) { - slp->ns_flag &= ~SLP_NEEDQ; - (void) nfs_slplock(slp, 1); + } else if (slp->ns_flag & SLP_NEEDQ) { + (void)nfs_slplock(slp, 1); nfsrv_rcv(slp->ns_so, (caddr_t)slp, - MB_WAIT); + MB_WAIT); nfs_slpunlock(slp); } error = nfsrv_dorec(slp, nfsd, &nd); @@ -514,15 +523,32 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) error = 0; cacherep = RC_DOIT; writes_todo = 1; - } else + } else { writes_todo = 0; + } nfsd->nfsd_flag |= NFSD_REQINPROG; + } else { + slp->ns_flag &= ~SLP_ACTION_MASK; + error = 0; } } else { error = 0; slp = nfsd->nfsd_slp; + lwkt_reltoken(&nfs_token); lwkt_gettoken(&slp->ns_token); + + if (slp->ns_needq_upcall) { + slp->ns_needq_upcall = 0; + slp->ns_flag |= SLP_NEEDQ; + } + if (NFSRV_RECLIMIT(slp) == 0 && + (slp->ns_flag & SLP_NEEDQ)) { + (void)nfs_slplock(slp, 1); + nfsrv_rcv(slp->ns_so, (caddr_t)slp, + MB_WAIT); + nfs_slpunlock(slp); + } } /* @@ -533,15 +559,22 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) kfree((caddr_t)nd, M_NFSRVDESC); nd = NULL; } - nfsd->nfsd_slp = NULL; nfsd->nfsd_flag &= ~NFSD_REQINPROG; - lwkt_reltoken(&slp->ns_token); - lwkt_gettoken(&nfs_token); - nfsrv_slpderef(slp); + if (slp->ns_flag & SLP_ACTION_MASK) { + lwkt_reltoken(&slp->ns_token); + lwkt_gettoken(&nfs_token); + } else { + nfsd->nfsd_slp = NULL; + lwkt_reltoken(&slp->ns_token); + lwkt_gettoken(&nfs_token); + nfsrv_slpderef(slp); + } continue; } /* + * Execute the NFS request - handle the server side cache + * * nfs_token not held here. slp token is held. */ sotype = slp->ns_so->so_type; @@ -600,6 +633,8 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) } /* + * Execute the NFS request - direct execution + * * Loop to get all the write rpc replies that have been * gathered together. * @@ -616,10 +651,12 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) procrastinate > 0) ) { error = nfsrv_writegather(&nd, slp, - nfsd->nfsd_td, &mreq); + nfsd->nfsd_td, &mreq); } else { + /* NOT YET lwkt_reltoken(&slp->ns_token); */ error = (*(nfsrv3_procs[nd->nd_procnum]))(nd, - slp, nfsd->nfsd_td, &mreq); + slp, nfsd->nfsd_td, &mreq); + /* NOT YET lwkt_gettoken(&slp->ns_token); */ } if (mreq == NULL) break; @@ -634,6 +671,7 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) nfsstats.srvrpccnt[nd->nd_procnum]++; nfsrv_updatecache(nd, TRUE, mreq); nd->nd_mrep = NULL; + /* FALL THROUGH */ case RC_REPLY: m = mreq; siz = 0; @@ -654,8 +692,10 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) */ if (sotype == SOCK_STREAM) { M_PREPEND(m, NFSX_UNSIGNED, MB_WAIT); - if (m == NULL) - return (ENOBUFS); + if (m == NULL) { + error = ENOBUFS; + goto skip; + } *mtod(m, u_int32_t *) = htonl(0x80000000 | siz); } if (slp->ns_so->so_proto->pr_flags & PR_CONNREQUIRED) @@ -666,13 +706,14 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) error = EPIPE; m_freem(m); } +skip: if (nfsrtton) nfsd_rt(sotype, nd, cacherep); if (nd->nd_nam2) FREE(nd->nd_nam2, M_SONAME); if (nd->nd_mrep) m_freem(nd->nd_mrep); - if (error == EPIPE) + if (error == EPIPE || error == ENOBUFS) nfsrv_zapsock(slp); if (slp->ns_so->so_proto->pr_flags & PR_CONNREQUIRED) nfs_slpunlock(slp); @@ -680,6 +721,7 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) kfree((caddr_t)nd, M_NFSRVDESC); lwkt_reltoken(&slp->ns_token); lwkt_gettoken(&nfs_token); + nfsd->nfsd_slp = NULL; nfsrv_slpderef(slp); goto done; } @@ -716,10 +758,15 @@ nfssvc_nfsd(struct nfsd_srvargs *nsd, caddr_t argp, struct thread *td) */ if (nfsrv_dorec(slp, nfsd, &nd)) { nfsd->nfsd_flag &= ~NFSD_REQINPROG; - nfsd->nfsd_slp = NULL; - lwkt_reltoken(&slp->ns_token); - lwkt_gettoken(&nfs_token); - nfsrv_slpderef(slp); + if (slp->ns_flag & SLP_ACTION_MASK) { + lwkt_reltoken(&slp->ns_token); + lwkt_gettoken(&nfs_token); + } else { + nfsd->nfsd_slp = NULL; + lwkt_reltoken(&slp->ns_token); + lwkt_gettoken(&nfs_token); + nfsrv_slpderef(slp); + } } else { lwkt_reltoken(&slp->ns_token); lwkt_gettoken(&nfs_token); @@ -773,6 +820,8 @@ nfsrv_zapsock(struct nfssvc_sock *slp) m_freem(rec->nr_packet); kfree(rec, M_NFSRVDESC); } + KKASSERT(slp->ns_numrec == 0); + TAILQ_FOREACH_MUTABLE(nuidp, &slp->ns_uidlruhead, nu_lru, nnuidp) { LIST_REMOVE(nuidp, nu_hash); @@ -789,6 +838,7 @@ nfsrv_zapsock(struct nfssvc_sock *slp) } LIST_INIT(&slp->ns_tq); crit_exit(); + nfsrv_slpderef(slp); } } @@ -802,9 +852,13 @@ void nfsrv_slpderef(struct nfssvc_sock *slp) { ASSERT_LWKT_TOKEN_HELD(&nfs_token); - if (--slp->ns_sref == 0 && (slp->ns_flag & SLP_VALID) == 0) { + if (slp->ns_sref == 1) { + KKASSERT((slp->ns_flag & SLP_VALID) == 0); TAILQ_REMOVE(&nfssvc_sockhead, slp, ns_chain); + slp->ns_sref = 0; kfree((caddr_t)slp, M_NFSSVC); + } else { + --slp->ns_sref; } } @@ -865,8 +919,10 @@ nfsrv_init(int terminating) TAILQ_FOREACH_MUTABLE(slp, &nfssvc_sockhead, ns_chain, nslp) { if (slp->ns_flag & SLP_VALID) nfsrv_zapsock(slp); + /* TAILQ_REMOVE(&nfssvc_sockhead, slp, ns_chain); kfree((caddr_t)slp, M_NFSSVC); + */ } nfsrv_cleancache(); /* And clear out server cache */ } else { -- 2.41.0