hammer2 - update documentation, begin working on callback I/O
[dragonfly.git] / sys / vfs / hammer2 / hammer2_io.c
index 0ce2ee3..2fe5be1 100644 (file)
@@ -41,7 +41,6 @@
  * using smaller allocations, without causing deadlocks.
  *
  */
-static void hammer2_io_callback(struct bio *bio);
 static int hammer2_io_cleanup_callback(hammer2_io_t *dio, void *arg);
 
 static int
@@ -63,21 +62,23 @@ struct hammer2_cleanupcb_info {
        int     count;
 };
 
-
 #define HAMMER2_DIO_INPROG     0x80000000
-#define HAMMER2_DIO_GOOD       0x40000000
-#define HAMMER2_DIO_WAITING    0x20000000
-#define HAMMER2_DIO_DIRTY      0x10000000
+#define HAMMER2_DIO_GOOD       0x40000000      /* buf/bio is good */
+#define HAMMER2_DIO_WAITING    0x20000000      /* iocb's queued */
+#define HAMMER2_DIO_DIRTY      0x10000000      /* flush on last drop */
 
 #define HAMMER2_DIO_MASK       0x0FFFFFFF
 
+#define HAMMER2_GETBLK_GOOD    0
+#define HAMMER2_GETBLK_QUEUED  1
+#define HAMMER2_GETBLK_OWNED   2
+
 /*
- * Acquire the requested dio, set *ownerp based on state.  If state is good
- * *ownerp is set to 0, otherwise *ownerp is set to DIO_INPROG and the
- * caller must resolve the buffer.
+ * Allocate/Locate the requested dio, reference it, issue or queue iocb.
  */
