Start working on the full-duplex journaling feature, where the target can
authorMatthew Dillon <dillon@dragonflybsd.org>
Tue, 22 Mar 2005 22:13:33 +0000 (22:13 +0000)
committerMatthew Dillon <dillon@dragonflybsd.org>
Tue, 22 Mar 2005 22:13:33 +0000 (22:13 +0000)
acknowledge the sequence space to prevent information loss if a journaling
stream is interrupted.  Implement a skeleton for the receiver thread.

Delete journals associated with a mount point that is undergoing an unmount.
(reported-by: Fabian <fabian.duelli@bluewin.ch>)

sys/kern/vfs_jops.c
sys/kern/vfs_journal.c
sys/kern/vfs_syscalls.c
sys/sys/journal.h
sys/sys/mount.h
sys/sys/mountctl.h

index b747847..930e830 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/vfs_jops.c,v 1.11 2005/03/05 05:08:27 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_jops.c,v 1.12 2005/03/22 22:13:28 dillon Exp $
  */
 /*
  * Each mount point may have zero or more independantly configured journals
@@ -106,12 +106,14 @@ static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
                            const struct mountctl_install_journal *info);
 static int journal_remove_vfs_journal(struct mount *mp,
                            const struct mountctl_remove_journal *info);
+static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
 static int journal_status_vfs_journal(struct mount *mp,
                       const struct mountctl_status_journal *info,
                       struct mountctl_journal_ret_status *rstat,
                       int buflen, int *res);
-static void journal_thread(void *info);
+static void journal_wthread(void *info);
+static void journal_rthread(void *info);
 
 static void *journal_reserve(struct journal *jo, 
                            struct journal_rawrecbeg **rawpp, 
@@ -279,7 +281,8 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
 
     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
     bcopy(info->id, jo->id, sizeof(jo->id));
-    jo->flags = info->flags & ~(MC_JOURNAL_ACTIVE | MC_JOURNAL_STOP_REQ);
+    jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
+                               MC_JOURNAL_STOP_REQ);
 
     /*
      * Memory FIFO size, round to nearest power of 2
@@ -327,12 +330,19 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
        free(jo, M_JOURNAL);
     } else {
        fhold(fp);
-       jo->flags |= MC_JOURNAL_ACTIVE;
-       lwkt_create(journal_thread, jo, NULL, &jo->thread,
-                       TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id);
-       lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON);
-       lwkt_schedule(&jo->thread);
-
+       jo->flags |= MC_JOURNAL_WACTIVE;
+       lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
+                       TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
+       lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
+       lwkt_schedule(&jo->wthread);
+
+       if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
+           jo->flags |= MC_JOURNAL_RACTIVE;
+           lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
+                       TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
+           lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
+           lwkt_schedule(&jo->rthread);
+       }
        jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
        jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
        jrecord_done(&jrec, 0);
@@ -351,36 +361,56 @@ journal_remove_vfs_journal(struct mount *mp,
                           const struct mountctl_remove_journal *info)
 {
     struct journal *jo;
-    struct jrecord jrec;
     int error;
 
     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
        if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
            break;
     }
-    if (jo) {
-       error = 0;
-       TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
+    if (jo)
+       error = journal_destroy(mp, jo, info->flags);
+    else
+       error = EINVAL;
+    return (error);
+}
 
-       jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
-       jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
-       jrecord_done(&jrec, 0);
+/*
+ * Remove all journals associated with a mount point.  Usually called
+ * by the umount code.
+ */
+void
+journal_remove_all_journals(struct mount *mp, int flags)
+{
+    struct journal *jo;
 
-       jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM);
-       wakeup(&jo->fifo);
-       while (jo->flags & MC_JOURNAL_ACTIVE) {
-           tsleep(jo, 0, "jwait", 0);
-       }
-       lwkt_free_thread(&jo->thread); /* XXX SMP */
-       if (jo->fp)
-           fdrop(jo->fp, curthread);
-       if (jo->fifo.membase)
-           free(jo->fifo.membase, M_JFIFO);
-       free(jo, M_JOURNAL);
-    } else {
-       error = EINVAL;
+    while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
+       journal_destroy(mp, jo, flags);
     }
