Journaling layer work. Lock down the journaling data format and most
authorMatthew Dillon <dillon@dragonflybsd.org>
Thu, 30 Dec 2004 21:41:06 +0000 (21:41 +0000)
committerMatthew Dillon <dillon@dragonflybsd.org>
Thu, 30 Dec 2004 21:41:06 +0000 (21:41 +0000)
of the record-building API.

The Journaling data format consists of two layers, A logical stream
abstraction layer and a recursive subrecord layer.

The memory FIFO and worker thread only deals with the logical stream
abstraction layer.  subrecord data is broken down into logical stream
records which the worker thread then writes out to the journal.  Space
for a logical stream record is 'reserved' and then filled in by the
journaling operation.  Other threads can reserve their own space in the
memory FIFO, even if earlier reservations have not yet been committed.
The worker thread will only write out completed records and it currently
does so in sequential order, so the worker thread itself may stall
temporarily if the next reservation in the FIFO has not yet been completed.
(this will probably have to be changed in the future but for now its the
easiest solution, allowing for some parallelism without creating too big
a mess).

Each logical stream is a (typically) short-lived entity, usually
encompassing a single VFS operation, but may be made up of multiple
stream records.  The stream records contain a stream id and bits specifying
whether the record is beginning a new logical stream, in the middle
somewhere, or ending a logical stream.  Small transactions may be able
to fit in a single record in which case multiple bits may be set.
Very large transactions, for example when someone does a write(... 10MB),
are fully supported and would likely generate a large number of stream
records.  Such transactions would not necessarily stall other operations
from other processes, however, since they would be broken up into smaller
pieces for output to the journal.

The stream layer serves to multiplex individual logical streams onto
the memory FIFO and out the journaling stream descriptor.

The recursive subrecord layer identifies the transaction as well as any
other necessary data, including UNDO data if the journal is reversable.
A single transaction may contain several sub-records identifying the bits
making up the transaction (for example, a 'mkdir' transaction would need
a subrecord identifying the userid, groupid, file modes, and path).

The record formats also allow for transactional aborts, even if some of the
data has already been pushed out to the descriptor due to limited buffer
space.  And, finally, while the subrecord's header format includes a record
size field, this value may not be known for subrecords representing
recusive 'pushes' since the header may be flushed out to the journal long
before the record is completed.  This case is also fully supported.

NOTE: The memory FIFO used to ship data to the worker thread is serialized
by the BGL for the moment, but will eventually be made per-cpu to support
lockless operation under SMP.

sys/kern/vfs_jops.c
sys/kern/vfs_journal.c
sys/sys/kern_syscall.h
sys/sys/mountctl.h

index 82031f1..64243bf 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/vfs_jops.c,v 1.3 2004/12/29 02:40:02 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_jops.c,v 1.4 2004/12/30 21:41:04 dillon Exp $
  */
 /*
  * Each mount point may have zero or more independantly configured journals
@@ -56,7 +56,7 @@
  * well as to allow such filesystems to take direct advantage of the kernel's
  * journaling layer so they don't have to roll their own.
  *
- * In addition, the journaling thread will have access to much larger 
+ * In addition, the worker thread will have access to much larger 
  * spooling areas then the memory buffer is able to provide by e.g. 
  * reserving swap space, in order to absorb potentially long interruptions
  * of off-site journaling streams, and to prevent 'slow' off-site linkages
  * possible, reducing the number of gratuitous thread switches and taking
  * advantage of cpu caches through the use of shorter batched code paths
  * rather then trying to do everything in the context of the process
- * originating the filesystem op.  
+ * originating the filesystem op.  In the future the memory FIFO can be
+ * made per-cpu to remove BGL or other locking requirements.
  */
-
 #include <sys/param.h>
 #include <sys/systm.h>
 #include <sys/buf.h>
 #include <sys/conf.h>
 #include <sys/kernel.h>
+#include <sys/queue.h>
 #include <sys/lock.h>
 #include <sys/malloc.h>
 #include <sys/mount.h>
@@ -104,9 +105,33 @@ static int journal_remove_vfs_journal(struct mount *mp,
                            const struct mountctl_remove_journal *info);
 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
 static void journal_thread(void *info);
-static void journal_write_record(struct journal *jo, const char *ctl,
-                           const char *buf, int bytes);
-static void journal_write(struct journal *jo, const char *buf, int bytes);
+
+static void *journal_reserve(struct journal *jo, 
+                           struct journal_rawrecbeg **rawpp, 
+                           int16_t streamid, int bytes);
+static void *journal_extend(struct journal *jo,
+                           struct journal_rawrecbeg **rawpp,
+                           int truncbytes, int bytes, int *newstreamrecp);
+static void journal_abort(struct journal *jo, 
+                           struct journal_rawrecbeg **rawpp);
+static void journal_commit(struct journal *jo, 
+                           struct journal_rawrecbeg **rawpp, 
+                           int bytes, int closeout);
+
+static void jrecord_init(struct journal *jo, 
+                           struct jrecord *jrec, int16_t streamid);
+static struct journal_subrecord *jrecord_push(
+                           struct jrecord *jrec, int16_t rectype);
+static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
+static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
+                           int16_t rectype, int bytes);
+static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
+static void jrecord_done(struct jrecord *jrec, int abortit);
+
+static void jrecord_write_path(struct jrecord *jrec, 
+                           int16_t rectype, struct namecache *ncp);
+static void jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat);
+
 
 static int journal_nmkdir(struct vop_nmkdir_args *ap);
 
@@ -117,7 +142,7 @@ static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
     { NULL, NULL }
 };
 
-static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structure");
+static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
 
 int