-hammer2_io_t *
-hammer2_io_getblk(hammer2_mount_t *hmp, off_t lbase, int lsize, int *ownerp)
+void
+hammer2_io_getblk(hammer2_mount_t *hmp, off_t lbase, int lsize,
+                 hammer2_iocb_t *iocb)
 {
        hammer2_io_t *dio;
        hammer2_io_t *xio;
@@ -111,6 +112,8 @@ hammer2_io_getblk(hammer2_mount_t *hmp, off_t lbase, int lsize, int *ownerp)
                dio->pbase = pbase;
                dio->psize = psize;
                dio->refs = 1;
+               spin_init(&dio->spin, "h2dio");
+               TAILQ_INIT(&dio->iocbq);
                spin_lock(&hmp->io_spin);
                xio = RB_INSERT(hammer2_io_tree, &hmp->iotree, dio);
                if (xio == NULL) {
@@ -130,87 +133,165 @@ hammer2_io_getblk(hammer2_mount_t *hmp, off_t lbase, int lsize, int *ownerp)
        /*
         * Obtain/Validate the buffer.
         */
+       iocb->dio = dio;
+
        for (;;) {
                refs = dio->refs;
                cpu_ccfence();
 
                /*
-                * Stop if the buffer is good.  Once set GOOD the flag cannot
-                * be cleared until refs drops to 0.
+                * Issue the iocb immediately if the buffer is already good.
+                * Once set GOOD cannot be cleared until refs drops to 0.
                 */
                if (refs & HAMMER2_DIO_GOOD) {
-                       *ownerp = 0;
-                       goto done;
+                       iocb->callback(iocb);
+                       break;
                }
 
                /*
-                * We need to acquire the in-progress lock on the buffer
+                * Try to own the buffer.  If we cannot we queue the iocb.
                 */
                if (refs & HAMMER2_DIO_INPROG) {
-                       tsleep_interlock(dio, 0);
+                       spin_lock(&dio->spin);
                        if (atomic_cmpset_int(&dio->refs, refs,
                                              refs | HAMMER2_DIO_WAITING)) {
-                               tsleep(dio, PINTERLOCKED, "h2dio", 0);
+                               iocb->flags |= HAMMER2_IOCB_ONQ |
+                                              HAMMER2_IOCB_INPROG;
+                               TAILQ_INSERT_TAIL(&dio->iocbq, iocb, entry);
+                               spin_unlock(&dio->spin);
+                               break;
                        }
+                       spin_unlock(&dio->spin);
                        /* retry */
                } else {
                        if (atomic_cmpset_int(&dio->refs, refs,
                                              refs | HAMMER2_DIO_INPROG)) {
+                               iocb->flags |= HAMMER2_IOCB_INPROG;
+                               iocb->callback(iocb);
                                break;
                        }
+                       /* retry */
                }
                /* retry */
        }
+       if (dio->act < 5)
+               ++dio->act;
+}
+
+/*
+ * The iocb is done.
+ */
+void
+hammer2_io_complete(hammer2_iocb_t *iocb)
+{
+       hammer2_io_t *dio = iocb->dio;
+       uint32_t orefs;
+       uint32_t nrefs;
+       uint32_t oflags;
+       uint32_t nflags;
+
+       /*
+        * If IOCB_INPROG is not set then the completion was synchronous.
+        * We can set IOCB_DONE safely without having to worry about waiters.
+        * XXX
+        */
+       if ((iocb->flags & HAMMER2_IOCB_INPROG) == 0) {
+               iocb->flags |= HAMMER2_IOCB_DONE;
+               return;
+       }
 
        /*
-        * We need to do more work before the buffer is usable
+        * bp is held for all comers, make sure the lock is not owned by
+        * a particular thread.
         */
-       *ownerp = HAMMER2_DIO_INPROG;
-done:
-       if (dio->act < 5)
-               ++dio->act;
-       return(dio);
+       if (iocb->flags & HAMMER2_IOCB_DIDBP)
+               BUF_KERNPROC(dio->bp);
+
+       /*
+        * Set the GOOD bit on completion with no error if dio->bp is
+        * not NULL.  Only applicable if INPROG was set.
+        */
+       if (dio->bp && iocb->error == 0)
+               atomic_set_int(&dio->refs, HAMMER2_DIO_GOOD);
+
+       for (;;) {
+               oflags = iocb->flags;
+               cpu_ccfence();
+               nflags = oflags;
+               nflags &= ~(HAMMER2_IOCB_DIDBP |
+                           HAMMER2_IOCB_WAKEUP |
+                           HAMMER2_IOCB_INPROG);
+               nflags |= HAMMER2_IOCB_DONE;
+
+               if (atomic_cmpset_int(&iocb->flags, oflags, nflags)) {
+                       if (oflags & HAMMER2_IOCB_WAKEUP)
+                               wakeup(iocb);
+                       /* SMP: iocb is now stale */
+                       break;
+               }
+       }
+       iocb = NULL;
+
+       /*
+        * Now finish up the dio.  If another iocb is pending chain to it,
+        * otherwise clear INPROG (and WAITING).
+        */
+       for (;;) {
+               orefs = dio->refs;
+               nrefs = orefs & ~(HAMMER2_DIO_WAITING | HAMMER2_DIO_INPROG);
+
+               if ((orefs & HAMMER2_DIO_WAITING) && TAILQ_FIRST(&dio->iocbq)) {
+                       spin_lock(&dio->spin);
+                       iocb = TAILQ_FIRST(&dio->iocbq);
+                       if (iocb) {
+                               TAILQ_REMOVE(&dio->iocbq, iocb, entry);
+                               spin_unlock(&dio->spin);
+                               iocb->callback(iocb);   /* chained */
+                               break;
+                       }
+                       spin_unlock(&dio->spin);
+                       /* retry */
+               } else if (atomic_cmpset_int(&dio->refs, orefs, nrefs)) {
+                       break;
+               } /* else retry */
+               /* retry */
+       }
+       /* SMP: dio is stale now */
 }
 
 /*
- * If part of an asynchronous I/O the asynchronous I/O is biodone()'d.
  *
- * If the caller owned INPROG then the dio will be set GOOD or not
- * depending on whether the caller disposed of dio->bp or not.
  */
-static
 void
-hammer2_io_complete(hammer2_io_t *dio, int owner)
+hammer2_iocb_wait(hammer2_iocb_t *iocb)
 {
-       int refs;
-       int good;
+       uint32_t oflags;
+       uint32_t nflags;
 
-       while (owner & HAMMER2_DIO_INPROG) {
-               refs = dio->refs;
+       for (;;) {
+               oflags = iocb->flags;
                cpu_ccfence();
-               good = dio->bp ? HAMMER2_DIO_GOOD : 0;
-               if (atomic_cmpset_int(&dio->refs, refs,
-                                     (refs & ~(HAMMER2_DIO_WAITING |
-                                               HAMMER2_DIO_INPROG)) |
-                                     good)) {
-                       if (refs & HAMMER2_DIO_WAITING)
-                               wakeup(dio);
-                       if (good)
-                               BUF_KERNPROC(dio->bp);
+               nflags = oflags | HAMMER2_IOCB_WAKEUP;
+               if (oflags & HAMMER2_IOCB_DONE)
                        break;
+               tsleep_interlock(iocb, 0);
+               if (atomic_cmpset_int(&iocb->flags, oflags, nflags)) {
+                       tsleep(iocb, PINTERLOCKED, "h2iocb", hz);
                }
-               /* retry */
        }
+
 }
 
 /*
- * Release our ref on *diop, dispose of the underlying buffer.
+ * Release our ref on *diop, dispose of the underlying buffer, and flush
+ * on last drop if it was dirty.
  */
 void
 hammer2_io_putblk(hammer2_io_t **diop)
 {
        hammer2_mount_t *hmp;
        hammer2_io_t *dio;
+       hammer2_iocb_t iocb;
        struct buf *bp;
        off_t peof;
        off_t pbase;
@@ -220,6 +301,9 @@ hammer2_io_putblk(hammer2_io_t **diop)
        dio = *diop;
        *diop = NULL;
 
+       /*
+        * Drop refs, on 1->0 transition clear flags, set INPROG.
+        */
        for (;;) {
                refs = dio->refs;
 
@@ -242,22 +326,18 @@ hammer2_io_putblk(hammer2_io_t **diop)
        }
 
        /*
-        * Locked INPROG on 1->0 transition and we cleared DIO_GOOD (which is
-        * legal only on the last ref).  This allows us to dispose of the
-        * buffer.  refs is now 0.
+        * We have set DIO_INPROG to gain control of the buffer and we have
+        * cleared DIO_GOOD to prevent other accessors from thinking it is
+        * still good.
         *
-        * The instant we call io_complete dio is a free agent again and
-        * can be ripped out from under us.  Acquisition of the dio after
-        * this point will require a shared or exclusive spinlock.
+        * We can now dispose of the buffer, and should do it before calling
+        * io_complete() in case there's a race against a new reference
+        * which causes io_complete() to chain and instantiate the bp again.
         */
-       hmp = dio->hmp;
-       bp = dio->bp;
-       dio->bp = NULL;
        pbase = dio->pbase;
        psize = dio->psize;
-       atomic_add_int(&hmp->iofree_count, 1);
-       hammer2_io_complete(dio, HAMMER2_DIO_INPROG);   /* clears INPROG */
-       dio = NULL;     /* dio stale */
+       bp = dio->bp;
+       dio->bp = NULL;
 
        if (refs & HAMMER2_DIO_GOOD) {
                KKASSERT(bp != NULL);
@@ -275,8 +355,30 @@ hammer2_io_putblk(hammer2_io_t **diop)
                } else {
                        bqrelse(bp);
                }
+       } else if (bp) {
+               if (refs & HAMMER2_DIO_DIRTY) {
+                       bdwrite(bp);
+               } else {
+                       brelse(bp);
+               }
        }
 
+       /*
+        * The instant we call io_complete dio is a free agent again and
+        * can be ripped out from under us.
+        *
+        * we can cleanup our final DIO_INPROG by simulating an iocb
+        * completion.
+        */
+       hmp = dio->hmp;                         /* extract fields */
+       atomic_add_int(&hmp->iofree_count, 1);
+       cpu_ccfence();
+
+       iocb.dio = dio;
+       iocb.flags = HAMMER2_IOCB_INPROG;
+       hammer2_io_complete(&iocb);
+       dio = NULL;                             /* dio stale */
+
        /*
         * We cache free buffers so re-use cases can use a shared lock, but
         * if too many build up we have to clean them out.
@@ -297,7 +399,7 @@ hammer2_io_putblk(hammer2_io_t **diop)
 }
 
 /*
- * Cleanup any dio's with no references which are not in-progress.
+ * Cleanup any dio's with (INPROG | refs) == 0.
  */
 static
 int
@@ -336,6 +438,9 @@ hammer2_io_cleanup(hammer2_mount_t *hmp, struct hammer2_io_tree *tree)
        }
 }
 
+/*
+ * Returns a pointer to the requested data.
+ */
 char *
 hammer2_io_data(hammer2_io_t *dio, off_t lbase)
 {
@@ -349,157 +454,188 @@ hammer2_io_data(hammer2_io_t *dio, off_t lbase)
        return(bp->b_data + off);
 }
 
+/*
+ * Helpers for hammer2_io_new*() functions
+ */
 static
-int
-_hammer2_io_new(hammer2_mount_t *hmp, off_t lbase, int lsize,
-               hammer2_io_t **diop, int dozero, int quick)
+void
+hammer2_iocb_new_callback(hammer2_iocb_t *iocb)
 {
-       hammer2_io_t *dio;
-       int owner;
-       int error;
+       hammer2_io_t *dio = iocb->dio;
+       int gbctl = (iocb->flags & HAMMER2_IOCB_QUICK) ? GETBLK_NOWAIT : 0;
 
-       dio = *diop = hammer2_io_getblk(hmp, lbase, lsize, &owner);
-       if (owner) {
-               if (lsize == dio->psize) {
-                       dio->bp = getblk(hmp->devvp,
-                                            dio->pbase, dio->psize,
-                                            (quick ? GETBLK_NOWAIT : 0),
-                                            0);
-                       if (dio->bp) {
-                               vfs_bio_clrbuf(dio->bp);
-                               if (quick) {
-                                       dio->bp->b_flags |= B_CACHE;
-                                       bqrelse(dio->bp);
+       /*
+        * If INPROG is not set the dio already has a good buffer and we
+        * can't mess with it other than zero the requested range.
+        *
+        * If INPROG is set it gets a bit messy.
+        */
+       if (iocb->flags & HAMMER2_IOCB_INPROG) {
+               if ((iocb->flags & HAMMER2_IOCB_READ) == 0) {
+                       if (iocb->lsize == dio->psize) {
+                               /*
+                                * Fully covered buffer, try to optimize to
+                                * avoid any I/O.
+                                */
+                               if (dio->bp == NULL) {
+                                       dio->bp = getblk(dio->hmp->devvp,
+                                                        dio->pbase, dio->psize,
+                                                        gbctl, 0);
+                               }
+                               if (dio->bp) {
+                                       vfs_bio_clrbuf(dio->bp);
+                                       if (iocb->flags & HAMMER2_IOCB_QUICK) {
+                                               dio->bp->b_flags |= B_CACHE;
+                                               bqrelse(dio->bp);
+                                               dio->bp = NULL;
+                                       }
+                               }
+                       } else if (iocb->flags & HAMMER2_IOCB_QUICK) {
+                               /*
+                                * Partial buffer, quick mode.  Do nothing.
+                                */
+                       } else if (dio->bp == NULL ||
+                                  (dio->bp->b_flags & B_CACHE) == 0) {
+                               /*
+                                * Partial buffer, normal mode, requires
+                                * read-before-write.  Chain the read.
+                                */
+                               if (dio->bp) {
+                                       if (dio->refs & HAMMER2_DIO_DIRTY)
+                                               bdwrite(dio->bp);
+                                       else
+                                               bqrelse(dio->bp);
                                        dio->bp = NULL;
                                }
-                       }
-                       error = 0;
-               } else if (quick) {
-                       /* do nothing */
-                       error = 0;
-               } else {
-                       error = bread(hmp->devvp, dio->pbase,
-                                     dio->psize, &dio->bp);
-               }
-               if (error) {
-                       brelse(dio->bp);
-                       dio->bp = NULL;
+                               iocb->flags |= HAMMER2_IOCB_READ;
+                               breadcb(dio->hmp->devvp,
+                                       dio->pbase, dio->psize,
+                                       hammer2_io_callback, iocb);
+                               return;
+                       } /* else buffer is good */
                }
-               hammer2_io_complete(dio, owner);
-       } else {
-               error = 0;
        }
        if (dio->bp) {
-               if (dozero)
-                       bzero(hammer2_io_data(dio, lbase), lsize);
+               if (iocb->flags & HAMMER2_IOCB_ZERO)
+                       bzero(hammer2_io_data(dio, iocb->lbase), iocb->lsize);
                atomic_set_int(&dio->refs, HAMMER2_DIO_DIRTY);
        }
-       return error;
+       hammer2_io_complete(iocb);
+}
+
+static
+int
+_hammer2_io_new(hammer2_mount_t *hmp, off_t lbase, int lsize,
+               hammer2_io_t **diop, int flags)
+{
+       hammer2_iocb_t iocb;
+       hammer2_io_t *dio;
+
+       iocb.callback = hammer2_iocb_new_callback;
+       iocb.cluster = NULL;
+       iocb.chain = NULL;
+       iocb.ptr = NULL;
+       iocb.lbase = lbase;
+       iocb.lsize = lsize;
+       iocb.flags = flags;
+       iocb.error = 0;
+       hammer2_io_getblk(hmp, lbase, lsize, &iocb);
+       if ((iocb.flags & HAMMER2_IOCB_DONE) == 0)
+               hammer2_iocb_wait(&iocb);
+       dio = *diop = iocb.dio;
+
+       return (iocb.error);
 }
 
 int
 hammer2_io_new(hammer2_mount_t *hmp, off_t lbase, int lsize,
               hammer2_io_t **diop)
 {
-       return(_hammer2_io_new(hmp, lbase, lsize, diop, 1, 0));
+       return(_hammer2_io_new(hmp, lbase, lsize, diop, HAMMER2_IOCB_ZERO));
 }
 
 int
 hammer2_io_newnz(hammer2_mount_t *hmp, off_t lbase, int lsize,
               hammer2_io_t **diop)
 {
-       return(_hammer2_io_new(hmp, lbase, lsize, diop, 0, 0));
+       return(_hammer2_io_new(hmp, lbase, lsize, diop, 0));
 }
 
 int
 hammer2_io_newq(hammer2_mount_t *hmp, off_t lbase, int lsize,
               hammer2_io_t **diop)
 {
-       return(_hammer2_io_new(hmp, lbase, lsize, diop, 0, 1));
+       return(_hammer2_io_new(hmp, lbase, lsize, diop, HAMMER2_IOCB_QUICK));
 }
 
-int
-hammer2_io_bread(hammer2_mount_t *hmp, off_t lbase, int lsize,
-               hammer2_io_t **diop)
+static
+void
+hammer2_iocb_bread_callback(hammer2_iocb_t *iocb)
 {
-       hammer2_io_t *dio;
+       hammer2_io_t *dio = iocb->dio;
        off_t peof;
-       int owner;
        int error;
 
-       dio = *diop = hammer2_io_getblk(hmp, lbase, lsize, &owner);
-       if (owner) {
+       if (iocb->flags & HAMMER2_IOCB_INPROG) {
                if (hammer2_cluster_enable) {
                        peof = (dio->pbase + HAMMER2_SEGMASK64) &
                               ~HAMMER2_SEGMASK64;
-                       error = cluster_read(hmp->devvp, peof, dio->pbase,
+                       error = cluster_read(dio->hmp->devvp, peof, dio->pbase,
                                             dio->psize,
                                             dio->psize, HAMMER2_PBUFSIZE*4,
                                             &dio->bp);
                } else {
-                       error = bread(hmp->devvp, dio->pbase,
+                       error = bread(dio->hmp->devvp, dio->pbase,
                                      dio->psize, &dio->bp);
                }
                if (error) {
                        brelse(dio->bp);
                        dio->bp = NULL;
                }
-               hammer2_io_complete(dio, owner);
-       } else {
-               error = 0;
        }
-       return error;
+       hammer2_io_complete(iocb);
 }
 
-void
-hammer2_io_breadcb(hammer2_mount_t *hmp, off_t lbase, int lsize,
-                 void (*callback)(hammer2_io_t *dio,
-                                  hammer2_cluster_t *arg_l,
-                                  hammer2_chain_t *arg_c,
-                                  void *arg_p, off_t arg_o),
-                 hammer2_cluster_t *arg_l, hammer2_chain_t *arg_c,
-                 void *arg_p, off_t arg_o)
+int
+hammer2_io_bread(hammer2_mount_t *hmp, off_t lbase, int lsize,
+               hammer2_io_t **diop)
 {
+       hammer2_iocb_t iocb;
        hammer2_io_t *dio;
-       int owner;
-       int error;
 
-       dio = hammer2_io_getblk(hmp, lbase, lsize, &owner);
-       if (owner) {
-               dio->callback = callback;
-               dio->arg_l = arg_l;
-               dio->arg_c = arg_c;
-               dio->arg_p = arg_p;
-               dio->arg_o = arg_o;
-               breadcb(hmp->devvp, dio->pbase, dio->psize,
-                       hammer2_io_callback, dio);
-       } else {
-               error = 0;
-               callback(dio, arg_l, arg_c, arg_p, arg_o);
-               hammer2_io_bqrelse(&dio);
-       }
+       iocb.callback = hammer2_iocb_bread_callback;
+       iocb.cluster = NULL;
+       iocb.chain = NULL;
+       iocb.ptr = NULL;
+       iocb.lbase = lbase;
+       iocb.lsize = lsize;
+       iocb.flags = 0;
+       iocb.error = 0;
+       hammer2_io_getblk(hmp, lbase, lsize, &iocb);
+       if ((iocb.flags & HAMMER2_IOCB_DONE) == 0)
+               hammer2_iocb_wait(&iocb);
+       dio = *diop = iocb.dio;
+
+       return (iocb.error);
 }
 
-static void
+/*
+ * System buf/bio async callback extracts the iocb and chains
+ * to the iocb callback.
+ */
+void
 hammer2_io_callback(struct bio *bio)
 {
        struct buf *dbp = bio->bio_buf;
-       hammer2_io_t *dio = bio->bio_caller_info1.ptr;
+       hammer2_iocb_t *iocb = bio->bio_caller_info1.ptr;
+       hammer2_io_t *dio;
 
+       dio = iocb->dio;
        if ((bio->bio_flags & BIO_DONE) == 0)
                bpdone(dbp, 0);
        bio->bio_flags &= ~(BIO_DONE | BIO_SYNC);
        dio->bp = bio->bio_buf;
-       KKASSERT((dio->bp->b_flags & B_ERROR) == 0); /* XXX */
-       hammer2_io_complete(dio, HAMMER2_DIO_INPROG);
-
-       /*
-        * We still have the ref and DIO_GOOD is now set so nothing else
-        * should mess with the callback fields until we release the dio.
-        */
-       dio->callback(dio, dio->arg_l, dio->arg_c, dio->arg_p, dio->arg_o);
-       hammer2_io_bqrelse(&dio);
-       /* TODO: async load meta-data and assign chain->dio */
+       iocb->callback(iocb);
 }
 
 void