-    return (error);
+}
+
+static int
+journal_destroy(struct mount *mp, struct journal *jo, int flags)
+{
+    struct jrecord jrec;
+
+    TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
+
+    jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
+    jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
+    jrecord_done(&jrec, 0);
+
+    jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
+    wakeup(&jo->fifo);
+    while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
+       tsleep(jo, 0, "jwait", 0);
+    }
+    lwkt_free_thread(&jo->wthread); /* XXX SMP */
+    if (jo->fp)
+       fdrop(jo->fp, curthread);
+    if (jo->fifo.membase)
+       free(jo->fifo.membase, M_JFIFO);
+    free(jo, M_JOURNAL);
+    return(0);
 }
 
 static int
@@ -433,12 +463,13 @@ journal_status_vfs_journal(struct mount *mp,
     }
     return(error);
 }
+
 /*
  * The per-journal worker thread is responsible for writing out the
  * journal's FIFO to the target stream.
  */
 static void
-journal_thread(void *info)
+journal_wthread(void *info)
 {
     struct journal *jo = info;
     struct journal_rawrecbeg *rawp;
@@ -483,7 +514,7 @@ journal_thread(void *info)
         * help it. 
         *
         * If xindex is caught up to rindex it gets incremented along with
-        * rindex.  XXX
+        * rindex.  XXX SMP
         */
        if (rawp->streamid == JREC_STREAMID_PAD) {
            if (jo->fifo.rindex == jo->fifo.xindex)
@@ -535,20 +566,127 @@ journal_thread(void *info)
        }
 
        /*
-        * Advance rindex.  XXX for now also advance xindex, which will
-        * eventually be advanced only when the target acknowledges the
-        * sequence space.
+        * Advance rindex.  If the journal stream is not full duplex we also
+        * advance xindex, otherwise the rjournal thread is responsible for
+        * advancing xindex.
         */
        jo->fifo.rindex += bytes;
-       jo->fifo.xindex += bytes;
+       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
+           jo->fifo.xindex += bytes;
        jo->total_acked += bytes;
        KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
-       if (jo->flags & MC_JOURNAL_WWAIT) {
-           jo->flags &= ~MC_JOURNAL_WWAIT;     /* XXX hysteresis */
-           wakeup(&jo->fifo.windex);
+       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
+           if (jo->flags & MC_JOURNAL_WWAIT) {
+               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
+               wakeup(&jo->fifo.windex);
+           }
+       }
+    }
+    jo->flags &= ~MC_JOURNAL_WACTIVE;
+    wakeup(jo);
+    wakeup(&jo->fifo.windex);
+}
+
+/*
+ * A second per-journal worker thread is created for two-way journaling
+ * streams to deal with the return acknowledgement stream.
+ */
+static void
+journal_rthread(void *info)
+{
+    struct journal_rawrecbeg *rawp;
+    struct journal_ackrecord ack;
+    struct journal *jo = info;
+    int64_t transid;
+    int error;
+    int count;
+    int bytes;
+    int index;
+
+    transid = 0;
+    error = 0;
+
+    for (;;) {
+       /*
+        * We have been asked to stop
+        */
+       if (jo->flags & MC_JOURNAL_STOP_REQ)
+               break;
+
+       /*
+        * If we have no active transaction id, get one from the return
+        * stream.
+        */
+       if (transid == 0) {
+           for (index = 0; index < sizeof(ack); index += count) {
+               error = fp_read(jo->fp, &ack, sizeof(ack), &count);
+               if (error)
+                   break;
+               if (count == 0)
+                   tsleep(&jo->fifo.xindex, 0, "jread", hz);
+           }
+           if (error) {
+               printf("read error %d on receive stream\n", error);
+               break;
+           }
+           if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
+               ack.rend.endmagic != JREC_ENDMAGIC
+           ) {
+               printf("bad begmagic or endmagic on receive stream\n");
+               break;
+           }
+           transid = ack.rbeg.transid;
        }
+
+       /*
+        * Calculate the number of unacknowledged bytes.  If there are no
+        * unacknowledged bytes then unsent data was acknowledged, report,
+        * sleep a bit, and loop in that case.  This should not happen 
+        * normally.  The ack record is thrown away.
+        */
+       bytes = jo->fifo.rindex - jo->fifo.xindex;
+
+       if (bytes == 0) {
+           printf("warning: unsent data acknowledged\n");
+           tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
+           transid = 0;
+           continue;
+       }
+
+       /*
+        * Since rindex has advanceted, the record pointed to by xindex
+        * must be a valid record.
+        */
+       rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
+       KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
+       KKASSERT(rawp->recsize <= bytes);
+
+       /*
+        * The target can acknowledge several records at once.
+        */
+       if (rawp->transid < transid) {
+           printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+           jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           if (jo->flags & MC_JOURNAL_WWAIT) {
+               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
+               wakeup(&jo->fifo.windex);
+           }
+           continue;
+       }
+       if (rawp->transid == transid) {
+           printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+           jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           if (jo->flags & MC_JOURNAL_WWAIT) {
+               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
+               wakeup(&jo->fifo.windex);
+           }
+           transid = 0;
+           continue;
+       }
+       printf("warning: unsent data(2) acknowledged\n");
+       transid = 0;
     }