@@ -199,13 +224,17 @@ journal_detach(struct mount *mp)
 }
 
 /*
- * Install a journal on a mount point
+ * Install a journal on a mount point.  Each journal has an associated worker
+ * thread which is responsible for buffering and spooling the data to the
+ * target.  A mount point may have multiple journals attached to it.  An
+ * initial start record is generated when the journal is associated.
  */
 static int
 journal_install_vfs_journal(struct mount *mp, struct file *fp, 
                            const struct mountctl_install_journal *info)
 {
     struct journal *jo;
+    struct jrecord jrec;
     int error = 0;
     int size;
 
@@ -216,7 +245,7 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
     /*
      * Memory FIFO size, round to nearest power of 2
      */
-    if (info->flags & MC_JOURNAL_MBSIZE_PROVIDED) {
+    if (info->membufsize) {
        if (info->membufsize < 65536)
            size = 65536;
        else if (info->membufsize > 128 * 1024 * 1024)
@@ -234,7 +263,7 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
      * Other parameters.  If not specified the starting transaction id
      * will be the current date.
      */
-    if (info->flags & MC_JOURNAL_TRANSID_PROVIDED) {
+    if (info->transid) {
        jo->transid = info->transid;
     } else {
        struct timespec ts;
@@ -252,6 +281,9 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
     if (jo->fifo.membase == NULL)
        error = ENOMEM;
 
+    /*
+     * Create the worker thread and generate the association record.
+     */
     if (error) {
        free(jo, M_JOURNAL);
     } else {
@@ -261,17 +293,26 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
                        TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id);
        lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON);
        lwkt_schedule(&jo->thread);
-       journal_write_record(jo, "INSTALL", NULL, 0);
 
+       jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
+       jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
+       jrecord_done(&jrec, 0);
        TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
     }
     return(error);
 }
 
+/*
+ * Disassociate a journal from a mount point and terminate its worker thread.
+ * A final termination record is written out before the file pointer is
+ * dropped.
+ */
 static int
-journal_remove_vfs_journal(struct mount *mp, const struct mountctl_remove_journal *info)
+journal_remove_vfs_journal(struct mount *mp, 
+                          const struct mountctl_remove_journal *info)
 {
     struct journal *jo;
+    struct jrecord jrec;
     int error;
 
     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
@@ -281,7 +322,11 @@ journal_remove_vfs_journal(struct mount *mp, const struct mountctl_remove_journa
     if (jo) {
        error = 0;
        TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
-       journal_write_record(jo, "REMOVE", NULL, 0); /* XXX sequencing */
+
+       jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
+       jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
+       jrecord_done(&jrec, 0);
+
        jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM);
        wakeup(&jo->fifo);
        while (jo->flags & MC_JOURNAL_ACTIVE) {
@@ -305,34 +350,95 @@ journal_resync_vfs_journal(struct mount *mp, const void *ctl)
     return(EINVAL);
 }
 
+/*
+ * The per-journal worker thread is responsible for writing out the
+ * journal's FIFO to the target stream.
+ */
 static void
 journal_thread(void *info)
 {
     struct journal *jo = info;
+    struct journal_rawrecbeg *rawp;
     int bytes;
     int error;
+    int avail;
     int res;
 
     for (;;) {
-       bytes = (jo->fifo.windex - jo->fifo.rindex) & jo->fifo.mask;
-       if (bytes == 0 || (jo->flags & MC_JOURNAL_STOP_IMM)) {
+       /*
+        * Calculate the number of bytes available to write.  This buffer
+        * area may contain reserved records so we can't just write it out
+        * without further checks.
+        */
+       bytes = jo->fifo.windex - jo->fifo.rindex;
+
+       /*
+        * sleep if no bytes are available or if an incomplete record is
+        * encountered (it needs to be filled in before we can write it
+        * out), and skip any pad records that we encounter.
+        */
+       if (bytes == 0) {
            if (jo->flags & MC_JOURNAL_STOP_REQ)
                break;
-           tsleep(&jo->fifo, 0, "jfifo", 0); /* XXX add heartbeat */
+           tsleep(&jo->fifo, 0, "jfifo", hz);
+           continue;
+       }
+       rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
+       if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
+           tsleep(&jo->fifo, 0, "jpad", hz);
+           continue;
+       }
+       if (rawp->streamid == JREC_STREAMID_PAD) {
+           jo->fifo.rindex += (rawp->recsize + 15) & ~15;
+           KKASSERT(jo->fifo.windex - jo->fifo.rindex > 0);
+           continue;
+       }
+
+       /*
+        * Figure out how much we can write out, beware the buffer wrap
+        * case.
+        */
+       res = 0;
+       avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
+       while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
+           res += (rawp->recsize + 15) & ~15;
+           if (res >= avail) {
+               KKASSERT(res == avail);
+               break;
+           }
        }
-       if (bytes > jo->fifo.size - jo->fifo.windex)
-           bytes = jo->fifo.size - jo->fifo.windex;
-       error = fp_write(jo->fp, jo->fifo.membase + jo->fifo.rindex, bytes, &res);
+
+       /*
+        * Issue the write and deal with any errors or other conditions.
+        * For now assume blocking I/O.  Since we are record-aware the
+        * code cannot yet handle partial writes.
+        *
+        * XXX EWOULDBLOCK/NBIO
+        * XXX notification on failure
+        * XXX two-way acknowledgement stream in the return direction / xindex
+        */
+       bytes = res;
+       error = fp_write(jo->fp, 
+                       jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
+                       bytes, &res);
        if (error) {
            printf("journal_thread(%s) write, error %d\n", jo->id, error);
-           jo->fifo.rindex = jo->fifo.windex; /* XXX flag out-of-sync */
+           /* XXX */
        } else {
+           KKASSERT(res == bytes);
            printf("journal_thread(%s) write %d\n", jo->id, res);
-           jo->fifo.rindex = (jo->fifo.rindex + res) & jo->fifo.mask;
-           if (jo->flags & MC_JOURNAL_WWAIT) {
-               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
-               wakeup(&jo->fifo.windex);
-           }
+       }
+
+       /*
+        * Advance rindex.  XXX for now also advance xindex, which will
+        * eventually be advanced when the target acknowledges the sequence
+        * space.
+        */
+       jo->fifo.rindex += bytes;
+       jo->fifo.xindex += bytes;
+       if (jo->flags & MC_JOURNAL_WWAIT) {
+           jo->flags &= ~MC_JOURNAL_WWAIT;     /* XXX hysteresis */
+           wakeup(&jo->fifo.windex);
        }
     }
     jo->flags &= ~MC_JOURNAL_ACTIVE;
@@ -340,48 +446,695 @@ journal_thread(void *info)
     wakeup(&jo->fifo.windex);
 }
 
-static 
-void 
-journal_write_record(struct journal *jo, const char *ctl, 
-                    const char *buf, int bytes)
+static __inline
+void
+journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
 {
-    char head[64];
-
-    if (jo->flags & MC_JOURNAL_STOP_REQ)
-       return;
-    snprintf(head, sizeof(head), "%016llx %s\n", jo->transid, ctl);
-    /* XXX locking (token) or cmpexgl space reservation */
-    ++jo->transid;     /* XXX embed nanotime, force monotonic */
-    journal_write(jo, head, strlen(head));
-    if (bytes)
-       journal_write(jo, buf, bytes);
+    struct journal_rawrecend *rendp;
+    
+    KKASSERT((recsize & 15) == 0 && recsize >= 16);
+
+    rawp->begmagic = JREC_BEGMAGIC;
+    rawp->streamid = JREC_STREAMID_PAD;
+    rawp->recsize = recsize;   /* must be 16-byte aligned */
+    rawp->seqno = 0;
+    /*
+     * WARNING, rendp may overlap rawp->seqno.  This is necessary to
+     * allow PAD records to fit in 16 bytes.  Use cpu_mb1() to
+     * hopefully cause the compiler to not make any assumptions.
+     */
+    cpu_mb1();
+    rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
+    rendp->endmagic = JREC_ENDMAGIC;
+    rendp->check = 0;
+    rendp->recsize = rawp->recsize;
 }
 
-static
+/*
+ * Wake up the worker thread if the FIFO is more then half full or if
+ * someone is waiting for space to be freed up.  Otherwise let the 
+ * heartbeat deal with it.  Being able to avoid waking up the worker
+ * is the key to the journal's cpu efficiency.
+ */
+static __inline
 void
-journal_write(struct journal *jo, const char *buf, int bytes)
+journal_commit_wakeup(struct journal *jo)
 {
     int avail;
 
-    while (bytes && (jo->flags & MC_JOURNAL_ACTIVE)) {
-       avail = (jo->fifo.windex - jo->fifo.rindex) & jo->fifo.mask;
-       avail = jo->fifo.size - avail - 1;
-       if (avail == 0) {
-           if (jo->flags & MC_JOURNAL_STOP_IMM)
-               break;
+    avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
+    KKASSERT(avail >= 0);
+    if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
+       wakeup(&jo->fifo);
+}
+
+/*
+ * Create a new BEGIN stream record with the specified streamid and the
+ * specified amount of payload space.  *rawpp will be set to point to the
+ * base of the new stream record and a pointer to the base of the payload
+ * space will be returned.  *rawpp does not need to be pre-NULLd prior to
+ * making this call.
+ *
+ * A stream can be extended, aborted, or committed by other API calls
+ * below.  This may result in a sequence of potentially disconnected
+ * stream records to be output to the journaling target.  The first record
+ * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
+ * while the last record on commit or abort will be marked JREC_STREAMCTL_END
+ * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
+ * up being the same as the first, in which case the bits are all set in
+ * the first record.
+ *
+ * The stream record is created in an incomplete state by setting the begin
+ * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
+ * flushing the fifo past our record until we have finished populating it.
+ * Other threads can reserve and operate on their own space without stalling
+ * but the stream output will stall until we have completed operations.  The
+ * memory FIFO is intended to be large enough to absorb such situations
+ * without stalling out other threads.
+ */
+static
+void *
+journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
+               int16_t streamid, int bytes)
+{
+    struct journal_rawrecbeg *rawp;
+    int avail;
+    int availtoend;
+    int req;
+
+    /*
+     * Add header and trailer overheads to the passed payload.  Note that
+     * the passed payload size need not be aligned in any way.
+     */
+    bytes += sizeof(struct journal_rawrecbeg);
+    bytes += sizeof(struct journal_rawrecend);
+
+    for (;;) {
+       /*
+        * First, check boundary conditions.  If the request would wrap around
+        * we have to skip past the ending block and return to the beginning
+        * of the FIFO's buffer.  Calculate 'req' which is the actual number
+        * of bytes being reserved, including wrap-around dead space.
+        *
+        * Note that availtoend is not truncated to avail and so cannot be
+        * used to determine whether the reservation is possible by itself.
+        * Also, since all fifo ops are 16-byte aligned, we can check
+        * the size before calculating the aligned size.
+        */
+       availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
+       if (bytes > availtoend) 
+           req = bytes + availtoend;   /* add pad to end */
+       else
+           req = bytes;
+
+       /*
+        * Next calculate the total available space and see if it is
+        * sufficient.  We cannot overwrite previously buffered data
+        * past xindex because otherwise we would not be able to restart
+        * a broken link at the target's last point of commit.
+        */
+       avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
+       KKASSERT(avail >= 0 && (avail & 15) == 0);
+
+       if (avail < req) {
+           /* XXX MC_JOURNAL_STOP_IMM */
            jo->flags |= MC_JOURNAL_WWAIT;
            tsleep(&jo->fifo.windex, 0, "jwrite", 0);
            continue;
        }
-       if (avail > jo->fifo.size - jo->fifo.windex)
-           avail = jo->fifo.size - jo->fifo.windex;
-       if (avail > bytes)
-           avail = bytes;
-       bcopy(buf, jo->fifo.membase + jo->fifo.windex, avail);
-       bytes -= avail;
-       jo->fifo.windex = (jo->fifo.windex + avail) & jo->fifo.mask;
-       tsleep(&jo->fifo, 0, "jfifo", 0);       /* XXX hysteresis */
+
+       /*
+        * Create a pad record for any dead space and create an incomplete
+        * record for the live space, then return a pointer to the
+        * contiguous buffer space that was requested.
+        *
+        * NOTE: The worker thread will not flush past an incomplete
+        * record, so the reserved space can be filled in at-will.  The
+        * journaling code must also be aware the reserved sections occuring
+        * after this one will also not be written out even if completed
+        * until this one is completed.
+        */
+       rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
+       if (req != bytes) {
+           journal_build_pad(rawp, req - bytes);
+           rawp = (void *)jo->fifo.membase;
+       }
+       rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
+       rawp->recsize = bytes;                  /* (unaligned size) */
+       rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
+       rawp->seqno = 0;                        /* set by caller */
+
+       /*
+        * Issue a memory barrier to guarentee that the record data has been
+        * properly initialized before we advance the write index and return
+        * a pointer to the reserved record.  Otherwise the worker thread
+        * could accidently run past us.
+        *
+        * Note that stream records are always 16-byte aligned.
+        */
+       cpu_mb1();
+       jo->fifo.windex += (req + 15) & ~15;
+       *rawpp = rawp;
+       return(rawp + 1);
+    }
+    /* not reached */
+    *rawpp = NULL;
+    return(NULL);
+}
+
+/*
+ * Extend a previous reservation by the specified number of payload bytes.
+ * If it is not possible to extend the existing reservation due to either
+ * another thread having reserved space after us or due to a boundary
+ * condition, the current reservation will be committed and possibly
+ * truncated and a new reservation with the specified payload size will
+ * be created. *rawpp is set to the new reservation in this case but the
+ * caller cannot depend on a comparison with the old rawp to determine if
+ * this case occurs because we could end up using the same memory FIFO
+ * offset for the new stream record.
+ *
+ * In either case this function will return a pointer to the base of the
+ * extended payload space.
+ *
+ * If a new stream block is created the caller needs to recalculate payload
+ * byte counts, if the same stream block is used the caller needs to extend
+ * its current notion of the payload byte count.
+ */
+static void *
+journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
+               int truncbytes, int bytes, int *newstreamrecp)
+{
+    struct journal_rawrecbeg *rawp;
+    int16_t streamid;
+    int availtoend;
+    int avail;
+    int osize;
+    int nsize;
+    int wbase;
+    void *rptr;
+
+    *newstreamrecp = 0;
+    rawp = *rawpp;
+    osize = (rawp->recsize + 15) & ~15;
+    nsize = (rawp->recsize + bytes + 15) & ~15;
+    wbase = (char *)rawp - jo->fifo.membase;
+
+    /*
+     * If the aligned record size does not change we can trivially extend
+     * the record.
+     */
+    if (nsize == osize) {
+       rawp->recsize += bytes;
+       return((char *)rawp + rawp->recsize - bytes);
+    }
+
+    /*
+     * If the fifo's write index hasn't been modified since we made the
+     * reservation and we do not hit any boundary conditions, we can 
+     * trivially extend the record.
+     */
+    if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
+       availtoend = jo->fifo.size - wbase;
+       avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
+       KKASSERT((availtoend & 15) == 0);
+       KKASSERT((avail & 15) == 0);
+       if (nsize <= avail && nsize <= availtoend) {
+           jo->fifo.windex += nsize - osize;
+           rawp->recsize += bytes;
+           return((char *)rawp + rawp->recsize - bytes);
+       }
+    }
+
+    /*
+     * It was not possible to extend the buffer.  Commit the current
+     * buffer and create a new one.  We manually clear the BEGIN mark that
+     * journal_reserve() creates (because this is a continuing record, not
+     * the start of a new stream).
+     */
+    streamid = rawp->streamid & JREC_STREAMID_MASK;
+    journal_commit(jo, rawpp, truncbytes, 0);
+    rptr = journal_reserve(jo, rawpp, streamid, bytes);
+    rawp = *rawpp;
+    rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
+    *newstreamrecp = 1;
+    return(rptr);
+}
+
+/*
+ * Abort a journal record.  If the transaction record represents a stream
+ * BEGIN and we can reverse the fifo's write index we can simply reverse
+ * index the entire record, as if it were never reserved in the first place.
+ *
+ * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
+ * with the payload truncated to 0 bytes.
+ */
+static void
+journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
+{
+    struct journal_rawrecbeg *rawp;
+    int osize;
+
+    rawp = *rawpp;
+    osize = (rawp->recsize + 15) & ~15;
+
+    if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
+       (jo->fifo.windex & jo->fifo.mask) == 
+        (char *)rawp - jo->fifo.membase + osize)
+    {
+       jo->fifo.windex -= osize;
+       *rawpp = NULL;
+    } else {
+       rawp->streamid |= JREC_STREAMCTL_ABORTED;
+       journal_commit(jo, rawpp, 0, 1);
+    }
+}
+
+/*
+ * Commit a journal record and potentially truncate it to the specified
+ * number of payload bytes.  If you do not want to truncate the record,
+ * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
+ * field includes header and trailer and will not be correct.  Note that
+ * passing 0 will truncate the entire data payload of the record.
+ *
+ * The logical stream is terminated by this function.
+ *
+ * If truncation occurs, and it is not possible to physically optimize the
+ * memory FIFO due to other threads having reserved space after ours,
+ * the remaining reserved space will be covered by a pad record.
+ */
+static void
+journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
+               int bytes, int closeout)
+{
+    struct journal_rawrecbeg *rawp;
+    struct journal_rawrecend *rendp;
+    int osize;
+    int nsize;
+
+    rawp = *rawpp;
+    *rawpp = NULL;
+
+    KKASSERT((char *)rawp >= jo->fifo.membase &&
+            (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
+    KKASSERT(((intptr_t)rawp & 15) == 0);
+
+    /*
+     * Truncate the record if requested.  If the FIFO write index as still
+     * at the end of our record we can optimally backindex it.  Otherwise
+     * we have to insert a pad record.
+     *
+     * We calculate osize which is the 16-byte-aligned original recsize.
+     * We calculate nsize which is the 16-byte-aligned new recsize.
+     *
+     * Due to alignment issues or in case the passed truncation bytes is
+     * the same as the original payload, windex will be equal to nindex.
+     */
+    if (bytes >= 0) {
+       KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
+       osize = (rawp->recsize + 15) & ~15;
+       rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
+                       sizeof(struct journal_rawrecend);
+       nsize = (rawp->recsize + 15) & ~15;
+       if (osize == nsize) {
+           /* do nothing */
+       } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
+           /* we are able to backindex the fifo */
+           jo->fifo.windex -= osize - nsize;
+       } else {
+           /* we cannot backindex the fifo, emplace a pad in the dead space */
+           journal_build_pad((void *)((char *)rawp + osize), osize - nsize);
+       }
     }
+
+    /*
+     * Fill in the trailer.  Note that unlike pad records, the trailer will
+     * never overlap the header.
+     */
+    rendp = (void *)((char *)rawp + 
+           ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
+    rendp->endmagic = JREC_ENDMAGIC;
+    rendp->recsize = rawp->recsize;
+    rendp->check = 0;          /* XXX check word, disabled for now */
+
+    /*
+     * Fill in begmagic last.  This will allow the worker thread to proceed.
+     * Use a memory barrier to guarentee write ordering.  Mark the stream
+     * as terminated if closeout is set.  This is the typical case.
+     */
+    if (closeout)
+       rawp->streamid |= JREC_STREAMCTL_END;
+    cpu_mb1();                 /* memory barrier */
+    rawp->begmagic = JREC_BEGMAGIC;
+
+    journal_commit_wakeup(jo);
+}
+
+/************************************************************************
+ *                     TRANSACTION SUPPORT ROUTINES                    *
+ ************************************************************************
+ *
+ * JRECORD_*() - routines to create subrecord transactions and embed them
+ *              in the logical streams managed by the journal_*() routines.
+ */
+
+static int16_t sid = JREC_STREAMID_JMIN;
+
+/*
+ * Initialize the passed jrecord structure and start a new stream transaction
+ * by reserving an initial build space in the journal's memory FIFO.
+ */
+static void
+jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
+{
+    bzero(jrec, sizeof(*jrec));
+    jrec->jo = jo;
+    if (streamid < 0) {
+       streamid = sid++;       /* XXX need to track stream ids! */
+       if (sid == JREC_STREAMID_JMAX)
+           sid = JREC_STREAMID_JMIN;
+    }
+    jrec->streamid = streamid;
+    jrec->stream_residual = JREC_DEFAULTSIZE;
+    jrec->stream_reserved = jrec->stream_residual;
+    jrec->stream_ptr = 
+       journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
+}
+
+/*
+ * Push a recursive record type.  All pushes should have matching pops.
+ * The old parent is returned and the newly pushed record becomes the
+ * new parent.  Note that the old parent's pointer may already be invalid
+ * or may become invalid if jrecord_write() had to build a new stream
+ * record, so the caller should not mess with the returned pointer in
+ * any way other then to save it.
+ */
+static 
+struct journal_subrecord *
+jrecord_push(struct jrecord *jrec, int16_t rectype)
+{
+    struct journal_subrecord *save;
+
+    save = jrec->parent;
+    jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
+    jrec->last = NULL;
+    KKASSERT(jrec->parent != NULL);
+    ++jrec->pushcount;
+    ++jrec->pushptrgood;       /* cleared on flush */
+    return(save);
+}
+
+/*
+ * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
+ * on the last record written within the subtransaction.  If the last 
+ * record written is not accessible or if the subtransaction is empty,
+ * we must write out a pad record with JMASK_LAST set before popping.
+ *
+ * When popping a subtransaction the parent record's recsize field
+ * will be properly set.  If the parent pointer is no longer valid
+ * (which can occur if the data has already been flushed out to the
+ * stream), the protocol spec allows us to leave it 0.
+ *
+ * The saved parent pointer which we restore may or may not be valid,
+ * and if not valid may or may not be NULL, depending on the value
+ * of pushptrgood.
+ */
+static void
+jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
+{
+    struct journal_subrecord *last;
+
+    KKASSERT(jrec->pushcount > 0);
+    KKASSERT(jrec->residual == 0);
+
+    /*
+     * Set JMASK_LAST on the last record we wrote at the current
+     * level.  If last is NULL we either no longer have access to the
+     * record or the subtransaction was empty and we must write out a pad
+     * record.
+     */
+    if ((last = jrec->last) == NULL) {
+       jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
+       last = jrec->last;      /* reload after possible flush */
+    } else {
+       last->rectype |= JMASK_LAST;
+    }
+
+    /*
+     * pushptrgood tells us how many levels of parent record pointers
+     * are valid.  The jrec only stores the current parent record pointer
+     * (and it is only valid if pushptrgood != 0).  The higher level parent
+     * record pointers are saved by the routines calling jrecord_push() and
+     * jrecord_pop().  These pointers may become stale and we determine
+     * that fact by tracking the count of valid parent pointers with 
+     * pushptrgood.  Pointers become invalid when their related stream
+     * record gets pushed out.
+     *
+     * [parentA]
+     *   [node X]
+     *    [parentB]
+     *      [node Y]
+     *      [node Z]
+     *    (pop B)      see NOTE B
+     * (pop A)         see NOTE A
+     *
+     * NOTE B: This pop sets LAST in node Z if the node is still accessible,
+     *         else a PAD record is appended and LAST is set in that.
+     *
+     *         This pop sets the record size in parentB if parentB is still
+     *         accessible, else the record size is left 0 (the scanner must
+     *         deal with that).
+     *
+     *         This pop sets the new 'last' record to parentB, the pointer
+     *         to which may or may not still be accessible.
+     *
+     * NOTE A: This pop sets LAST in parentB if the node is still accessible,
+     *         else a PAD record is appended and LAST is set in that.
+     *
+     *         This pop sets the record size in parentA if parentA is still
+     *         accessible, else the record size is left 0 (the scanner must
+     *         deal with that).
+     *
+     *         This pop sets the new 'last' record to parentA, the pointer
+     *         to which may or may not still be accessible.
+     *
+     * Also note that the last record in the stream transaction, which in
+     * the above example is parentA, does not currently have the LAST bit
+     * set.
+     *
+     * The current parent becomes the last record relative to the
+     * saved parent passed into us.  It's validity is based on 
+     * whether pushptrgood is non-zero prior to decrementing.  The saved
+     * parent becomes the new parent, and its validity is based on whether
+     * pushptrgood is non-zero after decrementing.
+     *
+     * The old jrec->parent may be NULL if it is no longer accessible.
+     * If pushptrgood is non-zero, however, it is guarenteed to not
+     * be NULL (since no flush occured).
+     */
+    jrec->last = jrec->parent;
+    --jrec->pushcount;
+    if (jrec->pushptrgood) {
+       KKASSERT(jrec->last != NULL && last != NULL);
+       if (--jrec->pushptrgood == 0) {
+           jrec->parent = NULL;        /* 'save' contains garbage or NULL */
+       } else {
+           KKASSERT(save != NULL);
+           jrec->parent = save;        /* 'save' must not be NULL */
+       }
+
+       /*
+        * Set the record size in the old parent.  'last' still points to
+        * the original last record in the subtransaction being popped,
+        * jrec->last points to the old parent (which became the last
+        * record relative to the new parent being popped into).
+        */
+       jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
+    } else {
+       jrec->parent = NULL;
+       KKASSERT(jrec->last == NULL);
+    }
+}
+
+/*
+ * Write a leaf record out and return a pointer to its base.  The leaf
+ * record may contain potentially megabytes of data which is supplied
+ * in jrecord_data() calls.  The exact amount must be specified in this
+ * call.
+ */
+static
+struct journal_subrecord *
+jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
+{
+    struct journal_subrecord *last;
+    int pusheditout;
+
+    /*
+     * Try to catch some obvious errors.  Nesting records must specify a
+     * size of 0, and there should be no left-overs from previous operations
+     * (such as incomplete data writeouts).
+     */
+    KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
+    KKASSERT(jrec->residual == 0);
+
+    /*
+     * Check to see if the current stream record has enough room for
+     * the new subrecord header.  If it doesn't we extend the current
+     * stream record.
+     *
+     * This may have the side effect of pushing out the current stream record
+     * and creating a new one.  We must adjust our stream tracking fields
+     * accordingly.
+     */
+    if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
+       jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
+                               jrec->stream_reserved - jrec->stream_residual,
+                               JREC_DEFAULTSIZE, &pusheditout);
+       if (pusheditout) {
+           jrec->stream_reserved = JREC_DEFAULTSIZE;
+           jrec->stream_residual = JREC_DEFAULTSIZE;
+           jrec->parent = NULL;        /* no longer accessible */
+           jrec->pushptrgood = 0;      /* restored parents in pops no good */
+       } else {
+           jrec->stream_reserved += JREC_DEFAULTSIZE;
+           jrec->stream_residual += JREC_DEFAULTSIZE;
+       }
+    }
+    last = (void *)jrec->stream_ptr;
+    last->rectype = rectype;
+    last->reserved = 0;
+    last->recsize = sizeof(struct journal_subrecord) + bytes;
+    jrec->last = last;
+    jrec->residual = bytes;            /* remaining data to be posted */
+    jrec->residual_align = -bytes & 7; /* post-data alignment required */
+    return(last);
+}
+
+/*
+ * Write out the data associated with a leaf record.  Any number of calls
+ * to this routine may be made as long as the byte count adds up to the
+ * amount originally specified in jrecord_write().
+ *
+ * The act of writing out the leaf data may result in numerous stream records
+ * being pushed out.   Callers should be aware that even the associated
+ * subrecord header may become inaccessible due to stream record pushouts.
+ */
+static void
+jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
+{
+    int pusheditout;
+    int extsize;
+
+    KKASSERT(bytes >= 0 && bytes <= jrec->residual);
+
+    /*
+     * Push out stream records as long as there is insufficient room to hold
+     * the remaining data.
+     */
+    while (jrec->stream_residual < bytes) {
+       /*
+        * Fill in any remaining space in the current stream record.
+        */
+       bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
+       buf = (const char *)buf + jrec->stream_residual;
+       bytes -= jrec->stream_residual;
+       /*jrec->stream_ptr += jrec->stream_residual;*/
+       jrec->stream_residual = 0;
+       jrec->residual -= jrec->stream_residual;
+
+       /*
+        * Try to extend the current stream record, but no more then 1/4
+        * the size of the FIFO.
+        */
+       extsize = jrec->jo->fifo.size >> 2;
+       if (extsize > bytes)
+           extsize = (bytes + 15) & ~15;
+
+       jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
+                               jrec->stream_reserved - jrec->stream_residual,
+                               extsize, &pusheditout);
+       if (pusheditout) {
+           jrec->stream_reserved = extsize;
+           jrec->stream_residual = extsize;
+           jrec->parent = NULL;        /* no longer accessible */
+           jrec->last = NULL;          /* no longer accessible */
+           jrec->pushptrgood = 0;      /* restored parents in pops no good */
+       } else {
+           jrec->stream_reserved += extsize;
+           jrec->stream_residual += extsize;
+       }
+    }
+
+    /*
+     * Push out any remaining bytes into the current stream record.
+     */
+    if (bytes) {
+       bcopy(buf, jrec->stream_ptr, bytes);
+       jrec->stream_ptr += bytes;
+       jrec->stream_residual -= bytes;
+       jrec->residual -= bytes;
+    }
+
+    /*
+     * Handle data alignment requirements for the subrecord.  Because the
+     * stream record's data space is more strictly aligned, it must already
+     * have sufficient space to hold any subrecord alignment slop.
+     */
+    if (jrec->residual == 0 && jrec->residual_align) {
+       KKASSERT(jrec->residual_align <= jrec->stream_residual);
+       bzero(jrec->stream_ptr, jrec->residual_align);
+       jrec->stream_ptr += jrec->residual_align;
+       jrec->stream_residual -= jrec->residual_align;
+       jrec->residual_align = 0;
+    }
+}
+
+/*
+ * We are finished with a transaction.  If abortit is not set then we must
+ * be at the top level with no residual subrecord data left to output.
+ * If abortit is set then we can be in any state.
+ *
+ * The stream record will be committed or aborted as specified and jrecord
+ * resources will be cleaned up.
+ */
+static void
+jrecord_done(struct jrecord *jrec, int abortit)
+{
+    KKASSERT(jrec->rawp != NULL);
+
+    if (abortit) {
+       journal_abort(jrec->jo, &jrec->rawp);
+    } else {
+       KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
+       journal_commit(jrec->jo, &jrec->rawp, 
+                       jrec->stream_reserved - jrec->stream_residual, 1);
+    }
+
+    /*
+     * jrec should not be used beyond this point without another init,
+     * but clean up some fields to ensure that we panic if it is.
+     *
+     * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
+     */
+    jrec->jo = NULL;
+    jrec->stream_ptr = NULL;
+}
+
+/************************************************************************
+ *                     LEAF RECORD SUPPORT ROUTINES                    *
+ ************************************************************************
+ *
+ * These routine create leaf subrecords representing common filesystem
+ * structures.
+ */
+
+static void
+jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
+{
+}
+
+static void
+jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
+{
 }
 
 /************************************************************************
@@ -392,10 +1145,34 @@ static
 int
 journal_nmkdir(struct vop_nmkdir_args *ap)
 {
+    struct mount *mp;
+    struct journal *jo;
+    struct jrecord jrec;
+    void *save;                /* warning, save pointers do not always remain valid */
     int error;
 
