NFS - implement async state machine for nfs_readrpc_bio()
authorMatthew Dillon <dillon@apollo.backplane.com>
Fri, 17 Jul 2009 10:02:13 +0000 (03:02 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 17 Jul 2009 10:02:13 +0000 (03:02 -0700)
Implement the basic state machine mechanics to allow the two
helper threads to separate transmit and receive work for any
number of NFS RPCs.

sys/vfs/nfs/nfs.h
sys/vfs/nfs/nfs_bio.c
sys/vfs/nfs/nfs_iod.c
sys/vfs/nfs/nfs_socket.c
sys/vfs/nfs/nfs_vfsops.c
sys/vfs/nfs/nfs_vnops.c
sys/vfs/nfs/nfsm_subs.c
sys/vfs/nfs/nfsm_subs.h
sys/vfs/nfs/nfsmount.h

index 40a1d48..c2c7fb8 100644 (file)
@@ -336,11 +336,14 @@ struct nlookupdata;
                ((e) != EINTR && (e) != ERESTART && (e) != EWOULDBLOCK && \
                ((s) & PR_CONNREQUIRED) == 0)
 
+struct nfsm_info;
+
 /*
  * Nfs outstanding request list element
  */
 struct nfsreq {
        TAILQ_ENTRY(nfsreq) r_chain;
+       struct nfsm_info *r_info;
        struct mtx_link r_link;
        struct mbuf     *r_mreq;
        struct mbuf     *r_mrep;
@@ -377,7 +380,7 @@ struct nfsreq {
 #define        R_SOCKERR       0x0010          /* Fatal error on socket */
 #define        R_TPRINTFMSG    0x0020          /* Did a tprintf msg. */
 #define        R_MUSTRESEND    0x0040          /* Must resend request */
-#define        R_GETONEREP     0x0080          /* Probe for one reply only */
+#define        R_UNUSED07      0x0080
 #define R_MASKTIMER    0x0100          /* Timer should ignore this req */
 #define R_LOCKED       0x0200          /* Locked by the timer */
 
@@ -622,25 +625,29 @@ extern struct nfsv3_diskless nfsv3_diskless;
 u_quad_t nfs_curusec (void);
 int    nfs_init (struct vfsconf *vfsp);
 int    nfs_uninit (struct vfsconf *vfsp);
-int    nfs_reply (struct nfsreq *);
+int    nfs_reply (struct nfsmount *nmp, struct nfsreq *);
 int    nfs_getreq (struct nfsrv_descript *,struct nfsd *,int);
 int    nfs_send (struct socket *, struct sockaddr *, struct mbuf *, 
                      struct nfsreq *);
 int    nfs_rephead (int, struct nfsrv_descript *, struct nfssvc_sock *,
                         int, struct mbuf **, struct mbuf **, caddr_t *);
-int    nfs_sndlock (struct nfsreq *);
-void   nfs_sndunlock (struct nfsreq *);
+int    nfs_sndlock (struct nfsmount *, struct nfsreq *);
+void   nfs_sndunlock (struct nfsmount *);
 int    nfs_slplock (struct nfssvc_sock *, int);
 void   nfs_slpunlock (struct nfssvc_sock *);
 int    nfs_disct (struct mbuf **, caddr_t *, int, int, caddr_t *);
 int    nfs_vinvalbuf (struct vnode *, int, int);
-int    nfs_readrpc (struct vnode *, struct uio *);
+int    nfs_readrpc_uio (struct vnode *, struct uio *);
+void   nfs_readrpc_bio (struct vnode *, struct bio *);
 int    nfs_writerpc (struct vnode *, struct uio *, int *, int *);
 int    nfs_commit (struct vnode *vp, u_quad_t offset, int cnt, 
                        struct thread *td);
 int    nfs_readdirrpc (struct vnode *, struct uio *);
-int    nfs_asyncio (struct vnode *vp, struct bio *, struct thread *);
-int    nfs_doio (struct vnode *vp, struct bio *, struct thread *);
+void   nfs_startio (struct vnode *vp, struct bio *, struct thread *);
+void   nfs_asyncio(struct vnode *vp, struct bio *bio);
+int    nfs_asyncok(struct nfsmount *nmp);
+int    nfs_iowait (struct bio *bio);
+
 int    nfs_readlinkrpc (struct vnode *, struct uio *);
 int    nfs_sigintr (struct nfsmount *, struct nfsreq *, struct thread *);
 int    nfs_readdirplusrpc (struct vnode *, struct uio *);
@@ -756,6 +763,7 @@ void        nfssvc_iod_reader(void *arg);
 void   nfssvc_iod_writer(void *arg);
 void   nfssvc_iod_stop(struct nfsmount *nmp);
 void   nfssvc_iod_writer_wakeup(struct nfsmount *nmp);
+void   nfssvc_iod_reader_wakeup(struct nfsmount *nmp);
 
 #endif /* _KERNEL */
 
index a10aa70..0cc3bf4 100644 (file)
@@ -48,7 +48,7 @@
 #include <sys/vnode.h>
 #include <sys/mount.h>
 #include <sys/kernel.h>
-#include <sys/buf2.h>
+#include <sys/mbuf.h>
 #include <sys/msfbuf.h>
 
 #include <vm/vm.h>
@@ -58,6 +58,7 @@
 #include <vm/vm_pager.h>
 #include <vm/vnode_pager.h>
 
+#include <sys/buf2.h>
 #include <sys/thread2.h>
 
 #include "rpcv2.h"
@@ -65,6 +66,9 @@
 #include "nfs.h"
 #include "nfsmount.h"
 #include "nfsnode.h"
+#include "xdr_subs.h"
+#include "nfsm_subs.h"
+
 
 static struct buf *nfs_getcacheblk(struct vnode *vp, off_t loffset,
                                   int size, struct thread *td);
@@ -161,7 +165,7 @@ nfs_getpages(struct vop_getpages_args *ap)
        uio.uio_rw = UIO_READ;
        uio.uio_td = td;
 
-       error = nfs_readrpc(vp, &uio);
+       error = nfs_readrpc_uio(vp, &uio);
        msf_buf_free(msf);
 
        if (error && (uio.uio_resid == count)) {
@@ -401,7 +405,7 @@ nfs_bioread(struct vnode *vp, struct uio *uio, int ioflag)
            if (np->n_flag & NDONTCACHE) {
                switch (vp->v_type) {
                case VREG:
-                       return (nfs_readrpc(vp, uio));
+                       return (nfs_readrpc_uio(vp, uio));
                case VLNK:
                        return (nfs_readlinkrpc(vp, uio));
                case VDIR:
@@ -421,7 +425,7 @@ nfs_bioread(struct vnode *vp, struct uio *uio, int ioflag)
                /*
                 * Start the read ahead(s), as required.
                 */
-               if (nmp->nm_readahead > 0) {
+               if (nmp->nm_readahead > 0 && nfs_asyncok(nmp)) {
                    for (nra = 0; nra < nmp->nm_readahead && nra < seqcount &&
                        (off_t)(lbn + 1 + nra) * biosize < np->n_size; nra++) {
                        rabn = lbn + 1 + nra;
@@ -433,12 +437,15 @@ nfs_bioread(struct vnode *vp, struct uio *uio, int ioflag)
                            if ((rabp->b_flags & (B_CACHE|B_DELWRI)) == 0) {
                                rabp->b_cmd = BUF_CMD_READ;
                                vfs_busy_pages(vp, rabp);
-                               if (nfs_asyncio(vp, &rabp->b_bio2, td)) {
+                               nfs_asyncio(vp, &rabp->b_bio2);
+#if 0
+                               if (nfs_startio(vp, &rabp->b_bio2, td)) {
                                    rabp->b_flags |= B_INVAL|B_ERROR;
                                    vfs_unbusy_pages(rabp);
                                    brelse(rabp);
                                    break;
                                }
+#endif
                            } else {
                                brelse(rabp);
                            }
@@ -496,7 +503,8 @@ again:
                    bp->b_bio2.bio_done = nfsiodone_sync;
                    bp->b_bio2.bio_flags |= BIO_SYNC;
                    vfs_busy_pages(vp, bp);
-                   error = nfs_doio(vp, &bp->b_bio2, td);
+                   nfs_startio(vp, &bp->b_bio2, td);
+                   error = nfs_iowait(&bp->b_bio2);
                    if (error) {
                        brelse(bp);
                        return (error);
@@ -526,7 +534,8 @@ again:
                    bp->b_bio2.bio_done = nfsiodone_sync;
                    bp->b_bio2.bio_flags |= BIO_SYNC;
                    vfs_busy_pages(vp, bp);
-                   error = nfs_doio(vp, &bp->b_bio2, td);
+                   nfs_startio(vp, &bp->b_bio2, td);
+                   error = nfs_iowait(&bp->b_bio2);
                    if (error) {
                        bp->b_flags |= B_ERROR | B_INVAL;
                        brelse(bp);
@@ -554,7 +563,8 @@ again:
                    bp->b_bio2.bio_done = nfsiodone_sync;
                    bp->b_bio2.bio_flags |= BIO_SYNC;
                    vfs_busy_pages(vp, bp);
-                   error = nfs_doio(vp, &bp->b_bio2, td);
+                   nfs_startio(vp, &bp->b_bio2, td);
+                   error = nfs_iowait(&bp->b_bio2);
                    if (error) {
                            brelse(bp);
                    }
@@ -585,7 +595,8 @@ again:
                                    bp->b_bio2.bio_done = nfsiodone_sync;
                                    bp->b_bio2.bio_flags |= BIO_SYNC;
                                    vfs_busy_pages(vp, bp);
-                                   error = nfs_doio(vp, &bp->b_bio2, td);
+                                   nfs_startio(vp, &bp->b_bio2, td);
+                                   error = nfs_iowait(&bp->b_bio2);
                                    /*
                                     * no error + B_INVAL == directory EOF,
                                     * use the block.
@@ -617,7 +628,7 @@ again:
                 * (You need the current block first, so that you have the
                 *  directory offset cookie of the next block.)
                 */
-               if (nmp->nm_readahead > 0 &&
+               if (nmp->nm_readahead > 0 && nfs_asyncok(nmp) &&
                    (bp->b_flags & B_INVAL) == 0 &&
                    (np->n_direofoffset == 0 ||
                    loffset + NFS_DIRBLKSIZ < np->n_direofoffset) &&
@@ -630,11 +641,14 @@ again:
                            if ((rabp->b_flags & (B_CACHE|B_DELWRI)) == 0) {
                                rabp->b_cmd = BUF_CMD_READ;
                                vfs_busy_pages(vp, rabp);
-                               if (nfs_asyncio(vp, &rabp->b_bio2, td)) {
+                               nfs_asyncio(vp, &rabp->b_bio2);
+#if 0
+                               if (nfs_startio(vp, &rabp->b_bio2, td)) {
                                    rabp->b_flags |= B_INVAL|B_ERROR;
                                    vfs_unbusy_pages(rabp);
                                    brelse(rabp);
                                }
+#endif
                            } else {
                                brelse(rabp);
                            }
@@ -958,7 +972,8 @@ again:
                        bp->b_bio2.bio_done = nfsiodone_sync;
                        bp->b_bio2.bio_flags |= BIO_SYNC;
                        vfs_busy_pages(vp, bp);
-                       error = nfs_doio(vp, &bp->b_bio2, td);
+                       nfs_startio(vp, &bp->b_bio2, td);
+                       error = nfs_iowait(&bp->b_bio2);
                        if (error) {
                                brelse(bp);
                                break;
@@ -1090,7 +1105,7 @@ again:
  * NULL.
  *
  * The caller must carefully deal with the possible B_INVAL state of
- * the buffer.  nfs_doio() clears B_INVAL (and nfs_asyncio() clears it
+ * the buffer.  nfs_startio() clears B_INVAL (and nfs_asyncio() clears it
  * indirectly), so synchronous reads can be issued without worrying about
  * the B_INVAL state.  We have to be a little more careful when dealing
  * with writes (see comments in nfs_write()) when extending a file past
@@ -1184,54 +1199,57 @@ nfs_vinvalbuf(struct vnode *vp, int flags, int intrflg)
 }
 
 /*
- * Initiate asynchronous I/O. Return an error if no nfsiods are available.
- * This is mainly to avoid queueing async I/O requests when the nfsiods
- * are all hung on a dead server.
- *
- * Note: nfs_asyncio() does not clear (B_ERROR|B_INVAL) but when the bp
- * is eventually dequeued by the async daemon, nfs_doio() *will*.
+ * Return true (non-zero) if the txthread and rxthread are operational
+ * and we do not already have too many not-yet-started BIO's built up.
  */
 int
-nfs_asyncio(struct vnode *vp, struct bio *bio, struct thread *td)
+nfs_asyncok(struct nfsmount *nmp)
+{
+       return (nmp->nm_bioqlen < 64 &&
+               nmp->nm_rxstate <= NFSSVC_PENDING &&
+               nmp->nm_txstate <= NFSSVC_PENDING);
+}
+
+/*
+ * The read-ahead code calls this to queue a bio to the txthread.
+ *
+ * We don't touch the bio otherwise... that is, we do not even
+ * construct or send the initial rpc.  The txthread will do it
+ * for us.
+ */
+void
+nfs_asyncio(struct vnode *vp, struct bio *bio)
 {
        struct buf *bp = bio->bio_buf;
-       struct nfsmount *nmp;
+       struct nfsmount *nmp = VFSTONFS(vp->v_mount);
 
        KKASSERT(vp->v_tag == VT_NFS);
-       nmp = VFSTONFS(vp->v_mount);
-
-       /*
-        * If no async daemons then return EIO to force caller to run the rpc
-        * synchronously.
-        */
-       if (nmp->nm_rxstate > NFSSVC_PENDING)
-               return (EIO);
-
        BUF_KERNPROC(bp);
-
-       /*
-        * The passed bio's buffer is not necessary associated with
-        * the NFS vnode it is being written to.  Store the NFS vnode
-        * in the BIO driver info.
-        */
        bio->bio_driver_info = vp;
        TAILQ_INSERT_TAIL(&nmp->nm_bioq, bio, bio_act);
        nmp->nm_bioqlen++;
        nfssvc_iod_writer_wakeup(nmp);
-       return (0);
 }
 
 /*
- * Do an I/O operation to/from a cache block. This may be called
- * synchronously or from an nfsiod.  The BIO is normalized for DEV_BSIZE.
+ * Initiate an I/O operation to/from a cache block.  If the BIO is
+ * flagged BIO_SYNC, or if the async thread is not running, the
+ * operation will be executed synchronously.
+ *
+ * Typically for BIO_SYNC the caller set up the completion and will
+ * call nfs_iowait() to obtain the error code, then brelse().
+ * iowait is a degenerate routine.
  *
- * A locked, completed I/O is returned and the caller is responsible for
- * brelse()'ing it.
+ * For async operation we set up a request and queue it the transmit
+ * thread along with a done function to deal with cleanup after
+ * the RPC completes.  The presence of a done function causes the
+ * state machine to automatically move the req onto the reqrxq when
+ * a reponse is received.
  *
  * NOTE! TD MIGHT BE NULL
  */
-int
-nfs_doio(struct vnode *vp, struct bio *bio, struct thread *td)
+void
+nfs_startio(struct vnode *vp, struct bio *bio, struct thread *td)
 {
        struct buf *bp = bio->bio_buf;
        struct uio *uiop;
@@ -1254,10 +1272,12 @@ nfs_doio(struct vnode *vp, struct bio *bio, struct thread *td)
         * clear B_ERROR and B_INVAL state prior to initiating the I/O.  We
         * do this here so we do not have to do it in all the code that
         * calls us.
+        *
+        * NOTE: An EINPROGRESS response can be returned if the bio was
+        *       asynchronous.
         */
        bp->b_flags &= ~(B_ERROR | B_INVAL);
 
-
        KASSERT(bp->b_cmd != BUF_CMD_DONE, 
                ("nfs_doio: bp %p already marked done!", bp));
 
@@ -1268,11 +1288,18 @@ nfs_doio(struct vnode *vp, struct bio *bio, struct thread *td)
 
            switch (vp->v_type) {
            case VREG:
-               uiop->uio_offset = bio->bio_offset;
+               /*
+                * Note: NFS assumes BIO_SYNC is run synchronously, so
+                *       be sure to do that.
+                */
                nfsstats.read_bios++;
-               error = nfs_readrpc(vp, uiop);
-
-               if (!error) {
+               if ((bio->bio_flags & BIO_SYNC) == 0) {
+                       nfs_readrpc_bio(vp, bio);
+                       return;
+               }
+               uiop->uio_offset = bio->bio_offset;
+               error = nfs_readrpc_uio(vp, uiop);
+               if (error == 0) {
                    if (uiop->uio_resid) {
                        /*
                         * If we had a short read with no error, we must have
@@ -1282,12 +1309,12 @@ nfs_doio(struct vnode *vp, struct bio *bio, struct thread *td)
                         * Holes used to be able to occur due to pending 
                         * writes, but that is not possible any longer.
                         */
-                       int nread = bp->b_bcount - uiop->uio_resid;
-                       int left  = uiop->uio_resid;
+                       int nread = bp->b_bcount - bp->b_resid;
+                       int left  = bp->b_resid;
 
                        if (left > 0)
                                bzero((char *)bp->b_data + nread, left);
-                       uiop->uio_resid = 0;
+                       bp->b_resid = 0;
                    }
                }
                if (td && td->td_proc && (vp->v_flag & VTEXT) &&
@@ -1343,7 +1370,6 @@ nfs_doio(struct vnode *vp, struct bio *bio, struct thread *td)
                            bp->b_flags &= ~(B_NEEDCOMMIT | B_CLUSTEROK);
                            bp->b_resid = 0;
                            biodone(bio);
-                           return (0);
                    }
                    if (retv == NFSERR_STALEWRITEVERF) {
                            nfs_clearcommit(vp->v_mount);
@@ -1432,14 +1458,12 @@ nfs_doio(struct vnode *vp, struct bio *bio, struct thread *td)
            } else {
                bp->b_resid = 0;
                biodone(bio);
-               return (0);
            }
        }
        bp->b_resid = uiop->uio_resid;
        if (must_commit)
            nfs_clearcommit(vp->v_mount);
        biodone(bio);
-       return (error);
 }
 
 /*
@@ -1498,3 +1522,248 @@ nfsiodone_sync(struct bio *bio)
        bio->bio_flags = 0;
        bpdone(bio->bio_buf, 0);
 }
+
+/*
+ * If nfs_startio() was told to do the request BIO_SYNC it will
+ * complete the request before returning, so assert that the
+ * request is in-fact complete.
+ */
+int
+nfs_iowait(struct bio *bio)
+{
+       struct buf *bp = bio->bio_buf;
+
+       KKASSERT(bp->b_cmd == BUF_CMD_DONE);
+       if (bp->b_flags & B_EINTR)
+               return (EINTR);
+       if (bp->b_flags & B_ERROR)
+               return (bp->b_error ? bp->b_error : EIO);
+       return (0);
+}
+
+/*
+ * nfs read rpc - BIO version
+ */
+static void nfs_readrpc_bio_done(nfsm_info_t info);
+
+void
+nfs_readrpc_bio(struct vnode *vp, struct bio *bio)
+{
+       struct buf *bp = bio->bio_buf;
+       u_int32_t *tl;
+       struct nfsmount *nmp;
+       int error = 0, len, tsiz;
+       struct nfsm_info *info;
+
+       info = kmalloc(sizeof(*info), M_NFSREQ, M_WAITOK);
+       info->mrep = NULL;
+       info->v3 = NFS_ISV3(vp);
+
+       nmp = VFSTONFS(vp->v_mount);
+       tsiz = bp->b_bcount;
+       if (bio->bio_offset + tsiz > nmp->nm_maxfilesize) {
+               error = EFBIG;
+               goto nfsmout;
+       }
+       nfsstats.rpccnt[NFSPROC_READ]++;
+       len = (tsiz > nmp->nm_rsize) ? nmp->nm_rsize : tsiz;
+       nfsm_reqhead(info, vp, NFSPROC_READ,
+                    NFSX_FH(info->v3) + NFSX_UNSIGNED * 3);
+       ERROROUT(nfsm_fhtom(info, vp));
+       tl = nfsm_build(info, NFSX_UNSIGNED * 3);
+       if (info->v3) {
+               txdr_hyper(bio->bio_offset, tl);
+               *(tl + 2) = txdr_unsigned(len);
+       } else {
+               *tl++ = txdr_unsigned(bio->bio_offset);
+               *tl++ = txdr_unsigned(len);
+               *tl = 0;
+       }
+       info->bio = bio;
+       info->done = nfs_readrpc_bio_done;
+       nfsm_request_bio(info, vp, NFSPROC_READ, NULL,
+                        nfs_vpcred(vp, ND_READ));
+       return;
+nfsmout:
+       kfree(info, M_NFSREQ);
+       bp->b_error = error;
+       bp->b_flags |= B_ERROR;
+       biodone(bio);
+}
+
+static void
+nfs_readrpc_bio_done(nfsm_info_t info)
+{
+       struct nfsmount *nmp = VFSTONFS(info->vp->v_mount);
+       struct bio *bio = info->bio;
+       struct buf *bp = bio->bio_buf;
+       u_int32_t *tl;
+       int attrflag;
+       int retlen;
+       int eof;
+       int error = 0;
+
+       KKASSERT(info->state == NFSM_STATE_DONE);
+
+       if (info->v3) {
+               ERROROUT(nfsm_postop_attr(info, info->vp, &attrflag,
+                                        NFS_LATTR_NOSHRINK));
+               NULLOUT(tl = nfsm_dissect(info, 2 * NFSX_UNSIGNED));
+               eof = fxdr_unsigned(int, *(tl + 1));
+       } else {
+               ERROROUT(nfsm_loadattr(info, info->vp, NULL));
+               eof = 0;
+       }
+       NEGATIVEOUT(retlen = nfsm_strsiz(info, nmp->nm_rsize));
+       ERROROUT(nfsm_mtobio(info, bio, retlen));
+       m_freem(info->mrep);
+       info->mrep = NULL;
+
+       /*
+        * No error occured, fill the hole if any
+        */
+       if (retlen < bp->b_bcount) {
+               bzero(bp->b_data + retlen, bp->b_bcount - retlen);
+       }
+       bp->b_resid = bp->b_bcount - retlen;
+#if 0
+       /* retlen */
+       tsiz -= retlen;
+       if (info.v3) {
+               if (eof || retlen == 0) {
+                       tsiz = 0;
+               }
+       } else if (retlen < len) {
+               tsiz = 0;
+       }
+#endif
+nfsmout:
+       if (error) {
+               bp->b_error = error;
+               bp->b_flags |= B_ERROR;
+       }
+       biodone(bio);
+}
+
+#if 0
+
+/*
+ * nfs write call - BIO version
+ */
+int
+nfs_writerpc_bio(struct vnode *vp, struct bio *bio, int *iomode, int *must_commit)
+{
+       u_int32_t *tl;
+       int32_t backup;
+       struct nfsmount *nmp = VFSTONFS(vp->v_mount);
+       int error = 0, len, tsiz, wccflag = NFSV3_WCCRATTR, rlen, commit;
+       int  committed = NFSV3WRITE_FILESYNC;
+       struct nfsm_info info;
+
+       info.mrep = NULL;
+       info.v3 = NFS_ISV3(vp);
+
+#ifndef DIAGNOSTIC
+       if (uiop->uio_iovcnt != 1)
+               panic("nfs: writerpc iovcnt > 1");
+#endif
+       *must_commit = 0;
+       tsiz = uiop->uio_resid;
+       if (uiop->uio_offset + tsiz > nmp->nm_maxfilesize)
+               return (EFBIG);
+       while (tsiz > 0) {
+               nfsstats.rpccnt[NFSPROC_WRITE]++;
+               len = (tsiz > nmp->nm_wsize) ? nmp->nm_wsize : tsiz;
+               nfsm_reqhead(&info, vp, NFSPROC_WRITE,
+                            NFSX_FH(info.v3) + 5 * NFSX_UNSIGNED + nfsm_rndup(len));
+               ERROROUT(nfsm_fhtom(&info, vp));
+               if (info.v3) {
+                       tl = nfsm_build(&info, 5 * NFSX_UNSIGNED);
+                       txdr_hyper(uiop->uio_offset, tl);
+                       tl += 2;
+                       *tl++ = txdr_unsigned(len);
+                       *tl++ = txdr_unsigned(*iomode);
+                       *tl = txdr_unsigned(len);
+               } else {
+                       u_int32_t x;
+
+                       tl = nfsm_build(&info, 4 * NFSX_UNSIGNED);
+                       /* Set both "begin" and "current" to non-garbage. */
+                       x = txdr_unsigned((u_int32_t)uiop->uio_offset);
+                       *tl++ = x;      /* "begin offset" */
+                       *tl++ = x;      /* "current offset" */
+                       x = txdr_unsigned(len);
+                       *tl++ = x;      /* total to this offset */
+                       *tl = x;        /* size of this write */
+               }
+               ERROROUT(nfsm_uiotom(&info, uiop, len));
+               NEGKEEPOUT(nfsm_request(&info, vp, NFSPROC_WRITE, uiop->uio_td,
+                                       nfs_vpcred(vp, ND_WRITE), &error));
+               if (info.v3) {
+                       /*
+                        * The write RPC returns a before and after mtime.  The
+                        * nfsm_wcc_data() macro checks the before n_mtime
+                        * against the before time and stores the after time
+                        * in the nfsnode's cached vattr and n_mtime field.
+                        * The NRMODIFIED bit will be set if the before
+                        * time did not match the original mtime.
+                        */
+                       wccflag = NFSV3_WCCCHK;
+                       ERROROUT(nfsm_wcc_data(&info, vp, &wccflag));
+                       if (error == 0) {
+                               NULLOUT(tl = nfsm_dissect(&info, 2 * NFSX_UNSIGNED + NFSX_V3WRITEVERF));
+                               rlen = fxdr_unsigned(int, *tl++);
+                               if (rlen == 0) {
+                                       error = NFSERR_IO;
+                                       m_freem(info.mrep);
+                                       info.mrep = NULL;
+                                       break;
+                               } else if (rlen < len) {
+                                       backup = len - rlen;
+                                       uiop->uio_iov->iov_base = (char *)uiop->uio_iov->iov_base - backup;
+                                       uiop->uio_iov->iov_len += backup;
+                                       uiop->uio_offset -= backup;
+                                       uiop->uio_resid += backup;
+                                       len = rlen;
+                               }
+                               commit = fxdr_unsigned(int, *tl++);
+
+                               /*
+                                * Return the lowest committment level
+                                * obtained by any of the RPCs.
+                                */
+                               if (committed == NFSV3WRITE_FILESYNC)
+                                       committed = commit;
+                               else if (committed == NFSV3WRITE_DATASYNC &&
+                                       commit == NFSV3WRITE_UNSTABLE)
+                                       committed = commit;
+                               if ((nmp->nm_state & NFSSTA_HASWRITEVERF) == 0){
+                                   bcopy((caddr_t)tl, (caddr_t)nmp->nm_verf,
+                                       NFSX_V3WRITEVERF);
+                                   nmp->nm_state |= NFSSTA_HASWRITEVERF;
+                               } else if (bcmp((caddr_t)tl,
+                                   (caddr_t)nmp->nm_verf, NFSX_V3WRITEVERF)) {
+                                   *must_commit = 1;
+                                   bcopy((caddr_t)tl, (caddr_t)nmp->nm_verf,
+                                       NFSX_V3WRITEVERF);
+                               }
+                       }
+               } else {
+                       ERROROUT(nfsm_loadattr(&info, vp, NULL));
+               }
+               m_freem(info.mrep);
+               info.mrep = NULL;
+               if (error)
+                       break;
+               tsiz -= len;
+       }
+nfsmout:
+       if (vp->v_mount->mnt_flag & MNT_ASYNC)
+               committed = NFSV3WRITE_FILESYNC;
+       *iomode = committed;
+       if (error)
+               uiop->uio_resid = tsiz;
+       return (error);
+}
+
+#endif
index 7ec8ea5..c4de31b 100644 (file)
@@ -75,22 +75,58 @@ void
 nfssvc_iod_reader(void *arg)
 {
        struct nfsmount *nmp = arg;
+       struct nfsm_info *info;
+       struct nfsreq *req;
+       int error;
 
        if (nmp->nm_rxstate == NFSSVC_INIT)
                nmp->nm_rxstate = NFSSVC_PENDING;
        for (;;) {
                if (nmp->nm_rxstate == NFSSVC_WAITING) {
-                       tsleep(&nmp->nm_rxstate, 0, "nfsidl", 0);
+                       if (TAILQ_FIRST(&nmp->nm_reqq) == NULL &&
+                           TAILQ_FIRST(&nmp->nm_reqrxq) == NULL) {
+                               tsleep(&nmp->nm_rxstate, 0, "nfsidl", 0);
+                       } else {
+                               /*
+                                * This can happen during shutdown, we don't
+                                * want to hardloop.
+                                */
+                               error = nfs_reply(nmp, NULL);
+                               if (error && error != EWOULDBLOCK) {
+                                       tsleep(&nmp->nm_rxstate, 0,
+                                               "nfsxxx", hz / 10);
+                               }
+                       }
                        continue;
                }
                if (nmp->nm_rxstate != NFSSVC_PENDING)
                        break;
                nmp->nm_rxstate = NFSSVC_WAITING;
 
-#if 0
-               error = tsleep((caddr_t)&nfs_iodwant[myiod],
-                       PCATCH, "nfsidl", 0);
-#endif
+               /*
+                * Process requests which have received replies.  Only
+                * process the post-reply states.  If we get EINPROGRESS
+                * it means the request went back to an auth or retransmit
+                * state and we let the iod_writer thread deal with it.
+                *
+                * If the request completes we run the info->done call
+                * to finish up the I/O.
+                */
+               while ((req = TAILQ_FIRST(&nmp->nm_reqrxq)) != NULL) {
+                       TAILQ_REMOVE(&nmp->nm_reqrxq, req, r_chain);
+                       info = req->r_info;
+                       KKASSERT(info);
+                       info->error = nfs_request(info,
+                                                 NFSM_STATE_PROCESSREPLY,
+                                                 NFSM_STATE_DONE);
+                       if (info->error == EINPROGRESS) {
+                               kprintf("rxq: move info %p back to txq\n", info);
+                               TAILQ_INSERT_TAIL(&nmp->nm_reqtxq, req, r_chain);
+                               nfssvc_iod_writer_wakeup(nmp);
+                       } else {
+                               info->done(info);
+                       }
+               }
        }
        nmp->nm_rxthread = NULL;
        nmp->nm_rxstate = NFSSVC_DONE;
@@ -101,13 +137,18 @@ nfssvc_iod_reader(void *arg)
  * The writer sits on the send side of the client's socket and
  * does both the initial processing of BIOs and also transmission
  * and retransmission of nfsreq's.
+ *
+ * The writer processes both new BIOs from nm_bioq and retransmit
+ * or state machine jumpbacks from nm_reqtxq
  */
 void
 nfssvc_iod_writer(void *arg)
 {
        struct nfsmount *nmp = arg;
        struct bio *bio;
+       struct nfsreq *req;
        struct vnode *vp;
+       nfsm_info_t info;
 
        if (nmp->nm_txstate == NFSSVC_INIT)
                nmp->nm_txstate = NFSSVC_PENDING;
@@ -126,7 +167,29 @@ nfssvc_iod_writer(void *arg)
                        TAILQ_REMOVE(&nmp->nm_bioq, bio, bio_act);
                        nmp->nm_bioqlen--;
                        vp = bio->bio_driver_info;
-                       nfs_doio(vp, bio, NULL);
+                       nfs_startio(vp, bio, NULL);
+               }
+
+               /*
+                * Process reauths & retransmits.  If we get an EINPROGRESS
+                * it means the state transitioned to WAITREPLY or later.
+                * Otherwise the request completed (probably with an error
+                * since we didn't get to a replied state).
+                */
+               while ((req = TAILQ_FIRST(&nmp->nm_reqtxq)) != NULL) {
+                       TAILQ_REMOVE(&nmp->nm_reqtxq, req, r_chain);
+                       info = req->r_info;
+                       KKASSERT(info);
+                       info->error = nfs_request(info,
+                                                 NFSM_STATE_AUTH,
+                                                 NFSM_STATE_WAITREPLY);
+                       if (info->error == EINPROGRESS) {
+                               /*
+                               TAILQ_INSERT_TAIL(&nmp->nm_reqrxq, req, r_chain);
+                               */
+                       } else {
+                               info->done(info);
+                       }
                }
        }
        nmp->nm_txthread = NULL;
@@ -156,3 +219,12 @@ nfssvc_iod_writer_wakeup(struct nfsmount *nmp)
                wakeup(&nmp->nm_txstate);
        }
 }
+
+void
+nfssvc_iod_reader_wakeup(struct nfsmount *nmp)
+{
+       if (nmp->nm_rxstate == NFSSVC_WAITING) {
+               nmp->nm_rxstate = NFSSVC_PENDING;
+               wakeup(&nmp->nm_rxstate);
+       }
+}
index 443d89f..d5d4b6e 100644 (file)
@@ -131,6 +131,7 @@ static int nfs_request_auth(struct nfsreq *rep);
 static int nfs_request_try(struct nfsreq *rep);
 static int nfs_request_waitreply(struct nfsreq *rep);
 static int nfs_request_processreply(nfsm_info_t info, int);
+static void nfs_async_return(struct nfsmount *nmp, struct nfsreq *rep);
 
 /*
  * There is a congestion window for outstanding rpcs maintained per mount
@@ -153,13 +154,13 @@ struct nfsrtt nfsrtt;
 struct callout nfs_timer_handle;
 
 static int     nfs_msg (struct thread *,char *,char *);
-static int     nfs_rcvlock (struct nfsreq *);
-static void    nfs_rcvunlock (struct nfsreq *);
+static int     nfs_rcvlock (struct nfsmount *nmp, struct nfsreq *myreq);
+static void    nfs_rcvunlock (struct nfsmount *nmp);
 static void    nfs_realign (struct mbuf **pm, int hsiz);
-static int     nfs_receive (struct nfsreq *rep, struct sockaddr **aname,
-                                struct mbuf **mp);
+static int     nfs_receive (struct nfsmount *nmp, struct nfsreq *rep,
+                               struct sockaddr **aname, struct mbuf **mp);
 static void    nfs_softterm (struct nfsreq *rep);
-static int     nfs_reconnect (struct nfsreq *rep);
+static int     nfs_reconnect (struct nfsmount *nmp, struct nfsreq *rep);
 #ifndef NFS_NOSERVER 
 static int     nfsrv_getstream (struct nfssvc_sock *, int, int *);
 static void    nfs_timer_req(struct nfsreq *req);
@@ -383,10 +384,9 @@ bad:
  * nb: Must be called with the nfs_sndlock() set on the mount point.
  */
 static int
-nfs_reconnect(struct nfsreq *rep)
+nfs_reconnect(struct nfsmount *nmp, struct nfsreq *rep)
 {
        struct nfsreq *req;
-       struct nfsmount *nmp = rep->r_nmp;
        int error;
 
        nfs_disconnect(nmp);
@@ -428,15 +428,9 @@ nfs_disconnect(struct nfsmount *nmp)
 void
 nfs_safedisconnect(struct nfsmount *nmp)
 {
-       struct nfsreq dummyreq;
-
-       bzero(&dummyreq, sizeof(dummyreq));
-       dummyreq.r_nmp = nmp;
-       dummyreq.r_td = NULL;
-       mtx_link_init(&dummyreq.r_link);
-       nfs_rcvlock(&dummyreq);
+       nfs_rcvlock(nmp, NULL);
        nfs_disconnect(nmp);
-       nfs_rcvunlock(&dummyreq);
+       nfs_rcvunlock(nmp);
 }
 
 /*
@@ -528,7 +522,8 @@ nfs_send(struct socket *so, struct sockaddr *nam, struct mbuf *top,
  * we have read any of it, even if the system call has been interrupted.
  */
 static int
-nfs_receive(struct nfsreq *rep, struct sockaddr **aname, struct mbuf **mp)
+nfs_receive(struct nfsmount *nmp, struct nfsreq *rep,
+           struct sockaddr **aname, struct mbuf **mp)
 {
        struct socket *so;
        struct sockbuf sio;
@@ -546,7 +541,7 @@ nfs_receive(struct nfsreq *rep, struct sockaddr **aname, struct mbuf **mp)
         */
        *mp = NULL;
        *aname = NULL;
-       sotype = rep->r_nmp->nm_sotype;
+       sotype = nmp->nm_sotype;
 
        /*
         * For reliable protocols, lock against other senders/receivers
@@ -557,7 +552,7 @@ nfs_receive(struct nfsreq *rep, struct sockaddr **aname, struct mbuf **mp)
         * until we have an entire rpc request/reply.
         */
        if (sotype != SOCK_DGRAM) {
-               error = nfs_sndlock(rep);
+               error = nfs_sndlock(nmp, rep);
                if (error)
                        return (error);
 tryagain:
@@ -570,33 +565,33 @@ tryagain:
                 * attempt that has essentially shut down this
                 * mount point.
                 */
-               if (rep->r_mrep || (rep->r_flags & R_SOFTTERM)) {
-                       nfs_sndunlock(rep);
+               if (rep && (rep->r_mrep || (rep->r_flags & R_SOFTTERM))) {
+                       nfs_sndunlock(nmp);
                        return (EINTR);
                }
-               so = rep->r_nmp->nm_so;
-               if (!so) {
-                       error = nfs_reconnect(rep);
+               so = nmp->nm_so;
+               if (so == NULL) {
+                       error = nfs_reconnect(nmp, rep);
                        if (error) {
-                               nfs_sndunlock(rep);
+                               nfs_sndunlock(nmp);
                                return (error);
                        }
                        goto tryagain;
                }
-               while (rep->r_flags & R_MUSTRESEND) {
+               while (rep && (rep->r_flags & R_MUSTRESEND)) {
                        m = m_copym(rep->r_mreq, 0, M_COPYALL, MB_WAIT);
                        nfsstats.rpcretries++;
                        error = nfs_send(so, rep->r_nmp->nm_nam, m, rep);
                        if (error) {
                                if (error == EINTR || error == ERESTART ||
-                                   (error = nfs_reconnect(rep)) != 0) {
-                                       nfs_sndunlock(rep);
+                                   (error = nfs_reconnect(nmp, rep)) != 0) {
+                                       nfs_sndunlock(nmp);
                                        return (error);
                                }
                                goto tryagain;
                        }
                }
-               nfs_sndunlock(rep);
+               nfs_sndunlock(nmp);
                if (sotype == SOCK_STREAM) {
                        /*
                         * Get the length marker from the stream
@@ -629,7 +624,7 @@ tryagain:
                                 "short receive (%d/%d) from nfs server %s\n",
                                 (int)(sizeof(u_int32_t) - auio.uio_resid),
                                 (int)sizeof(u_int32_t),
-                                rep->r_nmp->nm_mountp->mnt_stat.f_mntfromname);
+                                nmp->nm_mountp->mnt_stat.f_mntfromname);
                            error = EPIPE;
                        }
                        if (error)
@@ -643,7 +638,7 @@ tryagain:
                            log(LOG_ERR, "%s (%d) from nfs server %s\n",
                                "impossible packet length",
                                len,
-                               rep->r_nmp->nm_mountp->mnt_stat.f_mntfromname);
+                               nmp->nm_mountp->mnt_stat.f_mntfromname);
                            error = EFBIG;
                            goto errout;
                        }
@@ -663,7 +658,7 @@ tryagain:
                            log(LOG_INFO,
                                "short receive (%d/%d) from nfs server %s\n",
                                len - auio.uio_resid, len,
-                               rep->r_nmp->nm_mountp->mnt_stat.f_mntfromname);
+                               nmp->nm_mountp->mnt_stat.f_mntfromname);
                            error = EPIPE;
                        }
                        *mp = sio.sb_mb;
@@ -707,19 +702,19 @@ errout:
                                log(LOG_INFO,
                                    "receive error %d from nfs server %s\n",
                                    error,
-                                rep->r_nmp->nm_mountp->mnt_stat.f_mntfromname);
+                                nmp->nm_mountp->mnt_stat.f_mntfromname);
                        }
-                       error = nfs_sndlock(rep);
+                       error = nfs_sndlock(nmp, rep);
                        if (!error) {
-                               error = nfs_reconnect(rep);
+                               error = nfs_reconnect(nmp, rep);
                                if (!error)
                                        goto tryagain;
                                else
-                                       nfs_sndunlock(rep);
+                                       nfs_sndunlock(nmp);
                        }
                }
        } else {
-               if ((so = rep->r_nmp->nm_so) == NULL)
+               if ((so = nmp->nm_so) == NULL)
                        return (EACCES);
                if (so->so_state & SS_ISCONNECTED)
                        getnam = NULL;
@@ -730,7 +725,7 @@ errout:
                        rcvflg = 0;
                        error =  so_pru_soreceive(so, getnam, NULL, &sio,
                                                  NULL, &rcvflg);
-                       if (error == EWOULDBLOCK &&
+                       if (error == EWOULDBLOCK && rep &&
                            (rep->r_flags & R_SOFTTERM)) {
                                m_freem(sio.sb_mb);
                                return (EINTR);
@@ -755,15 +750,18 @@ errout:
 
 /*
  * Implement receipt of reply on a socket.
+ *
  * We must search through the list of received datagrams matching them
  * with outstanding requests using the xid, until ours is found.
+ *
+ * If myrep is NULL we process packets on the socket until
+ * interrupted or until nm_reqrxq is non-empty.
  */
 /* ARGSUSED */
 int
-nfs_reply(struct nfsreq *myrep)
+nfs_reply(struct nfsmount *nmp, struct nfsreq *myrep)
 {
        struct nfsreq *rep;
-       struct nfsmount *nmp = myrep->r_nmp;
        struct sockaddr *nam;
        u_int32_t rxid;
        u_int32_t *tl;
@@ -789,24 +787,35 @@ nfs_reply(struct nfsreq *myrep)
                 */
                info.mrep = NULL;
 
-               error = nfs_rcvlock(myrep);
+               error = nfs_rcvlock(nmp, myrep);
                if (error == EALREADY)
                        return (0);
                if (error)
                        return (error);
+
+               /*
+                * If myrep is NULL we are the receiver helper thread.
+                * Stop waiting for incoming replies if there are
+                * replies sitting on reqrxq.
+                */
+               if (myrep == NULL && TAILQ_FIRST(&nmp->nm_reqrxq)) {
+                       nfs_rcvunlock(nmp);
+                       return(EWOULDBLOCK);
+               }
+
                /*
                 * Get the next Rpc reply off the socket
                 */
-               error = nfs_receive(myrep, &nam, &info.mrep);
-               nfs_rcvunlock(myrep);
+               error = nfs_receive(nmp, myrep, &nam, &info.mrep);
+               nfs_rcvunlock(nmp);
                if (error) {
                        /*
                         * Ignore routing errors on connectionless protocols??
                         */
                        if (NFSIGNORE_SOERROR(nmp->nm_soflags, error)) {
+                               if (nmp->nm_so == NULL)
+                                       return (error);
                                nmp->nm_so->so_error = 0;
-                               if (myrep->r_flags & R_GETONEREP)
-                                       return (0);
                                continue;
                        }
                        return (error);
@@ -826,8 +835,6 @@ nfs_reply(struct nfsreq *myrep)
                        m_freem(info.mrep);
                        info.mrep = NULL;
 nfsmout:
-                       if (myrep->r_flags & R_GETONEREP)
-                               return (0);
                        continue;
                }
 
@@ -910,7 +917,28 @@ nfsmout:
                        }
                        nmp->nm_timeouts = 0;
                        rep->r_mrep = info.mrep;
+
+                       /*
+                        * Wakeup anyone waiting explicitly for this reply.
+                        */
                        mtx_abort_ex_link(&rep->r_nmp->nm_rxlock, &rep->r_link);
+
+                       /*
+                        * Asynchronous replies are bound-over to the
+                        * rxthread.  Note that nmp->nm_reqqlen is not
+                        * decremented until the rxthread has finished
+                        * with the request.
+                        *
+                        * async is sometimes temporarily turned off to
+                        * avoid races.
+                        */
+                       if (rep->r_info && rep->r_info->async) {
+                               KKASSERT(rep->r_info->state ==
+                                        NFSM_STATE_WAITREPLY ||
+                                        rep->r_info->state ==
+                                        NFSM_STATE_TRY);
+                               nfs_async_return(nmp, rep);
+                       }
                }
                /*
                 * If not matched to a request, drop it.
@@ -925,8 +953,6 @@ nfsmout:
                                panic("nfsreply nil");
                        return (0);
                }
-               if (myrep->r_flags & R_GETONEREP)
-                       return (0);
        }
 }
 
@@ -940,11 +966,12 @@ nfsmout:
  * indicating that the rpc is still in progress.
  */
 int
-nfs_request(struct nfsm_info *info, nfsm_state_t target)
+nfs_request(struct nfsm_info *info, nfsm_state_t bstate, nfsm_state_t estate)
 {
+       struct nfsmount *nmp = info->nmp;
        struct nfsreq *req;
 
-       while (info->state == NFSM_STATE_DONE || info->state != target) {
+       while (info->state >= bstate && info->state < estate) {
                switch(info->state) {
                case NFSM_STATE_SETUP:
                        /*
@@ -981,9 +1008,36 @@ nfs_request(struct nfsm_info *info, nfsm_state_t target)
                         * Transmit or retransmit attempt.  An error in this
                         * state is ignored and we always move on to the
                         * next state.
+                        *
+                        * This can trivially race the receiver if the
+                        * request is asynchronous.  Temporarily turn
+                        * off async mode so the structure doesn't get
+                        * ripped out from under us, and resolve the
+                        * race.
+                        */
+                       if (info->async) {
+                               info->async = 0;
+                               info->error = nfs_request_try(info->req);
+                               crit_enter();
+                               info->async = 1;
+                               KKASSERT(info->state == NFSM_STATE_TRY);
+                               if (info->req->r_mrep)
+                                       nfs_async_return(nmp, info->req);
+                               else
+                                       info->state = NFSM_STATE_WAITREPLY;
+                               crit_exit();
+                       } else {
+                               info->error = nfs_request_try(info->req);
+                               info->state = NFSM_STATE_WAITREPLY;
+                       }
+
+                       /*
+                        * The backend can rip the request out from under
+                        * is at this point.  If we were async the estate
+                        * will be set to WAITREPLY.  Return immediately.
                         */
-                       info->error = nfs_request_try(info->req);
-                       info->state = NFSM_STATE_WAITREPLY;
+                       if (estate == NFSM_STATE_WAITREPLY)
+                               return (EINPROGRESS);
                        break;
                case NFSM_STATE_WAITREPLY:
                        /*
@@ -1023,9 +1077,7 @@ nfs_request(struct nfsm_info *info, nfsm_state_t target)
                        break;
                case NFSM_STATE_DONE:
                        /*
-                        * If the caller happens to re-call the state
-                        * machine after it returned completion, just
-                        * re-return the completion.
+                        * Shouldn't be reached
                         */
                        return (info->error);
                        /* NOT REACHED */
@@ -1033,9 +1085,11 @@ nfs_request(struct nfsm_info *info, nfsm_state_t target)
        }
 
        /*
-        * The target state (other then NFSM_STATE_DONE) was reached.
-        * Return EINPROGRESS.
+        * If we are done return the error code (if any).
+        * Otherwise return EINPROGRESS.
         */
+       if (info->state == NFSM_STATE_DONE)
+               return (info->error);
        return (EINPROGRESS);
 }
 
@@ -1081,6 +1135,13 @@ nfs_request_setup(nfsm_info_t info)
        req->r_mrest = info->mreq;
        req->r_mrest_len = i;
        req->r_cred = info->cred;
+
+       /*
+        * The presence of a non-NULL r_info in req indicates
+        * async completion via our helper threads.  See the receiver
+        * code.
+        */
+       req->r_info = info->async ? info : NULL;
        info->req = req;
        return(0);
 }
@@ -1185,8 +1246,10 @@ nfs_request_try(struct nfsreq *rep)
         * that we may block in this code so there is no atomicy guarentee.
         */
        crit_enter();
-       TAILQ_INSERT_TAIL(&nmp->nm_reqq, rep, r_chain);
        mtx_link_init(&rep->r_link);
+       TAILQ_INSERT_TAIL(&nmp->nm_reqq, rep, r_chain);/* XXX */
+       ++nmp->nm_reqqlen;
+       nfssvc_iod_reader_wakeup(nmp);
 
        error = 0;
 
@@ -1205,12 +1268,12 @@ nfs_request_try(struct nfsreq *rep)
            (nmp->nm_flag & NFSMNT_DUMBTIMR) ||
            nmp->nm_sent < nmp->nm_cwnd)) {
                if (nmp->nm_soflags & PR_CONNREQUIRED)
-                       error = nfs_sndlock(rep);
+                       error = nfs_sndlock(nmp, rep);
                if (!error) {
                        m2 = m_copym(rep->r_mreq, 0, M_COPYALL, MB_WAIT);
                        error = nfs_send(nmp->nm_so, nmp->nm_nam, m2, rep);
                        if (nmp->nm_soflags & PR_CONNREQUIRED)
-                               nfs_sndunlock(rep);
+                               nfs_sndunlock(nmp);
                }
                if (!error && (rep->r_flags & R_MUSTRESEND) == 0 &&
                    rep->r_mrep == NULL) {
@@ -1240,8 +1303,7 @@ nfs_request_waitreply(struct nfsreq *rep)
        struct nfsmount *nmp = rep->r_nmp;
        int error;
 
-
-       error = nfs_reply(rep);
+       error = nfs_reply(nmp, rep);
        crit_enter();
 
        /*
@@ -1253,6 +1315,7 @@ nfs_request_waitreply(struct nfsreq *rep)
                tsleep(&nfs_timer_raced, 0, "nfstrac", 0);
        }
        TAILQ_REMOVE(&nmp->nm_reqq, rep, r_chain);
+       --nmp->nm_reqqlen;
 
        /*
         * Decrement the outstanding request count.
@@ -1719,6 +1782,7 @@ nfs_nmcancelreqs(struct nfsmount *nmp)
                }
                nfs_softterm(req);
        }
+       /* XXX  the other two queues as well */
        crit_exit();
 
        for (i = 0; i < 30; i++) {
@@ -1735,6 +1799,22 @@ nfs_nmcancelreqs(struct nfsmount *nmp)
        return (EBUSY);
 }
 
+static void
+nfs_async_return(struct nfsmount *nmp, struct nfsreq *rep)
+{
+       KKASSERT(rep->r_info->state == NFSM_STATE_TRY ||
+                rep->r_info->state == NFSM_STATE_WAITREPLY);
+       rep->r_info->state = NFSM_STATE_PROCESSREPLY;
+       TAILQ_REMOVE(&nmp->nm_reqq, rep, r_chain);
+       if (rep->r_flags & R_SENT) {
+               rep->r_flags &= ~R_SENT;
+               nmp->nm_sent -= NFS_CWNDSCALE;
+       }
+       --nmp->nm_reqqlen;
+       TAILQ_INSERT_TAIL(&nmp->nm_reqrxq, rep, r_chain);
+       nfssvc_iod_reader_wakeup(nmp);
+}
+
 /*
  * Flag a request as being about to terminate (due to NFSMNT_INT/NFSMNT_SOFT).
  * The nm_send count is decremented now to avoid deadlocks when the process in
@@ -1743,16 +1823,26 @@ nfs_nmcancelreqs(struct nfsmount *nmp)
  * This routine must be called at splsoftclock() to protect r_flags and
  * nm_sent.
  */
-
 static void
 nfs_softterm(struct nfsreq *rep)
 {
+       struct nfsmount *nmp = rep->r_nmp;
+
        rep->r_flags |= R_SOFTTERM;
 
        if (rep->r_flags & R_SENT) {
                rep->r_nmp->nm_sent -= NFS_CWNDSCALE;
                rep->r_flags &= ~R_SENT;
        }
+
+       /*
+        * Asynchronous replies are bound-over to the
+        * rxthread.  Note that nmp->nm_reqqlen is not
+        * decremented until the rxthread has finished
+        * with the request.
+        */
+       if (rep->r_info && rep->r_info->async)
+               nfs_async_return(nmp, rep);
 }
 
 /*
@@ -1794,9 +1884,9 @@ nfs_sigintr(struct nfsmount *nmp, struct nfsreq *rep, struct thread *td)
  * in progress when a reconnect is necessary.
  */
 int
-nfs_sndlock(struct nfsreq *rep)
+nfs_sndlock(struct nfsmount *nmp, struct nfsreq *rep)
 {
-       mtx_t mtx = &rep->r_nmp->nm_txlock;
+       mtx_t mtx = &nmp->nm_txlock;
        struct thread *td;
        int slptimeo;
        int slpflag;
@@ -1804,12 +1894,12 @@ nfs_sndlock(struct nfsreq *rep)
 
        slpflag = 0;
        slptimeo = 0;
-       td = rep->r_td;
-       if (rep->r_nmp->nm_flag & NFSMNT_INT)
+       td = rep ? rep->r_td : NULL;
+       if (nmp->nm_flag & NFSMNT_INT)
                slpflag = PCATCH;
 
        while ((error = mtx_lock_ex_try(mtx)) != 0) {
-               if (nfs_sigintr(rep->r_nmp, rep, td)) {
+               if (nfs_sigintr(nmp, rep, td)) {
                        error = EINTR;
                        break;
                }
@@ -1822,7 +1912,7 @@ nfs_sndlock(struct nfsreq *rep)
                }
        }
        /* Always fail if our request has been cancelled. */
-       if (rep->r_flags & R_SOFTTERM) {
+       if (rep && (rep->r_flags & R_SOFTTERM)) {
                if (error == 0)
                        mtx_unlock(mtx);
                error = EINTR;
@@ -1834,17 +1924,20 @@ nfs_sndlock(struct nfsreq *rep)
  * Unlock the stream socket for others.
  */
 void
-nfs_sndunlock(struct nfsreq *rep)
+nfs_sndunlock(struct nfsmount *nmp)
 {
-       mtx_t mtx = &rep->r_nmp->nm_txlock;
-
-       mtx_unlock(mtx);
+       mtx_unlock(&nmp->nm_txlock);
 }
 
+/*
+ * Lock the receiver side of the socket.
+ *
+ * rep may be NULL.
+ */
 static int
-nfs_rcvlock(struct nfsreq *rep)
+nfs_rcvlock(struct nfsmount *nmp, struct nfsreq *rep)
 {
-       mtx_t mtx = &rep->r_nmp->nm_rxlock;
+       mtx_t mtx = &nmp->nm_rxlock;
        int slpflag;
        int slptimeo;
        int error;
@@ -1858,21 +1951,21 @@ nfs_rcvlock(struct nfsreq *rep)
         * We do not strictly need the second check just before the
         * tsleep(), but it's good defensive programming.
         */
-       if (rep->r_mrep != NULL)
+       if (rep && rep->r_mrep != NULL)
                return (EALREADY);
 
-       if (rep->r_nmp->nm_flag & NFSMNT_INT)
+       if (nmp->nm_flag & NFSMNT_INT)
                slpflag = PCATCH;
        else
                slpflag = 0;
        slptimeo = 0;
 
        while ((error = mtx_lock_ex_try(mtx)) != 0) {
-               if (nfs_sigintr(rep->r_nmp, rep, rep->r_td)) {
+               if (nfs_sigintr(nmp, rep, (rep ? rep->r_td : NULL))) {
                        error = EINTR;
                        break;
                }
-               if (rep->r_mrep != NULL) {
+               if (rep && rep->r_mrep != NULL) {
                        error = EALREADY;
                        break;
                }
@@ -1881,8 +1974,13 @@ nfs_rcvlock(struct nfsreq *rep)
                 * NOTE: can return ENOLCK, but in that case rep->r_mrep
                 *       will already be set.
                 */
-               error = mtx_lock_ex_link(mtx, &rep->r_link, "nfsrcvlk",
-                                        slpflag, slptimeo);
+               if (rep) {
+                       error = mtx_lock_ex_link(mtx, &rep->r_link,
+                                                "nfsrcvlk",
+                                                slpflag, slptimeo);
+               } else {
+                       error = mtx_lock_ex(mtx, "nfsrcvlk", slpflag, slptimeo);
+               }
                if (error == 0)
                        break;
 
@@ -1892,7 +1990,7 @@ nfs_rcvlock(struct nfsreq *rep)
                 * situation where a single iod could 'capture' the
                 * recieve lock.
                 */
-               if (rep->r_mrep != NULL) {
+               if (rep && rep->r_mrep != NULL) {
                        error = EALREADY;
                        break;
                }
@@ -1902,7 +2000,7 @@ nfs_rcvlock(struct nfsreq *rep)
                }
        }
        if (error == 0) {
-               if (rep->r_mrep != NULL) {
+               if (rep && rep->r_mrep != NULL) {
                        error = EALREADY;
                        mtx_unlock(mtx);
                }
@@ -1914,11 +2012,9 @@ nfs_rcvlock(struct nfsreq *rep)
  * Unlock the stream socket for others.
  */
 static void
-nfs_rcvunlock(struct nfsreq *rep)
+nfs_rcvunlock(struct nfsmount *nmp)
 {
-       mtx_t mtx = &rep->r_nmp->nm_rxlock;
-
-       mtx_unlock(mtx);
+       mtx_unlock(&nmp->nm_rxlock);
 }
 
 /*
index 0a8adbb..b33c5ee 100644 (file)
@@ -904,6 +904,8 @@ mountnfs(struct nfs_args *argp, struct mount *mp, struct sockaddr *nam,
                TAILQ_INIT(&nmp->nm_uidlruhead);
                TAILQ_INIT(&nmp->nm_bioq);
                TAILQ_INIT(&nmp->nm_reqq);
+               TAILQ_INIT(&nmp->nm_reqtxq);
+               TAILQ_INIT(&nmp->nm_reqrxq);
                mp->mnt_data = (qaddr_t)nmp;
        }
        vfs_getnewfsid(mp);
index 6a113f1..9872412 100644 (file)
@@ -399,7 +399,7 @@ nfs_access(struct vop_access_args *ap)
                        auio.uio_td = td;
 
                        if (vp->v_type == VREG) {
-                               error = nfs_readrpc(vp, &auio);
+                               error = nfs_readrpc_uio(vp, &auio);
                        } else if (vp->v_type == VDIR) {
                                char* bp;
                                bp = kmalloc(NFS_DIRBLKSIZ, M_TEMP, M_WAITOK);
@@ -1232,11 +1232,12 @@ nfsmout:
 }
 
 /*
- * nfs read rpc call
- * Ditto above
+ * nfs read rpc.
+ *
+ * If bio is non-NULL and asynchronous
  */
 int
-nfs_readrpc(struct vnode *vp, struct uio *uiop)
+nfs_readrpc_uio(struct vnode *vp, struct uio *uiop)
 {
        u_int32_t *tl;
        struct nfsmount *nmp;
@@ -2910,10 +2911,6 @@ nfs_bmap(struct vop_bmap_args *ap)
 
 /*
  * Strategy routine.
- *
- * For async requests when nfsiod(s) are running, queue the request by
- * calling nfs_asyncio(), otherwise just all nfs_doio() to do the
- * request.
  */
 static int
 nfs_strategy(struct vop_strategy_args *ap)
@@ -2922,7 +2919,6 @@ nfs_strategy(struct vop_strategy_args *ap)
        struct bio *nbio;
        struct buf *bp = bio->bio_buf;
        struct thread *td;
-       int error = 0;
 
        KASSERT(bp->b_cmd != BUF_CMD_DONE,
                ("nfs_strategy: buffer %p unexpectedly marked done", bp));
@@ -2947,9 +2943,12 @@ nfs_strategy(struct vop_strategy_args *ap)
         * queue the request, wake it up and wait for completion
         * otherwise just do it ourselves.
         */
-       if ((bio->bio_flags & BIO_SYNC) || nfs_asyncio(ap->a_vp, nbio, td))
-               error = nfs_doio(ap->a_vp, nbio, td);
-       return (error);
+       if (bio->bio_flags & BIO_SYNC) {
+               nfs_startio(ap->a_vp, nbio, td);
+       } else {
+               nfs_asyncio(ap->a_vp, nbio);
+       }
+       return(0);
 }
 
 /*
index 372568c..7b98604 100644 (file)
@@ -717,6 +717,23 @@ nfsm_mtouio(nfsm_info_t info, struct uio *uiop, int len)
  * Caller is expected to abort if a non-zero error is returned.
  */
 int
+nfsm_mtobio(nfsm_info_t info, struct bio *bio, int len)
+{
+       int error;
+
+       if (len > 0 &&
+          (error = nfsm_mbuftobio(&info->md, bio, len, &info->dpos)) != 0) {
+               m_freem(info->mrep);
+               info->mrep = NULL;
+               return(error);
+       }
+       return (0);
+}
+
+/*
+ * Caller is expected to abort if a non-zero error is returned.
+ */
+int
 nfsm_uiotom(nfsm_info_t info, struct uio *uiop, int len)
 {
        int error;
@@ -735,6 +752,9 @@ nfsm_uiotom(nfsm_info_t info, struct uio *uiop, int len)
  *
  * We load up the remaining info fields and run the request state
  * machine until it is done.
+ *
+ * This call runs the entire state machine and does not return until
+ * the command is complete.
  */
 int
 nfsm_request(nfsm_info_t info, struct vnode *vp, int procnum,
@@ -745,8 +765,11 @@ nfsm_request(nfsm_info_t info, struct vnode *vp, int procnum,
        info->vp = vp;
        info->td = td;
        info->cred = cred;
+       info->async = 0;
+       info->bio = NULL;
+       info->nmp = VFSTONFS(vp->v_mount);
 
-       *errorp = nfs_request(info, NFSM_STATE_DONE);
+       *errorp = nfs_request(info, NFSM_STATE_SETUP, NFSM_STATE_DONE);
        if (*errorp) {
                if ((*errorp & NFSERR_RETERR) == 0)
                        return(-1);
@@ -756,6 +779,40 @@ nfsm_request(nfsm_info_t info, struct vnode *vp, int procnum,
 }
 
 /*
+ * This call starts the state machine through the initial transmission.
+ * Completion is via the bio.  The info structure must have installed
+ * a 'done' callback.
+ *
+ * If we are unable to do the initial tx we generate the bio completion
+ * ourselves.
+ */
+void
+nfsm_request_bio(nfsm_info_t info, struct vnode *vp, int procnum,
+            thread_t td, struct ucred *cred)
+{
+       struct buf *bp;
+       int error;
+
+       info->state = NFSM_STATE_SETUP;
+       info->procnum = procnum;
+       info->vp = vp;
+       info->td = td;
+       info->cred = cred;
+       info->async = 1;
+       info->nmp = VFSTONFS(vp->v_mount);
+
+       error = nfs_request(info, NFSM_STATE_SETUP, NFSM_STATE_WAITREPLY);
+       if (error != EINPROGRESS) {
+               kprintf("nfsm_request_bio: early abort %d\n", error);
+               bp = info->bio->bio_buf;
+               if (error)
+                       bp->b_flags |= B_ERROR;
+               bp->b_error = error;
+               biodone(info->bio);
+       }
+}
+
+/*
  * Caller is expected to abort if a non-zero error is returned.
  */
 int
@@ -1025,6 +1082,66 @@ nfsm_mbuftouio(struct mbuf **mrep, struct uio *uiop, int siz, caddr_t *dpos)
 }
 
 /*
+ * copies mbuf chain to the bio buffer
+ */
+int
+nfsm_mbuftobio(struct mbuf **mrep, struct bio *bio, int size, caddr_t *dpos)
+{
+       struct buf *bp = bio->bio_buf;
+       char *mbufcp;
+       char *bio_cp;
+       int xfer, len;
+       struct mbuf *mp;
+       long rem;
+       int error = 0;
+       int bio_left;
+
+       mp = *mrep;
+       mbufcp = *dpos;
+       len = mtod(mp, caddr_t) + mp->m_len - mbufcp;
+       rem = nfsm_rndup(size) - size;
+
+       bio_left = bp->b_bcount;
+       bio_cp = bp->b_data;
+
+       while (size > 0) {
+               while (len == 0) {
+                       mp = mp->m_next;
+                       if (mp == NULL)
+                               return (EBADRPC);
+                       mbufcp = mtod(mp, caddr_t);
+                       len = mp->m_len;
+               }
+               if ((xfer = len) > size)
+                       xfer = size;
+               if (bio_left) {
+                       if (xfer > bio_left)
+                               xfer = bio_left;
+                       bcopy(mbufcp, bio_cp, xfer);
+               } else {
+                       /*
+                        * Not enough buffer space in the bio.
+                        */
+                       return(EFBIG);
+               }
+               size -= xfer;
+               bio_left -= xfer;
+               bio_cp += xfer;
+               len -= xfer;
+               mbufcp += xfer;
+       }
+       *dpos = mbufcp;
+       *mrep = mp;
+       if (rem > 0) {
+               if (len < rem)
+                       error = nfs_adv(mrep, dpos, rem, len);
+               else
+                       *dpos += rem;
+       }
+       return (error);
+}
+
+/*
  * copies a uio scatter/gather list to an mbuf chain.
  * NOTE: can ony handle iovcnt == 1
  */
index 4515aee..2e7d234 100644 (file)
@@ -80,7 +80,15 @@ struct nfsm_info {
        struct thread   *td;
        struct ucred    *cred;
        struct nfsreq   *req;
+       struct nfsmount *nmp;
+       int             async;  /* indicates async completion */
        int             error;
+
+       /*
+        * Retained state for higher level VOP and BIO operations
+        */
+       struct bio      *bio;
+       void            (*done)(struct nfsm_info *);
 };
 
 typedef struct nfsm_info *nfsm_info_t;
@@ -145,9 +153,14 @@ int        nfsm_strsiz(nfsm_info_t info, int maxlen);
 int    nfsm_srvstrsiz(nfsm_info_t info, int maxlen, int *errorp);
 int    nfsm_srvnamesiz(nfsm_info_t info, int *errorp);
 int    nfsm_mtouio(nfsm_info_t info, struct uio *uiop, int len);
+int    nfsm_mtobio(nfsm_info_t info, struct bio *bio, int len);
+
 int    nfsm_uiotom(nfsm_info_t info, struct uio *uiop, int len);
+int    nfsm_biotom(nfsm_info_t info, struct bio *bio, int len);
 int    nfsm_request(nfsm_info_t info, struct vnode *vp, int procnum,
                                thread_t td, struct ucred *cred, int *errorp);
+void   nfsm_request_bio(nfsm_info_t info, struct vnode *vp, int procnum,
+                               thread_t td, struct ucred *cred);
 int    nfsm_strtom(nfsm_info_t info, const void *data, int len, int maxlen);
 int    nfsm_reply(nfsm_info_t info, struct nfsrv_descript *nfsd,
                                struct nfssvc_sock *slp, int siz, int *errorp);
@@ -161,8 +174,12 @@ void       *_nfsm_clget(nfsm_info_t info, struct mbuf *mp1, struct mbuf *mp2,
 int    nfsm_srvsattr(nfsm_info_t info, struct vattr *vap);
 int    nfsm_mbuftouio(struct mbuf **mrep, struct uio *uiop,
                                int siz, caddr_t *dpos);
+int    nfsm_mbuftobio(struct mbuf **mrep, struct bio *bio,
+                               int siz, caddr_t *dpos);
 int    nfsm_uiotombuf (struct uio *uiop, struct mbuf **mq,
                                int siz, caddr_t *bpos);
+int    nfsm_biotombuf (struct bio *bio, struct mbuf **mq,
+                               int siz, caddr_t *bpos);
 int    nfsm_disct(struct mbuf **mdp, caddr_t *dposp, int siz,
                                int left, caddr_t *cp2);
 int    nfsm_strtmbuf (struct mbuf **, char **, const char *, long);
@@ -175,7 +192,7 @@ void        nfsm_srvpostop_attr(nfsm_info_t info, struct nfsrv_descript *nfsd,
 void   nfsm_srvfattr(struct nfsrv_descript *nfsd, struct vattr *vap,
                                struct nfs_fattr *fp);
 
-int     nfs_request (struct nfsm_info *, nfsm_state_t);
+int     nfs_request (struct nfsm_info *, nfsm_state_t, nfsm_state_t);
 
 #define nfsm_clget(info, mp1, mp2, bp, be)     \
        ((bp >= be) ? _nfsm_clget(info, mp1, mp2, bp, be) : (void *)bp)
index 0fbfb3a..859e249 100644 (file)
@@ -105,7 +105,9 @@ struct      nfsmount {
        TAILQ_HEAD(, nfsuid) nm_uidlruhead; /* Lists of nfsuid mappings */
        LIST_HEAD(, nfsuid) nm_uidhashtbl[NFS_MUIDHASHSIZ];
        TAILQ_HEAD(, bio) nm_bioq;      /* async io buffer queue */
-       TAILQ_HEAD(, nfsreq) nm_reqq;   /* nfsreq queue */
+       TAILQ_HEAD(, nfsreq) nm_reqtxq; /* nfsreq queue - tx processing */
+       TAILQ_HEAD(, nfsreq) nm_reqrxq; /* nfsreq queue - rx processing */
+       TAILQ_HEAD(, nfsreq) nm_reqq;   /* nfsreq queue - pending */
        int     nm_bioqlen;             /* number of buffers in queue */
        int     nm_reqqlen;             /* number of nfsreqs in queue */
        u_int64_t nm_maxfilesize;       /* maximum file size */