-    jo->flags &= ~MC_JOURNAL_ACTIVE;
+    jo->flags &= ~MC_JOURNAL_RACTIVE;
     wakeup(jo);
     wakeup(&jo->fifo.windex);
 }
@@ -557,10 +695,13 @@ journal_thread(void *info)
  * This builds a pad record which the journaling thread will skip over.  Pad
  * records are required when we are unable to reserve sufficient stream space
  * due to insufficient space at the end of the physical memory fifo.
+ *
+ * Even though the record is not transmitted, a normal transid must be 
+ * assigned to it so link recovery operations after a failure work properly.
  */
 static
 void
-journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
+journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
 {
     struct journal_rawrecend *rendp;
     
@@ -568,7 +709,7 @@ journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
 
     rawp->streamid = JREC_STREAMID_PAD;
     rawp->recsize = recsize;   /* must be 16-byte aligned */
-    rawp->seqno = 0;
+    rawp->transid = transid;
     /*
      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
      * allow PAD records to fit in 16 bytes.  Use cpu_mb1() to
@@ -610,7 +751,7 @@ journal_commit_wakeup(struct journal *jo)
  * specified amount of payload space.  *rawpp will be set to point to the
  * base of the new stream record and a pointer to the base of the payload
  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
- * making this call.
+ * making this call.  The raw record header will be partially initialized.
  *
  * A stream can be extended, aborted, or committed by other API calls
  * below.  This may result in a sequence of potentially disconnected
@@ -693,16 +834,20 @@ journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
         * journaling code must also be aware the reserved sections occuring
         * after this one will also not be written out even if completed
         * until this one is completed.
+        *
+        * The transaction id must accomodate real and potential pad creation.
         */
        rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
        if (req != bytes) {
-           journal_build_pad(rawp, availtoend);
+           journal_build_pad(rawp, availtoend, jo->transid);
+           ++jo->transid;
            rawp = (void *)jo->fifo.membase;
        }
        rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
        rawp->recsize = bytes;                  /* (unaligned size) */
        rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
-       rawp->seqno = 0;                        /* set by caller */
+       rawp->transid = jo->transid;
+       jo->transid += 2;
 
        /*
         * Issue a memory barrier to guarentee that the record data has been
@@ -882,7 +1027,8 @@ journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
            jo->fifo.windex -= osize - nsize;
        } else {
            /* we cannot backindex the fifo, emplace a pad in the dead space */
-           journal_build_pad((void *)((char *)rawp + nsize), osize - nsize);
+           journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
+                               rawp->transid + 1);
        }
     }
 
@@ -1352,9 +1498,9 @@ jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
 
     save = jrecord_push(jrec, JTYPE_VATTR);
     if (vat->va_type != VNON)
-       jrecord_leaf(jrec, JLEAF_UID, &vat->va_type, sizeof(vat->va_type));
+       jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
     if (vat->va_uid != VNOVAL)
-       jrecord_leaf(jrec, JLEAF_UID, &vat->va_mode, sizeof(vat->va_mode));
+       jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
     if (vat->va_nlink != VNOVAL)
        jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
     if (vat->va_uid != VNOVAL)