-    printf("JMKDIR %s\n", ap->a_ncp->nc_name);
     error = vop_journal_operate_ap(&ap->a_head);
+    mp = ap->a_head.a_ops->vv_mount;
+    if (error == 0) {
+       TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
+           jrecord_init(jo, &jrec, -1);
+           if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
+               save = jrecord_push(&jrec, JTYPE_UNDO);
+               /* XXX undo operations */
+               jrecord_pop(&jrec, save);
+           }
+#if 0
+           if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
+               jrecord_write_audit(&jrec);
+           }
+#endif
+           save = jrecord_push(&jrec, JTYPE_MKDIR);
+           jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
+           jrecord_write_vattr(&jrec, ap->a_vap);
+           jrecord_pop(&jrec, save);
+           jrecord_done(&jrec, 0);
+       }
+    }
     return (error);
 }
 
index 68f6101..3e86e73 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/kern/vfs_journal.c,v 1.3 2004/12/29 02:40:02 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_journal.c,v 1.4 2004/12/30 21:41:04 dillon Exp $
  */
 /*
  * Each mount point may have zero or more independantly configured journals
@@ -56,7 +56,7 @@
  * well as to allow such filesystems to take direct advantage of the kernel's
  * journaling layer so they don't have to roll their own.
  *
- * In addition, the journaling thread will have access to much larger 
+ * In addition, the worker thread will have access to much larger 
  * spooling areas then the memory buffer is able to provide by e.g. 
  * reserving swap space, in order to absorb potentially long interruptions
  * of off-site journaling streams, and to prevent 'slow' off-site linkages
  * possible, reducing the number of gratuitous thread switches and taking
  * advantage of cpu caches through the use of shorter batched code paths
  * rather then trying to do everything in the context of the process
- * originating the filesystem op.  
+ * originating the filesystem op.  In the future the memory FIFO can be
+ * made per-cpu to remove BGL or other locking requirements.
  */
