kernel - Cluster fixes + Enable clustering for HAMMER1
authorMatthew Dillon <dillon@apollo.backplane.com>
Thu, 22 Mar 2012 23:45:55 +0000 (16:45 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Fri, 30 Mar 2012 01:31:42 +0000 (18:31 -0700)
* Add cluster_awrite(), which replaces vfs_bio_awrite() and has the same
  basic semantics as bawrite().

* Remove vfs_bio_awrite(), which had an odd API that required the buffer
  to be locked but not removed from its queues.

* Make cluster operations work on disk device buffers as well as on
  regular files.

* Add a blkflags argument to getcacheblk(), allowing GETBLK_NOWAIT to
  be passed to it.

* Enhance cluster_wbuild() to support cluster_awrite() by having it take
  an optional bp to incorporate into the cluster.  The caller disposes of
  the bp by calling bawrite() if the cluster_wbuild() code could not use it.

* Certain adjustments to cluster_write() and related code in checking against
  the file EOF to not break when variable block sizes are used.

* Fix a bug in calls made to buf_checkwrite().  The caller is required to
  initiate the I/O if the function returns good (0).  HAMMER1 uses this
  save side effects and blows up if the I/O is then not initiated.

* Enable clustering in HAMMER1 for both data and meta-data.

sys/kern/vfs_bio.c
sys/kern/vfs_cluster.c
sys/kern/vfs_subr.c
sys/sys/buf.h
sys/vfs/hammer/hammer_io.c
sys/vfs/hammer/hammer_vnops.c
sys/vfs/tmpfs/tmpfs_vnops.c

index b6a96eb..1cda8d0 100644 (file)
@@ -1862,96 +1862,6 @@ vfs_vmio_release(struct buf *bp)
 }
 
 /*
- * vfs_bio_awrite:
- *
- *     Implement clustered async writes for clearing out B_DELWRI buffers.
- *     This is much better then the old way of writing only one buffer at
- *     a time.  Note that we may not be presented with the buffers in the 
- *     correct order, so we search for the cluster in both directions.
- *
- *     The buffer is locked on call.
- */
-int
-vfs_bio_awrite(struct buf *bp)
-{
-       int i;
-       int j;
-       off_t loffset = bp->b_loffset;
-       struct vnode *vp = bp->b_vp;
-       int nbytes;
-       struct buf *bpa;
-       int nwritten;
-       int size;
-
-       /*
-        * right now we support clustered writing only to regular files.  If
-        * we find a clusterable block we could be in the middle of a cluster
-        * rather then at the beginning.
-        *
-        * NOTE: b_bio1 contains the logical loffset and is aliased
-        * to b_loffset.  b_bio2 contains the translated block number.
-        */
-       if ((vp->v_type == VREG) && 
-           (vp->v_mount != 0) && /* Only on nodes that have the size info */
-           (bp->b_flags & (B_CLUSTEROK | B_INVAL)) == B_CLUSTEROK) {
-
-               size = vp->v_mount->mnt_stat.f_iosize;
-
-               for (i = size; i < MAXPHYS; i += size) {
-                       if ((bpa = findblk(vp, loffset + i, FINDBLK_TEST)) &&
-                           BUF_REFCNT(bpa) == 0 &&
-                           ((bpa->b_flags & (B_DELWRI | B_CLUSTEROK | B_INVAL)) ==
-                           (B_DELWRI | B_CLUSTEROK)) &&
-                           (bpa->b_bufsize == size)) {
-                               if ((bpa->b_bio2.bio_offset == NOOFFSET) ||
-                                   (bpa->b_bio2.bio_offset !=
-                                    bp->b_bio2.bio_offset + i))
-                                       break;
-                       } else {
-                               break;
-                       }
-               }
-               for (j = size; i + j <= MAXPHYS && j <= loffset; j += size) {
-                       if ((bpa = findblk(vp, loffset - j, FINDBLK_TEST)) &&
-                           BUF_REFCNT(bpa) == 0 &&
-                           ((bpa->b_flags & (B_DELWRI | B_CLUSTEROK | B_INVAL)) ==
-                           (B_DELWRI | B_CLUSTEROK)) &&
-                           (bpa->b_bufsize == size)) {
-                               if ((bpa->b_bio2.bio_offset == NOOFFSET) ||
-                                   (bpa->b_bio2.bio_offset !=
-                                    bp->b_bio2.bio_offset - j))
-                                       break;
-                       } else {
-                               break;
-                       }
-               }
-               j -= size;
-               nbytes = (i + j);
-
-               /*
-                * this is a possible cluster write
-                */
-               if (nbytes != size) {
-                       BUF_UNLOCK(bp);
-                       nwritten = cluster_wbuild(vp, size,
-                                                 loffset - j, nbytes);
-                       return nwritten;
-               }
-       }
-
-       /*
-        * default (old) behavior, writing out only one block
-        *
-        * XXX returns b_bufsize instead of b_bcount for nwritten?
-        */
-       nwritten = bp->b_bufsize;
-       bremfree(bp);
-       bawrite(bp);
-
-       return nwritten;
-}
-
-/*
  * getnewbuf:
  *
  *     Find and initialize a new buffer header, freeing up existing buffers 
@@ -2716,16 +2626,16 @@ flushbufqueues(struct buf *marker, bufq_type_t q)
                 *
                 * NOTE: buf_checkwrite is MPSAFE.
                 */