index 291aae3..4351d69 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/vfs_journal.c,v 1.11 2005/03/05 05:08:27 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_journal.c,v 1.12 2005/03/22 22:13:28 dillon Exp $
  */
 /*
  * Each mount point may have zero or more independantly configured journals
@@ -106,12 +106,14 @@ static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
                            const struct mountctl_install_journal *info);
 static int journal_remove_vfs_journal(struct mount *mp,
                            const struct mountctl_remove_journal *info);
+static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
 static int journal_status_vfs_journal(struct mount *mp,
                       const struct mountctl_status_journal *info,
                       struct mountctl_journal_ret_status *rstat,
                       int buflen, int *res);
-static void journal_thread(void *info);
+static void journal_wthread(void *info);
+static void journal_rthread(void *info);
 
 static void *journal_reserve(struct journal *jo, 
                            struct journal_rawrecbeg **rawpp, 
@@ -279,7 +281,8 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
 
     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
     bcopy(info->id, jo->id, sizeof(jo->id));
-    jo->flags = info->flags & ~(MC_JOURNAL_ACTIVE | MC_JOURNAL_STOP_REQ);
+    jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
+                               MC_JOURNAL_STOP_REQ);
 
     /*
      * Memory FIFO size, round to nearest power of 2
@@ -327,12 +330,19 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
        free(jo, M_JOURNAL);
     } else {
        fhold(fp);
-       jo->flags |= MC_JOURNAL_ACTIVE;
-       lwkt_create(journal_thread, jo, NULL, &jo->thread,
-                       TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id);
-       lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON);
-       lwkt_schedule(&jo->thread);
-
+       jo->flags |= MC_JOURNAL_WACTIVE;
+       lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
+                       TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
+       lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
+       lwkt_schedule(&jo->wthread);
+
+       if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
+           jo->flags |= MC_JOURNAL_RACTIVE;
+           lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
+                       TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
+           lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
+           lwkt_schedule(&jo->rthread);
+       }
        jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
        jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
        jrecord_done(&jrec, 0);
@@ -351,36 +361,56 @@ journal_remove_vfs_journal(struct mount *mp,
                           const struct mountctl_remove_journal *info)
 {
     struct journal *jo;
-    struct jrecord jrec;
     int error;
 
     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
        if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
            break;
     }
-    if (jo) {
-       error = 0;
-       TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
+    if (jo)
+       error = journal_destroy(mp, jo, info->flags);
+    else
+       error = EINVAL;
+    return (error);
+}
 
-       jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
-       jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
-       jrecord_done(&jrec, 0);
+/*
+ * Remove all journals associated with a mount point.  Usually called
+ * by the umount code.
+ */
+void
+journal_remove_all_journals(struct mount *mp, int flags)
+{
+    struct journal *jo;
 
-       jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM);
-       wakeup(&jo->fifo);
-       while (jo->flags & MC_JOURNAL_ACTIVE) {
-           tsleep(jo, 0, "jwait", 0);
-       }
-       lwkt_free_thread(&jo->thread); /* XXX SMP */
-       if (jo->fp)
-           fdrop(jo->fp, curthread);
-       if (jo->fifo.membase)
-           free(jo->fifo.membase, M_JFIFO);
-       free(jo, M_JOURNAL);
-    } else {
-       error = EINVAL;
+    while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
+       journal_destroy(mp, jo, flags);
     }
-    return (error);
+}
+
+static int
+journal_destroy(struct mount *mp, struct journal *jo, int flags)
+{
+    struct jrecord jrec;
+
+    TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
+
+    jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
+    jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
+    jrecord_done(&jrec, 0);
+
+    jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
+    wakeup(&jo->fifo);
+    while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
+       tsleep(jo, 0, "jwait", 0);
+    }
+    lwkt_free_thread(&jo->wthread); /* XXX SMP */
+    if (jo->fp)
+       fdrop(jo->fp, curthread);
+    if (jo->fifo.membase)
+       free(jo->fifo.membase, M_JFIFO);
+    free(jo, M_JOURNAL);
+    return(0);
 }
 
 static int
@@ -433,12 +463,13 @@ journal_status_vfs_journal(struct mount *mp,
     }
     return(error);
 }
+
 /*
  * The per-journal worker thread is responsible for writing out the
  * journal's FIFO to the target stream.
  */
 static void