-
 #include <sys/param.h>
 #include <sys/systm.h>
 #include <sys/buf.h>
 #include <sys/conf.h>
 #include <sys/kernel.h>
+#include <sys/queue.h>
 #include <sys/lock.h>
 #include <sys/malloc.h>
 #include <sys/mount.h>
@@ -104,9 +105,33 @@ static int journal_remove_vfs_journal(struct mount *mp,
                            const struct mountctl_remove_journal *info);
 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
 static void journal_thread(void *info);
-static void journal_write_record(struct journal *jo, const char *ctl,
-                           const char *buf, int bytes);
-static void journal_write(struct journal *jo, const char *buf, int bytes);
+
+static void *journal_reserve(struct journal *jo, 
+                           struct journal_rawrecbeg **rawpp, 
+                           int16_t streamid, int bytes);
+static void *journal_extend(struct journal *jo,
+                           struct journal_rawrecbeg **rawpp,
+                           int truncbytes, int bytes, int *newstreamrecp);
+static void journal_abort(struct journal *jo, 
+                           struct journal_rawrecbeg **rawpp);
+static void journal_commit(struct journal *jo, 
+                           struct journal_rawrecbeg **rawpp, 
+                           int bytes, int closeout);
+
+static void jrecord_init(struct journal *jo, 
+                           struct jrecord *jrec, int16_t streamid);
+static struct journal_subrecord *jrecord_push(
+                           struct jrecord *jrec, int16_t rectype);
+static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
+static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
+                           int16_t rectype, int bytes);
+static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
+static void jrecord_done(struct jrecord *jrec, int abortit);
+
+static void jrecord_write_path(struct jrecord *jrec, 
+                           int16_t rectype, struct namecache *ncp);
+static void jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat);
+
 
 static int journal_nmkdir(struct vop_nmkdir_args *ap);
 
@@ -117,7 +142,7 @@ static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
     { NULL, NULL }
 };
 
-static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structure");
+static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
 
 int
