Merge from vendor branch GCC:
[dragonfly.git] / sys / kern / vfs_journal.c
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.13 2005/06/03 23:57:32 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread. 
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger 
60  * spooling areas then the memory buffer is able to provide by e.g. 
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.  
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/journal.h>
88 #include <sys/file.h>
89 #include <sys/proc.h>
90 #include <sys/msfbuf.h>
91
92 #include <machine/limits.h>
93
94 #include <vm/vm.h>
95 #include <vm/vm_object.h>
96 #include <vm/vm_page.h>
97 #include <vm/vm_pager.h>
98 #include <vm/vnode_pager.h>
99
100 #include <sys/file2.h>
101 #include <sys/thread2.h>
102
103 static int journal_attach(struct mount *mp);
104 static void journal_detach(struct mount *mp);
105 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
106                             const struct mountctl_install_journal *info);
107 static int journal_remove_vfs_journal(struct mount *mp,
108                             const struct mountctl_remove_journal *info);
109 static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
110 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
111 static int journal_status_vfs_journal(struct mount *mp,
112                        const struct mountctl_status_journal *info,
113                        struct mountctl_journal_ret_status *rstat,
114                        int buflen, int *res);
115 static void journal_wthread(void *info);
116 static void journal_rthread(void *info);
117
118 static void *journal_reserve(struct journal *jo, 
119                             struct journal_rawrecbeg **rawpp, 
120                             int16_t streamid, int bytes);
121 static void *journal_extend(struct journal *jo,
122                             struct journal_rawrecbeg **rawpp,
123                             int truncbytes, int bytes, int *newstreamrecp);
124 static void journal_abort(struct journal *jo, 
125                             struct journal_rawrecbeg **rawpp);
126 static void journal_commit(struct journal *jo, 
127                             struct journal_rawrecbeg **rawpp, 
128                             int bytes, int closeout);
129
130 static void jrecord_init(struct journal *jo, 
131                             struct jrecord *jrec, int16_t streamid);
132 static struct journal_subrecord *jrecord_push(
133                             struct jrecord *jrec, int16_t rectype);
134 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
135 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
136                             int16_t rectype, int bytes);
137 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
138 static void jrecord_done(struct jrecord *jrec, int abortit);
139
140 static int journal_setattr(struct vop_setattr_args *ap);
141 static int journal_write(struct vop_write_args *ap);
142 static int journal_fsync(struct vop_fsync_args *ap);
143 static int journal_putpages(struct vop_putpages_args *ap);
144 static int journal_setacl(struct vop_setacl_args *ap);
145 static int journal_setextattr(struct vop_setextattr_args *ap);
146 static int journal_ncreate(struct vop_ncreate_args *ap);
147 static int journal_nmknod(struct vop_nmknod_args *ap);
148 static int journal_nlink(struct vop_nlink_args *ap);
149 static int journal_nsymlink(struct vop_nsymlink_args *ap);
150 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
151 static int journal_nremove(struct vop_nremove_args *ap);
152 static int journal_nmkdir(struct vop_nmkdir_args *ap);
153 static int journal_nrmdir(struct vop_nrmdir_args *ap);
154 static int journal_nrename(struct vop_nrename_args *ap);
155
156 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
157     { &vop_default_desc,                vop_journal_operate_ap },
158     { &vop_mountctl_desc,               (void *)journal_mountctl },
159     { &vop_setattr_desc,                (void *)journal_setattr },
160     { &vop_write_desc,                  (void *)journal_write },
161     { &vop_fsync_desc,                  (void *)journal_fsync },
162     { &vop_putpages_desc,               (void *)journal_putpages },
163     { &vop_setacl_desc,                 (void *)journal_setacl },
164     { &vop_setextattr_desc,             (void *)journal_setextattr },
165     { &vop_ncreate_desc,                (void *)journal_ncreate },
166     { &vop_nmknod_desc,                 (void *)journal_nmknod },
167     { &vop_nlink_desc,                  (void *)journal_nlink },
168     { &vop_nsymlink_desc,               (void *)journal_nsymlink },
169     { &vop_nwhiteout_desc,              (void *)journal_nwhiteout },
170     { &vop_nremove_desc,                (void *)journal_nremove },
171     { &vop_nmkdir_desc,                 (void *)journal_nmkdir },
172     { &vop_nrmdir_desc,                 (void *)journal_nrmdir },
173     { &vop_nrename_desc,                (void *)journal_nrename },
174     { NULL, NULL }
175 };
176
177 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
178 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
179
180 int
181 journal_mountctl(struct vop_mountctl_args *ap)
182 {
183     struct mount *mp;
184     int error = 0;
185
186     mp = ap->a_head.a_ops->vv_mount;
187     KKASSERT(mp);
188
189     if (mp->mnt_vn_journal_ops == NULL) {
190         switch(ap->a_op) {
191         case MOUNTCTL_INSTALL_VFS_JOURNAL:
192             error = journal_attach(mp);
193             if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
194                 error = EINVAL;
195             if (error == 0 && ap->a_fp == NULL)
196                 error = EBADF;
197             if (error == 0)
198                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
199             if (TAILQ_EMPTY(&mp->mnt_jlist))
200                 journal_detach(mp);
201             break;
202         case MOUNTCTL_REMOVE_VFS_JOURNAL:
203         case MOUNTCTL_RESYNC_VFS_JOURNAL:
204         case MOUNTCTL_STATUS_VFS_JOURNAL:
205             error = ENOENT;
206             break;
207         default:
208             error = EOPNOTSUPP;
209             break;
210         }
211     } else {
212         switch(ap->a_op) {
213         case MOUNTCTL_INSTALL_VFS_JOURNAL:
214             if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
215                 error = EINVAL;
216             if (error == 0 && ap->a_fp == NULL)
217                 error = EBADF;
218             if (error == 0)
219                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
220             break;
221         case MOUNTCTL_REMOVE_VFS_JOURNAL:
222             if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
223                 error = EINVAL;
224             if (error == 0)
225                 error = journal_remove_vfs_journal(mp, ap->a_ctl);
226             if (TAILQ_EMPTY(&mp->mnt_jlist))
227                 journal_detach(mp);
228             break;
229         case MOUNTCTL_RESYNC_VFS_JOURNAL:
230             if (ap->a_ctllen != 0)
231                 error = EINVAL;
232             error = journal_resync_vfs_journal(mp, ap->a_ctl);
233             break;
234         case MOUNTCTL_STATUS_VFS_JOURNAL:
235             if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
236                 error = EINVAL;
237             if (error == 0) {
238                 error = journal_status_vfs_journal(mp, ap->a_ctl, 
239                                         ap->a_buf, ap->a_buflen, ap->a_res);
240             }
241             break;
242         default:
243             error = EOPNOTSUPP;
244             break;
245         }
246     }
247     return (error);
248 }
249
250 /*
251  * High level mount point setup.  When a 
252  */
253 static int
254 journal_attach(struct mount *mp)
255 {
256     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops, journal_vnodeop_entries);
257     return(0);
258 }
259
260 static void
261 journal_detach(struct mount *mp)
262 {
263     if (mp->mnt_vn_journal_ops)
264         vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
265 }
266
267 /*
268  * Install a journal on a mount point.  Each journal has an associated worker
269  * thread which is responsible for buffering and spooling the data to the
270  * target.  A mount point may have multiple journals attached to it.  An
271  * initial start record is generated when the journal is associated.
272  */
273 static int
274 journal_install_vfs_journal(struct mount *mp, struct file *fp, 
275                             const struct mountctl_install_journal *info)
276 {
277     struct journal *jo;
278     struct jrecord jrec;
279     int error = 0;
280     int size;
281
282     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
283     bcopy(info->id, jo->id, sizeof(jo->id));
284     jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
285                                 MC_JOURNAL_STOP_REQ);
286
287     /*
288      * Memory FIFO size, round to nearest power of 2
289      */
290     if (info->membufsize) {
291         if (info->membufsize < 65536)
292             size = 65536;
293         else if (info->membufsize > 128 * 1024 * 1024)
294             size = 128 * 1024 * 1024;
295         else
296             size = (int)info->membufsize;
297     } else {
298         size = 1024 * 1024;
299     }
300     jo->fifo.size = 1;
301     while (jo->fifo.size < size)
302         jo->fifo.size <<= 1;
303
304     /*
305      * Other parameters.  If not specified the starting transaction id
306      * will be the current date.
307      */
308     if (info->transid) {
309         jo->transid = info->transid;
310     } else {
311         struct timespec ts;
312         getnanotime(&ts);
313         jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
314     }
315
316     jo->fp = fp;
317
318     /*
319      * Allocate the memory FIFO
320      */
321     jo->fifo.mask = jo->fifo.size - 1;
322     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
323     if (jo->fifo.membase == NULL)
324         error = ENOMEM;
325
326     /*
327      * Create the worker thread and generate the association record.
328      */
329     if (error) {
330         free(jo, M_JOURNAL);
331     } else {
332         fhold(fp);
333         jo->flags |= MC_JOURNAL_WACTIVE;
334         lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
335                         TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
336         lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
337         lwkt_schedule(&jo->wthread);
338
339         if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
340             jo->flags |= MC_JOURNAL_RACTIVE;
341             lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
342                         TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
343             lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
344             lwkt_schedule(&jo->rthread);
345         }
346         jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
347         jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
348         jrecord_done(&jrec, 0);
349         TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
350     }
351     return(error);
352 }
353
354 /*
355  * Disassociate a journal from a mount point and terminate its worker thread.
356  * A final termination record is written out before the file pointer is
357  * dropped.
358  */
359 static int
360 journal_remove_vfs_journal(struct mount *mp, 
361                            const struct mountctl_remove_journal *info)
362 {
363     struct journal *jo;
364     int error;
365
366     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
367         if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
368             break;
369     }
370     if (jo)
371         error = journal_destroy(mp, jo, info->flags);
372     else
373         error = EINVAL;
374     return (error);
375 }
376
377 /*
378  * Remove all journals associated with a mount point.  Usually called
379  * by the umount code.
380  */
381 void
382 journal_remove_all_journals(struct mount *mp, int flags)
383 {
384     struct journal *jo;
385
386     while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
387         journal_destroy(mp, jo, flags);
388     }
389 }
390
391 static int
392 journal_destroy(struct mount *mp, struct journal *jo, int flags)
393 {
394     struct jrecord jrec;
395
396     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
397
398     jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
399     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
400     jrecord_done(&jrec, 0);
401
402     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
403     wakeup(&jo->fifo);
404     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
405         tsleep(jo, 0, "jwait", 0);
406     }
407     lwkt_free_thread(&jo->wthread); /* XXX SMP */
408     if (jo->fp)
409         fdrop(jo->fp, curthread);
410     if (jo->fifo.membase)
411         free(jo->fifo.membase, M_JFIFO);
412     free(jo, M_JOURNAL);
413     return(0);
414 }
415
416 static int
417 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
418 {
419     return(EINVAL);
420 }
421
422 static int
423 journal_status_vfs_journal(struct mount *mp, 
424                        const struct mountctl_status_journal *info,
425                        struct mountctl_journal_ret_status *rstat,
426                        int buflen, int *res)
427 {
428     struct journal *jo;
429     int error = 0;
430     int index;
431
432     index = 0;
433     *res = 0;
434     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
435         if (info->index == MC_JOURNAL_INDEX_ID) {
436             if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
437                 continue;
438         } else if (info->index >= 0) {
439             if (info->index < index)
440                 continue;
441         } else if (info->index != MC_JOURNAL_INDEX_ALL) {
442             continue;
443         }
444         if (buflen < sizeof(*rstat)) {
445             if (*res)
446                 rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
447             else
448                 error = EINVAL;
449             break;
450         }
451         bzero(rstat, sizeof(*rstat));
452         rstat->recsize = sizeof(*rstat);
453         bcopy(jo->id, rstat->id, sizeof(jo->id));
454         rstat->index = index;
455         rstat->membufsize = jo->fifo.size;
456         rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
457         rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
458         rstat->bytessent = jo->total_acked;
459         ++rstat;
460         ++index;
461         *res += sizeof(*rstat);
462         buflen -= sizeof(*rstat);
463     }
464     return(error);
465 }
466
467 /*
468  * The per-journal worker thread is responsible for writing out the
469  * journal's FIFO to the target stream.
470  */
471 static void
472 journal_wthread(void *info)
473 {
474     struct journal *jo = info;
475     struct journal_rawrecbeg *rawp;
476     int bytes;
477     int error;
478     int avail;
479     int res;
480
481     for (;;) {
482         /*
483          * Calculate the number of bytes available to write.  This buffer
484          * area may contain reserved records so we can't just write it out
485          * without further checks.
486          */
487         bytes = jo->fifo.windex - jo->fifo.rindex;
488
489         /*
490          * sleep if no bytes are available or if an incomplete record is
491          * encountered (it needs to be filled in before we can write it
492          * out), and skip any pad records that we encounter.
493          */
494         if (bytes == 0) {
495             if (jo->flags & MC_JOURNAL_STOP_REQ)
496                 break;
497             tsleep(&jo->fifo, 0, "jfifo", hz);
498             continue;
499         }
500
501         /*
502          * Sleep if we can not go any further due to hitting an incomplete
503          * record.  This case should occur rarely but may have to be better
504          * optimized XXX.
505          */
506         rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
507         if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
508             tsleep(&jo->fifo, 0, "jpad", hz);
509             continue;
510         }
511
512         /*
513          * Skip any pad records.  We do not write out pad records if we can
514          * help it. 
515          *
516          * If xindex is caught up to rindex it gets incremented along with
517          * rindex.  XXX SMP
518          */
519         if (rawp->streamid == JREC_STREAMID_PAD) {
520             if (jo->fifo.rindex == jo->fifo.xindex)
521                 jo->fifo.xindex += (rawp->recsize + 15) & ~15;
522             jo->fifo.rindex += (rawp->recsize + 15) & ~15;
523             jo->total_acked += bytes;
524             KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
525             continue;
526         }
527
528         /*
529          * 'bytes' is the amount of data that can potentially be written out.  
530          * Calculate 'res', the amount of data that can actually be written
531          * out.  res is bounded either by hitting the end of the physical
532          * memory buffer or by hitting an incomplete record.  Incomplete
533          * records often occur due to the way the space reservation model
534          * works.
535          */
536         res = 0;
537         avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
538         while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
539             res += (rawp->recsize + 15) & ~15;
540             if (res >= avail) {
541                 KKASSERT(res == avail);
542                 break;
543             }
544             rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
545         }
546
547         /*
548          * Issue the write and deal with any errors or other conditions.
549          * For now assume blocking I/O.  Since we are record-aware the
550          * code cannot yet handle partial writes.
551          *
552          * XXX EWOULDBLOCK/NBIO
553          * XXX notification on failure
554          * XXX permanent verses temporary failures
555          * XXX two-way acknowledgement stream in the return direction / xindex
556          */
557         bytes = res;
558         error = fp_write(jo->fp, 
559                         jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
560                         bytes, &res);
561         if (error) {
562             printf("journal_thread(%s) write, error %d\n", jo->id, error);
563             /* XXX */
564         } else {
565             KKASSERT(res == bytes);
566         }
567
568         /*
569          * Advance rindex.  If the journal stream is not full duplex we also
570          * advance xindex, otherwise the rjournal thread is responsible for
571          * advancing xindex.
572          */
573         jo->fifo.rindex += bytes;
574         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
575             jo->fifo.xindex += bytes;
576         jo->total_acked += bytes;
577         KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
578         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
579             if (jo->flags & MC_JOURNAL_WWAIT) {
580                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
581                 wakeup(&jo->fifo.windex);
582             }
583         }
584     }
585     jo->flags &= ~MC_JOURNAL_WACTIVE;
586     wakeup(jo);
587     wakeup(&jo->fifo.windex);
588 }
589
590 /*
591  * A second per-journal worker thread is created for two-way journaling
592  * streams to deal with the return acknowledgement stream.
593  */
594 static void
595 journal_rthread(void *info)
596 {
597     struct journal_rawrecbeg *rawp;
598     struct journal_ackrecord ack;
599     struct journal *jo = info;
600     int64_t transid;
601     int error;
602     int count;
603     int bytes;
604     int index;
605
606     transid = 0;
607     error = 0;
608
609     for (;;) {
610         /*
611          * We have been asked to stop
612          */
613         if (jo->flags & MC_JOURNAL_STOP_REQ)
614                 break;
615
616         /*
617          * If we have no active transaction id, get one from the return
618          * stream.
619          */
620         if (transid == 0) {
621             for (index = 0; index < sizeof(ack); index += count) {
622                 error = fp_read(jo->fp, &ack, sizeof(ack), &count);
623                 if (error)
624                     break;
625                 if (count == 0)
626                     tsleep(&jo->fifo.xindex, 0, "jread", hz);
627             }
628             if (error) {
629                 printf("read error %d on receive stream\n", error);
630                 break;
631             }
632             if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
633                 ack.rend.endmagic != JREC_ENDMAGIC
634             ) {
635                 printf("bad begmagic or endmagic on receive stream\n");
636                 break;
637             }
638             transid = ack.rbeg.transid;
639         }
640
641         /*
642          * Calculate the number of unacknowledged bytes.  If there are no
643          * unacknowledged bytes then unsent data was acknowledged, report,
644          * sleep a bit, and loop in that case.  This should not happen 
645          * normally.  The ack record is thrown away.
646          */
647         bytes = jo->fifo.rindex - jo->fifo.xindex;
648
649         if (bytes == 0) {
650             printf("warning: unsent data acknowledged\n");
651             tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
652             transid = 0;
653             continue;
654         }
655
656         /*
657          * Since rindex has advanceted, the record pointed to by xindex
658          * must be a valid record.
659          */
660         rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
661         KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
662         KKASSERT(rawp->recsize <= bytes);
663
664         /*
665          * The target can acknowledge several records at once.
666          */
667         if (rawp->transid < transid) {
668             printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
669             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
670             if (jo->flags & MC_JOURNAL_WWAIT) {
671                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
672                 wakeup(&jo->fifo.windex);
673             }
674             continue;
675         }
676         if (rawp->transid == transid) {
677             printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
678             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
679             if (jo->flags & MC_JOURNAL_WWAIT) {
680                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
681                 wakeup(&jo->fifo.windex);
682             }
683             transid = 0;
684             continue;
685         }
686         printf("warning: unsent data(2) acknowledged\n");
687         transid = 0;
688     }
689     jo->flags &= ~MC_JOURNAL_RACTIVE;
690     wakeup(jo);
691     wakeup(&jo->fifo.windex);
692 }
693
694 /*
695  * This builds a pad record which the journaling thread will skip over.  Pad
696  * records are required when we are unable to reserve sufficient stream space
697  * due to insufficient space at the end of the physical memory fifo.
698  *
699  * Even though the record is not transmitted, a normal transid must be 
700  * assigned to it so link recovery operations after a failure work properly.
701  */
702 static
703 void
704 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
705 {
706     struct journal_rawrecend *rendp;
707     
708     KKASSERT((recsize & 15) == 0 && recsize >= 16);
709
710     rawp->streamid = JREC_STREAMID_PAD;
711     rawp->recsize = recsize;    /* must be 16-byte aligned */
712     rawp->transid = transid;
713     /*
714      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
715      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
716      * hopefully cause the compiler to not make any assumptions.
717      */
718     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
719     rendp->endmagic = JREC_ENDMAGIC;
720     rendp->check = 0;
721     rendp->recsize = rawp->recsize;
722
723     /*
724      * Set the begin magic last.  This is what will allow the journal
725      * thread to write the record out.  Use a store fence to prevent
726      * compiler and cpu reordering of the writes.
727      */
728     cpu_sfence();
729     rawp->begmagic = JREC_BEGMAGIC;
730 }
731
732 /*
733  * Wake up the worker thread if the FIFO is more then half full or if
734  * someone is waiting for space to be freed up.  Otherwise let the 
735  * heartbeat deal with it.  Being able to avoid waking up the worker
736  * is the key to the journal's cpu performance.
737  */
738 static __inline
739 void
740 journal_commit_wakeup(struct journal *jo)
741 {
742     int avail;
743
744     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
745     KKASSERT(avail >= 0);
746     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
747         wakeup(&jo->fifo);
748 }
749
750 /*
751  * Create a new BEGIN stream record with the specified streamid and the
752  * specified amount of payload space.  *rawpp will be set to point to the
753  * base of the new stream record and a pointer to the base of the payload
754  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
755  * making this call.  The raw record header will be partially initialized.
756  *
757  * A stream can be extended, aborted, or committed by other API calls
758  * below.  This may result in a sequence of potentially disconnected
759  * stream records to be output to the journaling target.  The first record
760  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
761  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
762  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
763  * up being the same as the first, in which case the bits are all set in
764  * the first record.
765  *
766  * The stream record is created in an incomplete state by setting the begin
767  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
768  * flushing the fifo past our record until we have finished populating it.
769  * Other threads can reserve and operate on their own space without stalling
770  * but the stream output will stall until we have completed operations.  The
771  * memory FIFO is intended to be large enough to absorb such situations
772  * without stalling out other threads.
773  */
774 static
775 void *
776 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
777                 int16_t streamid, int bytes)
778 {
779     struct journal_rawrecbeg *rawp;
780     int avail;
781     int availtoend;
782     int req;
783
784     /*
785      * Add header and trailer overheads to the passed payload.  Note that
786      * the passed payload size need not be aligned in any way.
787      */
788     bytes += sizeof(struct journal_rawrecbeg);
789     bytes += sizeof(struct journal_rawrecend);
790
791     for (;;) {
792         /*
793          * First, check boundary conditions.  If the request would wrap around
794          * we have to skip past the ending block and return to the beginning
795          * of the FIFO's buffer.  Calculate 'req' which is the actual number
796          * of bytes being reserved, including wrap-around dead space.
797          *
798          * Neither 'bytes' or 'req' are aligned.
799          *
800          * Note that availtoend is not truncated to avail and so cannot be
801          * used to determine whether the reservation is possible by itself.
802          * Also, since all fifo ops are 16-byte aligned, we can check
803          * the size before calculating the aligned size.
804          */
805         availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
806         KKASSERT((availtoend & 15) == 0);
807         if (bytes > availtoend) 
808             req = bytes + availtoend;   /* add pad to end */
809         else
810             req = bytes;
811
812         /*
813          * Next calculate the total available space and see if it is
814          * sufficient.  We cannot overwrite previously buffered data
815          * past xindex because otherwise we would not be able to restart
816          * a broken link at the target's last point of commit.
817          */
818         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
819         KKASSERT(avail >= 0 && (avail & 15) == 0);
820
821         if (avail < req) {
822             /* XXX MC_JOURNAL_STOP_IMM */
823             jo->flags |= MC_JOURNAL_WWAIT;
824             tsleep(&jo->fifo.windex, 0, "jwrite", 0);
825             continue;
826         }
827
828         /*
829          * Create a pad record for any dead space and create an incomplete
830          * record for the live space, then return a pointer to the
831          * contiguous buffer space that was requested.
832          *
833          * NOTE: The worker thread will not flush past an incomplete
834          * record, so the reserved space can be filled in at-will.  The
835          * journaling code must also be aware the reserved sections occuring
836          * after this one will also not be written out even if completed
837          * until this one is completed.
838          *
839          * The transaction id must accomodate real and potential pad creation.
840          */
841         rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
842         if (req != bytes) {
843             journal_build_pad(rawp, availtoend, jo->transid);
844             ++jo->transid;
845             rawp = (void *)jo->fifo.membase;
846         }
847         rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
848         rawp->recsize = bytes;                  /* (unaligned size) */
849         rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
850         rawp->transid = jo->transid;
851         jo->transid += 2;
852
853         /*
854          * Issue a memory barrier to guarentee that the record data has been
855          * properly initialized before we advance the write index and return
856          * a pointer to the reserved record.  Otherwise the worker thread
857          * could accidently run past us.
858          *
859          * Note that stream records are always 16-byte aligned.
860          */
861         cpu_sfence();
862         jo->fifo.windex += (req + 15) & ~15;
863         *rawpp = rawp;
864         return(rawp + 1);
865     }
866     /* not reached */
867     *rawpp = NULL;
868     return(NULL);
869 }
870
871 /*
872  * Attempt to extend the stream record by <bytes> worth of payload space.
873  *
874  * If it is possible to extend the existing stream record no truncation
875  * occurs and the record is extended as specified.  A pointer to the 
876  * truncation offset within the payload space is returned.
877  *
878  * If it is not possible to do this the existing stream record is truncated
879  * and committed, and a new stream record of size <bytes> is created.  A
880  * pointer to the base of the new stream record's payload space is returned.
881  *
882  * *rawpp is set to the new reservation in the case of a new record but
883  * the caller cannot depend on a comparison with the old rawp to determine if
884  * this case occurs because we could end up using the same memory FIFO
885  * offset for the new stream record.  Use *newstreamrecp instead.
886  */
887 static void *
888 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
889                 int truncbytes, int bytes, int *newstreamrecp)
890 {
891     struct journal_rawrecbeg *rawp;
892     int16_t streamid;
893     int availtoend;
894     int avail;
895     int osize;
896     int nsize;
897     int wbase;
898     void *rptr;
899
900     *newstreamrecp = 0;
901     rawp = *rawpp;
902     osize = (rawp->recsize + 15) & ~15;
903     nsize = (rawp->recsize + bytes + 15) & ~15;
904     wbase = (char *)rawp - jo->fifo.membase;
905
906     /*
907      * If the aligned record size does not change we can trivially adjust
908      * the record size.
909      */
910     if (nsize == osize) {
911         rawp->recsize += bytes;
912         return((char *)(rawp + 1) + truncbytes);
913     }
914
915     /*
916      * If the fifo's write index hasn't been modified since we made the
917      * reservation and we do not hit any boundary conditions, we can 
918      * trivially make the record smaller or larger.
919      */
920     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
921         availtoend = jo->fifo.size - wbase;
922         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
923         KKASSERT((availtoend & 15) == 0);
924         KKASSERT((avail & 15) == 0);
925         if (nsize <= avail && nsize <= availtoend) {
926             jo->fifo.windex += nsize - osize;
927             rawp->recsize += bytes;
928             return((char *)(rawp + 1) + truncbytes);
929         }
930     }
931
932     /*
933      * It was not possible to extend the buffer.  Commit the current
934      * buffer and create a new one.  We manually clear the BEGIN mark that
935      * journal_reserve() creates (because this is a continuing record, not
936      * the start of a new stream).
937      */
938     streamid = rawp->streamid & JREC_STREAMID_MASK;
939     journal_commit(jo, rawpp, truncbytes, 0);
940     rptr = journal_reserve(jo, rawpp, streamid, bytes);
941     rawp = *rawpp;
942     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
943     *newstreamrecp = 1;
944     return(rptr);
945 }
946
947 /*
948  * Abort a journal record.  If the transaction record represents a stream
949  * BEGIN and we can reverse the fifo's write index we can simply reverse
950  * index the entire record, as if it were never reserved in the first place.
951  *
952  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
953  * with the payload truncated to 0 bytes.
954  */
955 static void
956 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
957 {
958     struct journal_rawrecbeg *rawp;
959     int osize;
960
961     rawp = *rawpp;
962     osize = (rawp->recsize + 15) & ~15;
963
964     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
965         (jo->fifo.windex & jo->fifo.mask) == 
966          (char *)rawp - jo->fifo.membase + osize)
967     {
968         jo->fifo.windex -= osize;
969         *rawpp = NULL;
970     } else {
971         rawp->streamid |= JREC_STREAMCTL_ABORTED;
972         journal_commit(jo, rawpp, 0, 1);
973     }
974 }
975
976 /*
977  * Commit a journal record and potentially truncate it to the specified
978  * number of payload bytes.  If you do not want to truncate the record,
979  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
980  * field includes header and trailer and will not be correct.  Note that
981  * passing 0 will truncate the entire data payload of the record.
982  *
983  * The logical stream is terminated by this function.
984  *
985  * If truncation occurs, and it is not possible to physically optimize the
986  * memory FIFO due to other threads having reserved space after ours,
987  * the remaining reserved space will be covered by a pad record.
988  */
989 static void
990 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
991                 int bytes, int closeout)
992 {
993     struct journal_rawrecbeg *rawp;
994     struct journal_rawrecend *rendp;
995     int osize;
996     int nsize;
997
998     rawp = *rawpp;
999     *rawpp = NULL;
1000
1001     KKASSERT((char *)rawp >= jo->fifo.membase &&
1002              (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
1003     KKASSERT(((intptr_t)rawp & 15) == 0);
1004
1005     /*
1006      * Truncate the record if necessary.  If the FIFO write index as still
1007      * at the end of our record we can optimally backindex it.  Otherwise
1008      * we have to insert a pad record to cover the dead space.
1009      *
1010      * We calculate osize which is the 16-byte-aligned original recsize.
1011      * We calculate nsize which is the 16-byte-aligned new recsize.
1012      *
1013      * Due to alignment issues or in case the passed truncation bytes is
1014      * the same as the original payload, nsize may be equal to osize even
1015      * if the committed bytes is less then the originally reserved bytes.
1016      */
1017     if (bytes >= 0) {
1018         KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
1019         osize = (rawp->recsize + 15) & ~15;
1020         rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
1021                         sizeof(struct journal_rawrecend);
1022         nsize = (rawp->recsize + 15) & ~15;
1023         KKASSERT(nsize <= osize);
1024         if (osize == nsize) {
1025             /* do nothing */
1026         } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
1027             /* we are able to backindex the fifo */
1028             jo->fifo.windex -= osize - nsize;
1029         } else {
1030             /* we cannot backindex the fifo, emplace a pad in the dead space */
1031             journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
1032                                 rawp->transid + 1);
1033         }
1034     }
1035
1036     /*
1037      * Fill in the trailer.  Note that unlike pad records, the trailer will
1038      * never overlap the header.
1039      */
1040     rendp = (void *)((char *)rawp + 
1041             ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
1042     rendp->endmagic = JREC_ENDMAGIC;
1043     rendp->recsize = rawp->recsize;
1044     rendp->check = 0;           /* XXX check word, disabled for now */
1045
1046     /*
1047      * Fill in begmagic last.  This will allow the worker thread to proceed.
1048      * Use a memory barrier to guarentee write ordering.  Mark the stream
1049      * as terminated if closeout is set.  This is the typical case.
1050      */
1051     if (closeout)
1052         rawp->streamid |= JREC_STREAMCTL_END;
1053     cpu_sfence();               /* memory and compiler barrier */
1054     rawp->begmagic = JREC_BEGMAGIC;
1055
1056     journal_commit_wakeup(jo);
1057 }
1058
1059 /************************************************************************
1060  *                      TRANSACTION SUPPORT ROUTINES                    *
1061  ************************************************************************
1062  *
1063  * JRECORD_*() - routines to create subrecord transactions and embed them
1064  *               in the logical streams managed by the journal_*() routines.
1065  */
1066
1067 static int16_t sid = JREC_STREAMID_JMIN;
1068
1069 /*
1070  * Initialize the passed jrecord structure and start a new stream transaction
1071  * by reserving an initial build space in the journal's memory FIFO.
1072  */
1073 static void
1074 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
1075 {
1076     bzero(jrec, sizeof(*jrec));
1077     jrec->jo = jo;
1078     if (streamid < 0) {
1079         streamid = sid++;       /* XXX need to track stream ids! */
1080         if (sid == JREC_STREAMID_JMAX)
1081             sid = JREC_STREAMID_JMIN;
1082     }
1083     jrec->streamid = streamid;
1084     jrec->stream_residual = JREC_DEFAULTSIZE;
1085     jrec->stream_reserved = jrec->stream_residual;
1086     jrec->stream_ptr = 
1087         journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
1088 }
1089
1090 /*
1091  * Push a recursive record type.  All pushes should have matching pops.
1092  * The old parent is returned and the newly pushed record becomes the
1093  * new parent.  Note that the old parent's pointer may already be invalid
1094  * or may become invalid if jrecord_write() had to build a new stream
1095  * record, so the caller should not mess with the returned pointer in
1096  * any way other then to save it.
1097  */
1098 static 
1099 struct journal_subrecord *
1100 jrecord_push(struct jrecord *jrec, int16_t rectype)
1101 {
1102     struct journal_subrecord *save;
1103
1104     save = jrec->parent;
1105     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
1106     jrec->last = NULL;
1107     KKASSERT(jrec->parent != NULL);
1108     ++jrec->pushcount;
1109     ++jrec->pushptrgood;        /* cleared on flush */
1110     return(save);
1111 }
1112
1113 /*
1114  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
1115  * on the last record written within the subtransaction.  If the last 
1116  * record written is not accessible or if the subtransaction is empty,
1117  * we must write out a pad record with JMASK_LAST set before popping.
1118  *
1119  * When popping a subtransaction the parent record's recsize field
1120  * will be properly set.  If the parent pointer is no longer valid
1121  * (which can occur if the data has already been flushed out to the
1122  * stream), the protocol spec allows us to leave it 0.
1123  *
1124  * The saved parent pointer which we restore may or may not be valid,
1125  * and if not valid may or may not be NULL, depending on the value
1126  * of pushptrgood.
1127  */
1128 static void
1129 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
1130 {
1131     struct journal_subrecord *last;
1132
1133     KKASSERT(jrec->pushcount > 0);
1134     KKASSERT(jrec->residual == 0);
1135
1136     /*
1137      * Set JMASK_LAST on the last record we wrote at the current
1138      * level.  If last is NULL we either no longer have access to the
1139      * record or the subtransaction was empty and we must write out a pad
1140      * record.
1141      */
1142     if ((last = jrec->last) == NULL) {
1143         jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
1144         last = jrec->last;      /* reload after possible flush */
1145     } else {
1146         last->rectype |= JMASK_LAST;
1147     }
1148
1149     /*
1150      * pushptrgood tells us how many levels of parent record pointers
1151      * are valid.  The jrec only stores the current parent record pointer
1152      * (and it is only valid if pushptrgood != 0).  The higher level parent
1153      * record pointers are saved by the routines calling jrecord_push() and
1154      * jrecord_pop().  These pointers may become stale and we determine
1155      * that fact by tracking the count of valid parent pointers with 
1156      * pushptrgood.  Pointers become invalid when their related stream
1157      * record gets pushed out.
1158      *
1159      * If no pointer is available (the data has already been pushed out),
1160      * then no fixup of e.g. the length field is possible for non-leaf
1161      * nodes.  The protocol allows for this situation by placing a larger
1162      * burden on the program scanning the stream on the other end.
1163      *
1164      * [parentA]
1165      *    [node X]
1166      *    [parentB]
1167      *       [node Y]
1168      *       [node Z]
1169      *    (pop B)       see NOTE B
1170      * (pop A)          see NOTE A
1171      *
1172      * NOTE B:  This pop sets LAST in node Z if the node is still accessible,
1173      *          else a PAD record is appended and LAST is set in that.
1174      *
1175      *          This pop sets the record size in parentB if parentB is still
1176      *          accessible, else the record size is left 0 (the scanner must
1177      *          deal with that).
1178      *
1179      *          This pop sets the new 'last' record to parentB, the pointer
1180      *          to which may or may not still be accessible.
1181      *
1182      * NOTE A:  This pop sets LAST in parentB if the node is still accessible,
1183      *          else a PAD record is appended and LAST is set in that.
1184      *
1185      *          This pop sets the record size in parentA if parentA is still
1186      *          accessible, else the record size is left 0 (the scanner must
1187      *          deal with that).
1188      *
1189      *          This pop sets the new 'last' record to parentA, the pointer
1190      *          to which may or may not still be accessible.
1191      *
1192      * Also note that the last record in the stream transaction, which in
1193      * the above example is parentA, does not currently have the LAST bit
1194      * set.
1195      *
1196      * The current parent becomes the last record relative to the
1197      * saved parent passed into us.  It's validity is based on 
1198      * whether pushptrgood is non-zero prior to decrementing.  The saved
1199      * parent becomes the new parent, and its validity is based on whether
1200      * pushptrgood is non-zero after decrementing.
1201      *
1202      * The old jrec->parent may be NULL if it is no longer accessible.
1203      * If pushptrgood is non-zero, however, it is guarenteed to not
1204      * be NULL (since no flush occured).
1205      */
1206     jrec->last = jrec->parent;
1207     --jrec->pushcount;
1208     if (jrec->pushptrgood) {
1209         KKASSERT(jrec->last != NULL && last != NULL);
1210         if (--jrec->pushptrgood == 0) {
1211             jrec->parent = NULL;        /* 'save' contains garbage or NULL */
1212         } else {
1213             KKASSERT(save != NULL);
1214             jrec->parent = save;        /* 'save' must not be NULL */
1215         }
1216
1217         /*
1218          * Set the record size in the old parent.  'last' still points to
1219          * the original last record in the subtransaction being popped,
1220          * jrec->last points to the old parent (which became the last
1221          * record relative to the new parent being popped into).
1222          */
1223         jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1224     } else {
1225         jrec->parent = NULL;
1226         KKASSERT(jrec->last == NULL);
1227     }
1228 }
1229
1230 /*
1231  * Write out a leaf record, including associated data.
1232  */
1233 static
1234 void
1235 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
1236 {
1237     jrecord_write(jrec, rectype, bytes);
1238     jrecord_data(jrec, ptr, bytes);
1239 }
1240
1241 /*
1242  * Write a leaf record out and return a pointer to its base.  The leaf
1243  * record may contain potentially megabytes of data which is supplied
1244  * in jrecord_data() calls.  The exact amount must be specified in this
1245  * call.
1246  *
1247  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
1248  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
1249  * USE THE RETURN VALUE.
1250  */
1251 static
1252 struct journal_subrecord *
1253 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1254 {
1255     struct journal_subrecord *last;
1256     int pusheditout;
1257
1258     /*
1259      * Try to catch some obvious errors.  Nesting records must specify a
1260      * size of 0, and there should be no left-overs from previous operations
1261      * (such as incomplete data writeouts).
1262      */
1263     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1264     KKASSERT(jrec->residual == 0);
1265
1266     /*
1267      * Check to see if the current stream record has enough room for
1268      * the new subrecord header.  If it doesn't we extend the current
1269      * stream record.
1270      *
1271      * This may have the side effect of pushing out the current stream record
1272      * and creating a new one.  We must adjust our stream tracking fields
1273      * accordingly.
1274      */
1275     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1276         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1277                                 jrec->stream_reserved - jrec->stream_residual,
1278                                 JREC_DEFAULTSIZE, &pusheditout);
1279         if (pusheditout) {
1280             /*
1281              * If a pushout occured, the pushed out stream record was
1282              * truncated as specified and the new record is exactly the
1283              * extension size specified.
1284              */
1285             jrec->stream_reserved = JREC_DEFAULTSIZE;
1286             jrec->stream_residual = JREC_DEFAULTSIZE;
1287             jrec->parent = NULL;        /* no longer accessible */
1288             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1289         } else {
1290             /*
1291              * If no pushout occured the stream record is NOT truncated and
1292              * IS extended.
1293              */
1294             jrec->stream_reserved += JREC_DEFAULTSIZE;
1295             jrec->stream_residual += JREC_DEFAULTSIZE;
1296         }
1297     }
1298     last = (void *)jrec->stream_ptr;
1299     last->rectype = rectype;
1300     last->reserved = 0;
1301     last->recsize = sizeof(struct journal_subrecord) + bytes;
1302     jrec->last = last;
1303     jrec->residual = bytes;             /* remaining data to be posted */
1304     jrec->residual_align = -bytes & 7;  /* post-data alignment required */
1305     jrec->stream_ptr += sizeof(*last);  /* current write pointer */
1306     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1307     return(last);
1308 }
1309
1310 /*
1311  * Write out the data associated with a leaf record.  Any number of calls
1312  * to this routine may be made as long as the byte count adds up to the
1313  * amount originally specified in jrecord_write().
1314  *
1315  * The act of writing out the leaf data may result in numerous stream records
1316  * being pushed out.   Callers should be aware that even the associated
1317  * subrecord header may become inaccessible due to stream record pushouts.
1318  */
1319 static void
1320 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1321 {
1322     int pusheditout;
1323     int extsize;
1324
1325     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1326
1327     /*
1328      * Push out stream records as long as there is insufficient room to hold
1329      * the remaining data.
1330      */
1331     while (jrec->stream_residual < bytes) {
1332         /*
1333          * Fill in any remaining space in the current stream record.
1334          */
1335         bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1336         buf = (const char *)buf + jrec->stream_residual;
1337         bytes -= jrec->stream_residual;
1338         /*jrec->stream_ptr += jrec->stream_residual;*/
1339         jrec->residual -= jrec->stream_residual;
1340         jrec->stream_residual = 0;
1341
1342         /*
1343          * Try to extend the current stream record, but no more then 1/4
1344          * the size of the FIFO.
1345          */
1346         extsize = jrec->jo->fifo.size >> 2;
1347         if (extsize > bytes)
1348             extsize = (bytes + 15) & ~15;
1349
1350         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1351                                 jrec->stream_reserved - jrec->stream_residual,
1352                                 extsize, &pusheditout);
1353         if (pusheditout) {
1354             jrec->stream_reserved = extsize;
1355             jrec->stream_residual = extsize;
1356             jrec->parent = NULL;        /* no longer accessible */
1357             jrec->last = NULL;          /* no longer accessible */
1358             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1359         } else {
1360             jrec->stream_reserved += extsize;
1361             jrec->stream_residual += extsize;
1362         }
1363     }
1364
1365     /*
1366      * Push out any remaining bytes into the current stream record.
1367      */
1368     if (bytes) {
1369         bcopy(buf, jrec->stream_ptr, bytes);
1370         jrec->stream_ptr += bytes;
1371         jrec->stream_residual -= bytes;
1372         jrec->residual -= bytes;
1373     }
1374
1375     /*
1376      * Handle data alignment requirements for the subrecord.  Because the
1377      * stream record's data space is more strictly aligned, it must already
1378      * have sufficient space to hold any subrecord alignment slop.
1379      */
1380     if (jrec->residual == 0 && jrec->residual_align) {
1381         KKASSERT(jrec->residual_align <= jrec->stream_residual);
1382         bzero(jrec->stream_ptr, jrec->residual_align);
1383         jrec->stream_ptr += jrec->residual_align;
1384         jrec->stream_residual -= jrec->residual_align;
1385         jrec->residual_align = 0;
1386     }
1387 }
1388
1389 /*
1390  * We are finished with the transaction.  This closes the transaction created
1391  * by jrecord_init().
1392  *
1393  * NOTE: If abortit is not set then we must be at the top level with no
1394  *       residual subrecord data left to output.
1395  *
1396  *       If abortit is set then we can be in any state, all pushes will be 
1397  *       popped and it is ok for there to be residual data.  This works 
1398  *       because the virtual stream itself is truncated.  Scanners must deal
1399  *       with this situation.
1400  *
1401  * The stream record will be committed or aborted as specified and jrecord
1402  * resources will be cleaned up.
1403  */
1404 static void
1405 jrecord_done(struct jrecord *jrec, int abortit)
1406 {
1407     KKASSERT(jrec->rawp != NULL);
1408
1409     if (abortit) {
1410         journal_abort(jrec->jo, &jrec->rawp);
1411     } else {
1412         KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1413         journal_commit(jrec->jo, &jrec->rawp, 
1414                         jrec->stream_reserved - jrec->stream_residual, 1);
1415     }
1416
1417     /*
1418      * jrec should not be used beyond this point without another init,
1419      * but clean up some fields to ensure that we panic if it is.
1420      *
1421      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1422      */
1423     jrec->jo = NULL;
1424     jrec->stream_ptr = NULL;
1425 }
1426
1427 /************************************************************************
1428  *                      LOW LEVEL RECORD SUPPORT ROUTINES               *
1429  ************************************************************************
1430  *
1431  * These routine create low level recursive and leaf subrecords representing
1432  * common filesystem structures.
1433  */
1434
1435 /*
1436  * Write out a filename path relative to the base of the mount point.
1437  * rectype is typically JLEAF_PATH{1,2,3,4}.
1438  */
1439 static void
1440 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1441 {
1442     char buf[64];       /* local buffer if it fits, else malloced */
1443     char *base;
1444     int pathlen;
1445     int index;
1446     struct namecache *scan;
1447
1448     /*
1449      * Pass 1 - figure out the number of bytes required.  Include terminating
1450      *         \0 on last element and '/' separator on other elements.
1451      */
1452 again:
1453     pathlen = 0;
1454     for (scan = ncp; 
1455          scan && (scan->nc_flag & NCF_MOUNTPT) == 0; 
1456          scan = scan->nc_parent
1457     ) {
1458         pathlen += scan->nc_nlen + 1;
1459     }
1460
1461     if (pathlen <= sizeof(buf))
1462         base = buf;
1463     else
1464         base = malloc(pathlen, M_TEMP, M_INTWAIT);
1465
1466     /*
1467      * Pass 2 - generate the path buffer
1468      */
1469     index = pathlen;
1470     for (scan = ncp; 
1471          scan && (scan->nc_flag & NCF_MOUNTPT) == 0; 
1472          scan = scan->nc_parent
1473     ) {
1474         if (scan->nc_nlen >= index) {
1475             if (base != buf)
1476                 free(base, M_TEMP);
1477             goto again;
1478         }
1479         if (index == pathlen)
1480             base[--index] = 0;
1481         else
1482             base[--index] = '/';
1483         index -= scan->nc_nlen;
1484         bcopy(scan->nc_name, base + index, scan->nc_nlen);
1485     }
1486     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1487     if (base != buf)
1488         free(base, M_TEMP);
1489 }
1490
1491 /*
1492  * Write out a file attribute structure.  While somewhat inefficient, using
1493  * a recursive data structure is the most portable and extensible way.
1494  */
1495 static void
1496 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1497 {
1498     void *save;
1499
1500     save = jrecord_push(jrec, JTYPE_VATTR);
1501     if (vat->va_type != VNON)
1502         jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1503     if (vat->va_uid != VNOVAL)
1504         jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1505     if (vat->va_nlink != VNOVAL)
1506         jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1507     if (vat->va_uid != VNOVAL)
1508         jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1509     if (vat->va_gid != VNOVAL)
1510         jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1511     if (vat->va_fsid != VNOVAL)
1512         jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1513     if (vat->va_fileid != VNOVAL)
1514         jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1515     if (vat->va_size != VNOVAL)
1516         jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1517     if (vat->va_atime.tv_sec != VNOVAL)
1518         jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1519     if (vat->va_mtime.tv_sec != VNOVAL)
1520         jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1521     if (vat->va_ctime.tv_sec != VNOVAL)
1522         jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1523     if (vat->va_gen != VNOVAL)
1524         jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1525     if (vat->va_flags != VNOVAL)
1526         jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1527     if (vat->va_rdev != VNOVAL)
1528         jrecord_leaf(jrec, JLEAF_UDEV, &vat->va_rdev, sizeof(vat->va_rdev));
1529 #if 0
1530     if (vat->va_filerev != VNOVAL)
1531         jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1532 #endif
1533     jrecord_pop(jrec, save);
1534 }
1535
1536 /*
1537  * Write out the creds used to issue a file operation.  If a process is
1538  * available write out additional tracking information related to the 
1539  * process.
1540  *
1541  * XXX additional tracking info
1542  * XXX tty line info
1543  */
1544 static void
1545 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1546 {
1547     void *save;
1548     struct proc *p;
1549
1550     save = jrecord_push(jrec, JTYPE_CRED);
1551     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1552     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1553     if (td && (p = td->td_proc) != NULL) {
1554         jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1555         jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1556     }
1557     jrecord_pop(jrec, save);
1558 }
1559
1560 /*
1561  * Write out information required to identify a vnode
1562  *
1563  * XXX this needs work.  We should write out the inode number as well,
1564  * and in fact avoid writing out the file path for seqential writes
1565  * occuring within e.g. a certain period of time.
1566  */
1567 static void
1568 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1569 {
1570     struct namecache *ncp;
1571
1572     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1573         if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1574             break;
1575     }
1576     if (ncp)
1577         jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1578 }
1579
1580 #if 0
1581 /*
1582  * Write out the current contents of the file within the specified
1583  * range.  This is typically called from within an UNDO section.  A
1584  * locked vnode must be passed.
1585  */
1586 static int
1587 jrecord_write_filearea(struct jrecord *jrec, struct vnode *vp, 
1588                         off_t begoff, off_t endoff)
1589 {
1590 }
1591 #endif
1592
1593 /*
1594  * Write out the data represented by a pagelist
1595  */
1596 static void
1597 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1598                         struct vm_page **pglist, int *rtvals, int pgcount,
1599                         off_t offset)
1600 {
1601     struct msf_buf *msf;
1602     int error;
1603     int b;
1604     int i;
1605
1606     i = 0;
1607     while (i < pgcount) {
1608         /*
1609          * Find the next valid section.  Skip any invalid elements
1610          */
1611         if (rtvals[i] != VM_PAGER_OK) {
1612             ++i;
1613             offset += PAGE_SIZE;
1614             continue;
1615         }
1616
1617         /*
1618          * Figure out how big the valid section is, capping I/O at what the
1619          * MSFBUF can represent.
1620          */
1621         b = i;
1622         while (i < pgcount && i - b != XIO_INTERNAL_PAGES && 
1623                rtvals[i] == VM_PAGER_OK
1624         ) {
1625             ++i;
1626         }
1627
1628         /*
1629          * And write it out.
1630          */
1631         if (i - b) {
1632             error = msf_map_pagelist(&msf, pglist + b, i - b, 0);
1633             if (error == 0) {
1634                 printf("RECORD PUTPAGES %d\n", msf_buf_bytes(msf));
1635                 jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1636                 jrecord_leaf(jrec, rectype, 
1637                              msf_buf_kva(msf), msf_buf_bytes(msf));
1638                 msf_buf_free(msf);
1639             } else {
1640                 printf("jrecord_write_pagelist: mapping failure\n");
1641             }
1642             offset += (off_t)(i - b) << PAGE_SHIFT;
1643         }
1644     }
1645 }
1646
1647 /*
1648  * Write out the data represented by a UIO.
1649  */
1650 struct jwuio_info {
1651     struct jrecord *jrec;
1652     int16_t rectype;
1653 };
1654
1655 static int jrecord_write_uio_callback(void *info, char *buf, int bytes);
1656
1657 static void
1658 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1659 {
1660     struct jwuio_info info = { jrec, rectype };
1661     int error;
1662
1663     if (uio->uio_segflg != UIO_NOCOPY) {
1664         jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset, 
1665                      sizeof(uio->uio_offset));
1666         error = msf_uio_iterate(uio, jrecord_write_uio_callback, &info);
1667         if (error)
1668             printf("XXX warning uio iterate failed %d\n", error);
1669     }
1670 }
1671
1672 static int
1673 jrecord_write_uio_callback(void *info_arg, char *buf, int bytes)
1674 {
1675     struct jwuio_info *info = info_arg;
1676
1677     jrecord_leaf(info->jrec, info->rectype, buf, bytes);
1678     return(0);
1679 }
1680
1681 /************************************************************************
1682  *                      JOURNAL VNOPS                                   *
1683  ************************************************************************
1684  *
1685  * These are function shims replacing the normal filesystem ops.  We become
1686  * responsible for calling the underlying filesystem ops.  We have the choice
1687  * of executing the underlying op first and then generating the journal entry,
1688  * or starting the journal entry, executing the underlying op, and then
1689  * either completing or aborting it.  
1690  *
1691  * The journal is supposed to be a high-level entity, which generally means
1692  * identifying files by name rather then by inode.  Supplying both allows
1693  * the journal to be used both for inode-number-compatible 'mirrors' and
1694  * for simple filesystem replication.
1695  *
1696  * Writes are particularly difficult to deal with because a single write may
1697  * represent a hundred megabyte buffer or more, and both writes and truncations
1698  * require the 'old' data to be written out as well as the new data if the
1699  * log is reversable.  Other issues:
1700  *
1701  * - How to deal with operations on unlinked files (no path available),
1702  *   but which may still be filesystem visible due to hard links.
1703  *
1704  * - How to deal with modifications made via a memory map.
1705  *
1706  * - Future cache coherency support will require cache coherency API calls
1707  *   both prior to and after the call to the underlying VFS.
1708  *
1709  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
1710  * new VFS equivalents (NMKDIR).
1711  */
1712
1713 /*
1714  * Journal vop_settattr { a_vp, a_vap, a_cred, a_td }
1715  */
1716 static
1717 int
1718 journal_setattr(struct vop_setattr_args *ap)
1719 {
1720     struct mount *mp;
1721     struct journal *jo;
1722     struct jrecord jrec;
1723     void *save;         /* warning, save pointers do not always remain valid */
1724     int error;
1725
1726     error = vop_journal_operate_ap(&ap->a_head);
1727     mp = ap->a_head.a_ops->vv_mount;
1728     if (error == 0) {
1729         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1730             jrecord_init(jo, &jrec, -1);
1731             save = jrecord_push(&jrec, JTYPE_SETATTR);
1732             jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1733             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1734             jrecord_write_vattr(&jrec, ap->a_vap);
1735             jrecord_pop(&jrec, save);
1736             jrecord_done(&jrec, 0);
1737         }
1738     }
1739     return (error);
1740 }
1741
1742 /*
1743  * Journal vop_write { a_vp, a_uio, a_ioflag, a_cred }
1744  */
1745 static
1746 int
1747 journal_write(struct vop_write_args *ap)
1748 {
1749     struct mount *mp;
1750     struct journal *jo;
1751     struct jrecord jrec;
1752     struct uio uio_copy;
1753     struct iovec uio_one_iovec;
1754     void *save;         /* warning, save pointers do not always remain valid */
1755     int error;
1756
1757     /*
1758      * This is really nasty.  UIO's don't retain sufficient information to
1759      * be reusable once they've gone through the VOP chain.  The iovecs get
1760      * cleared, so we have to copy the UIO.
1761      *
1762      * XXX fix the UIO code to not destroy iov's during a scan so we can
1763      *     reuse the uio over and over again.
1764      */
1765     uio_copy = *ap->a_uio;
1766     if (uio_copy.uio_iovcnt == 1) {
1767         uio_one_iovec = ap->a_uio->uio_iov[0];
1768         uio_copy.uio_iov = &uio_one_iovec;
1769     } else {
1770         uio_copy.uio_iov = malloc(uio_copy.uio_iovcnt * sizeof(struct iovec),
1771                                     M_JOURNAL, M_WAITOK);
1772         bcopy(ap->a_uio->uio_iov, uio_copy.uio_iov, 
1773                 uio_copy.uio_iovcnt * sizeof(struct iovec));
1774     }
1775
1776     error = vop_journal_operate_ap(&ap->a_head);
1777     mp = ap->a_head.a_ops->vv_mount;
1778     if (error == 0) {
1779         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1780             jrecord_init(jo, &jrec, -1);
1781             save = jrecord_push(&jrec, JTYPE_WRITE);
1782             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1783             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1784             jrecord_write_uio(&jrec, JLEAF_FILEDATA, &uio_copy);
1785             jrecord_pop(&jrec, save);
1786             jrecord_done(&jrec, 0);
1787         }
1788     }
1789
1790     if (uio_copy.uio_iov != &uio_one_iovec)
1791         free(uio_copy.uio_iov, M_JOURNAL);
1792
1793
1794     return (error);
1795 }
1796
1797 /*
1798  * Journal vop_fsync { a_vp, a_waitfor, a_td }
1799  */
1800 static
1801 int
1802 journal_fsync(struct vop_fsync_args *ap)
1803 {
1804     struct mount *mp;
1805     struct journal *jo;
1806     int error;
1807
1808     error = vop_journal_operate_ap(&ap->a_head);
1809     mp = ap->a_head.a_ops->vv_mount;
1810     if (error == 0) {
1811         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1812             /* XXX synchronize pending journal records */
1813         }
1814     }
1815     return (error);
1816 }
1817
1818 /*
1819  * Journal vop_putpages { a_vp, a_m, a_count, a_sync, a_rtvals, a_offset }
1820  *
1821  * note: a_count is in bytes.
1822  */
1823 static
1824 int
1825 journal_putpages(struct vop_putpages_args *ap)
1826 {
1827     struct mount *mp;
1828     struct journal *jo;
1829     struct jrecord jrec;
1830     void *save;         /* warning, save pointers do not always remain valid */
1831     int error;
1832
1833     error = vop_journal_operate_ap(&ap->a_head);
1834     mp = ap->a_head.a_ops->vv_mount;
1835     if (error == 0 && ap->a_count > 0) {
1836         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1837             jrecord_init(jo, &jrec, -1);
1838             save = jrecord_push(&jrec, JTYPE_PUTPAGES);
1839             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1840             jrecord_write_pagelist(&jrec, JLEAF_FILEDATA, 
1841                         ap->a_m, ap->a_rtvals, btoc(ap->a_count), ap->a_offset);
1842             jrecord_pop(&jrec, save);
1843             jrecord_done(&jrec, 0);
1844         }
1845     }
1846     return (error);
1847 }
1848
1849 /*
1850  * Journal vop_setacl { a_vp, a_type, a_aclp, a_cred, a_td }
1851  */
1852 static
1853 int
1854 journal_setacl(struct vop_setacl_args *ap)
1855 {
1856     struct mount *mp;
1857     struct journal *jo;
1858     struct jrecord jrec;
1859     void *save;         /* warning, save pointers do not always remain valid */
1860     int error;
1861
1862     error = vop_journal_operate_ap(&ap->a_head);
1863     mp = ap->a_head.a_ops->vv_mount;
1864     if (error == 0) {
1865         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1866             jrecord_init(jo, &jrec, -1);
1867             save = jrecord_push(&jrec, JTYPE_SETACL);
1868             jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1869             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1870             /* XXX type, aclp */
1871             jrecord_pop(&jrec, save);
1872             jrecord_done(&jrec, 0);
1873         }
1874     }
1875     return (error);
1876 }
1877
1878 /*
1879  * Journal vop_setextattr { a_vp, a_name, a_uio, a_cred, a_td }
1880  */
1881 static
1882 int
1883 journal_setextattr(struct vop_setextattr_args *ap)
1884 {
1885     struct mount *mp;
1886     struct journal *jo;
1887     struct jrecord jrec;
1888     void *save;         /* warning, save pointers do not always remain valid */
1889     int error;
1890
1891     error = vop_journal_operate_ap(&ap->a_head);
1892     mp = ap->a_head.a_ops->vv_mount;
1893     if (error == 0) {
1894         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1895             jrecord_init(jo, &jrec, -1);
1896             save = jrecord_push(&jrec, JTYPE_SETEXTATTR);
1897             jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1898             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1899             jrecord_leaf(&jrec, JLEAF_ATTRNAME, ap->a_name, strlen(ap->a_name));
1900             jrecord_write_uio(&jrec, JLEAF_FILEDATA, ap->a_uio);
1901             jrecord_pop(&jrec, save);
1902             jrecord_done(&jrec, 0);
1903         }
1904     }
1905     return (error);
1906 }
1907
1908 /*
1909  * Journal vop_ncreate { a_ncp, a_vpp, a_cred, a_vap }
1910  */
1911 static
1912 int
1913 journal_ncreate(struct vop_ncreate_args *ap)
1914 {
1915     struct mount *mp;
1916     struct journal *jo;
1917     struct jrecord jrec;
1918     void *save;         /* warning, save pointers do not always remain valid */
1919     int error;
1920
1921     error = vop_journal_operate_ap(&ap->a_head);
1922     mp = ap->a_head.a_ops->vv_mount;
1923     if (error == 0) {
1924         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1925             jrecord_init(jo, &jrec, -1);
1926             save = jrecord_push(&jrec, JTYPE_CREATE);
1927             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1928             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1929             if (*ap->a_vpp)
1930                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
1931             jrecord_pop(&jrec, save);
1932             jrecord_done(&jrec, 0);
1933         }
1934     }
1935     return (error);
1936 }
1937
1938 /*
1939  * Journal vop_nmknod { a_ncp, a_vpp, a_cred, a_vap }
1940  */
1941 static
1942 int
1943 journal_nmknod(struct vop_nmknod_args *ap)
1944 {
1945     struct mount *mp;
1946     struct journal *jo;
1947     struct jrecord jrec;
1948     void *save;         /* warning, save pointers do not always remain valid */
1949     int error;
1950
1951     error = vop_journal_operate_ap(&ap->a_head);
1952     mp = ap->a_head.a_ops->vv_mount;
1953     if (error == 0) {
1954         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1955             jrecord_init(jo, &jrec, -1);
1956             save = jrecord_push(&jrec, JTYPE_MKNOD);
1957             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1958             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1959             jrecord_write_vattr(&jrec, ap->a_vap);
1960             if (*ap->a_vpp)
1961                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
1962             jrecord_pop(&jrec, save);
1963             jrecord_done(&jrec, 0);
1964         }
1965     }
1966     return (error);
1967 }
1968
1969 /*
1970  * Journal vop_nlink { a_ncp, a_vp, a_cred }
1971  */
1972 static
1973 int
1974 journal_nlink(struct vop_nlink_args *ap)
1975 {
1976     struct mount *mp;
1977     struct journal *jo;
1978     struct jrecord jrec;
1979     void *save;         /* warning, save pointers do not always remain valid */
1980     int error;
1981
1982     error = vop_journal_operate_ap(&ap->a_head);
1983     mp = ap->a_head.a_ops->vv_mount;
1984     if (error == 0) {
1985         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1986             jrecord_init(jo, &jrec, -1);
1987             save = jrecord_push(&jrec, JTYPE_LINK);
1988             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1989             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1990             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1991             /* XXX PATH to VP and inode number */
1992             jrecord_pop(&jrec, save);
1993             jrecord_done(&jrec, 0);
1994         }
1995     }
1996     return (error);
1997 }
1998
1999 /*
2000  * Journal vop_symlink { a_ncp, a_vpp, a_cred, a_vap, a_target }
2001  */
2002 static
2003 int
2004 journal_nsymlink(struct vop_nsymlink_args *ap)
2005 {
2006     struct mount *mp;
2007     struct journal *jo;
2008     struct jrecord jrec;
2009     void *save;         /* warning, save pointers do not always remain valid */
2010     int error;
2011
2012     error = vop_journal_operate_ap(&ap->a_head);
2013     mp = ap->a_head.a_ops->vv_mount;
2014     if (error == 0) {
2015         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2016             jrecord_init(jo, &jrec, -1);
2017             save = jrecord_push(&jrec, JTYPE_SYMLINK);
2018             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2019             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2020             jrecord_leaf(&jrec, JLEAF_SYMLINKDATA,
2021                         ap->a_target, strlen(ap->a_target));
2022             if (*ap->a_vpp)
2023                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
2024             jrecord_pop(&jrec, save);
2025             jrecord_done(&jrec, 0);
2026         }
2027     }
2028     return (error);
2029 }
2030
2031 /*
2032  * Journal vop_nwhiteout { a_ncp, a_cred, a_flags }
2033  */
2034 static
2035 int
2036 journal_nwhiteout(struct vop_nwhiteout_args *ap)
2037 {
2038     struct mount *mp;
2039     struct journal *jo;
2040     struct jrecord jrec;
2041     void *save;         /* warning, save pointers do not always remain valid */
2042     int error;
2043
2044     error = vop_journal_operate_ap(&ap->a_head);
2045     mp = ap->a_head.a_ops->vv_mount;
2046     if (error == 0) {
2047         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2048             jrecord_init(jo, &jrec, -1);
2049             save = jrecord_push(&jrec, JTYPE_WHITEOUT);
2050             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2051             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2052             jrecord_pop(&jrec, save);
2053             jrecord_done(&jrec, 0);
2054         }
2055     }
2056     return (error);
2057 }
2058
2059 /*
2060  * Journal vop_nremove { a_ncp, a_cred }
2061  */
2062 static
2063 int
2064 journal_nremove(struct vop_nremove_args *ap)
2065 {
2066     struct mount *mp;
2067     struct journal *jo;
2068     struct jrecord jrec;
2069     void *save;         /* warning, save pointers do not always remain valid */
2070     int error;
2071
2072     error = vop_journal_operate_ap(&ap->a_head);
2073     mp = ap->a_head.a_ops->vv_mount;
2074     if (error == 0) {
2075         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2076             jrecord_init(jo, &jrec, -1);
2077             save = jrecord_push(&jrec, JTYPE_REMOVE);
2078             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2079             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2080             jrecord_pop(&jrec, save);
2081             jrecord_done(&jrec, 0);
2082         }
2083     }
2084     return (error);
2085 }
2086
2087 /*
2088  * Journal vop_nmkdir { a_ncp, a_vpp, a_cred, a_vap }
2089  */
2090 static
2091 int
2092 journal_nmkdir(struct vop_nmkdir_args *ap)
2093 {
2094     struct mount *mp;
2095     struct journal *jo;
2096     struct jrecord jrec;
2097     void *save;         /* warning, save pointers do not always remain valid */
2098     int error;
2099
2100     error = vop_journal_operate_ap(&ap->a_head);
2101     mp = ap->a_head.a_ops->vv_mount;
2102     if (error == 0) {
2103         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2104             jrecord_init(jo, &jrec, -1);
2105             if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
2106                 save = jrecord_push(&jrec, JTYPE_UNDO);
2107                 /* XXX undo operations */
2108                 jrecord_pop(&jrec, save);
2109             }
2110 #if 0
2111             if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
2112                 jrecord_write_audit(&jrec);
2113             }
2114 #endif
2115             save = jrecord_push(&jrec, JTYPE_MKDIR);
2116             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2117             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2118             jrecord_write_vattr(&jrec, ap->a_vap);
2119             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2120             if (*ap->a_vpp)
2121                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
2122             jrecord_pop(&jrec, save);
2123             jrecord_done(&jrec, 0);
2124         }
2125     }
2126     return (error);
2127 }
2128
2129 /*
2130  * Journal vop_nrmdir { a_ncp, a_cred }
2131  */
2132 static
2133 int
2134 journal_nrmdir(struct vop_nrmdir_args *ap)
2135 {
2136     struct mount *mp;
2137     struct journal *jo;
2138     struct jrecord jrec;
2139     void *save;         /* warning, save pointers do not always remain valid */
2140     int error;
2141
2142     error = vop_journal_operate_ap(&ap->a_head);
2143     mp = ap->a_head.a_ops->vv_mount;
2144     if (error == 0) {
2145         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2146             jrecord_init(jo, &jrec, -1);
2147             save = jrecord_push(&jrec, JTYPE_RMDIR);
2148             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2149             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2150             jrecord_pop(&jrec, save);
2151             jrecord_done(&jrec, 0);
2152         }
2153     }
2154     return (error);
2155 }
2156
2157 /*
2158  * Journal vop_nrename { a_fncp, a_tncp, a_cred }
2159  */
2160 static
2161 int
2162 journal_nrename(struct vop_nrename_args *ap)
2163 {
2164     struct mount *mp;
2165     struct journal *jo;
2166     struct jrecord jrec;
2167     void *save;         /* warning, save pointers do not always remain valid */
2168     int error;
2169
2170     error = vop_journal_operate_ap(&ap->a_head);
2171     mp = ap->a_head.a_ops->vv_mount;
2172     if (error == 0) {
2173         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2174             jrecord_init(jo, &jrec, -1);
2175             save = jrecord_push(&jrec, JTYPE_RENAME);
2176             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2177             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_fncp);
2178             jrecord_write_path(&jrec, JLEAF_PATH2, ap->a_tncp);
2179             jrecord_pop(&jrec, save);
2180             jrecord_done(&jrec, 0);
2181         }
2182     }
2183     return (error);
2184 }
2185