-journal_thread(void *info)
+journal_wthread(void *info)
 {
     struct journal *jo = info;
     struct journal_rawrecbeg *rawp;
@@ -483,7 +514,7 @@ journal_thread(void *info)
         * help it. 
         *
         * If xindex is caught up to rindex it gets incremented along with
-        * rindex.  XXX
+        * rindex.  XXX SMP
         */
        if (rawp->streamid == JREC_STREAMID_PAD) {
            if (jo->fifo.rindex == jo->fifo.xindex)
@@ -535,20 +566,127 @@ journal_thread(void *info)
        }
 
        /*
-        * Advance rindex.  XXX for now also advance xindex, which will
-        * eventually be advanced only when the target acknowledges the
-        * sequence space.
+        * Advance rindex.  If the journal stream is not full duplex we also
+        * advance xindex, otherwise the rjournal thread is responsible for
+        * advancing xindex.
         */
        jo->fifo.rindex += bytes;
-       jo->fifo.xindex += bytes;
+       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
+           jo->fifo.xindex += bytes;
        jo->total_acked += bytes;
        KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
-       if (jo->flags & MC_JOURNAL_WWAIT) {
-           jo->flags &= ~MC_JOURNAL_WWAIT;     /* XXX hysteresis */
-           wakeup(&jo->fifo.windex);
+       if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
+           if (jo->flags & MC_JOURNAL_WWAIT) {
+               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
+               wakeup(&jo->fifo.windex);
+           }
+       }
+    }
+    jo->flags &= ~MC_JOURNAL_WACTIVE;
+    wakeup(jo);
+    wakeup(&jo->fifo.windex);
+}
+
+/*
+ * A second per-journal worker thread is created for two-way journaling
+ * streams to deal with the return acknowledgement stream.
+ */
+static void
+journal_rthread(void *info)
+{
+    struct journal_rawrecbeg *rawp;
+    struct journal_ackrecord ack;
+    struct journal *jo = info;
+    int64_t transid;
+    int error;
+    int count;
+    int bytes;
+    int index;
+
+    transid = 0;
+    error = 0;
+
+    for (;;) {
+       /*
+        * We have been asked to stop
+        */
+       if (jo->flags & MC_JOURNAL_STOP_REQ)
+               break;
+
+       /*
+        * If we have no active transaction id, get one from the return
+        * stream.
+        */
+       if (transid == 0) {
+           for (index = 0; index < sizeof(ack); index += count) {
+               error = fp_read(jo->fp, &ack, sizeof(ack), &count);
+               if (error)
+                   break;
+               if (count == 0)
+                   tsleep(&jo->fifo.xindex, 0, "jread", hz);
+           }
+           if (error) {
+               printf("read error %d on receive stream\n", error);
+               break;
+           }
+           if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
+               ack.rend.endmagic != JREC_ENDMAGIC
+           ) {
+               printf("bad begmagic or endmagic on receive stream\n");
+               break;
+           }
+           transid = ack.rbeg.transid;
        }
+
+       /*
+        * Calculate the number of unacknowledged bytes.  If there are no
+        * unacknowledged bytes then unsent data was acknowledged, report,
+        * sleep a bit, and loop in that case.  This should not happen 
+        * normally.  The ack record is thrown away.
+        */
+       bytes = jo->fifo.rindex - jo->fifo.xindex;
+
+       if (bytes == 0) {
+           printf("warning: unsent data acknowledged\n");
+           tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
+           transid = 0;
+           continue;
+       }
+
+       /*
+        * Since rindex has advanceted, the record pointed to by xindex
+        * must be a valid record.
+        */
+       rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
+       KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
+       KKASSERT(rawp->recsize <= bytes);
+
+       /*
+        * The target can acknowledge several records at once.
+        */
+       if (rawp->transid < transid) {
+           printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+           jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           if (jo->flags & MC_JOURNAL_WWAIT) {
+               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
+               wakeup(&jo->fifo.windex);
+           }
+           continue;
+       }
+       if (rawp->transid == transid) {
+           printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+           jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+           if (jo->flags & MC_JOURNAL_WWAIT) {
+               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
+               wakeup(&jo->fifo.windex);
+           }
+           transid = 0;
+           continue;
+       }
+       printf("warning: unsent data(2) acknowledged\n");
+       transid = 0;
     }
-    jo->flags &= ~MC_JOURNAL_ACTIVE;
+    jo->flags &= ~MC_JOURNAL_RACTIVE;
     wakeup(jo);
     wakeup(&jo->fifo.windex);
 }