@@ -199,13 +224,17 @@ journal_detach(struct mount *mp)
 }
 
 /*
- * Install a journal on a mount point
+ * Install a journal on a mount point.  Each journal has an associated worker
+ * thread which is responsible for buffering and spooling the data to the
+ * target.  A mount point may have multiple journals attached to it.  An
+ * initial start record is generated when the journal is associated.
  */
 static int
 journal_install_vfs_journal(struct mount *mp, struct file *fp, 
                            const struct mountctl_install_journal *info)
 {
     struct journal *jo;
+    struct jrecord jrec;
     int error = 0;
     int size;
 
@@ -216,7 +245,7 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
     /*
      * Memory FIFO size, round to nearest power of 2
      */
-    if (info->flags & MC_JOURNAL_MBSIZE_PROVIDED) {
+    if (info->membufsize) {
        if (info->membufsize < 65536)
            size = 65536;
        else if (info->membufsize > 128 * 1024 * 1024)
@@ -234,7 +263,7 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
      * Other parameters.  If not specified the starting transaction id
      * will be the current date.
      */
-    if (info->flags & MC_JOURNAL_TRANSID_PROVIDED) {
+    if (info->transid) {
        jo->transid = info->transid;
     } else {
        struct timespec ts;
@@ -252,6 +281,9 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
     if (jo->fifo.membase == NULL)
        error = ENOMEM;
 
+    /*
+     * Create the worker thread and generate the association record.
+     */
     if (error) {
        free(jo, M_JOURNAL);
     } else {
@@ -261,17 +293,26 @@ journal_install_vfs_journal(struct mount *mp, struct file *fp,
                        TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id);
        lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON);
        lwkt_schedule(&jo->thread);
-       journal_write_record(jo, "INSTALL", NULL, 0);
 
+       jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
+       jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
+       jrecord_done(&jrec, 0);
        TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
     }
     return(error);
 }
 
+/*
+ * Disassociate a journal from a mount point and terminate its worker thread.
+ * A final termination record is written out before the file pointer is
+ * dropped.
+ */
 static int
-journal_remove_vfs_journal(struct mount *mp, const struct mountctl_remove_journal *info)
+journal_remove_vfs_journal(struct mount *mp, 
+                          const struct mountctl_remove_journal *info)
 {
     struct journal *jo;
+    struct jrecord jrec;
     int error;
 
     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
@@ -281,7 +322,11 @@ journal_remove_vfs_journal(struct mount *mp, const struct mountctl_remove_journa
     if (jo) {
        error = 0;
        TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
-       journal_write_record(jo, "REMOVE", NULL, 0); /* XXX sequencing */
+
+       jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
+       jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
+       jrecord_done(&jrec, 0);
+
        jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM);
        wakeup(&jo->fifo);
        while (jo->flags & MC_JOURNAL_ACTIVE) {
@@ -305,34 +350,95 @@ journal_resync_vfs_journal(struct mount *mp, const void *ctl)
     return(EINVAL);
 }
 
+/*
+ * The per-journal worker thread is responsible for writing out the
+ * journal's FIFO to the target stream.
+ */
 static void
 journal_thread(void *info)
 {
     struct journal *jo = info;
+    struct journal_rawrecbeg *rawp;
     int bytes;
     int error;
+    int avail;
     int res;
 
     for (;;) {
-       bytes = (jo->fifo.windex - jo->fifo.rindex) & jo->fifo.mask;
-       if (bytes == 0 || (jo->flags & MC_JOURNAL_STOP_IMM)) {
+       /*
+        * Calculate the number of bytes available to write.  This buffer
+        * area may contain reserved records so we can't just write it out
+        * without further checks.
+        */
+       bytes = jo->fifo.windex - jo->fifo.rindex;
+
+       /*
+        * sleep if no bytes are available or if an incomplete record is
+        * encountered (it needs to be filled in before we can write it
+        * out), and skip any pad records that we encounter.
+        */
+       if (bytes == 0) {
            if (jo->flags & MC_JOURNAL_STOP_REQ)
                break;
-           tsleep(&jo->fifo, 0, "jfifo", 0); /* XXX add heartbeat */
+           tsleep(&jo->fifo, 0, "jfifo", hz);
+           continue;
+       }
+       rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
+       if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
+           tsleep(&jo->fifo, 0, "jpad", hz);
+           continue;
+       }
+       if (rawp->streamid == JREC_STREAMID_PAD) {
+           jo->fifo.rindex += (rawp->recsize + 15) & ~15;
+           KKASSERT(jo->fifo.windex - jo->fifo.rindex > 0);
+           continue;
+       }
+
+       /*
+        * Figure out how much we can write out, beware the buffer wrap
+        * case.
+        */
+       res = 0;
+       avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
+       while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
+           res += (rawp->recsize + 15) & ~15;
+           if (res >= avail) {
+               KKASSERT(res == avail);
+               break;
+           }
        }
-       if (bytes > jo->fifo.size - jo->fifo.windex)
-           bytes = jo->fifo.size - jo->fifo.windex;
-       error = fp_write(jo->fp, jo->fifo.membase + jo->fifo.rindex, bytes, &res);
+
+       /*
+        * Issue the write and deal with any errors or other conditions.
+        * For now assume blocking I/O.  Since we are record-aware the
+        * code cannot yet handle partial writes.
+        *
+        * XXX EWOULDBLOCK/NBIO
+        * XXX notification on failure
+        * XXX two-way acknowledgement stream in the return direction / xindex
+        */
+       bytes = res;
+       error = fp_write(jo->fp, 
+                       jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
+                       bytes, &res);
        if (error) {
            printf("journal_thread(%s) write, error %d\n", jo->id, error);
-           jo->fifo.rindex = jo->fifo.windex; /* XXX flag out-of-sync */
+           /* XXX */
        } else {
+           KKASSERT(res == bytes);
            printf("journal_thread(%s) write %d\n", jo->id, res);
-           jo->fifo.rindex = (jo->fifo.rindex + res) & jo->fifo.mask;
-           if (jo->flags & MC_JOURNAL_WWAIT) {
-               jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
-               wakeup(&jo->fifo.windex);
-           }
+       }
+
+       /*
+        * Advance rindex.  XXX for now also advance xindex, which will
+        * eventually be advanced when the target acknowledges the sequence
+        * space.
+        */
+       jo->fifo.rindex += bytes;
+       jo->fifo.xindex += bytes;
+       if (jo->flags & MC_JOURNAL_WWAIT) {
+           jo->flags &= ~MC_JOURNAL_WWAIT;     /* XXX hysteresis */
+           wakeup(&jo->fifo.windex);
        }
     }
     jo->flags &= ~MC_JOURNAL_ACTIVE;
@@ -340,48 +446,695 @@ journal_thread(void *info)
     wakeup(&jo->fifo.windex);
 }
 
-static 
-void 
-journal_write_record(struct journal *jo, const char *ctl, 
-                    const char *buf, int bytes)
+static __inline
+void
+journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
 {
-    char head[64];
-
-    if (jo->flags & MC_JOURNAL_STOP_REQ)
-       return;
-    snprintf(head, sizeof(head), "%016llx %s\n", jo->transid, ctl);
-    /* XXX locking (token) or cmpexgl space reservation */
-    ++jo->transid;     /* XXX embed nanotime, force monotonic */
-    journal_write(jo, head, strlen(head));
-    if (bytes)
-       journal_write(jo, buf, bytes);
+    struct journal_rawrecend *rendp;
+    
+    KKASSERT((recsize & 15) == 0 && recsize >= 16);
+
+    rawp->begmagic = JREC_BEGMAGIC;
+    rawp->streamid = JREC_STREAMID_PAD;
+    rawp->recsize = recsize;   /* must be 16-byte aligned */
+    rawp->seqno = 0;
+    /*
+     * WARNING, rendp may overlap rawp->seqno.  This is necessary to
+     * allow PAD records to fit in 16 bytes.  Use cpu_mb1() to
+     * hopefully cause the compiler to not make any assumptions.
+     */
+    cpu_mb1();
+    rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
+    rendp->endmagic = JREC_ENDMAGIC;
+    rendp->check = 0;
+    rendp->recsize = rawp->recsize;
 }
 
-static
+/*
+ * Wake up the worker thread if the FIFO is more then half full or if
+ * someone is waiting for space to be freed up.  Otherwise let the 
+ * heartbeat deal with it.  Being able to avoid waking up the worker
+ * is the key to the journal's cpu efficiency.
+ */
+static __inline
 void
-journal_write(struct journal *jo, const char *buf, int bytes)
+journal_commit_wakeup(struct journal *jo)
 {
     int avail;
 
-    while (bytes && (jo->flags & MC_JOURNAL_ACTIVE)) {
-       avail = (jo->fifo.windex - jo->fifo.rindex) & jo->fifo.mask;
-       avail = jo->fifo.size - avail - 1;
-       if (avail == 0) {
-           if (jo->flags & MC_JOURNAL_STOP_IMM)
-               break;
+    avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
+    KKASSERT(avail >= 0);
+    if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
+       wakeup(&jo->fifo);
+}
+
+/*
+ * Create a new BEGIN stream record with the specified streamid and the
+ * specified amount of payload space.  *rawpp will be set to point to the
+ * base of the new stream record and a pointer to the base of the payload
+ * space will be returned.  *rawpp does not need to be pre-NULLd prior to
+ * making this call.
+ *
+ * A stream can be extended, aborted, or committed by other API calls
+ * below.  This may result in a sequence of potentially disconnected
+ * stream records to be output to the journaling target.  The first record
+ * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
+ * while the last record on commit or abort will be marked JREC_STREAMCTL_END
+ * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
+ * up being the same as the first, in which case the bits are all set in
+ * the first record.
+ *
+ * The stream record is created in an incomplete state by setting the begin
+ * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
+ * flushing the fifo past our record until we have finished populating it.
+ * Other threads can reserve and operate on their own space without stalling
+ * but the stream output will stall until we have completed operations.  The
+ * memory FIFO is intended to be large enough to absorb such situations
+ * without stalling out other threads.
+ */
+static
+void *
+journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
+               int16_t streamid, int bytes)
+{
+    struct journal_rawrecbeg *rawp;
+    int avail;
+    int availtoend;
+    int req;
+
+    /*
+     * Add header and trailer overheads to the passed payload.  Note that
+     * the passed payload size need not be aligned in any way.
+     */
+    bytes += sizeof(struct journal_rawrecbeg);
+    bytes += sizeof(struct journal_rawrecend);
+
+    for (;;) {
+       /*
+        * First, check boundary conditions.  If the request would wrap around
+        * we have to skip past the ending block and return to the beginning
+        * of the FIFO's buffer.  Calculate 'req' which is the actual number
+        * of bytes being reserved, including wrap-around dead space.
+        *
+        * Note that availtoend is not truncated to avail and so cannot be
+        * used to determine whether the reservation is possible by itself.
+        * Also, since all fifo ops are 16-byte aligned, we can check
+        * the size before calculating the aligned size.
+        */
+       availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
+       if (bytes > availtoend) 
+           req = bytes + availtoend;   /* add pad to end */
+       else
+           req = bytes;
+
+       /*
+        * Next calculate the total available space and see if it is
+        * sufficient.  We cannot overwrite previously buffered data
+        * past xindex because otherwise we would not be able to restart
+        * a broken link at the target's last point of commit.
+        */
+       avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
+       KKASSERT(avail >= 0 && (avail & 15) == 0);
+
+       if (avail < req) {
+           /* XXX MC_JOURNAL_STOP_IMM */
            jo->flags |= MC_JOURNAL_WWAIT;
            tsleep(&jo->fifo.windex, 0, "jwrite", 0);
            continue;
        }
-       if (avail > jo->fifo.size - jo->fifo.windex)
-           avail = jo->fifo.size - jo->fifo.windex;
-       if (avail > bytes)
-           avail = bytes;
-       bcopy(buf, jo->fifo.membase + jo->fifo.windex, avail);
-       bytes -= avail;
-       jo->fifo.windex = (jo->fifo.windex + avail) & jo->fifo.mask;
-       tsleep(&jo->fifo, 0, "jfifo", 0);       /* XXX hysteresis */
+
+       /*
+        * Create a pad record for any dead space and create an incomplete
+        * record for the live space, then return a pointer to the
+        * contiguous buffer space that was requested.
+        *
+        * NOTE: The worker thread will not flush past an incomplete
+        * record, so the reserved space can be filled in at-will.  The
+        * journaling code must also be aware the reserved sections occuring
+        * after this one will also not be written out even if completed
+        * until this one is completed.
+        */
+       rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
+       if (req != bytes) {
+           journal_build_pad(rawp, req - bytes);
+           rawp = (void *)jo->fifo.membase;
+       }
+       rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
+       rawp->recsize = bytes;                  /* (unaligned size) */
+       rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
+       rawp->seqno = 0;                        /* set by caller */
+
+       /*
+        * Issue a memory barrier to guarentee that the record data has been
+        * properly initialized before we advance the write index and return
+        * a pointer to the reserved record.  Otherwise the worker thread
+        * could accidently run past us.
+        *
+        * Note that stream records are always 16-byte aligned.
+        */
+       cpu_mb1();
+       jo->fifo.windex += (req + 15) & ~15;
+       *rawpp = rawp;
+       return(rawp + 1);
+    }
+    /* not reached */
+    *rawpp = NULL;
+    return(NULL);
+}
+
+/*
+ * Extend a previous reservation by the specified number of payload bytes.
+ * If it is not possible to extend the existing reservation due to either
+ * another thread having reserved space after us or due to a boundary
+ * condition, the current reservation will be committed and possibly
+ * truncated and a new reservation with the specified payload size will
+ * be created. *rawpp is set to the new reservation in this case but the
+ * caller cannot depend on a comparison with the old rawp to determine if
+ * this case occurs because we could end up using the same memory FIFO
+ * offset for the new stream record.
+ *
+ * In either case this function will return a pointer to the base of the
+ * extended payload space.
+ *
+ * If a new stream block is created the caller needs to recalculate payload
+ * byte counts, if the same stream block is used the caller needs to extend
+ * its current notion of the payload byte count.
+ */
+static void *
+journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
+               int truncbytes, int bytes, int *newstreamrecp)
+{
+    struct journal_rawrecbeg *rawp;
+    int16_t streamid;
+    int availtoend;
+    int avail;
+    int osize;
+    int nsize;
+    int wbase;
+    void *rptr;
+
+    *newstreamrecp = 0;
+    rawp = *rawpp;
+    osize = (rawp->recsize + 15) & ~15;
+    nsize = (rawp->recsize + bytes + 15) & ~15;
+    wbase = (char *)rawp - jo->fifo.membase;
+
+    /*
+     * If the aligned record size does not change we can trivially extend
+     * the record.
+     */
+    if (nsize == osize) {
+       rawp->recsize += bytes;
+       return((char *)rawp + rawp->recsize - bytes);
+    }
+
+    /*
+     * If the fifo's write index hasn't been modified since we made the
+     * reservation and we do not hit any boundary conditions, we can 
+     * trivially extend the record.
+     */
+    if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
+       availtoend = jo->fifo.size - wbase;
+       avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
+       KKASSERT((availtoend & 15) == 0);
+       KKASSERT((avail & 15) == 0);
+       if (nsize <= avail && nsize <= availtoend) {
+           jo->fifo.windex += nsize - osize;
+           rawp->recsize += bytes;
+           return((char *)rawp + rawp->recsize - bytes);
+       }
+    }
+
+    /*
+     * It was not possible to extend the buffer.  Commit the current
+     * buffer and create a new one.  We manually clear the BEGIN mark that
+     * journal_reserve() creates (because this is a continuing record, not
+     * the start of a new stream).
+     */
+    streamid = rawp->streamid & JREC_STREAMID_MASK;
+    journal_commit(jo, rawpp, truncbytes, 0);
+    rptr = journal_reserve(jo, rawpp, streamid, bytes);
+    rawp = *rawpp;
+    rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
+    *newstreamrecp = 1;
+    return(rptr);
+}
+
+/*
+ * Abort a journal record.  If the transaction record represents a stream
+ * BEGIN and we can reverse the fifo's write index we can simply reverse
+ * index the entire record, as if it were never reserved in the first place.
+ *
+ * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
+ * with the payload truncated to 0 bytes.
+ */
+static void
+journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
+{
+    struct journal_rawrecbeg *rawp;
+    int osize;
+
+    rawp = *rawpp;
+    osize = (rawp->recsize + 15) & ~15;
+
+    if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
+       (jo->fifo.windex & jo->fifo.mask) == 
+        (char *)rawp - jo->fifo.membase + osize)
+    {
+       jo->fifo.windex -= osize;
+       *rawpp = NULL;
+    } else {
+       rawp->streamid |= JREC_STREAMCTL_ABORTED;
+       journal_commit(jo, rawpp, 0, 1);
+    }
+}
+
+/*
+ * Commit a journal record and potentially truncate it to the specified
+ * number of payload bytes.  If you do not want to truncate the record,
+ * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
+ * field includes header and trailer and will not be correct.  Note that
+ * passing 0 will truncate the entire data payload of the record.
+ *
+ * The logical stream is terminated by this function.
+ *
+ * If truncation occurs, and it is not possible to physically optimize the
+ * memory FIFO due to other threads having reserved space after ours,
+ * the remaining reserved space will be covered by a pad record.
+ */
+static void
+journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
+               int bytes, int closeout)
+{
+    struct journal_rawrecbeg *rawp;
+    struct journal_rawrecend *rendp;
+    int osize;
+    int nsize;
+
+    rawp = *rawpp;
+    *rawpp = NULL;
+
+    KKASSERT((char *)rawp >= jo->fifo.membase &&
+            (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
+    KKASSERT(((intptr_t)rawp & 15) == 0);
+
+    /*
+     * Truncate the record if requested.  If the FIFO write index as still
+     * at the end of our record we can optimally backindex it.  Otherwise
+     * we have to insert a pad record.
+     *
+     * We calculate osize which is the 16-byte-aligned original recsize.
+     * We calculate nsize which is the 16-byte-aligned new recsize.
+     *
+     * Due to alignment issues or in case the passed truncation bytes is
+     * the same as the original payload, windex will be equal to nindex.
+     */
+    if (bytes >= 0) {
+       KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
+       osize = (rawp->recsize + 15) & ~15;
+       rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
+                       sizeof(struct journal_rawrecend);
+       nsize = (rawp->recsize + 15) & ~15;
+       if (osize == nsize) {
+           /* do nothing */
+       } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
+           /* we are able to backindex the fifo */
+           jo->fifo.windex -= osize - nsize;
+       } else {
+           /* we cannot backindex the fifo, emplace a pad in the dead space */
+           journal_build_pad((void *)((char *)rawp + osize), osize - nsize);
+       }
     }
+
+    /*
+     * Fill in the trailer.  Note that unlike pad records, the trailer will
+     * never overlap the header.
+     */
+    rendp = (void *)((char *)rawp + 
+           ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
+    rendp->endmagic = JREC_ENDMAGIC;
+    rendp->recsize = rawp->recsize;
+    rendp->check = 0;          /* XXX check word, disabled for now */
+
+    /*
+     * Fill in begmagic last.  This will allow the worker thread to proceed.
+     * Use a memory barrier to guarentee write ordering.  Mark the stream
+     * as terminated if closeout is set.  This is the typical case.
+     */
+    if (closeout)
+       rawp->streamid |= JREC_STREAMCTL_END;
+    cpu_mb1();                 /* memory barrier */
+    rawp->begmagic = JREC_BEGMAGIC;
+
+    journal_commit_wakeup(jo);
+}
+
+/************************************************************************
+ *                     TRANSACTION SUPPORT ROUTINES                    *
+ ************************************************************************
+ *
+ * JRECORD_*() - routines to create subrecord transactions and embed them
+ *              in the logical streams managed by the journal_*() routines.
+ */
+
+static int16_t sid = JREC_STREAMID_JMIN;
+
+/*
+ * Initialize the passed jrecord structure and start a new stream transaction
+ * by reserving an initial build space in the journal's memory FIFO.
+ */
+static void
+jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
+{
+    bzero(jrec, sizeof(*jrec));
+    jrec->jo = jo;
+    if (streamid < 0) {
+       streamid = sid++;       /* XXX need to track stream ids! */
+       if (sid == JREC_STREAMID_JMAX)
+           sid = JREC_STREAMID_JMIN;
+    }
+    jrec->streamid = streamid;
+    jrec->stream_residual = JREC_DEFAULTSIZE;
+    jrec->stream_reserved = jrec->stream_residual;
+    jrec->stream_ptr = 
+       journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
+}
+
+/*
+ * Push a recursive record type.  All pushes should have matching pops.
+ * The old parent is returned and the newly pushed record becomes the
+ * new parent.  Note that the old parent's pointer may already be invalid
+ * or may become invalid if jrecord_write() had to build a new stream
+ * record, so the caller should not mess with the returned pointer in
+ * any way other then to save it.
+ */
+static 
+struct journal_subrecord *
+jrecord_push(struct jrecord *jrec, int16_t rectype)
+{
+    struct journal_subrecord *save;
+
+    save = jrec->parent;
+    jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
+    jrec->last = NULL;
+    KKASSERT(jrec->parent != NULL);
+    ++jrec->pushcount;
+    ++jrec->pushptrgood;       /* cleared on flush */
+    return(save);
+}
+
+/*
+ * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
+ * on the last record written within the subtransaction.  If the last 
+ * record written is not accessible or if the subtransaction is empty,
+ * we must write out a pad record with JMASK_LAST set before popping.
+ *
+ * When popping a subtransaction the parent record's recsize field
+ * will be properly set.  If the parent pointer is no longer valid
+ * (which can occur if the data has already been flushed out to the
+ * stream), the protocol spec allows us to leave it 0.
+ *
+ * The saved parent pointer which we restore may or may not be valid,
+ * and if not valid may or may not be NULL, depending on the value
+ * of pushptrgood.
+ */
+static void
+jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
+{
+    struct journal_subrecord *last;
+
+    KKASSERT(jrec->pushcount > 0);
+    KKASSERT(jrec->residual == 0);
+
+    /*
+     * Set JMASK_LAST on the last record we wrote at the current
+     * level.  If last is NULL we either no longer have access to the
+     * record or the subtransaction was empty and we must write out a pad
+     * record.
+     */
+    if ((last = jrec->last) == NULL) {
+       jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
+       last = jrec->last;      /* reload after possible flush */
+    } else {
+       last->rectype |= JMASK_LAST;
+    }
+
+    /*
+     * pushptrgood tells us how many levels of parent record pointers
+     * are valid.  The jrec only stores the current parent record pointer
+     * (and it is only valid if pushptrgood != 0).  The higher level parent
+     * record pointers are saved by the routines calling jrecord_push() and
+     * jrecord_pop().  These pointers may become stale and we determine
+     * that fact by tracking the count of valid parent pointers with 
+     * pushptrgood.  Pointers become invalid when their related stream
+     * record gets pushed out.
+     *
+     * [parentA]
+     *   [node X]
+     *    [parentB]
+     *      [node Y]
+     *      [node Z]
+     *    (pop B)      see NOTE B
+     * (pop A)         see NOTE A
+     *
+     * NOTE B: This pop sets LAST in node Z if the node is still accessible,
+     *         else a PAD record is appended and LAST is set in that.
+     *
+     *         This pop sets the record size in parentB if parentB is still
+     *         accessible, else the record size is left 0 (the scanner must
+     *         deal with that).
+     *
+     *         This pop sets the new 'last' record to parentB, the pointer
+     *         to which may or may not still be accessible.
+     *
+     * NOTE A: This pop sets LAST in parentB if the node is still accessible,
+     *         else a PAD record is appended and LAST is set in that.
+     *
+     *         This pop sets the record size in parentA if parentA is still
+     *         accessible, else the record size is left 0 (the scanner must
+     *         deal with that).
+     *
+     *         This pop sets the new 'last' record to parentA, the pointer
+     *         to which may or may not still be accessible.
+     *
+     * Also note that the last record in the stream transaction, which in
+     * the above example is parentA, does not currently have the LAST bit
+     * set.
+     *
+     * The current parent becomes the last record relative to the
+     * saved parent passed into us.  It's validity is based on 
+     * whether pushptrgood is non-zero prior to decrementing.  The saved
+     * parent becomes the new parent, and its validity is based on whether
+     * pushptrgood is non-zero after decrementing.
+     *
+     * The old jrec->parent may be NULL if it is no longer accessible.
+     * If pushptrgood is non-zero, however, it is guarenteed to not
+     * be NULL (since no flush occured).
+     */
+    jrec->last = jrec->parent;
+    --jrec->pushcount;
+    if (jrec->pushptrgood) {
+       KKASSERT(jrec->last != NULL && last != NULL);
+       if (--jrec->pushptrgood == 0) {
+           jrec->parent = NULL;        /* 'save' contains garbage or NULL */
+       } else {
+           KKASSERT(save != NULL);
+           jrec->parent = save;        /* 'save' must not be NULL */
+       }
+
+       /*
+        * Set the record size in the old parent.  'last' still points to
+        * the original last record in the subtransaction being popped,
+        * jrec->last points to the old parent (which became the last
+        * record relative to the new parent being popped into).
+        */
+       jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
+    } else {
+       jrec->parent = NULL;
+       KKASSERT(jrec->last == NULL);
+    }
+}
+
+/*
+ * Write a leaf record out and return a pointer to its base.  The leaf
+ * record may contain potentially megabytes of data which is supplied
+ * in jrecord_data() calls.  The exact amount must be specified in this
+ * call.
+ */
+static
+struct journal_subrecord *
+jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
+{
+    struct journal_subrecord *last;
+    int pusheditout;
+
+    /*
+     * Try to catch some obvious errors.  Nesting records must specify a
+     * size of 0, and there should be no left-overs from previous operations
+     * (such as incomplete data writeouts).
+     */
+    KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
+    KKASSERT(jrec->residual == 0);
+
+    /*
+     * Check to see if the current stream record has enough room for
+     * the new subrecord header.  If it doesn't we extend the current
+     * stream record.
+     *
+     * This may have the side effect of pushing out the current stream record
+     * and creating a new one.  We must adjust our stream tracking fields
+     * accordingly.
+     */
+    if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
+       jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
+                               jrec->stream_reserved - jrec->stream_residual,
+                               JREC_DEFAULTSIZE, &pusheditout);
+       if (pusheditout) {
+           jrec->stream_reserved = JREC_DEFAULTSIZE;
+           jrec->stream_residual = JREC_DEFAULTSIZE;
+           jrec->parent = NULL;        /* no longer accessible */
+           jrec->pushptrgood = 0;      /* restored parents in pops no good */
+       } else {
+           jrec->stream_reserved += JREC_DEFAULTSIZE;
+           jrec->stream_residual += JREC_DEFAULTSIZE;
+       }
+    }
+    last = (void *)jrec->stream_ptr;
+    last->rectype = rectype;
+    last->reserved = 0;
+    last->recsize = sizeof(struct journal_subrecord) + bytes;
+    jrec->last = last;
+    jrec->residual = bytes;            /* remaining data to be posted */
+    jrec->residual_align = -bytes & 7; /* post-data alignment required */
+    return(last);
+}
+
+/*
+ * Write out the data associated with a leaf record.  Any number of calls
+ * to this routine may be made as long as the byte count adds up to the
+ * amount originally specified in jrecord_write().
+ *
+ * The act of writing out the leaf data may result in numerous stream records
+ * being pushed out.   Callers should be aware that even the associated
+ * subrecord header may become inaccessible due to stream record pushouts.
+ */
+static void
+jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
+{
+    int pusheditout;
+    int extsize;
+
+    KKASSERT(bytes >= 0 && bytes <= jrec->residual);
+
+    /*
+     * Push out stream records as long as there is insufficient room to hold
+     * the remaining data.
+     */
+    while (jrec->stream_residual < bytes) {
+       /*
+        * Fill in any remaining space in the current stream record.
+        */
+       bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
+       buf = (const char *)buf + jrec->stream_residual;
+       bytes -= jrec->stream_residual;
+       /*jrec->stream_ptr += jrec->stream_residual;*/
+       jrec->stream_residual = 0;
+       jrec->residual -= jrec->stream_residual;
+
+       /*
+        * Try to extend the current stream record, but no more then 1/4
+        * the size of the FIFO.
+        */
+       extsize = jrec->jo->fifo.size >> 2;
+       if (extsize > bytes)
+           extsize = (bytes + 15) & ~15;
+
+       jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
+                               jrec->stream_reserved - jrec->stream_residual,
+                               extsize, &pusheditout);
+       if (pusheditout) {
+           jrec->stream_reserved = extsize;
+           jrec->stream_residual = extsize;
+           jrec->parent = NULL;        /* no longer accessible */
+           jrec->last = NULL;          /* no longer accessible */
+           jrec->pushptrgood = 0;      /* restored parents in pops no good */
+       } else {
+           jrec->stream_reserved += extsize;
+           jrec->stream_residual += extsize;
+       }
+    }
+
+    /*
+     * Push out any remaining bytes into the current stream record.
+     */
+    if (bytes) {
+       bcopy(buf, jrec->stream_ptr, bytes);
+       jrec->stream_ptr += bytes;
+       jrec->stream_residual -= bytes;
+       jrec->residual -= bytes;
+    }
+
+    /*
+     * Handle data alignment requirements for the subrecord.  Because the
+     * stream record's data space is more strictly aligned, it must already
+     * have sufficient space to hold any subrecord alignment slop.
+     */
+    if (jrec->residual == 0 && jrec->residual_align) {
+       KKASSERT(jrec->residual_align <= jrec->stream_residual);
+       bzero(jrec->stream_ptr, jrec->residual_align);
+       jrec->stream_ptr += jrec->residual_align;
+       jrec->stream_residual -= jrec->residual_align;
+       jrec->residual_align = 0;
+    }
+}
+
+/*
+ * We are finished with a transaction.  If abortit is not set then we must
+ * be at the top level with no residual subrecord data left to output.
+ * If abortit is set then we can be in any state.
+ *
+ * The stream record will be committed or aborted as specified and jrecord
+ * resources will be cleaned up.
+ */
+static void
+jrecord_done(struct jrecord *jrec, int abortit)
+{
+    KKASSERT(jrec->rawp != NULL);
+
+    if (abortit) {
+       journal_abort(jrec->jo, &jrec->rawp);
+    } else {
+       KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
+       journal_commit(jrec->jo, &jrec->rawp, 
+                       jrec->stream_reserved - jrec->stream_residual, 1);
+    }
+
+    /*
+     * jrec should not be used beyond this point without another init,
+     * but clean up some fields to ensure that we panic if it is.
+     *
+     * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
+     */
+    jrec->jo = NULL;
+    jrec->stream_ptr = NULL;
+}
+
+/************************************************************************
+ *                     LEAF RECORD SUPPORT ROUTINES                    *
+ ************************************************************************
+ *
+ * These routine create leaf subrecords representing common filesystem
+ * structures.
+ */
+
+static void
+jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
+{
+}
+
+static void
+jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
+{
 }
 
 /************************************************************************
@@ -392,10 +1145,34 @@ static
 int
 journal_nmkdir(struct vop_nmkdir_args *ap)
 {
+    struct mount *mp;
+    struct journal *jo;
+    struct jrecord jrec;
+    void *save;                /* warning, save pointers do not always remain valid */
     int error;
 