+               bremfree(bp);
                if (LIST_FIRST(&bp->b_dep) != NULL && buf_checkwrite(bp)) {
-                       bremfree(bp);
                        brelse(bp);
                } else if (bp->b_flags & B_ERROR) {
                        tsleep(bp, 0, "bioer", 1);
                        bp->b_flags &= ~B_AGE;
-                       vfs_bio_awrite(bp);
+                       cluster_awrite(bp);
                } else {
                        bp->b_flags |= B_AGE;
-                       vfs_bio_awrite(bp);
+                       cluster_awrite(bp);
                }
                spin_lock(&bufqspin);
                ++r;
@@ -2892,12 +2802,13 @@ findblk(struct vnode *vp, off_t loffset, int flags)
  *     still be fully cached after reinstantiation to be returned.
  */
 struct buf *
-getcacheblk(struct vnode *vp, off_t loffset, int blksize)
+getcacheblk(struct vnode *vp, off_t loffset, int blksize, int blkflags)
 {
        struct buf *bp;
+       int fndflags = (blkflags & GETBLK_NOWAIT) ? FINDBLK_NBLOCK : 0;
 
        if (blksize) {
-               bp = getblk(vp, loffset, blksize, 0, 0);
+               bp = getblk(vp, loffset, blksize, blkflags, 0);
                if (bp) {
                        if ((bp->b_flags & (B_INVAL | B_CACHE | B_RAM)) ==
                            B_CACHE) {
@@ -2908,7 +2819,7 @@ getcacheblk(struct vnode *vp, off_t loffset, int blksize)
                        }
                }
        } else {
-               bp = findblk(vp, loffset, 0);
+               bp = findblk(vp, loffset, fndflags);
                if (bp) {
                        if ((bp->b_flags & (B_INVAL | B_CACHE | B_RAM)) ==
                            B_CACHE) {
index cfdfc10..d72eecc 100644 (file)
@@ -76,6 +76,8 @@ static struct buf *
                            struct buf *fbp);
 static void cluster_callback (struct bio *);
 static void cluster_setram (struct buf *);
+static int cluster_wbuild(struct vnode *vp, struct buf **bpp, int blksize,
+                           off_t start_loffset, int bytes);
 
 static int write_behind = 1;
 SYSCTL_INT(_vfs, OID_AUTO, write_behind, CTLFLAG_RW, &write_behind, 0,
@@ -665,7 +667,7 @@ cluster_wbuild_wb(struct vnode *vp, int blksize, off_t start_loffset, int len)
                start_loffset -= len;
                /* fall through */
        case 1:
-               r = cluster_wbuild(vp, blksize, start_loffset, len);
+               r = cluster_wbuild(vp, NULL, blksize, start_loffset, len);
                /* fall through */
        default:
                /* fall through */
@@ -727,7 +729,7 @@ cluster_write(struct buf *bp, off_t filesize, int blksize, int seqcount)
                         * flush.
                         */
                        cursize = vp->v_lastw - vp->v_cstart + blksize;
-                       if (bp->b_loffset + blksize != filesize ||
+                       if (bp->b_loffset + blksize < filesize ||
                            loffset != vp->v_lastw + blksize || vp->v_clen <= cursize) {
                                if (!async && seqcount > 0) {
                                        cluster_wbuild_wb(vp, blksize,
@@ -778,7 +780,7 @@ cluster_write(struct buf *bp, off_t filesize, int blksize, int seqcount)
                 * existing cluster.
                 */
                if ((vp->v_type == VREG) &&
-                   bp->b_loffset + blksize != filesize &&
+                   bp->b_loffset + blksize < filesize &&
                    (bp->b_bio2.bio_offset == NOOFFSET) &&
                    (VOP_BMAP(vp, loffset, &bp->b_bio2.bio_offset, &maxclen, NULL, BUF_CMD_WRITE) ||
                     bp->b_bio2.bio_offset == NOOFFSET)) {
@@ -827,38 +829,81 @@ cluster_write(struct buf *bp, off_t filesize, int blksize, int seqcount)
        vp->v_lasta = bp->b_bio2.bio_offset;
 }
 
+/*
+ * This is the clustered version of bawrite().  It works similarly to
+ * cluster_write() except I/O on the buffer is guaranteed to occur.
+ */
+int
+cluster_awrite(struct buf *bp)
+{
+       int total;
+
+       /*
+        * Don't bother if it isn't clusterable.
+        */
+       if ((bp->b_flags & B_CLUSTEROK) == 0 ||
+           bp->b_vp == NULL ||
+           (bp->b_vp->v_flag & VOBJBUF) == 0) {
+               total = bp->b_bufsize;
+               bawrite(bp);
+               return (total);
+       }
+
+       total = cluster_wbuild(bp->b_vp, &bp, bp->b_bufsize,
+                              bp->b_loffset, vmaxiosize(bp->b_vp));
+       if (bp)
+               bawrite(bp);
+
+       return total;
+}
 
 /*
  * This is an awful lot like cluster_rbuild...wish they could be combined.
  * The last lbn argument is the current block on which I/O is being
  * performed.  Check to see that it doesn't fall in the middle of
  * the current block (if last_bp == NULL).
+ *
+ * cluster_wbuild() normally does not guarantee anything.  If bpp is
+ * non-NULL and cluster_wbuild() is able to incorporate it into the
+ * I/O it will set *bpp to NULL, otherwise it will leave it alone and
+ * the caller must dispose of *bpp.
  */
-int
-cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
+static int
+cluster_wbuild(struct vnode *vp, struct buf **bpp,
+              int blksize, off_t start_loffset, int bytes)
 {
        struct buf *bp, *tbp;
        int i, j;
        int totalwritten = 0;
+       int must_initiate;
        int maxiosize = vmaxiosize(vp);
 
        while (bytes > 0) {
                /*
-                * If the buffer is not delayed-write (i.e. dirty), or it 
-                * is delayed-write but either locked or inval, it cannot 
-                * partake in the clustered write.
+                * If the buffer matches the passed locked & removed buffer
+                * we used the passed buffer (which might not be B_DELWRI).
+                *
+                * Otherwise locate the buffer and determine if it is
+                * compatible.
                 */
-               tbp = findblk(vp, start_loffset, FINDBLK_NBLOCK);
-               if (tbp == NULL ||
-                   (tbp->b_flags & (B_LOCKED | B_INVAL | B_DELWRI)) != B_DELWRI ||
-                   (LIST_FIRST(&tbp->b_dep) && buf_checkwrite(tbp))) {
-                       if (tbp)
-                               BUF_UNLOCK(tbp);
-                       start_loffset += blksize;
-                       bytes -= blksize;
-                       continue;
+               if (bpp && (*bpp)->b_loffset == start_loffset) {
+                       tbp = *bpp;
+                       *bpp = NULL;
+                       bpp = NULL;
+               } else {
+                       tbp = findblk(vp, start_loffset, FINDBLK_NBLOCK);
+                       if (tbp == NULL ||
+                           (tbp->b_flags & (B_LOCKED | B_INVAL | B_DELWRI)) !=
+                            B_DELWRI ||
+                           (LIST_FIRST(&tbp->b_dep) && buf_checkwrite(tbp))) {
+                               if (tbp)
+                                       BUF_UNLOCK(tbp);
+                               start_loffset += blksize;
+                               bytes -= blksize;
+                               continue;
+                       }
+                       bremfree(tbp);
                }
-               bremfree(tbp);
                KKASSERT(tbp->b_cmd == BUF_CMD_DONE);
 
                /*
@@ -911,9 +956,18 @@ cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
                 * From this location in the file, scan forward to see
                 * if there are buffers with adjacent data that need to
                 * be written as well.
+                *
+                * IO *must* be initiated on index 0 at this point
+                * (particularly when called from cluster_awrite()).
                 */
                for (i = 0; i < bytes; (i += blksize), (start_loffset += blksize)) {
-                       if (i != 0) { /* If not the first buffer */
+                       if (i == 0) {
+                               must_initiate = 1;
+                       } else {
+                               /*
+                                * Not first buffer.
+                                */
+                               must_initiate = 0;
                                tbp = findblk(vp, start_loffset,
                                              FINDBLK_NBLOCK);
                                /*
@@ -932,9 +986,7 @@ cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
                                     B_INVAL | B_DELWRI | B_NEEDCOMMIT))
                                    != (B_DELWRI | B_CLUSTEROK |
                                     (bp->b_flags & (B_VMIO | B_NEEDCOMMIT))) ||
-                                   (tbp->b_flags & B_LOCKED) ||
-                                   (LIST_FIRST(&tbp->b_dep) &&
-                                    buf_checkwrite(tbp))
+                                   (tbp->b_flags & B_LOCKED)
                                ) {
                                        BUF_UNLOCK(tbp);
                                        break;
@@ -944,15 +996,24 @@ cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
                                 * Check that the combined cluster
                                 * would make sense with regard to pages
                                 * and would not be too large
+                                *
+                                * WARNING! buf_checkwrite() must be the last
+                                *          check made.  If it returns 0 then
+                                *          we must initiate the I/O.
                                 */
                                if ((tbp->b_bcount != blksize) ||
                                  ((bp->b_bio2.bio_offset + i) !=
                                    tbp->b_bio2.bio_offset) ||
                                  ((tbp->b_xio.xio_npages + bp->b_xio.xio_npages) >
-                                   (maxiosize / PAGE_SIZE))) {
+                                   (maxiosize / PAGE_SIZE)) ||
+                                 (LIST_FIRST(&tbp->b_dep) &&
+                                  buf_checkwrite(tbp))
+                               ) {
                                        BUF_UNLOCK(tbp);
                                        break;
                                }
+                               if (LIST_FIRST(&tbp->b_dep))
+                                       must_initiate = 1;
                                /*
                                 * Ok, it's passed all the tests,
                                 * so remove it from the free list
@@ -960,7 +1021,7 @@ cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
                                 */
                                bremfree(tbp);
                                KKASSERT(tbp->b_cmd == BUF_CMD_DONE);
-                       } /* end of code for non-first buffers only */
+                       }
 
                        /*
                         * If the IO is via the VM then we do some
@@ -973,8 +1034,15 @@ cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
                        if (tbp->b_flags & B_VMIO) {
                                vm_page_t m;
 
-                               if (i != 0) { /* if not first buffer */
-                                       for (j = 0; j < tbp->b_xio.xio_npages; ++j) {
+                               /*
+                                * Try to avoid deadlocks with the VM system.
+                                * However, we cannot abort the I/O if
+                                * must_initiate is non-zero.
+                                */
+                               if (must_initiate == 0) {
+                                       for (j = 0;
+                                            j < tbp->b_xio.xio_npages;
+                                            ++j) {
                                                m = tbp->b_xio.xio_pages[j];
                                                if (m->flags & PG_BUSY) {
                                                        bqrelse(tbp);
@@ -1012,12 +1080,13 @@ cluster_wbuild(struct vnode *vp, int blksize, off_t start_loffset, int bytes)
                                buf_start(tbp);
                }
        finishcluster:
-               pmap_qenter(trunc_page((vm_offset_t) bp->b_data),
-                       (vm_page_t *) bp->b_xio.xio_pages, bp->b_xio.xio_npages);
+               pmap_qenter(trunc_page((vm_offset_t)bp->b_data),
+                           (vm_page_t *)bp->b_xio.xio_pages,
+                           bp->b_xio.xio_npages);
                if (bp->b_bufsize > bp->b_kvasize) {
-                       panic(
-                           "cluster_wbuild: b_bufsize(%d) > b_kvasize(%d)\n",
-                           bp->b_bufsize, bp->b_kvasize);
+                       panic("cluster_wbuild: b_bufsize(%d) "
+                             "> b_kvasize(%d)\n",
+                             bp->b_bufsize, bp->b_kvasize);
                }
                totalwritten += bp->b_bufsize;
                bp->b_dirtyoff = 0;
index ae1489c..ec42fe6 100644 (file)
@@ -401,31 +401,22 @@ vinvalbuf_bp(struct buf *bp, void *data)
        }
 
        /*
-        * Note that vfs_bio_awrite expects buffers to reside
-        * on a queue, while bwrite() and brelse() do not.
-        *
         * NOTE:  NO B_LOCKED CHECK.  Also no buf_checkwrite()
         * check.  This code will write out the buffer, period.
         */
+       bremfree(bp);
        if (((bp->b_flags & (B_DELWRI | B_INVAL)) == B_DELWRI) &&
            (info->flags & V_SAVE)) {
-               if (bp->b_flags & B_CLUSTEROK) {
-                       vfs_bio_awrite(bp);
-               } else {
-                       bremfree(bp);
-                       bawrite(bp);
-               }
+               cluster_awrite(bp);
        } else if (info->flags & V_SAVE) {
                /*
                 * Cannot set B_NOCACHE on a clean buffer as this will
                 * destroy the VM backing store which might actually
                 * be dirty (and unsynchronized).
                 */
-               bremfree(bp);
                bp->b_flags |= (B_INVAL | B_RELBUF);
                brelse(bp);
        } else {
-               bremfree(bp);
                bp->b_flags |= (B_INVAL | B_NOCACHE | B_RELBUF);
                brelse(bp);
        }
@@ -857,13 +848,8 @@ vfsync_bp(struct buf *bp, void *data)
                 * this to support limited MNT_LAZY flushes.
                 */
                vp->v_lazyw = bp->b_loffset;
-               if ((vp->v_flag & VOBJBUF) && (bp->b_flags & B_CLUSTEROK)) {
-                       info->lazycount += vfs_bio_awrite(bp);
-               } else {
-                       info->lazycount += bp->b_bufsize;
-                       bremfree(bp);
-                       bawrite(bp);
-               }
+               bremfree(bp);
+               info->lazycount += cluster_awrite(bp);
                waitrunningbufspace();
                vm_wait_nominal();
                if (info->lazylimit && info->lazycount >= info->lazylimit)
index 6024b57..b2f7e4f 100644 (file)
@@ -420,13 +420,13 @@ void      bundirty (struct buf *);
 int    bowrite (struct buf *);
 void   brelse (struct buf *);
 void   bqrelse (struct buf *);
-int    vfs_bio_awrite (struct buf *);
+int    cluster_awrite (struct buf *);
 struct buf *getpbuf (int *);
 struct buf *getpbuf_kva (int *);
 int    inmem (struct vnode *, off_t);
 struct buf *findblk (struct vnode *, off_t, int);
 struct buf *getblk (struct vnode *, off_t, int, int, int);
-struct buf *getcacheblk (struct vnode *, off_t, int);
+struct buf *getcacheblk (struct vnode *, off_t, int, int);
 struct buf *geteblk (int);
 struct buf *getnewbuf(int, int, int, int);
 void   bqhold(struct buf *bp);
@@ -443,7 +443,6 @@ void        biodone_sync (struct bio *);
 void   cluster_append(struct bio *, struct buf *);
 int    cluster_readx (struct vnode *, off_t, off_t, int,
            size_t, size_t, struct buf **);
-int    cluster_wbuild (struct vnode *, int, off_t, int);
 void   cluster_write (struct buf *, off_t, int, int);
 int    physread (struct dev_read_args *);
 int    physwrite (struct dev_write_args *);
index 2196292..b32b115 100644 (file)
@@ -589,6 +589,7 @@ hammer_io_release(struct hammer_io *io, int flush)
                case HAMMER_STRUCTURE_UNDO_BUFFER:
                        if (io->released == 0) {
                                io->released = 1;
+                               bp->b_flags |= B_CLUSTEROK;
                                bdwrite(bp);
                        }
                        break;
@@ -743,7 +744,7 @@ hammer_io_flush(struct hammer_io *io, int reclaim)
        lwkt_gettoken(&hmp->io_token);
        TAILQ_INSERT_TAIL(&hmp->iorun_list, io, iorun_entry);
        lwkt_reltoken(&hmp->io_token);
-       bawrite(bp);
+       cluster_awrite(bp);
        hammer_io_flush_mark(io->volume);
 }
 
index 2d27510..cd17cd3 100644 (file)
@@ -433,7 +433,7 @@ skip:
                }
                bp->b_flags &= ~B_IODEBUG;
 
-               /* bp->b_flags |= B_CLUSTEROK; temporarily disabled */
+               bp->b_flags |= B_CLUSTEROK;
                n = blksize - offset;
                if (n > uio->uio_resid)
                        n = uio->uio_resid;
@@ -500,6 +500,7 @@ hammer_vop_write(struct vop_write_args *ap)
        struct uio *uio;
        int offset;
        off_t base_offset;
+       int64_t cluster_eof;
        struct buf *bp;
        int kflags;
        int error;
@@ -792,7 +793,7 @@ hammer_vop_write(struct vop_write_args *ap)
                }
                kflags |= NOTE_WRITE;
                hammer_stats_file_write += n;
-               /* bp->b_flags |= B_CLUSTEROK; temporarily disabled */
+               bp->b_flags |= B_CLUSTEROK;
                if (ip->ino_data.size < uio->uio_offset) {
                        ip->ino_data.size = uio->uio_offset;
                        flags = HAMMER_INODE_SDIRTY;
@@ -840,25 +841,22 @@ hammer_vop_write(struct vop_write_args *ap)
                 *        configure a HAMMER file as swap, or when HAMMER
                 *        is serving NFS (for commits).  Ick ick.
                 */
-               bp->b_flags |= B_AGE;
+               bp->b_flags |= B_AGE | B_CLUSTEROK;
                if (ap->a_ioflag & IO_SYNC) {
                        bwrite(bp);
                } else if ((ap->a_ioflag & IO_DIRECT) && endofblk) {
                        bawrite(bp);
                } else if (ap->a_ioflag & IO_ASYNC) {
                        bawrite(bp);
+               } else if (hammer_cluster_enable &&
+                          !(ap->a_vp->v_mount->mnt_flag & MNT_NOCLUSTERW)) {
+                       if (base_offset < HAMMER_XDEMARC)
+                               cluster_eof = hammer_blockdemarc(base_offset,
+                                                        ip->ino_data.size);
+                       else
+                               cluster_eof = ip->ino_data.size;
+                       cluster_write(bp, cluster_eof, blksize, seqcount);
                } else {
-#if 0
-               if (offset + n == blksize) {
-                       if (hammer_cluster_enable == 0 ||
-                           (ap->a_vp->v_mount->mnt_flag & MNT_NOCLUSTERW)) {
-                               bawrite(bp);
-                       } else {
-                               cluster_write(bp, ip->ino_data.size,
-                                             blksize, seqcount);
-                       }
-               } else {
-#endif
                        bdwrite(bp);
                }
        }
index e8be50c..4cbf406 100644 (file)
@@ -453,9 +453,8 @@ tmpfs_read (struct vop_read_args *ap)
                 */
                offset = (size_t)uio->uio_offset & BMASK;
                base_offset = (off_t)uio->uio_offset - offset;
-               bp = getcacheblk(vp, base_offset, BSIZE);
-               if (bp == NULL)
-               {
+               bp = getcacheblk(vp, base_offset, BSIZE, 0);
+               if (bp == NULL) {
                        lwkt_gettoken(&vp->v_mount->mnt_token);
                        error = bread(vp, base_offset, BSIZE, &bp);
                        if (error) {