Implement the full-duplex ack protocol. refurbish some of the memory
authorMatthew Dillon <dillon@dragonflybsd.org>
Wed, 6 Jul 2005 06:02:23 +0000 (06:02 +0000)
committerMatthew Dillon <dillon@dragonflybsd.org>
Wed, 6 Jul 2005 06:02:23 +0000 (06:02 +0000)
fifo statistics.

sys/kern/vfs_jops.c
sys/kern/vfs_journal.c
sys/sys/journal.h
sys/sys/mountctl.h

index bea030a..506133c 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/vfs_jops.c,v 1.16 2005/07/05 06:19:29 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_jops.c,v 1.17 2005/07/06 06:02:22 dillon Exp $
  */
 /*
  * Each mount point may have zero or more independantly configured journals
@@ -324,7 +324,7 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
        error = ENOMEM;
 
     /*
-     * Create the worker thread and generate the association record.
+     * Create the worker threads and generate the association record.
      */
     if (error) {
        free(jo, M_JOURNAL);
@@ -392,6 +392,7 @@ static int
 journal_destroy(struct mount *mp, struct journal *jo, int flags)
 {
     struct jrecord jrec;
+    int wcount;
 
     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
 
@@ -401,10 +402,17 @@ journal_destroy(struct mount *mp, struct journal *jo, int flags)
 
     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
     wakeup(&jo->fifo);
+    wcount = 0;
     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
-       tsleep(jo, 0, "jwait", 0);
+       tsleep(jo, 0, "jwait", hz);
+       if (++wcount % 10 == 0) {
+           printf("Warning: journal %s waiting for descriptors to close\n",
+               jo->id);
+       }
     }
     lwkt_free_thread(&jo->wthread); /* XXX SMP */
+    if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
+       lwkt_free_thread(&jo->rthread); /* XXX SMP */
     if (jo->fp)
        fdrop(jo->fp, curthread);
     if (jo->fifo.membase)
@@ -453,9 +461,10 @@ journal_status_vfs_journal(struct mount *mp,
        bcopy(jo->id, rstat->id, sizeof(jo->id));
        rstat->index = index;
        rstat->membufsize = jo->fifo.size;
-       rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
-       rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
+       rstat->membufused = jo->fifo.windex - jo->fifo.xindex;
+       rstat->membufunacked = jo->fifo.rindex - jo->fifo.xindex;
        rstat->bytessent = jo->total_acked;
+       rstat->fifostalls = jo->fifostalls;
        ++rstat;
        ++index;
        *res += sizeof(*rstat);
@@ -512,13 +521,14 @@ journal_wthread(void *info)
        /*
         * Skip any pad records.  We do not write out pad records if we can
         * help it. 
-        *
-        * If xindex is caught up to rindex it gets incremented along with
-        * rindex.  XXX SMP
         */
        if (rawp->streamid == JREC_STREAMID_PAD) {
-           if (jo->fifo.rindex == jo->fifo.xindex)
-               jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
+               if (jo->fifo.rindex == jo->fifo.xindex) {
+                   jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+                   jo->total_acked += (rawp->recsize + 15) & ~15;
+               }
+           }
            jo->fifo.rindex += (rawp->recsize + 15) & ~15;
            jo->total_acked += bytes;
            KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
@@ -549,14 +559,20 @@ journal_wthread(void *info)
         * For now assume blocking I/O.  Since we are record-aware the
         * code cannot yet handle partial writes.
         *
+        * We bump rindex prior to issuing the write to avoid racing
+        * the acknowledgement coming back (which could prevent the ack
+        * from bumping xindex).  Restarts are always based on xindex so
+        * we do not try to undo the rindex if an error occurs.
+        *
         * XXX EWOULDBLOCK/NBIO
         * XXX notification on failure
         * XXX permanent verses temporary failures
         * XXX two-way acknowledgement stream in the return direction / xindex
         */
        bytes = res;
+       jo->fifo.rindex += bytes;
        error = fp_write(jo->fp, 
-                       jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
+                       jo->fifo.membase + ((jo->fifo.rindex - bytes) & jo->fifo.mask),
                        bytes, &res);
        if (error) {
            printf("journal_thread(%s) write, error %d\n", jo->id, error);
@@ -570,10 +586,10 @@ journal_wthread(void *info)
         * advance xindex, otherwise the rjournal thread is responsible for
         * advancing xindex.
         */
-       jo->fifo.rindex += bytes;
-       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
+       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
            jo->fifo.xindex += bytes;
-       jo->total_acked += bytes;
+           jo->total_acked += bytes;
+       }
        KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
        if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
            if (jo->flags & MC_JOURNAL_WWAIT) {
@@ -601,7 +617,6 @@ journal_rthread(void *info)
     int error;
     int count;
     int bytes;
-    int index;
 
     transid = 0;
     error = 0;
@@ -618,13 +633,12 @@ journal_rthread(void *info)
         * stream.
         */
        if (transid == 0) {
-           for (index = 0; index < sizeof(ack); index += count) {
-               error = fp_read(jo->fp, &ack, sizeof(ack), &count);
-               if (error)
-                   break;
-               if (count == 0)
-                   tsleep(&jo->fifo.xindex, 0, "jread", hz);
-           }
+           error = fp_read(jo->fp, &ack, sizeof(ack), &count, 1);
+#if 0
+           printf("fp_read ack error %d count %d\n", error, count);
+#endif
+           if (error || count != sizeof(ack))
+               break;
            if (error) {
                printf("read error %d on receive stream\n", error);
                break;
@@ -647,14 +661,14 @@ journal_rthread(void *info)
        bytes = jo->fifo.rindex - jo->fifo.xindex;
 
        if (bytes == 0) {
-           printf("warning: unsent data acknowledged\n");
+           printf("warning: unsent data acknowledged transid %08llx\n", transid);
            tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
            transid = 0;
            continue;
        }
 
        /*
-        * Since rindex has advanceted, the record pointed to by xindex
+        * Since rindex has advanced, the record pointed to by xindex
         * must be a valid record.
         */
        rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
@@ -665,8 +679,11 @@ journal_rthread(void *info)
         * The target can acknowledge several records at once.
         */
        if (rawp->transid < transid) {
+#if 1
            printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
            jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           jo->total_acked += (rawp->recsize + 15) & ~15;
            if (jo->flags & MC_JOURNAL_WWAIT) {
                jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
                wakeup(&jo->fifo.windex);
@@ -674,8 +691,11 @@ journal_rthread(void *info)
            continue;
        }
        if (rawp->transid == transid) {
+#if 1
            printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
            jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           jo->total_acked += (rawp->recsize + 15) & ~15;
            if (jo->flags & MC_JOURNAL_WWAIT) {
                jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
                wakeup(&jo->fifo.windex);
@@ -683,7 +703,7 @@ journal_rthread(void *info)
            transid = 0;
            continue;
        }
-       printf("warning: unsent data(2) acknowledged\n");
+       printf("warning: unsent data(2) acknowledged transid %08llx\n", transid);
        transid = 0;
     }
     jo->flags &= ~MC_JOURNAL_RACTIVE;
@@ -821,6 +841,7 @@ journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
        if (avail < req) {
            /* XXX MC_JOURNAL_STOP_IMM */
            jo->flags |= MC_JOURNAL_WWAIT;
+           ++jo->fifostalls;
            tsleep(&jo->fifo.windex, 0, "jwrite", 0);
            continue;
        }
index 39ff201..90f454a 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/vfs_journal.c,v 1.16 2005/07/05 06:19:29 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_journal.c,v 1.17 2005/07/06 06:02:22 dillon Exp $
  */
 /*
  * Each mount point may have zero or more independantly configured journals
@@ -324,7 +324,7 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
        error = ENOMEM;
 
     /*
-     * Create the worker thread and generate the association record.
+     * Create the worker threads and generate the association record.
      */
     if (error) {
        free(jo, M_JOURNAL);
@@ -392,6 +392,7 @@ static int
 journal_destroy(struct mount *mp, struct journal *jo, int flags)
 {
     struct jrecord jrec;
+    int wcount;
 
     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
 
@@ -401,10 +402,17 @@ journal_destroy(struct mount *mp, struct journal *jo, int flags)
 
     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
     wakeup(&jo->fifo);
+    wcount = 0;
     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
-       tsleep(jo, 0, "jwait", 0);
+       tsleep(jo, 0, "jwait", hz);
+       if (++wcount % 10 == 0) {
+           printf("Warning: journal %s waiting for descriptors to close\n",
+               jo->id);
+       }
     }
     lwkt_free_thread(&jo->wthread); /* XXX SMP */
+    if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
+       lwkt_free_thread(&jo->rthread); /* XXX SMP */
     if (jo->fp)
        fdrop(jo->fp, curthread);
     if (jo->fifo.membase)
@@ -453,9 +461,10 @@ journal_status_vfs_journal(struct mount *mp,
        bcopy(jo->id, rstat->id, sizeof(jo->id));
        rstat->index = index;
        rstat->membufsize = jo->fifo.size;
-       rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
-       rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
+       rstat->membufused = jo->fifo.windex - jo->fifo.xindex;
+       rstat->membufunacked = jo->fifo.rindex - jo->fifo.xindex;
        rstat->bytessent = jo->total_acked;
+       rstat->fifostalls = jo->fifostalls;
        ++rstat;
        ++index;
        *res += sizeof(*rstat);
@@ -512,13 +521,14 @@ journal_wthread(void *info)
        /*
         * Skip any pad records.  We do not write out pad records if we can
         * help it. 
-        *
-        * If xindex is caught up to rindex it gets incremented along with
-        * rindex.  XXX SMP
         */
        if (rawp->streamid == JREC_STREAMID_PAD) {
-           if (jo->fifo.rindex == jo->fifo.xindex)
-               jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
+               if (jo->fifo.rindex == jo->fifo.xindex) {
+                   jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+                   jo->total_acked += (rawp->recsize + 15) & ~15;
+               }
+           }
            jo->fifo.rindex += (rawp->recsize + 15) & ~15;
            jo->total_acked += bytes;
            KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
@@ -549,14 +559,20 @@ journal_wthread(void *info)
         * For now assume blocking I/O.  Since we are record-aware the
         * code cannot yet handle partial writes.
         *
+        * We bump rindex prior to issuing the write to avoid racing
+        * the acknowledgement coming back (which could prevent the ack
+        * from bumping xindex).  Restarts are always based on xindex so
+        * we do not try to undo the rindex if an error occurs.
+        *
         * XXX EWOULDBLOCK/NBIO
         * XXX notification on failure
         * XXX permanent verses temporary failures
         * XXX two-way acknowledgement stream in the return direction / xindex
         */
        bytes = res;
+       jo->fifo.rindex += bytes;
        error = fp_write(jo->fp, 
-                       jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
+                       jo->fifo.membase + ((jo->fifo.rindex - bytes) & jo->fifo.mask),
                        bytes, &res);
        if (error) {
            printf("journal_thread(%s) write, error %d\n", jo->id, error);
@@ -570,10 +586,10 @@ journal_wthread(void *info)
         * advance xindex, otherwise the rjournal thread is responsible for
         * advancing xindex.
         */
-       jo->fifo.rindex += bytes;
-       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
+       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
            jo->fifo.xindex += bytes;
-       jo->total_acked += bytes;
+           jo->total_acked += bytes;
+       }
        KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
        if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
            if (jo->flags & MC_JOURNAL_WWAIT) {
@@ -601,7 +617,6 @@ journal_rthread(void *info)
     int error;
     int count;
     int bytes;
-    int index;
 
     transid = 0;
     error = 0;
@@ -618,13 +633,12 @@ journal_rthread(void *info)
         * stream.
         */
        if (transid == 0) {
-           for (index = 0; index < sizeof(ack); index += count) {
-               error = fp_read(jo->fp, &ack, sizeof(ack), &count);
-               if (error)
-                   break;
-               if (count == 0)
-                   tsleep(&jo->fifo.xindex, 0, "jread", hz);
-           }
+           error = fp_read(jo->fp, &ack, sizeof(ack), &count, 1);
+#if 0
+           printf("fp_read ack error %d count %d\n", error, count);
+#endif
+           if (error || count != sizeof(ack))
+               break;
            if (error) {
                printf("read error %d on receive stream\n", error);
                break;
@@ -647,14 +661,14 @@ journal_rthread(void *info)
        bytes = jo->fifo.rindex - jo->fifo.xindex;
 
        if (bytes == 0) {
-           printf("warning: unsent data acknowledged\n");
+           printf("warning: unsent data acknowledged transid %08llx\n", transid);
            tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
            transid = 0;
            continue;
        }
 
        /*
-        * Since rindex has advanceted, the record pointed to by xindex
+        * Since rindex has advanced, the record pointed to by xindex
         * must be a valid record.
         */
        rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
@@ -665,8 +679,11 @@ journal_rthread(void *info)
         * The target can acknowledge several records at once.
         */
        if (rawp->transid < transid) {
+#if 1
            printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
            jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           jo->total_acked += (rawp->recsize + 15) & ~15;
            if (jo->flags & MC_JOURNAL_WWAIT) {
                jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
                wakeup(&jo->fifo.windex);
@@ -674,8 +691,11 @@ journal_rthread(void *info)
            continue;
        }
        if (rawp->transid == transid) {
+#if 1
            printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
            jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           jo->total_acked += (rawp->recsize + 15) & ~15;
            if (jo->flags & MC_JOURNAL_WWAIT) {
                jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
                wakeup(&jo->fifo.windex);
@@ -683,7 +703,7 @@ journal_rthread(void *info)
            transid = 0;
            continue;
        }
-       printf("warning: unsent data(2) acknowledged\n");
+       printf("warning: unsent data(2) acknowledged transid %08llx\n", transid);
        transid = 0;
     }
     jo->flags &= ~MC_JOURNAL_RACTIVE;
@@ -821,6 +841,7 @@ journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
        if (avail < req) {
            /* XXX MC_JOURNAL_STOP_IMM */
            jo->flags |= MC_JOURNAL_WWAIT;
+           ++jo->fifostalls;
            tsleep(&jo->fifo.windex, 0, "jwrite", 0);
            continue;
        }
index b138530..8912424 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/sys/journal.h,v 1.4 2005/07/04 21:05:54 dillon Exp $
+ * $DragonFly: src/sys/sys/journal.h,v 1.5 2005/07/06 06:02:23 dillon Exp $
  */
 
 #ifndef _SYS_JOURNAL_H_
@@ -151,7 +151,8 @@ struct journal_ackrecord {
 #define JREC_STREAMID_PAD      (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END|0x0001)
 #define JREC_STREAMID_DISCONT  0x0002  /* discontinuity */
 #define JREC_STREAMID_ANNOTATE 0x0003  /* annotation */
-                               /* 0x0004-0x007F reserved by DragonFly */
+#define JREC_STREAMID_ACK      0x0004  /* acknowledgement */
+                               /* 0x0005-0x007F reserved by DragonFly */
                                /* 0x0080-0x00FF for third party use */
 #define JREC_STREAMID_JMIN     0x0100  /* lowest allowed general id */
 #define JREC_STREAMID_JMAX     0x2000  /* (one past the highest allowed id) */
index 47abb2b..d6dc846 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/sys/mountctl.h,v 1.7 2005/03/22 22:13:33 dillon Exp $
+ * $DragonFly: src/sys/sys/mountctl.h,v 1.8 2005/07/06 06:02:23 dillon Exp $
  */
 
 #ifndef _SYS_MOUNTCTL_H_
@@ -103,16 +103,18 @@ struct mountctl_journal_ret_status {
        int     flags;
        int64_t membufsize;
        int64_t membufused;
-       int64_t membufiopend;
+       int64_t membufunacked;
        int64_t swapbufsize;
        int64_t swapbufused;
-       int64_t swapbufiopend;
+       int64_t swapbufunacked;
        int64_t transidstart;
        int64_t transidcurrent;
-       int64_t transidiopend;
+       int64_t transidunacked;
        int64_t transidacked;
        int64_t bytessent;
        int64_t bytesacked;
+       int64_t fifostalls;
+       int64_t reserved[4];
        struct timeval lastack;
 };
 
@@ -142,6 +144,7 @@ struct journal {
        int             flags;          /* journaling flags */
        int64_t         transid;
        int64_t         total_acked;
+       int64_t         fifostalls;
        struct journal_memfifo fifo;
        struct thread   wthread;
        struct thread   rthread;