-    printf("JMKDIR %s\n", ap->a_ncp->nc_name);
     error = vop_journal_operate_ap(&ap->a_head);
+    mp = ap->a_head.a_ops->vv_mount;
+    if (error == 0) {
+       TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
+           jrecord_init(jo, &jrec, -1);
+           if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
+               save = jrecord_push(&jrec, JTYPE_UNDO);
+               /* XXX undo operations */
+               jrecord_pop(&jrec, save);
+           }
+#if 0
+           if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
+               jrecord_write_audit(&jrec);
+           }
+#endif
+           save = jrecord_push(&jrec, JTYPE_MKDIR);
+           jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
+           jrecord_write_vattr(&jrec, ap->a_vap);
+           jrecord_pop(&jrec, save);
+           jrecord_done(&jrec, 0);
+       }
+    }
     return (error);
 }
 
index 0d80492..bbd2aab 100644 (file)
@@ -25,7 +25,7 @@
  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/sys/kern_syscall.h,v 1.24 2004/12/30 07:01:52 cpressey Exp $
+ * $DragonFly: src/sys/sys/kern_syscall.h,v 1.25 2004/12/30 21:41:06 dillon Exp $
  */
 
 #ifndef _SYS_KERN_SYSCALL_H_
@@ -56,6 +56,7 @@ struct statfs;
 struct timeval;
 struct uio;
 struct vnode;