@@ -557,10 +695,13 @@ journal_thread(void *info)
  * This builds a pad record which the journaling thread will skip over.  Pad
  * records are required when we are unable to reserve sufficient stream space
  * due to insufficient space at the end of the physical memory fifo.
+ *
+ * Even though the record is not transmitted, a normal transid must be 
+ * assigned to it so link recovery operations after a failure work properly.
  */
 static
 void
-journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
+journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
 {
     struct journal_rawrecend *rendp;
     
@@ -568,7 +709,7 @@ journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
 
     rawp->streamid = JREC_STREAMID_PAD;
     rawp->recsize = recsize;   /* must be 16-byte aligned */
-    rawp->seqno = 0;
+    rawp->transid = transid;
     /*
      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
      * allow PAD records to fit in 16 bytes.  Use cpu_mb1() to
@@ -610,7 +751,7 @@ journal_commit_wakeup(struct journal *jo)
  * specified amount of payload space.  *rawpp will be set to point to the
  * base of the new stream record and a pointer to the base of the payload
  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
- * making this call.
+ * making this call.  The raw record header will be partially initialized.
  *
  * A stream can be extended, aborted, or committed by other API calls
  * below.  This may result in a sequence of potentially disconnected
@@ -693,16 +834,20 @@ journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
         * journaling code must also be aware the reserved sections occuring
         * after this one will also not be written out even if completed
         * until this one is completed.
+        *
+        * The transaction id must accomodate real and potential pad creation.
         */
        rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
        if (req != bytes) {
-           journal_build_pad(rawp, availtoend);
+           journal_build_pad(rawp, availtoend, jo->transid);
+           ++jo->transid;
            rawp = (void *)jo->fifo.membase;
        }
        rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
        rawp->recsize = bytes;                  /* (unaligned size) */
        rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
-       rawp->seqno = 0;                        /* set by caller */
+       rawp->transid = jo->transid;
+       jo->transid += 2;
 
        /*
         * Issue a memory barrier to guarentee that the record data has been
@@ -882,7 +1027,8 @@ journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
            jo->fifo.windex -= osize - nsize;
        } else {
            /* we cannot backindex the fifo, emplace a pad in the dead space */
-           journal_build_pad((void *)((char *)rawp + nsize), osize - nsize);
+           journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
+                               rawp->transid + 1);
        }
     }
 
@@ -1352,9 +1498,9 @@ jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
 
     save = jrecord_push(jrec, JTYPE_VATTR);
     if (vat->va_type != VNON)
-       jrecord_leaf(jrec, JLEAF_UID, &vat->va_type, sizeof(vat->va_type));
+       jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
     if (vat->va_uid != VNOVAL)
-       jrecord_leaf(jrec, JLEAF_UID, &vat->va_mode, sizeof(vat->va_mode));
+       jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
     if (vat->va_nlink != VNOVAL)
        jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
     if (vat->va_uid != VNOVAL)
index 8b42517..c00784a 100644 (file)
@@ -37,7 +37,7 @@
  *
  *     @(#)vfs_syscalls.c      8.13 (Berkeley) 4/15/94
  * $FreeBSD: src/sys/kern/vfs_syscalls.c,v 1.151.2.18 2003/04/04 20:35:58 tegge Exp $
- * $DragonFly: src/sys/kern/vfs_syscalls.c,v 1.58 2005/02/02 21:34:18 joerg Exp $
+ * $DragonFly: src/sys/kern/vfs_syscalls.c,v 1.59 2005/03/22 22:13:28 dillon Exp $
  */
 
 #include <sys/param.h>
@@ -47,6 +47,7 @@
 #include <sys/sysent.h>
 #include <sys/malloc.h>
 #include <sys/mount.h>
+#include <sys/mountctl.h>
 #include <sys/sysproto.h>
 #include <sys/filedesc.h>
 #include <sys/kernel.h>
@@ -561,6 +562,13 @@ dounmount(struct mount *mp, int flags, struct thread *td)
                        wakeup(mp);
                return (error);
        }