+struct file;
 
 /*
  * Prototypes for syscalls in kern/kern_descrip.c
index 29f9b2b..545651d 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  *
- * $DragonFly: src/sys/sys/mountctl.h,v 1.2 2004/12/29 02:40:03 dillon Exp $
+ * $DragonFly: src/sys/sys/mountctl.h,v 1.3 2004/12/30 21:41:06 dillon Exp $
  */
 
 /*
@@ -68,18 +68,12 @@ struct mountctl_install_journal {
        int     unused04;
 };
 
-#define MC_JOURNAL_REVERSABLE          0x00000001      /* reversable log */
-#define MC_JOURNAL_BINARY              0x00000002      /* use binary format */
-#define MC_JOURNAL_ACTIVE              0x00000004      /* journal is active */
-#define MC_JOURNAL_STREAM_TWO_WAY      0x00000008      /* trans id ack */
-#define MC_JOURNAL_STOP_REQ            0x00000010      /* stop request pend */
-#define MC_JOURNAL_STOP_IMM            0x00000020      /* do not flush */
+#define MC_JOURNAL_ACTIVE              0x00000001      /* journal is active */
+#define MC_JOURNAL_STOP_REQ            0x00000002      /* stop request pend */
+#define MC_JOURNAL_STOP_IMM            0x00000004      /* STOP+trash fifo */
 #define MC_JOURNAL_WWAIT               0x00000040      /* write stall */
-#define MC_JOURNAL_MBSIZE_PROVIDED     0x00000200      /* data space */
-#define MC_JOURNAL_SWSIZE_PROVIDED     0x00000400      /* data space */
-#define MC_JOURNAL_TRANSID_PROVIDED    0x00000800      /* restart transid */
-#define MC_JOURNAL_STALLWARN_PROVIDED  0x00001000      /* restart transid */
-#define MC_JOURNAL_STALLERROR_PROVIDED 0x00002000      /* restart transid */
+#define MC_JOURNAL_WANT_AUDIT          0x00010000      /* audit trail */
+#define MC_JOURNAL_WANT_REVERSABLE     0x00020000      /* reversable stream */
 
 struct mountctl_remove_journal {
        char    id[JIDMAX];
@@ -109,6 +103,221 @@ struct mountctl_journal_status {
 
 #define MC_JOURNAL_STATUS_NEXT         0x80000000      /* find next id */
 
+/*
+ * Physical file format (binary)
+ *
+ * All raw records are 128-bit aligned, but all record sizes are actual.
+ * This means that any scanning code must 16-byte-align the recsize field
+ * when calculating skips.  The top level raw record has a header and a
+ * trailer to allow both forwards and backwards scanning of the journal.
+ * The alignment requirement allows the worker thread FIFO reservation
+ * API to operate efficiently, amoung other things.
+ *
+ * Logical data stream records are usually no larger then the journal's
+ * in-memory FIFO, since the journal's transactional APIs return contiguous
+ * blocks of buffer space and since logical stream records are used to avoid
+ * stalls when concurrent blocking operations are being written to the journal.
+ * Programs can depend on a logical stream record being a 'reasonable' size.
+ *
+ * Multiple logical data streams may operate concurrently in the journal,
+ * reflecting the fact that the system may be executing multiple blocking
+ * operations on the filesystem all at the same time.  These logical data
+ * streams are short-lived transactional entities which use a 13 bit id
+ * plus a transaction start bit, end bit, and abort bit.
+ *
+ * Stream identifiers in the 0x00-0xFF range are special and not used for
+ * normal transactional commands. 
+ *
+ * Stream id 0x00 indicates that no other streams should be active at that
+ * point in the journal, which helps the journaling code detect corruption.
+ *
+ * Stream id 0x01 is used for pad.  Pads are used to align data on convenient
+ * boundaries and to deal with dead space.
+ *
+ * Stream id 0x02 indicates a discontinuity in the streamed data and typically
+ * contains information relating to the reason for the discontinuity.
+ * JTYPE_ASSOCIATE and JTYPE_DISASSOCIATE are usually emplaced in stream 0x02.
+ *
+ * Stream id 0x03 may be used to annotate the journal with text comments
+ * via mountctl commands.  This can be extremely useful to note situations
+ * that may help with later recovery or audit operations.
+ *
+ * Stream id 0x04-0x7F are reserved by DragonFly for future protocol expansion.
+ *
+ * Stream id 0x80-0xFF may be used for third-party protocol expansion.
+ *
+ * Stream id's 0x0100-0x1FFF typically represent short-lived transactions
+ * (i.e. an id may be reused once the previous use has completed).  The
+ * journaling system runs through these id's sequentially which means that
+ * the journaling code can handle up to 8192-256 = 7936 simultanious
+ * transactions at any given moment.
+ *
+ * The sequence number field is context-sensitive.  It is typically used by
+ * a journaling stream to provide an incrementing counter and/or timestamp
+ * so recovery utilities can determine if any data is missing.
+ *
+ * The check word in the trailer may be used to provide an integrity check
+ * on the journaled data.  A value of 0 always means that no check word
+ * has been calculated.
+ *
+ * The journal_rawrecbeg structure MUST be a multiple of 16 bytes.
+ * The journal_rawrecend structure MUST be a multiple of 8 bytes.
+ *
+ * NOTE: PAD RECORD SPECIAL CASE.  Pad records are 16 bytes and have the
+ * rawrecend structure overlayed on the sequence number field of the 
+ * rawrecbeg structure.  This is necessary because stream records are
+ * 16 byte aligned, not 24 byte aligned, and dead space is not allowed.
+ * So the pad record must fit into any dead space.
+ */
+struct journal_rawrecbeg {
+       u_int16_t begmagic;     /* recovery scan, endianess detection */
+       u_int16_t streamid;     /* start/stop bits and stream identifier */
+       int32_t recsize;        /* stream data block (incls beg & end) */
+       int64_t seqno;          /* sequence number or transaction id */
+       /* ADDITIONAL DATA */
+};
+
+struct journal_rawrecend {
+       u_int16_t endmagic;     /* recovery scan, endianess detection */
+       u_int16_t check;        /* check word or 0 */
+       int32_t recsize;        /* same as rawrecbeg->recsize, for rev scan */
+};
+
+/*
+ * Constants for stream record magic numbers.    The incomplete magic
+ * number code is used internally by the memory FIFO reservation API
+ * and worker thread, allowing a block of space in the journaling
+ * stream (aka a stream block) to be reserved and then populated without
+ * stalling other threads doing their own reservation and population.
+ */
+#define JREC_BEGMAGIC          0x1234
+#define JREC_ENDMAGIC          0xCDEF
+#define JREC_INCOMPLETEMAGIC   0xFFFF
+
+/*
+ * Stream ids are 14 bits.  The top 2 bits specify when a new logical
+ * stream is being created or an existing logical stream is being terminated.
+ * A single raw stream record will set both the BEGIN and END bits if the
+ * entire transaction is encapsulated in a single stream record.
+ */
+#define JREC_STREAMCTL_MASK    0xE000
+#define JREC_STREAMCTL_BEGIN   0x8000  /* start a new logical stream */
+#define JREC_STREAMCTL_END     0x4000  /* terminate a logical stream */
+#define JREC_STREAMCTL_ABORTED 0x2000
+
+#define JREC_STREAMID_MASK     0x1FFF
+#define JREC_STREAMID_SYNCPT   (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END|0x0000)
+#define JREC_STREAMID_PAD      (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END|0x0001)
+#define JREC_STREAMID_DISCONT  0x0002  /* discontinuity */
+#define JREC_STREAMID_ANNOTATE 0x0003  /* annotation */
+                               /* 0x0004-0x007F reserved by DragonFly */
+                               /* 0x0080-0x00FF for third party use */
+#define JREC_STREAMID_JMIN     0x0100  /* lowest allowed general id */
+#define JREC_STREAMID_JMAX     0x2000  /* (one past the highest allowed id) */
+
+#define JREC_DEFAULTSIZE       64      /* reasonable initial reservation */
+
+/*
+ * Each logical journaling stream typically represents a transaction...
+ * that is, a VFS operation.  The VFS operation is written out using 
+ * sub-records and may contain multiple, possibly nested sub-transactions.
+ * multiple sub-transactions occur when a VFS operation cannot be represented
+ * by a single command.  This is typically the case when a journal is 
+ * configured to be reversable because UNDO sequences almost always have to
+ * be specified in such cases.  For example, if you ftruncate() a file the
+ * journal might have to write out a sequence of WRITE records representing
+ * the lost data, otherwise the journal would not be reversable.
+ * Sub-transactions within a particular stream do not have their own sequence
+ * number field and thus may not be parallelized (the protocol is already
+ * complex enough!).
+ *
+ * In order to support streaming operation with a limited buffer the recsize
+ * field is allowed to be 0 for subrecords with the JMASK_NESTED bit set.
+ * If this case occurs a scanner can determine that the recursion has ended
+ * by detecting a nested subrecord with the JMASK_LAST bit set.  A scanner
+ * may also set the field to the proper value after the fact to make later
+ * operations more efficient. 
+ *
+ * Note that this bit must be properly set even if the recsize field is
+ * non-zero.  The recsize must always be properly specified for 'leaf'
+ * subrecords, however in order to allow subsystems to potentially allocate
+ * more data space then they use the protocol allows any 'dead' space to be
+ * filled with JLEAF_PAD records.
+ *
+ * The recsize field may indicate data well past the size of the current
+ * raw stream record.  That is, the scanner may have to glue together
+ * multiple stream records with the same stream id to fully decode the
+ * embedded subrecords.  In particular, a subrecord could very well represent
+ * hundreds of megabytes of data (e.g. if a program were to do a
+ * multi-megabyte write()) and be split up across thousands of raw streaming
+ * records, possibly interlaced with other unrelated streams from other
+ * unrelated processes.  
+ *
+ * If a large sub-transaction is aborted the logical stream may be
+ * terminated without writing out all the expected data.  When this occurs
+ * the stream's ending record must also have the JREC_STREAMCTL_ABORTED bit
+ * set.  However, scanners should still be robust enough to detect such
+ * overflows even if the aborted bit is not set and consider them data
+ * corruption.
+ * 
+ * Aborts may also occur in the normal course of operations, especially once
+ * the journaling API is integrated into the cache coherency API.  A normal
+ * abort is issued by emplacing a JLEAF_ABORT record within the transaction
+ * being aborted.  Such records must be the last record in the sub-transaction,
+ * so JLEAF_LAST is also usually set.  In a transaction with many 
+ * sub-transactions only those sub-transactions with an abort record are
+ * aborted, the rest remain valid.  Abort records are considered S.O.P. for
+ * two reasons:  First, limited memory buffer space may make it impossible
+ * to delete the portion of the stream being aborted (the data may have
+ * already been sent to the target).  Second, the journaling code will
+ * eventually be used to support a cache coherency layer which may have to
+ * abort operations as part of the cache coherency protocol.  Note that
+ * subrecord aborts are different from stream record aborts.  Stream record
+ * aborts are considered to be extrodinary situations while subrecord aborts
+ * are S.O.P.
+ */
+
+struct journal_subrecord {
+       int16_t rectype;        /* 2 control bits, 14 record type bits */
+       int16_t reserved;       /* future use */
+       int32_t recsize;        /* record size (mandatory if not NESTED) */
+       /* ADDITIONAL DATA */
+};
+
+#define JMASK_NESTED           0x8000  /* data is a nested recursion */
+#define JMASK_LAST             0x4000
+
+#define JLEAF_PAD              0x0000
+#define JLEAF_ABORT            0x0001
+#define JTYPE_ASSOCIATE                0x0002
+#define JTYPE_DISASSOCIATE     0x0003
+#define JTYPE_UNDO             (JMASK_NESTED|0x0004)
+#define JTYPE_AUDIT            (JMASK_NESTED|0x0005)
+
+#define JTYPE_MKDIR            (JMASK_NESTED|0x0006)
+
+/*
+ * Low level record types
+ */
+#define JLEAF_FILEDATA         0x0401
+#define JLEAF_PATH1            0x0402
+#define JLEAF_PATH2            0x0403
+#define JLEAF_PATH3            0x0404
+#define JLEAF_PATH4            0x0405
+#define JLEAF_UID              0x0406
+#define JLEAF_GID              0x0407
+#define JLEAF_MODES            0x0408
+#define JLEAF_FFLAGS           0x0409
+#define JLEAF_PID              0x040A
+#define JLEAF_PPID             0x040B
+#define JLEAF_COMM             0x040C
+#define JLEAF_RESERVED_0D      0x040D
+#define JLEAF_RESERVED_0E      0x040E
+#define JLEAF_RESERVED_0F      0x040F
+#define JLEAF_SYMLINKDATA      0x0410
+#define JLEAF_SEEKPOS          0x0411
+#define JLEAF_INUM             0x0412
+
 #if defined(_KERNEL) || defined(_KERNEL_STRUCTURES)
 
 /*
@@ -117,7 +326,7 @@ struct mountctl_journal_status {
 struct journal_memfifo {
        int     size;           /* size (power of two) */
        int     mask;           /* index mask (size - 1) */
-       int     rindex;         /* stream reader index */
+       int     rindex;         /* stream reader index (track fd writes) */
        int     xindex;         /* last acked / reader restart */
        int     windex;         /* stream writer index */
        char    *membase;       /* memory buffer representing the FIFO */
@@ -129,11 +338,32 @@ struct journal_memfifo {
 struct journal {
        TAILQ_ENTRY(journal) jentry;
        struct file     *fp;
-       char    id[JIDMAX];
-       int     flags;          /* journaling flags */
-       int64_t transid;
+       char            id[JIDMAX];
+       int             flags;          /* journaling flags */
+       int64_t         transid;
        struct journal_memfifo fifo;
        struct thread   thread;
 };
 
+/*
+ * The jrecord structure is used to build a journaling transaction.  Since
+ * a single journaling transaction might encompass very large buffers it 
+ * is possible for multiple transactions to be written out to the FIFO
+ * in parallel and in peacemeal.
+ */
+struct jrecord {
+       struct journal  *jo;
+       char            *stream_ptr;
+       int             stream_residual;
+       int             stream_reserved;
+       struct journal_rawrecbeg *rawp;
+       struct journal_subrecord *parent;
+       struct journal_subrecord *last;
+       int16_t         streamid;
+       int             pushcount;
+       int             pushptrgood;
+       int             residual;
+       int             residual_align;
+};
+
 #endif