+       /*
+        * Clean up any journals still associated with the mount after
+        * filesystem activity has ceased.
+        */
+       journal_remove_all_journals(mp, 
+           ((flags & MNT_FORCE) ? MC_JOURNAL_STOP_IMM : 0));
+
        TAILQ_REMOVE(&mountlist, mp, mnt_list);
 
        /*
index c8a50da..2e4d514 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/sys/journal.h,v 1.2 2005/03/05 05:08:30 dillon Exp $
+ * $DragonFly: src/sys/sys/journal.h,v 1.3 2005/03/22 22:13:32 dillon Exp $
  */
 
 #ifndef _SYS_JOURNAL_H_
@@ -107,7 +107,7 @@ struct journal_rawrecbeg {
        u_int16_t begmagic;     /* recovery scan, endianess detection */
        u_int16_t streamid;     /* start/stop bits and stream identifier */
        int32_t recsize;        /* stream data block (incls beg & end) */
-       int64_t seqno;          /* sequence number or transaction id */
+       int64_t transid;        /* sequence number or transaction id */
        /* ADDITIONAL DATA */
 };
 
@@ -117,6 +117,13 @@ struct journal_rawrecend {
        int32_t recsize;        /* same as rawrecbeg->recsize, for rev scan */
 };
 
+struct journal_ackrecord {
+       struct journal_rawrecbeg        rbeg;
+       int32_t                         filler0;
+       int32_t                         filler1;
+       struct journal_rawrecend        rend;
+};
+
 /*
  * Constants for stream record magic numbers.    The incomplete magic
  * number code is used internally by the memory FIFO reservation API
@@ -278,6 +285,7 @@ struct journal_subrecord {
 #define JLEAF_FLAGS            0x041A
 #define JLEAF_UDEV             0x041B
 #define JLEAF_FILEREV          0x041C
+#define JLEAF_VTYPE            0x041D
 
 /*
  * Low level journal data file structures
index 2a2d74a..122a543 100644 (file)
@@ -32,7 +32,7 @@
  *
  *     @(#)mount.h     8.21 (Berkeley) 5/20/95
  * $FreeBSD: src/sys/sys/mount.h,v 1.89.2.7 2003/04/04 20:35:57 tegge Exp $
- * $DragonFly: src/sys/sys/mount.h,v 1.16 2004/12/29 02:40:03 dillon Exp $
+ * $DragonFly: src/sys/sys/mount.h,v 1.17 2005/03/22 22:13:33 dillon Exp $
  */
 
 #ifndef _SYS_MOUNT_H_
@@ -518,6 +518,8 @@ int vfs_stduninit (struct vfsconf *);
 int    vfs_stdextattrctl (struct mount *mp, int cmd, const char *attrname,
                caddr_t arg, struct thread *p);
 int     journal_mountctl(struct vop_mountctl_args *ap);
+void   journal_remove_all_journals(struct mount *mp, int flags);
+
 
 #else /* !_KERNEL */
 
index 36bfa92..47abb2b 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/sys/mountctl.h,v 1.6 2005/02/28 17:40:51 dillon Exp $
+ * $DragonFly: src/sys/sys/mountctl.h,v 1.7 2005/03/22 22:13:33 dillon Exp $
  */
 
 #ifndef _SYS_MOUNTCTL_H_
@@ -70,12 +70,15 @@ struct mountctl_install_journal {
        int     unused04;
 };
 
-#define MC_JOURNAL_ACTIVE              0x00000001      /* journal is active */
+#define MC_JOURNAL_UNUSED0001          0x00000001
 #define MC_JOURNAL_STOP_REQ            0x00000002      /* stop request pend */
 #define MC_JOURNAL_STOP_IMM            0x00000004      /* STOP+trash fifo */
+#define MC_JOURNAL_WACTIVE             0x00000008      /* wthread running */
+#define MC_JOURNAL_RACTIVE             0x00000010      /* rthread running */
 #define MC_JOURNAL_WWAIT               0x00000040      /* write stall */
 #define MC_JOURNAL_WANT_AUDIT          0x00010000      /* audit trail */
 #define MC_JOURNAL_WANT_REVERSABLE     0x00020000      /* reversable stream */
+#define MC_JOURNAL_WANT_FULLDUPLEX     0x00040000      /* has ack stream */
 
 struct mountctl_remove_journal {
        char    id[JIDMAX];
@@ -140,7 +143,8 @@ struct journal {
        int64_t         transid;
        int64_t         total_acked;
        struct journal_memfifo fifo;
-       struct thread   thread;
+       struct thread   wthread;
+       struct thread   rthread;
 };
 
 /*