The size of a nesting record may not be known (due to the virtual stream
[dragonfly.git] / sys / kern / vfs_journal.c
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.15 2005/07/05 00:14:27 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread. 
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger 
60  * spooling areas then the memory buffer is able to provide by e.g. 
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.  
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/journal.h>
88 #include <sys/file.h>
89 #include <sys/proc.h>
90 #include <sys/msfbuf.h>
91
92 #include <machine/limits.h>
93
94 #include <vm/vm.h>
95 #include <vm/vm_object.h>
96 #include <vm/vm_page.h>
97 #include <vm/vm_pager.h>
98 #include <vm/vnode_pager.h>
99
100 #include <sys/file2.h>
101 #include <sys/thread2.h>
102
103 static int journal_attach(struct mount *mp);
104 static void journal_detach(struct mount *mp);
105 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
106                             const struct mountctl_install_journal *info);
107 static int journal_remove_vfs_journal(struct mount *mp,
108                             const struct mountctl_remove_journal *info);
109 static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
110 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
111 static int journal_status_vfs_journal(struct mount *mp,
112                        const struct mountctl_status_journal *info,
113                        struct mountctl_journal_ret_status *rstat,
114                        int buflen, int *res);
115 static void journal_wthread(void *info);
116 static void journal_rthread(void *info);
117
118 static void *journal_reserve(struct journal *jo, 
119                             struct journal_rawrecbeg **rawpp, 
120                             int16_t streamid, int bytes);
121 static void *journal_extend(struct journal *jo,
122                             struct journal_rawrecbeg **rawpp,
123                             int truncbytes, int bytes, int *newstreamrecp);
124 static void journal_abort(struct journal *jo, 
125                             struct journal_rawrecbeg **rawpp);
126 static void journal_commit(struct journal *jo, 
127                             struct journal_rawrecbeg **rawpp, 
128                             int bytes, int closeout);
129
130 static void jrecord_init(struct journal *jo, 
131                             struct jrecord *jrec, int16_t streamid);
132 static struct journal_subrecord *jrecord_push(
133                             struct jrecord *jrec, int16_t rectype);
134 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
135 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
136                             int16_t rectype, int bytes);
137 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
138 static void jrecord_done(struct jrecord *jrec, int abortit);
139
140 static int journal_setattr(struct vop_setattr_args *ap);
141 static int journal_write(struct vop_write_args *ap);
142 static int journal_fsync(struct vop_fsync_args *ap);
143 static int journal_putpages(struct vop_putpages_args *ap);
144 static int journal_setacl(struct vop_setacl_args *ap);
145 static int journal_setextattr(struct vop_setextattr_args *ap);
146 static int journal_ncreate(struct vop_ncreate_args *ap);
147 static int journal_nmknod(struct vop_nmknod_args *ap);
148 static int journal_nlink(struct vop_nlink_args *ap);
149 static int journal_nsymlink(struct vop_nsymlink_args *ap);
150 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
151 static int journal_nremove(struct vop_nremove_args *ap);
152 static int journal_nmkdir(struct vop_nmkdir_args *ap);
153 static int journal_nrmdir(struct vop_nrmdir_args *ap);
154 static int journal_nrename(struct vop_nrename_args *ap);
155
156 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
157     { &vop_default_desc,                vop_journal_operate_ap },
158     { &vop_mountctl_desc,               (void *)journal_mountctl },
159     { &vop_setattr_desc,                (void *)journal_setattr },
160     { &vop_write_desc,                  (void *)journal_write },
161     { &vop_fsync_desc,                  (void *)journal_fsync },
162     { &vop_putpages_desc,               (void *)journal_putpages },
163     { &vop_setacl_desc,                 (void *)journal_setacl },
164     { &vop_setextattr_desc,             (void *)journal_setextattr },
165     { &vop_ncreate_desc,                (void *)journal_ncreate },
166     { &vop_nmknod_desc,                 (void *)journal_nmknod },
167     { &vop_nlink_desc,                  (void *)journal_nlink },
168     { &vop_nsymlink_desc,               (void *)journal_nsymlink },
169     { &vop_nwhiteout_desc,              (void *)journal_nwhiteout },
170     { &vop_nremove_desc,                (void *)journal_nremove },
171     { &vop_nmkdir_desc,                 (void *)journal_nmkdir },
172     { &vop_nrmdir_desc,                 (void *)journal_nrmdir },
173     { &vop_nrename_desc,                (void *)journal_nrename },
174     { NULL, NULL }
175 };
176
177 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
178 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
179
180 int
181 journal_mountctl(struct vop_mountctl_args *ap)
182 {
183     struct mount *mp;
184     int error = 0;
185
186     mp = ap->a_head.a_ops->vv_mount;
187     KKASSERT(mp);
188
189     if (mp->mnt_vn_journal_ops == NULL) {
190         switch(ap->a_op) {
191         case MOUNTCTL_INSTALL_VFS_JOURNAL:
192             error = journal_attach(mp);
193             if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
194                 error = EINVAL;
195             if (error == 0 && ap->a_fp == NULL)
196                 error = EBADF;
197             if (error == 0)
198                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
199             if (TAILQ_EMPTY(&mp->mnt_jlist))
200                 journal_detach(mp);
201             break;
202         case MOUNTCTL_REMOVE_VFS_JOURNAL:
203         case MOUNTCTL_RESYNC_VFS_JOURNAL:
204         case MOUNTCTL_STATUS_VFS_JOURNAL:
205             error = ENOENT;
206             break;
207         default:
208             error = EOPNOTSUPP;
209             break;
210         }
211     } else {
212         switch(ap->a_op) {
213         case MOUNTCTL_INSTALL_VFS_JOURNAL:
214             if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
215                 error = EINVAL;
216             if (error == 0 && ap->a_fp == NULL)
217                 error = EBADF;
218             if (error == 0)
219                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
220             break;
221         case MOUNTCTL_REMOVE_VFS_JOURNAL:
222             if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
223                 error = EINVAL;
224             if (error == 0)
225                 error = journal_remove_vfs_journal(mp, ap->a_ctl);
226             if (TAILQ_EMPTY(&mp->mnt_jlist))
227                 journal_detach(mp);
228             break;
229         case MOUNTCTL_RESYNC_VFS_JOURNAL:
230             if (ap->a_ctllen != 0)
231                 error = EINVAL;
232             error = journal_resync_vfs_journal(mp, ap->a_ctl);
233             break;
234         case MOUNTCTL_STATUS_VFS_JOURNAL:
235             if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
236                 error = EINVAL;
237             if (error == 0) {
238                 error = journal_status_vfs_journal(mp, ap->a_ctl, 
239                                         ap->a_buf, ap->a_buflen, ap->a_res);
240             }
241             break;
242         default:
243             error = EOPNOTSUPP;
244             break;
245         }
246     }
247     return (error);
248 }
249
250 /*
251  * High level mount point setup.  When a 
252  */
253 static int
254 journal_attach(struct mount *mp)
255 {
256     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops, journal_vnodeop_entries);
257     return(0);
258 }
259
260 static void
261 journal_detach(struct mount *mp)
262 {
263     if (mp->mnt_vn_journal_ops)
264         vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
265 }
266
267 /*
268  * Install a journal on a mount point.  Each journal has an associated worker
269  * thread which is responsible for buffering and spooling the data to the
270  * target.  A mount point may have multiple journals attached to it.  An
271  * initial start record is generated when the journal is associated.
272  */
273 static int
274 journal_install_vfs_journal(struct mount *mp, struct file *fp, 
275                             const struct mountctl_install_journal *info)
276 {
277     struct journal *jo;
278     struct jrecord jrec;
279     int error = 0;
280     int size;
281
282     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
283     bcopy(info->id, jo->id, sizeof(jo->id));
284     jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
285                                 MC_JOURNAL_STOP_REQ);
286
287     /*
288      * Memory FIFO size, round to nearest power of 2
289      */
290     if (info->membufsize) {
291         if (info->membufsize < 65536)
292             size = 65536;
293         else if (info->membufsize > 128 * 1024 * 1024)
294             size = 128 * 1024 * 1024;
295         else
296             size = (int)info->membufsize;
297     } else {
298         size = 1024 * 1024;
299     }
300     jo->fifo.size = 1;
301     while (jo->fifo.size < size)
302         jo->fifo.size <<= 1;
303
304     /*
305      * Other parameters.  If not specified the starting transaction id
306      * will be the current date.
307      */
308     if (info->transid) {
309         jo->transid = info->transid;
310     } else {
311         struct timespec ts;
312         getnanotime(&ts);
313         jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
314     }
315
316     jo->fp = fp;
317
318     /*
319      * Allocate the memory FIFO
320      */
321     jo->fifo.mask = jo->fifo.size - 1;
322     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
323     if (jo->fifo.membase == NULL)
324         error = ENOMEM;
325
326     /*
327      * Create the worker thread and generate the association record.
328      */
329     if (error) {
330         free(jo, M_JOURNAL);
331     } else {
332         fhold(fp);
333         jo->flags |= MC_JOURNAL_WACTIVE;
334         lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
335                         TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
336         lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
337         lwkt_schedule(&jo->wthread);
338
339         if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
340             jo->flags |= MC_JOURNAL_RACTIVE;
341             lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
342                         TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
343             lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
344             lwkt_schedule(&jo->rthread);
345         }
346         jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
347         jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
348         jrecord_done(&jrec, 0);
349         TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
350     }
351     return(error);
352 }
353
354 /*
355  * Disassociate a journal from a mount point and terminate its worker thread.
356  * A final termination record is written out before the file pointer is
357  * dropped.
358  */
359 static int
360 journal_remove_vfs_journal(struct mount *mp, 
361                            const struct mountctl_remove_journal *info)
362 {
363     struct journal *jo;
364     int error;
365
366     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
367         if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
368             break;
369     }
370     if (jo)
371         error = journal_destroy(mp, jo, info->flags);
372     else
373         error = EINVAL;
374     return (error);
375 }
376
377 /*
378  * Remove all journals associated with a mount point.  Usually called
379  * by the umount code.
380  */
381 void
382 journal_remove_all_journals(struct mount *mp, int flags)
383 {
384     struct journal *jo;
385
386     while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
387         journal_destroy(mp, jo, flags);
388     }
389 }
390
391 static int
392 journal_destroy(struct mount *mp, struct journal *jo, int flags)
393 {
394     struct jrecord jrec;
395
396     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
397
398     jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
399     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
400     jrecord_done(&jrec, 0);
401
402     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
403     wakeup(&jo->fifo);
404     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
405         tsleep(jo, 0, "jwait", 0);
406     }
407     lwkt_free_thread(&jo->wthread); /* XXX SMP */
408     if (jo->fp)
409         fdrop(jo->fp, curthread);
410     if (jo->fifo.membase)
411         free(jo->fifo.membase, M_JFIFO);
412     free(jo, M_JOURNAL);
413     return(0);
414 }
415
416 static int
417 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
418 {
419     return(EINVAL);
420 }
421
422 static int
423 journal_status_vfs_journal(struct mount *mp, 
424                        const struct mountctl_status_journal *info,
425                        struct mountctl_journal_ret_status *rstat,
426                        int buflen, int *res)
427 {
428     struct journal *jo;
429     int error = 0;
430     int index;
431
432     index = 0;
433     *res = 0;
434     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
435         if (info->index == MC_JOURNAL_INDEX_ID) {
436             if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
437                 continue;
438         } else if (info->index >= 0) {
439             if (info->index < index)
440                 continue;
441         } else if (info->index != MC_JOURNAL_INDEX_ALL) {
442             continue;
443         }
444         if (buflen < sizeof(*rstat)) {
445             if (*res)
446                 rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
447             else
448                 error = EINVAL;
449             break;
450         }
451         bzero(rstat, sizeof(*rstat));
452         rstat->recsize = sizeof(*rstat);
453         bcopy(jo->id, rstat->id, sizeof(jo->id));
454         rstat->index = index;
455         rstat->membufsize = jo->fifo.size;
456         rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
457         rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
458         rstat->bytessent = jo->total_acked;
459         ++rstat;
460         ++index;
461         *res += sizeof(*rstat);
462         buflen -= sizeof(*rstat);
463     }
464     return(error);
465 }
466
467 /*
468  * The per-journal worker thread is responsible for writing out the
469  * journal's FIFO to the target stream.
470  */
471 static void
472 journal_wthread(void *info)
473 {
474     struct journal *jo = info;
475     struct journal_rawrecbeg *rawp;
476     int bytes;
477     int error;
478     int avail;
479     int res;
480
481     for (;;) {
482         /*
483          * Calculate the number of bytes available to write.  This buffer
484          * area may contain reserved records so we can't just write it out
485          * without further checks.
486          */
487         bytes = jo->fifo.windex - jo->fifo.rindex;
488
489         /*
490          * sleep if no bytes are available or if an incomplete record is
491          * encountered (it needs to be filled in before we can write it
492          * out), and skip any pad records that we encounter.
493          */
494         if (bytes == 0) {
495             if (jo->flags & MC_JOURNAL_STOP_REQ)
496                 break;
497             tsleep(&jo->fifo, 0, "jfifo", hz);
498             continue;
499         }
500
501         /*
502          * Sleep if we can not go any further due to hitting an incomplete
503          * record.  This case should occur rarely but may have to be better
504          * optimized XXX.
505          */
506         rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
507         if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
508             tsleep(&jo->fifo, 0, "jpad", hz);
509             continue;
510         }
511
512         /*
513          * Skip any pad records.  We do not write out pad records if we can
514          * help it. 
515          *
516          * If xindex is caught up to rindex it gets incremented along with
517          * rindex.  XXX SMP
518          */
519         if (rawp->streamid == JREC_STREAMID_PAD) {
520             if (jo->fifo.rindex == jo->fifo.xindex)
521                 jo->fifo.xindex += (rawp->recsize + 15) & ~15;
522             jo->fifo.rindex += (rawp->recsize + 15) & ~15;
523             jo->total_acked += bytes;
524             KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
525             continue;
526         }
527
528         /*
529          * 'bytes' is the amount of data that can potentially be written out.  
530          * Calculate 'res', the amount of data that can actually be written
531          * out.  res is bounded either by hitting the end of the physical
532          * memory buffer or by hitting an incomplete record.  Incomplete
533          * records often occur due to the way the space reservation model
534          * works.
535          */
536         res = 0;
537         avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
538         while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
539             res += (rawp->recsize + 15) & ~15;
540             if (res >= avail) {
541                 KKASSERT(res == avail);
542                 break;
543             }
544             rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
545         }
546
547         /*
548          * Issue the write and deal with any errors or other conditions.
549          * For now assume blocking I/O.  Since we are record-aware the
550          * code cannot yet handle partial writes.
551          *
552          * XXX EWOULDBLOCK/NBIO
553          * XXX notification on failure
554          * XXX permanent verses temporary failures
555          * XXX two-way acknowledgement stream in the return direction / xindex
556          */
557         bytes = res;
558         error = fp_write(jo->fp, 
559                         jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
560                         bytes, &res);
561         if (error) {
562             printf("journal_thread(%s) write, error %d\n", jo->id, error);
563             /* XXX */
564         } else {
565             KKASSERT(res == bytes);
566         }
567
568         /*
569          * Advance rindex.  If the journal stream is not full duplex we also
570          * advance xindex, otherwise the rjournal thread is responsible for
571          * advancing xindex.
572          */
573         jo->fifo.rindex += bytes;
574         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
575             jo->fifo.xindex += bytes;
576         jo->total_acked += bytes;
577         KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
578         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
579             if (jo->flags & MC_JOURNAL_WWAIT) {
580                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
581                 wakeup(&jo->fifo.windex);
582             }
583         }
584     }
585     jo->flags &= ~MC_JOURNAL_WACTIVE;
586     wakeup(jo);
587     wakeup(&jo->fifo.windex);
588 }
589
590 /*
591  * A second per-journal worker thread is created for two-way journaling
592  * streams to deal with the return acknowledgement stream.
593  */
594 static void
595 journal_rthread(void *info)
596 {
597     struct journal_rawrecbeg *rawp;
598     struct journal_ackrecord ack;
599     struct journal *jo = info;
600     int64_t transid;
601     int error;
602     int count;
603     int bytes;
604     int index;
605
606     transid = 0;
607     error = 0;
608
609     for (;;) {
610         /*
611          * We have been asked to stop
612          */
613         if (jo->flags & MC_JOURNAL_STOP_REQ)
614                 break;
615
616         /*
617          * If we have no active transaction id, get one from the return
618          * stream.
619          */
620         if (transid == 0) {
621             for (index = 0; index < sizeof(ack); index += count) {
622                 error = fp_read(jo->fp, &ack, sizeof(ack), &count);
623                 if (error)
624                     break;
625                 if (count == 0)
626                     tsleep(&jo->fifo.xindex, 0, "jread", hz);
627             }
628             if (error) {
629                 printf("read error %d on receive stream\n", error);
630                 break;
631             }
632             if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
633                 ack.rend.endmagic != JREC_ENDMAGIC
634             ) {
635                 printf("bad begmagic or endmagic on receive stream\n");
636                 break;
637             }
638             transid = ack.rbeg.transid;
639         }
640
641         /*
642          * Calculate the number of unacknowledged bytes.  If there are no
643          * unacknowledged bytes then unsent data was acknowledged, report,
644          * sleep a bit, and loop in that case.  This should not happen 
645          * normally.  The ack record is thrown away.
646          */
647         bytes = jo->fifo.rindex - jo->fifo.xindex;
648
649         if (bytes == 0) {
650             printf("warning: unsent data acknowledged\n");
651             tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
652             transid = 0;
653             continue;
654         }
655
656         /*
657          * Since rindex has advanceted, the record pointed to by xindex
658          * must be a valid record.
659          */
660         rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
661         KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
662         KKASSERT(rawp->recsize <= bytes);
663
664         /*
665          * The target can acknowledge several records at once.
666          */
667         if (rawp->transid < transid) {
668             printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
669             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
670             if (jo->flags & MC_JOURNAL_WWAIT) {
671                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
672                 wakeup(&jo->fifo.windex);
673             }
674             continue;
675         }
676         if (rawp->transid == transid) {
677             printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
678             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
679             if (jo->flags & MC_JOURNAL_WWAIT) {
680                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
681                 wakeup(&jo->fifo.windex);
682             }
683             transid = 0;
684             continue;
685         }
686         printf("warning: unsent data(2) acknowledged\n");
687         transid = 0;
688     }
689     jo->flags &= ~MC_JOURNAL_RACTIVE;
690     wakeup(jo);
691     wakeup(&jo->fifo.windex);
692 }
693
694 /*
695  * This builds a pad record which the journaling thread will skip over.  Pad
696  * records are required when we are unable to reserve sufficient stream space
697  * due to insufficient space at the end of the physical memory fifo.
698  *
699  * Even though the record is not transmitted, a normal transid must be 
700  * assigned to it so link recovery operations after a failure work properly.
701  */
702 static
703 void
704 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
705 {
706     struct journal_rawrecend *rendp;
707     
708     KKASSERT((recsize & 15) == 0 && recsize >= 16);
709
710     rawp->streamid = JREC_STREAMID_PAD;
711     rawp->recsize = recsize;    /* must be 16-byte aligned */
712     rawp->transid = transid;
713     /*
714      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
715      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
716      * hopefully cause the compiler to not make any assumptions.
717      */
718     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
719     rendp->endmagic = JREC_ENDMAGIC;
720     rendp->check = 0;
721     rendp->recsize = rawp->recsize;
722
723     /*
724      * Set the begin magic last.  This is what will allow the journal
725      * thread to write the record out.  Use a store fence to prevent
726      * compiler and cpu reordering of the writes.
727      */
728     cpu_sfence();
729     rawp->begmagic = JREC_BEGMAGIC;
730 }
731
732 /*
733  * Wake up the worker thread if the FIFO is more then half full or if
734  * someone is waiting for space to be freed up.  Otherwise let the 
735  * heartbeat deal with it.  Being able to avoid waking up the worker
736  * is the key to the journal's cpu performance.
737  */
738 static __inline
739 void
740 journal_commit_wakeup(struct journal *jo)
741 {
742     int avail;
743
744     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
745     KKASSERT(avail >= 0);
746     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
747         wakeup(&jo->fifo);
748 }
749
750 /*
751  * Create a new BEGIN stream record with the specified streamid and the
752  * specified amount of payload space.  *rawpp will be set to point to the
753  * base of the new stream record and a pointer to the base of the payload
754  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
755  * making this call.  The raw record header will be partially initialized.
756  *
757  * A stream can be extended, aborted, or committed by other API calls
758  * below.  This may result in a sequence of potentially disconnected
759  * stream records to be output to the journaling target.  The first record
760  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
761  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
762  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
763  * up being the same as the first, in which case the bits are all set in
764  * the first record.
765  *
766  * The stream record is created in an incomplete state by setting the begin
767  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
768  * flushing the fifo past our record until we have finished populating it.
769  * Other threads can reserve and operate on their own space without stalling
770  * but the stream output will stall until we have completed operations.  The
771  * memory FIFO is intended to be large enough to absorb such situations
772  * without stalling out other threads.
773  */
774 static
775 void *
776 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
777                 int16_t streamid, int bytes)
778 {
779     struct journal_rawrecbeg *rawp;
780     int avail;
781     int availtoend;
782     int req;
783
784     /*
785      * Add header and trailer overheads to the passed payload.  Note that
786      * the passed payload size need not be aligned in any way.
787      */
788     bytes += sizeof(struct journal_rawrecbeg);
789     bytes += sizeof(struct journal_rawrecend);
790
791     for (;;) {
792         /*
793          * First, check boundary conditions.  If the request would wrap around
794          * we have to skip past the ending block and return to the beginning
795          * of the FIFO's buffer.  Calculate 'req' which is the actual number
796          * of bytes being reserved, including wrap-around dead space.
797          *
798          * Neither 'bytes' or 'req' are aligned.
799          *
800          * Note that availtoend is not truncated to avail and so cannot be
801          * used to determine whether the reservation is possible by itself.
802          * Also, since all fifo ops are 16-byte aligned, we can check
803          * the size before calculating the aligned size.
804          */
805         availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
806         KKASSERT((availtoend & 15) == 0);
807         if (bytes > availtoend) 
808             req = bytes + availtoend;   /* add pad to end */
809         else
810             req = bytes;
811
812         /*
813          * Next calculate the total available space and see if it is
814          * sufficient.  We cannot overwrite previously buffered data
815          * past xindex because otherwise we would not be able to restart
816          * a broken link at the target's last point of commit.
817          */
818         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
819         KKASSERT(avail >= 0 && (avail & 15) == 0);
820
821         if (avail < req) {
822             /* XXX MC_JOURNAL_STOP_IMM */
823             jo->flags |= MC_JOURNAL_WWAIT;
824             tsleep(&jo->fifo.windex, 0, "jwrite", 0);
825             continue;
826         }
827
828         /*
829          * Create a pad record for any dead space and create an incomplete
830          * record for the live space, then return a pointer to the
831          * contiguous buffer space that was requested.
832          *
833          * NOTE: The worker thread will not flush past an incomplete
834          * record, so the reserved space can be filled in at-will.  The
835          * journaling code must also be aware the reserved sections occuring
836          * after this one will also not be written out even if completed
837          * until this one is completed.
838          *
839          * The transaction id must accomodate real and potential pad creation.
840          */
841         rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
842         if (req != bytes) {
843             journal_build_pad(rawp, availtoend, jo->transid);
844             ++jo->transid;
845             rawp = (void *)jo->fifo.membase;
846         }
847         rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
848         rawp->recsize = bytes;                  /* (unaligned size) */
849         rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
850         rawp->transid = jo->transid;
851         jo->transid += 2;
852
853         /*
854          * Issue a memory barrier to guarentee that the record data has been
855          * properly initialized before we advance the write index and return
856          * a pointer to the reserved record.  Otherwise the worker thread
857          * could accidently run past us.
858          *
859          * Note that stream records are always 16-byte aligned.
860          */
861         cpu_sfence();
862         jo->fifo.windex += (req + 15) & ~15;
863         *rawpp = rawp;
864         return(rawp + 1);
865     }
866     /* not reached */
867     *rawpp = NULL;
868     return(NULL);
869 }
870
871 /*
872  * Attempt to extend the stream record by <bytes> worth of payload space.
873  *
874  * If it is possible to extend the existing stream record no truncation
875  * occurs and the record is extended as specified.  A pointer to the 
876  * truncation offset within the payload space is returned.
877  *
878  * If it is not possible to do this the existing stream record is truncated
879  * and committed, and a new stream record of size <bytes> is created.  A
880  * pointer to the base of the new stream record's payload space is returned.
881  *
882  * *rawpp is set to the new reservation in the case of a new record but
883  * the caller cannot depend on a comparison with the old rawp to determine if
884  * this case occurs because we could end up using the same memory FIFO
885  * offset for the new stream record.  Use *newstreamrecp instead.
886  */
887 static void *
888 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
889                 int truncbytes, int bytes, int *newstreamrecp)
890 {
891     struct journal_rawrecbeg *rawp;
892     int16_t streamid;
893     int availtoend;
894     int avail;
895     int osize;
896     int nsize;
897     int wbase;
898     void *rptr;
899
900     *newstreamrecp = 0;
901     rawp = *rawpp;
902     osize = (rawp->recsize + 15) & ~15;
903     nsize = (rawp->recsize + bytes + 15) & ~15;
904     wbase = (char *)rawp - jo->fifo.membase;
905
906     /*
907      * If the aligned record size does not change we can trivially adjust
908      * the record size.
909      */
910     if (nsize == osize) {
911         rawp->recsize += bytes;
912         return((char *)(rawp + 1) + truncbytes);
913     }
914
915     /*
916      * If the fifo's write index hasn't been modified since we made the
917      * reservation and we do not hit any boundary conditions, we can 
918      * trivially make the record smaller or larger.
919      */
920     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
921         availtoend = jo->fifo.size - wbase;
922         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
923         KKASSERT((availtoend & 15) == 0);
924         KKASSERT((avail & 15) == 0);
925         if (nsize <= avail && nsize <= availtoend) {
926             jo->fifo.windex += nsize - osize;
927             rawp->recsize += bytes;
928             return((char *)(rawp + 1) + truncbytes);
929         }
930     }
931
932     /*
933      * It was not possible to extend the buffer.  Commit the current
934      * buffer and create a new one.  We manually clear the BEGIN mark that
935      * journal_reserve() creates (because this is a continuing record, not
936      * the start of a new stream).
937      */
938     streamid = rawp->streamid & JREC_STREAMID_MASK;
939     journal_commit(jo, rawpp, truncbytes, 0);
940     rptr = journal_reserve(jo, rawpp, streamid, bytes);
941     rawp = *rawpp;
942     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
943     *newstreamrecp = 1;
944     return(rptr);
945 }
946
947 /*
948  * Abort a journal record.  If the transaction record represents a stream
949  * BEGIN and we can reverse the fifo's write index we can simply reverse
950  * index the entire record, as if it were never reserved in the first place.
951  *
952  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
953  * with the payload truncated to 0 bytes.
954  */
955 static void
956 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
957 {
958     struct journal_rawrecbeg *rawp;
959     int osize;
960
961     rawp = *rawpp;
962     osize = (rawp->recsize + 15) & ~15;
963
964     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
965         (jo->fifo.windex & jo->fifo.mask) == 
966          (char *)rawp - jo->fifo.membase + osize)
967     {
968         jo->fifo.windex -= osize;
969         *rawpp = NULL;
970     } else {
971         rawp->streamid |= JREC_STREAMCTL_ABORTED;
972         journal_commit(jo, rawpp, 0, 1);
973     }
974 }
975
976 /*
977  * Commit a journal record and potentially truncate it to the specified
978  * number of payload bytes.  If you do not want to truncate the record,
979  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
980  * field includes header and trailer and will not be correct.  Note that
981  * passing 0 will truncate the entire data payload of the record.
982  *
983  * The logical stream is terminated by this function.
984  *
985  * If truncation occurs, and it is not possible to physically optimize the
986  * memory FIFO due to other threads having reserved space after ours,
987  * the remaining reserved space will be covered by a pad record.
988  */
989 static void
990 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
991                 int bytes, int closeout)
992 {
993     struct journal_rawrecbeg *rawp;
994     struct journal_rawrecend *rendp;
995     int osize;
996     int nsize;
997
998     rawp = *rawpp;
999     *rawpp = NULL;
1000
1001     KKASSERT((char *)rawp >= jo->fifo.membase &&
1002              (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
1003     KKASSERT(((intptr_t)rawp & 15) == 0);
1004
1005     /*
1006      * Truncate the record if necessary.  If the FIFO write index as still
1007      * at the end of our record we can optimally backindex it.  Otherwise
1008      * we have to insert a pad record to cover the dead space.
1009      *
1010      * We calculate osize which is the 16-byte-aligned original recsize.
1011      * We calculate nsize which is the 16-byte-aligned new recsize.
1012      *
1013      * Due to alignment issues or in case the passed truncation bytes is
1014      * the same as the original payload, nsize may be equal to osize even
1015      * if the committed bytes is less then the originally reserved bytes.
1016      */
1017     if (bytes >= 0) {
1018         KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
1019         osize = (rawp->recsize + 15) & ~15;
1020         rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
1021                         sizeof(struct journal_rawrecend);
1022         nsize = (rawp->recsize + 15) & ~15;
1023         KKASSERT(nsize <= osize);
1024         if (osize == nsize) {
1025             /* do nothing */
1026         } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
1027             /* we are able to backindex the fifo */
1028             jo->fifo.windex -= osize - nsize;
1029         } else {
1030             /* we cannot backindex the fifo, emplace a pad in the dead space */
1031             journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
1032                                 rawp->transid + 1);
1033         }
1034     }
1035
1036     /*
1037      * Fill in the trailer.  Note that unlike pad records, the trailer will
1038      * never overlap the header.
1039      */
1040     rendp = (void *)((char *)rawp + 
1041             ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
1042     rendp->endmagic = JREC_ENDMAGIC;
1043     rendp->recsize = rawp->recsize;
1044     rendp->check = 0;           /* XXX check word, disabled for now */
1045
1046     /*
1047      * Fill in begmagic last.  This will allow the worker thread to proceed.
1048      * Use a memory barrier to guarentee write ordering.  Mark the stream
1049      * as terminated if closeout is set.  This is the typical case.
1050      */
1051     if (closeout)
1052         rawp->streamid |= JREC_STREAMCTL_END;
1053     cpu_sfence();               /* memory and compiler barrier */
1054     rawp->begmagic = JREC_BEGMAGIC;
1055
1056     journal_commit_wakeup(jo);
1057 }
1058
1059 /************************************************************************
1060  *                      TRANSACTION SUPPORT ROUTINES                    *
1061  ************************************************************************
1062  *
1063  * JRECORD_*() - routines to create subrecord transactions and embed them
1064  *               in the logical streams managed by the journal_*() routines.
1065  */
1066
1067 static int16_t sid = JREC_STREAMID_JMIN;
1068
1069 /*
1070  * Initialize the passed jrecord structure and start a new stream transaction
1071  * by reserving an initial build space in the journal's memory FIFO.
1072  */
1073 static void
1074 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
1075 {
1076     bzero(jrec, sizeof(*jrec));
1077     jrec->jo = jo;
1078     if (streamid < 0) {
1079         streamid = sid++;       /* XXX need to track stream ids! */
1080         if (sid == JREC_STREAMID_JMAX)
1081             sid = JREC_STREAMID_JMIN;
1082     }
1083     jrec->streamid = streamid;
1084     jrec->stream_residual = JREC_DEFAULTSIZE;
1085     jrec->stream_reserved = jrec->stream_residual;
1086     jrec->stream_ptr = 
1087         journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
1088 }
1089
1090 /*
1091  * Push a recursive record type.  All pushes should have matching pops.
1092  * The old parent is returned and the newly pushed record becomes the
1093  * new parent.  Note that the old parent's pointer may already be invalid
1094  * or may become invalid if jrecord_write() had to build a new stream
1095  * record, so the caller should not mess with the returned pointer in
1096  * any way other then to save it.
1097  */
1098 static 
1099 struct journal_subrecord *
1100 jrecord_push(struct jrecord *jrec, int16_t rectype)
1101 {
1102     struct journal_subrecord *save;
1103
1104     save = jrec->parent;
1105     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
1106     jrec->last = NULL;
1107     KKASSERT(jrec->parent != NULL);
1108     ++jrec->pushcount;
1109     ++jrec->pushptrgood;        /* cleared on flush */
1110     return(save);
1111 }
1112
1113 /*
1114  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
1115  * on the last record written within the subtransaction.  If the last 
1116  * record written is not accessible or if the subtransaction is empty,
1117  * we must write out a pad record with JMASK_LAST set before popping.
1118  *
1119  * When popping a subtransaction the parent record's recsize field
1120  * will be properly set.  If the parent pointer is no longer valid
1121  * (which can occur if the data has already been flushed out to the
1122  * stream), the protocol spec allows us to leave it 0.
1123  *
1124  * The saved parent pointer which we restore may or may not be valid,
1125  * and if not valid may or may not be NULL, depending on the value
1126  * of pushptrgood.
1127  */
1128 static void
1129 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
1130 {
1131     struct journal_subrecord *last;
1132
1133     KKASSERT(jrec->pushcount > 0);
1134     KKASSERT(jrec->residual == 0);
1135
1136     /*
1137      * Set JMASK_LAST on the last record we wrote at the current
1138      * level.  If last is NULL we either no longer have access to the
1139      * record or the subtransaction was empty and we must write out a pad
1140      * record.
1141      */
1142     if ((last = jrec->last) == NULL) {
1143         jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
1144         last = jrec->last;      /* reload after possible flush */
1145     } else {
1146         last->rectype |= JMASK_LAST;
1147     }
1148
1149     /*
1150      * pushptrgood tells us how many levels of parent record pointers
1151      * are valid.  The jrec only stores the current parent record pointer
1152      * (and it is only valid if pushptrgood != 0).  The higher level parent
1153      * record pointers are saved by the routines calling jrecord_push() and
1154      * jrecord_pop().  These pointers may become stale and we determine
1155      * that fact by tracking the count of valid parent pointers with 
1156      * pushptrgood.  Pointers become invalid when their related stream
1157      * record gets pushed out.
1158      *
1159      * If no pointer is available (the data has already been pushed out),
1160      * then no fixup of e.g. the length field is possible for non-leaf
1161      * nodes.  The protocol allows for this situation by placing a larger
1162      * burden on the program scanning the stream on the other end.
1163      *
1164      * [parentA]
1165      *    [node X]
1166      *    [parentB]
1167      *       [node Y]
1168      *       [node Z]
1169      *    (pop B)       see NOTE B
1170      * (pop A)          see NOTE A
1171      *
1172      * NOTE B:  This pop sets LAST in node Z if the node is still accessible,
1173      *          else a PAD record is appended and LAST is set in that.
1174      *
1175      *          This pop sets the record size in parentB if parentB is still
1176      *          accessible, else the record size is left 0 (the scanner must
1177      *          deal with that).
1178      *
1179      *          This pop sets the new 'last' record to parentB, the pointer
1180      *          to which may or may not still be accessible.
1181      *
1182      * NOTE A:  This pop sets LAST in parentB if the node is still accessible,
1183      *          else a PAD record is appended and LAST is set in that.
1184      *
1185      *          This pop sets the record size in parentA if parentA is still
1186      *          accessible, else the record size is left 0 (the scanner must
1187      *          deal with that).
1188      *
1189      *          This pop sets the new 'last' record to parentA, the pointer
1190      *          to which may or may not still be accessible.
1191      *
1192      * Also note that the last record in the stream transaction, which in
1193      * the above example is parentA, does not currently have the LAST bit
1194      * set.
1195      *
1196      * The current parent becomes the last record relative to the
1197      * saved parent passed into us.  It's validity is based on 
1198      * whether pushptrgood is non-zero prior to decrementing.  The saved
1199      * parent becomes the new parent, and its validity is based on whether
1200      * pushptrgood is non-zero after decrementing.
1201      *
1202      * The old jrec->parent may be NULL if it is no longer accessible.
1203      * If pushptrgood is non-zero, however, it is guarenteed to not
1204      * be NULL (since no flush occured).
1205      */
1206     jrec->last = jrec->parent;
1207     --jrec->pushcount;
1208     if (jrec->pushptrgood) {
1209         KKASSERT(jrec->last != NULL && last != NULL);
1210         if (--jrec->pushptrgood == 0) {
1211             jrec->parent = NULL;        /* 'save' contains garbage or NULL */
1212         } else {
1213             KKASSERT(save != NULL);
1214             jrec->parent = save;        /* 'save' must not be NULL */
1215         }
1216
1217         /*
1218          * Set the record size in the old parent.  'last' still points to
1219          * the original last record in the subtransaction being popped,
1220          * jrec->last points to the old parent (which became the last
1221          * record relative to the new parent being popped into).
1222          */
1223         jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1224     } else {
1225         jrec->parent = NULL;
1226         KKASSERT(jrec->last == NULL);
1227     }
1228 }
1229
1230 /*
1231  * Write out a leaf record, including associated data.
1232  */
1233 static
1234 void
1235 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
1236 {
1237     jrecord_write(jrec, rectype, bytes);
1238     jrecord_data(jrec, ptr, bytes);
1239 }
1240
1241 /*
1242  * Write a leaf record out and return a pointer to its base.  The leaf
1243  * record may contain potentially megabytes of data which is supplied
1244  * in jrecord_data() calls.  The exact amount must be specified in this
1245  * call.
1246  *
1247  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
1248  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
1249  * USE THE RETURN VALUE.
1250  */
1251 static
1252 struct journal_subrecord *
1253 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1254 {
1255     struct journal_subrecord *last;
1256     int pusheditout;
1257
1258     /*
1259      * Try to catch some obvious errors.  Nesting records must specify a
1260      * size of 0, and there should be no left-overs from previous operations
1261      * (such as incomplete data writeouts).
1262      */
1263     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1264     KKASSERT(jrec->residual == 0);
1265
1266     /*
1267      * Check to see if the current stream record has enough room for
1268      * the new subrecord header.  If it doesn't we extend the current
1269      * stream record.
1270      *
1271      * This may have the side effect of pushing out the current stream record
1272      * and creating a new one.  We must adjust our stream tracking fields
1273      * accordingly.
1274      */
1275     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1276         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1277                                 jrec->stream_reserved - jrec->stream_residual,
1278                                 JREC_DEFAULTSIZE, &pusheditout);
1279         if (pusheditout) {
1280             /*
1281              * If a pushout occured, the pushed out stream record was
1282              * truncated as specified and the new record is exactly the
1283              * extension size specified.
1284              */
1285             jrec->stream_reserved = JREC_DEFAULTSIZE;
1286             jrec->stream_residual = JREC_DEFAULTSIZE;
1287             jrec->parent = NULL;        /* no longer accessible */
1288             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1289         } else {
1290             /*
1291              * If no pushout occured the stream record is NOT truncated and
1292              * IS extended.
1293              */
1294             jrec->stream_reserved += JREC_DEFAULTSIZE;
1295             jrec->stream_residual += JREC_DEFAULTSIZE;
1296         }
1297     }
1298     last = (void *)jrec->stream_ptr;
1299     last->rectype = rectype;
1300     last->reserved = 0;
1301
1302     /*
1303      * We may not know the record size for recursive records and the 
1304      * header may become unavailable due to limited FIFO space.  Write
1305      * -1 to indicate this special case.
1306      */
1307     if ((rectype & JMASK_NESTED) && bytes == 0)
1308         last->recsize = -1;
1309     else
1310         last->recsize = sizeof(struct journal_subrecord) + bytes;
1311     jrec->last = last;
1312     jrec->residual = bytes;             /* remaining data to be posted */
1313     jrec->residual_align = -bytes & 7;  /* post-data alignment required */
1314     jrec->stream_ptr += sizeof(*last);  /* current write pointer */
1315     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1316     return(last);
1317 }
1318
1319 /*
1320  * Write out the data associated with a leaf record.  Any number of calls
1321  * to this routine may be made as long as the byte count adds up to the
1322  * amount originally specified in jrecord_write().
1323  *
1324  * The act of writing out the leaf data may result in numerous stream records
1325  * being pushed out.   Callers should be aware that even the associated
1326  * subrecord header may become inaccessible due to stream record pushouts.
1327  */
1328 static void
1329 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1330 {
1331     int pusheditout;
1332     int extsize;
1333
1334     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1335
1336     /*
1337      * Push out stream records as long as there is insufficient room to hold
1338      * the remaining data.
1339      */
1340     while (jrec->stream_residual < bytes) {
1341         /*
1342          * Fill in any remaining space in the current stream record.
1343          */
1344         bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1345         buf = (const char *)buf + jrec->stream_residual;
1346         bytes -= jrec->stream_residual;
1347         /*jrec->stream_ptr += jrec->stream_residual;*/
1348         jrec->residual -= jrec->stream_residual;
1349         jrec->stream_residual = 0;
1350
1351         /*
1352          * Try to extend the current stream record, but no more then 1/4
1353          * the size of the FIFO.
1354          */
1355         extsize = jrec->jo->fifo.size >> 2;
1356         if (extsize > bytes)
1357             extsize = (bytes + 15) & ~15;
1358
1359         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1360                                 jrec->stream_reserved - jrec->stream_residual,
1361                                 extsize, &pusheditout);
1362         if (pusheditout) {
1363             jrec->stream_reserved = extsize;
1364             jrec->stream_residual = extsize;
1365             jrec->parent = NULL;        /* no longer accessible */
1366             jrec->last = NULL;          /* no longer accessible */
1367             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1368         } else {
1369             jrec->stream_reserved += extsize;
1370             jrec->stream_residual += extsize;
1371         }
1372     }
1373
1374     /*
1375      * Push out any remaining bytes into the current stream record.
1376      */
1377     if (bytes) {
1378         bcopy(buf, jrec->stream_ptr, bytes);
1379         jrec->stream_ptr += bytes;
1380         jrec->stream_residual -= bytes;
1381         jrec->residual -= bytes;
1382     }
1383
1384     /*
1385      * Handle data alignment requirements for the subrecord.  Because the
1386      * stream record's data space is more strictly aligned, it must already
1387      * have sufficient space to hold any subrecord alignment slop.
1388      */
1389     if (jrec->residual == 0 && jrec->residual_align) {
1390         KKASSERT(jrec->residual_align <= jrec->stream_residual);
1391         bzero(jrec->stream_ptr, jrec->residual_align);
1392         jrec->stream_ptr += jrec->residual_align;
1393         jrec->stream_residual -= jrec->residual_align;
1394         jrec->residual_align = 0;
1395     }
1396 }
1397
1398 /*
1399  * We are finished with the transaction.  This closes the transaction created
1400  * by jrecord_init().
1401  *
1402  * NOTE: If abortit is not set then we must be at the top level with no
1403  *       residual subrecord data left to output.
1404  *
1405  *       If abortit is set then we can be in any state, all pushes will be 
1406  *       popped and it is ok for there to be residual data.  This works 
1407  *       because the virtual stream itself is truncated.  Scanners must deal
1408  *       with this situation.
1409  *
1410  * The stream record will be committed or aborted as specified and jrecord
1411  * resources will be cleaned up.
1412  */
1413 static void
1414 jrecord_done(struct jrecord *jrec, int abortit)
1415 {
1416     KKASSERT(jrec->rawp != NULL);
1417
1418     if (abortit) {
1419         journal_abort(jrec->jo, &jrec->rawp);
1420     } else {
1421         KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1422         journal_commit(jrec->jo, &jrec->rawp, 
1423                         jrec->stream_reserved - jrec->stream_residual, 1);
1424     }
1425
1426     /*
1427      * jrec should not be used beyond this point without another init,
1428      * but clean up some fields to ensure that we panic if it is.
1429      *
1430      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1431      */
1432     jrec->jo = NULL;
1433     jrec->stream_ptr = NULL;
1434 }
1435
1436 /************************************************************************
1437  *                      LOW LEVEL RECORD SUPPORT ROUTINES               *
1438  ************************************************************************
1439  *
1440  * These routine create low level recursive and leaf subrecords representing
1441  * common filesystem structures.
1442  */
1443
1444 /*
1445  * Write out a filename path relative to the base of the mount point.
1446  * rectype is typically JLEAF_PATH{1,2,3,4}.
1447  */
1448 static void
1449 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1450 {
1451     char buf[64];       /* local buffer if it fits, else malloced */
1452     char *base;
1453     int pathlen;
1454     int index;
1455     struct namecache *scan;
1456
1457     /*
1458      * Pass 1 - figure out the number of bytes required.  Include terminating
1459      *         \0 on last element and '/' separator on other elements.
1460      */
1461 again:
1462     pathlen = 0;
1463     for (scan = ncp; 
1464          scan && (scan->nc_flag & NCF_MOUNTPT) == 0; 
1465          scan = scan->nc_parent
1466     ) {
1467         pathlen += scan->nc_nlen + 1;
1468     }
1469
1470     if (pathlen <= sizeof(buf))
1471         base = buf;
1472     else
1473         base = malloc(pathlen, M_TEMP, M_INTWAIT);
1474
1475     /*
1476      * Pass 2 - generate the path buffer
1477      */
1478     index = pathlen;
1479     for (scan = ncp; 
1480          scan && (scan->nc_flag & NCF_MOUNTPT) == 0; 
1481          scan = scan->nc_parent
1482     ) {
1483         if (scan->nc_nlen >= index) {
1484             if (base != buf)
1485                 free(base, M_TEMP);
1486             goto again;
1487         }
1488         if (index == pathlen)
1489             base[--index] = 0;
1490         else
1491             base[--index] = '/';
1492         index -= scan->nc_nlen;
1493         bcopy(scan->nc_name, base + index, scan->nc_nlen);
1494     }
1495     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1496     if (base != buf)
1497         free(base, M_TEMP);
1498 }
1499
1500 /*
1501  * Write out a file attribute structure.  While somewhat inefficient, using
1502  * a recursive data structure is the most portable and extensible way.
1503  */
1504 static void
1505 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1506 {
1507     void *save;
1508
1509     save = jrecord_push(jrec, JTYPE_VATTR);
1510     if (vat->va_type != VNON)
1511         jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1512     if (vat->va_mode != (mode_t)VNOVAL)
1513         jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1514     if (vat->va_nlink != VNOVAL)
1515         jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1516     if (vat->va_uid != VNOVAL)
1517         jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1518     if (vat->va_gid != VNOVAL)
1519         jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1520     if (vat->va_fsid != VNOVAL)
1521         jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1522     if (vat->va_fileid != VNOVAL)
1523         jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1524     if (vat->va_size != VNOVAL)
1525         jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1526     if (vat->va_atime.tv_sec != VNOVAL)
1527         jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1528     if (vat->va_mtime.tv_sec != VNOVAL)
1529         jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1530     if (vat->va_ctime.tv_sec != VNOVAL)
1531         jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1532     if (vat->va_gen != VNOVAL)
1533         jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1534     if (vat->va_flags != VNOVAL)
1535         jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1536     if (vat->va_rdev != VNOVAL)
1537         jrecord_leaf(jrec, JLEAF_UDEV, &vat->va_rdev, sizeof(vat->va_rdev));
1538 #if 0
1539     if (vat->va_filerev != VNOVAL)
1540         jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1541 #endif
1542     jrecord_pop(jrec, save);
1543 }
1544
1545 /*
1546  * Write out the creds used to issue a file operation.  If a process is
1547  * available write out additional tracking information related to the 
1548  * process.
1549  *
1550  * XXX additional tracking info
1551  * XXX tty line info
1552  */
1553 static void
1554 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1555 {
1556     void *save;
1557     struct proc *p;
1558
1559     save = jrecord_push(jrec, JTYPE_CRED);
1560     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1561     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1562     if (td && (p = td->td_proc) != NULL) {
1563         jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1564         jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1565     }
1566     jrecord_pop(jrec, save);
1567 }
1568
1569 /*
1570  * Write out information required to identify a vnode
1571  *
1572  * XXX this needs work.  We should write out the inode number as well,
1573  * and in fact avoid writing out the file path for seqential writes
1574  * occuring within e.g. a certain period of time.
1575  */
1576 static void
1577 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1578 {
1579     struct namecache *ncp;
1580
1581     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1582         if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1583             break;
1584     }
1585     if (ncp)
1586         jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1587 }
1588
1589 #if 0
1590 /*
1591  * Write out the current contents of the file within the specified
1592  * range.  This is typically called from within an UNDO section.  A
1593  * locked vnode must be passed.
1594  */
1595 static int
1596 jrecord_write_filearea(struct jrecord *jrec, struct vnode *vp, 
1597                         off_t begoff, off_t endoff)
1598 {
1599 }
1600 #endif
1601
1602 /*
1603  * Write out the data represented by a pagelist
1604  */
1605 static void
1606 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1607                         struct vm_page **pglist, int *rtvals, int pgcount,
1608                         off_t offset)
1609 {
1610     struct msf_buf *msf;
1611     int error;
1612     int b;
1613     int i;
1614
1615     i = 0;
1616     while (i < pgcount) {
1617         /*
1618          * Find the next valid section.  Skip any invalid elements
1619          */
1620         if (rtvals[i] != VM_PAGER_OK) {
1621             ++i;
1622             offset += PAGE_SIZE;
1623             continue;
1624         }
1625
1626         /*
1627          * Figure out how big the valid section is, capping I/O at what the
1628          * MSFBUF can represent.
1629          */
1630         b = i;
1631         while (i < pgcount && i - b != XIO_INTERNAL_PAGES && 
1632                rtvals[i] == VM_PAGER_OK
1633         ) {
1634             ++i;
1635         }
1636
1637         /*
1638          * And write it out.
1639          */
1640         if (i - b) {
1641             error = msf_map_pagelist(&msf, pglist + b, i - b, 0);
1642             if (error == 0) {
1643                 printf("RECORD PUTPAGES %d\n", msf_buf_bytes(msf));
1644                 jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1645                 jrecord_leaf(jrec, rectype, 
1646                              msf_buf_kva(msf), msf_buf_bytes(msf));
1647                 msf_buf_free(msf);
1648             } else {
1649                 printf("jrecord_write_pagelist: mapping failure\n");
1650             }
1651             offset += (off_t)(i - b) << PAGE_SHIFT;
1652         }
1653     }
1654 }
1655
1656 /*
1657  * Write out the data represented by a UIO.
1658  */
1659 struct jwuio_info {
1660     struct jrecord *jrec;
1661     int16_t rectype;
1662 };
1663
1664 static int jrecord_write_uio_callback(void *info, char *buf, int bytes);
1665
1666 static void
1667 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1668 {
1669     struct jwuio_info info = { jrec, rectype };
1670     int error;
1671
1672     if (uio->uio_segflg != UIO_NOCOPY) {
1673         jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset, 
1674                      sizeof(uio->uio_offset));
1675         error = msf_uio_iterate(uio, jrecord_write_uio_callback, &info);
1676         if (error)
1677             printf("XXX warning uio iterate failed %d\n", error);
1678     }
1679 }
1680
1681 static int
1682 jrecord_write_uio_callback(void *info_arg, char *buf, int bytes)
1683 {
1684     struct jwuio_info *info = info_arg;
1685
1686     jrecord_leaf(info->jrec, info->rectype, buf, bytes);
1687     return(0);
1688 }
1689
1690 /************************************************************************
1691  *                      JOURNAL VNOPS                                   *
1692  ************************************************************************
1693  *
1694  * These are function shims replacing the normal filesystem ops.  We become
1695  * responsible for calling the underlying filesystem ops.  We have the choice
1696  * of executing the underlying op first and then generating the journal entry,
1697  * or starting the journal entry, executing the underlying op, and then
1698  * either completing or aborting it.  
1699  *
1700  * The journal is supposed to be a high-level entity, which generally means
1701  * identifying files by name rather then by inode.  Supplying both allows
1702  * the journal to be used both for inode-number-compatible 'mirrors' and
1703  * for simple filesystem replication.
1704  *
1705  * Writes are particularly difficult to deal with because a single write may
1706  * represent a hundred megabyte buffer or more, and both writes and truncations
1707  * require the 'old' data to be written out as well as the new data if the
1708  * log is reversable.  Other issues:
1709  *
1710  * - How to deal with operations on unlinked files (no path available),
1711  *   but which may still be filesystem visible due to hard links.
1712  *
1713  * - How to deal with modifications made via a memory map.
1714  *
1715  * - Future cache coherency support will require cache coherency API calls
1716  *   both prior to and after the call to the underlying VFS.
1717  *
1718  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
1719  * new VFS equivalents (NMKDIR).
1720  */
1721
1722 /*
1723  * Journal vop_settattr { a_vp, a_vap, a_cred, a_td }
1724  */
1725 static
1726 int
1727 journal_setattr(struct vop_setattr_args *ap)
1728 {
1729     struct mount *mp;
1730     struct journal *jo;
1731     struct jrecord jrec;
1732     void *save;         /* warning, save pointers do not always remain valid */
1733     int error;
1734
1735     error = vop_journal_operate_ap(&ap->a_head);
1736     mp = ap->a_head.a_ops->vv_mount;
1737     if (error == 0) {
1738         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1739             jrecord_init(jo, &jrec, -1);
1740             save = jrecord_push(&jrec, JTYPE_SETATTR);
1741             jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1742             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1743             jrecord_write_vattr(&jrec, ap->a_vap);
1744             jrecord_pop(&jrec, save);
1745             jrecord_done(&jrec, 0);
1746         }
1747     }
1748     return (error);
1749 }
1750
1751 /*
1752  * Journal vop_write { a_vp, a_uio, a_ioflag, a_cred }
1753  */
1754 static
1755 int
1756 journal_write(struct vop_write_args *ap)
1757 {
1758     struct mount *mp;
1759     struct journal *jo;
1760     struct jrecord jrec;
1761     struct uio uio_copy;
1762     struct iovec uio_one_iovec;
1763     void *save;         /* warning, save pointers do not always remain valid */
1764     int error;
1765
1766     /*
1767      * This is really nasty.  UIO's don't retain sufficient information to
1768      * be reusable once they've gone through the VOP chain.  The iovecs get
1769      * cleared, so we have to copy the UIO.
1770      *
1771      * XXX fix the UIO code to not destroy iov's during a scan so we can
1772      *     reuse the uio over and over again.
1773      *
1774      * XXX UNDO code needs to journal the old data prior to the write.
1775      */
1776     uio_copy = *ap->a_uio;
1777     if (uio_copy.uio_iovcnt == 1) {
1778         uio_one_iovec = ap->a_uio->uio_iov[0];
1779         uio_copy.uio_iov = &uio_one_iovec;
1780     } else {
1781         uio_copy.uio_iov = malloc(uio_copy.uio_iovcnt * sizeof(struct iovec),
1782                                     M_JOURNAL, M_WAITOK);
1783         bcopy(ap->a_uio->uio_iov, uio_copy.uio_iov, 
1784                 uio_copy.uio_iovcnt * sizeof(struct iovec));
1785     }
1786
1787     error = vop_journal_operate_ap(&ap->a_head);
1788
1789     /*
1790      * XXX bad hack to figure out the offset for O_APPEND writes (note: 
1791      * uio field state after the VFS operation).
1792      */
1793     uio_copy.uio_offset = ap->a_uio->uio_offset - 
1794                                 (uio_copy.uio_resid - ap->a_uio->uio_resid);
1795
1796     mp = ap->a_head.a_ops->vv_mount;
1797     if (error == 0) {
1798         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1799             jrecord_init(jo, &jrec, -1);
1800             save = jrecord_push(&jrec, JTYPE_WRITE);
1801             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1802             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1803             jrecord_write_uio(&jrec, JLEAF_FILEDATA, &uio_copy);
1804             jrecord_pop(&jrec, save);
1805             jrecord_done(&jrec, 0);
1806         }
1807     }
1808
1809     if (uio_copy.uio_iov != &uio_one_iovec)
1810         free(uio_copy.uio_iov, M_JOURNAL);
1811
1812
1813     return (error);
1814 }
1815
1816 /*
1817  * Journal vop_fsync { a_vp, a_waitfor, a_td }
1818  */
1819 static
1820 int
1821 journal_fsync(struct vop_fsync_args *ap)
1822 {
1823     struct mount *mp;
1824     struct journal *jo;
1825     int error;
1826
1827     error = vop_journal_operate_ap(&ap->a_head);
1828     mp = ap->a_head.a_ops->vv_mount;
1829     if (error == 0) {
1830         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1831             /* XXX synchronize pending journal records */
1832         }
1833     }
1834     return (error);
1835 }
1836
1837 /*
1838  * Journal vop_putpages { a_vp, a_m, a_count, a_sync, a_rtvals, a_offset }
1839  *
1840  * note: a_count is in bytes.
1841  */
1842 static
1843 int
1844 journal_putpages(struct vop_putpages_args *ap)
1845 {
1846     struct mount *mp;
1847     struct journal *jo;
1848     struct jrecord jrec;
1849     void *save;         /* warning, save pointers do not always remain valid */
1850     int error;
1851
1852     error = vop_journal_operate_ap(&ap->a_head);
1853     mp = ap->a_head.a_ops->vv_mount;
1854     if (error == 0 && ap->a_count > 0) {
1855         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1856             jrecord_init(jo, &jrec, -1);
1857             save = jrecord_push(&jrec, JTYPE_PUTPAGES);
1858             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1859             jrecord_write_pagelist(&jrec, JLEAF_FILEDATA, 
1860                         ap->a_m, ap->a_rtvals, btoc(ap->a_count), ap->a_offset);
1861             jrecord_pop(&jrec, save);
1862             jrecord_done(&jrec, 0);
1863         }
1864     }
1865     return (error);
1866 }
1867
1868 /*
1869  * Journal vop_setacl { a_vp, a_type, a_aclp, a_cred, a_td }
1870  */
1871 static
1872 int
1873 journal_setacl(struct vop_setacl_args *ap)
1874 {
1875     struct mount *mp;
1876     struct journal *jo;
1877     struct jrecord jrec;
1878     void *save;         /* warning, save pointers do not always remain valid */
1879     int error;
1880
1881     error = vop_journal_operate_ap(&ap->a_head);
1882     mp = ap->a_head.a_ops->vv_mount;
1883     if (error == 0) {
1884         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1885             jrecord_init(jo, &jrec, -1);
1886             save = jrecord_push(&jrec, JTYPE_SETACL);
1887             jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1888             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1889             /* XXX type, aclp */
1890             jrecord_pop(&jrec, save);
1891             jrecord_done(&jrec, 0);
1892         }
1893     }
1894     return (error);
1895 }
1896
1897 /*
1898  * Journal vop_setextattr { a_vp, a_name, a_uio, a_cred, a_td }
1899  */
1900 static
1901 int
1902 journal_setextattr(struct vop_setextattr_args *ap)
1903 {
1904     struct mount *mp;
1905     struct journal *jo;
1906     struct jrecord jrec;
1907     void *save;         /* warning, save pointers do not always remain valid */
1908     int error;
1909
1910     error = vop_journal_operate_ap(&ap->a_head);
1911     mp = ap->a_head.a_ops->vv_mount;
1912     if (error == 0) {
1913         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1914             jrecord_init(jo, &jrec, -1);
1915             save = jrecord_push(&jrec, JTYPE_SETEXTATTR);
1916             jrecord_write_cred(&jrec, ap->a_td, ap->a_cred);
1917             jrecord_write_vnode_ref(&jrec, ap->a_vp);
1918             jrecord_leaf(&jrec, JLEAF_ATTRNAME, ap->a_name, strlen(ap->a_name));
1919             jrecord_write_uio(&jrec, JLEAF_FILEDATA, ap->a_uio);
1920             jrecord_pop(&jrec, save);
1921             jrecord_done(&jrec, 0);
1922         }
1923     }
1924     return (error);
1925 }
1926
1927 /*
1928  * Journal vop_ncreate { a_ncp, a_vpp, a_cred, a_vap }
1929  */
1930 static
1931 int
1932 journal_ncreate(struct vop_ncreate_args *ap)
1933 {
1934     struct mount *mp;
1935     struct journal *jo;
1936     struct jrecord jrec;
1937     void *save;         /* warning, save pointers do not always remain valid */
1938     int error;
1939
1940     error = vop_journal_operate_ap(&ap->a_head);
1941     mp = ap->a_head.a_ops->vv_mount;
1942     if (error == 0) {
1943         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1944             jrecord_init(jo, &jrec, -1);
1945             save = jrecord_push(&jrec, JTYPE_CREATE);
1946             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1947             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1948             if (*ap->a_vpp)
1949                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
1950             jrecord_write_vattr(&jrec, ap->a_vap);
1951             jrecord_pop(&jrec, save);
1952             jrecord_done(&jrec, 0);
1953         }
1954     }
1955     return (error);
1956 }
1957
1958 /*
1959  * Journal vop_nmknod { a_ncp, a_vpp, a_cred, a_vap }
1960  */
1961 static
1962 int
1963 journal_nmknod(struct vop_nmknod_args *ap)
1964 {
1965     struct mount *mp;
1966     struct journal *jo;
1967     struct jrecord jrec;
1968     void *save;         /* warning, save pointers do not always remain valid */
1969     int error;
1970
1971     error = vop_journal_operate_ap(&ap->a_head);
1972     mp = ap->a_head.a_ops->vv_mount;
1973     if (error == 0) {
1974         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1975             jrecord_init(jo, &jrec, -1);
1976             save = jrecord_push(&jrec, JTYPE_MKNOD);
1977             jrecord_write_cred(&jrec, NULL, ap->a_cred);
1978             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1979             jrecord_write_vattr(&jrec, ap->a_vap);
1980             if (*ap->a_vpp)
1981                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
1982             jrecord_pop(&jrec, save);
1983             jrecord_done(&jrec, 0);
1984         }
1985     }
1986     return (error);
1987 }
1988
1989 /*
1990  * Journal vop_nlink { a_ncp, a_vp, a_cred }
1991  */
1992 static
1993 int
1994 journal_nlink(struct vop_nlink_args *ap)
1995 {
1996     struct mount *mp;
1997     struct journal *jo;
1998     struct jrecord jrec;
1999     void *save;         /* warning, save pointers do not always remain valid */
2000     int error;
2001
2002     error = vop_journal_operate_ap(&ap->a_head);
2003     mp = ap->a_head.a_ops->vv_mount;
2004     if (error == 0) {
2005         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2006             jrecord_init(jo, &jrec, -1);
2007             save = jrecord_push(&jrec, JTYPE_LINK);
2008             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2009             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2010             jrecord_write_vnode_ref(&jrec, ap->a_vp);
2011             /* XXX PATH to VP and inode number */
2012             jrecord_pop(&jrec, save);
2013             jrecord_done(&jrec, 0);
2014         }
2015     }
2016     return (error);
2017 }
2018
2019 /*
2020  * Journal vop_symlink { a_ncp, a_vpp, a_cred, a_vap, a_target }
2021  */
2022 static
2023 int
2024 journal_nsymlink(struct vop_nsymlink_args *ap)
2025 {
2026     struct mount *mp;
2027     struct journal *jo;
2028     struct jrecord jrec;
2029     void *save;         /* warning, save pointers do not always remain valid */
2030     int error;
2031
2032     error = vop_journal_operate_ap(&ap->a_head);
2033     mp = ap->a_head.a_ops->vv_mount;
2034     if (error == 0) {
2035         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2036             jrecord_init(jo, &jrec, -1);
2037             save = jrecord_push(&jrec, JTYPE_SYMLINK);
2038             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2039             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2040             jrecord_leaf(&jrec, JLEAF_SYMLINKDATA,
2041                         ap->a_target, strlen(ap->a_target));
2042             if (*ap->a_vpp)
2043                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
2044             jrecord_pop(&jrec, save);
2045             jrecord_done(&jrec, 0);
2046         }
2047     }
2048     return (error);
2049 }
2050
2051 /*
2052  * Journal vop_nwhiteout { a_ncp, a_cred, a_flags }
2053  */
2054 static
2055 int
2056 journal_nwhiteout(struct vop_nwhiteout_args *ap)
2057 {
2058     struct mount *mp;
2059     struct journal *jo;
2060     struct jrecord jrec;
2061     void *save;         /* warning, save pointers do not always remain valid */
2062     int error;
2063
2064     error = vop_journal_operate_ap(&ap->a_head);
2065     mp = ap->a_head.a_ops->vv_mount;
2066     if (error == 0) {
2067         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2068             jrecord_init(jo, &jrec, -1);
2069             save = jrecord_push(&jrec, JTYPE_WHITEOUT);
2070             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2071             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2072             jrecord_pop(&jrec, save);
2073             jrecord_done(&jrec, 0);
2074         }
2075     }
2076     return (error);
2077 }
2078
2079 /*
2080  * Journal vop_nremove { a_ncp, a_cred }
2081  */
2082 static
2083 int
2084 journal_nremove(struct vop_nremove_args *ap)
2085 {
2086     struct mount *mp;
2087     struct journal *jo;
2088     struct jrecord jrec;
2089     void *save;         /* warning, save pointers do not always remain valid */
2090     int error;
2091
2092     error = vop_journal_operate_ap(&ap->a_head);
2093     mp = ap->a_head.a_ops->vv_mount;
2094     if (error == 0) {
2095         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2096             jrecord_init(jo, &jrec, -1);
2097             save = jrecord_push(&jrec, JTYPE_REMOVE);
2098             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2099             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2100             jrecord_pop(&jrec, save);
2101             jrecord_done(&jrec, 0);
2102         }
2103     }
2104     return (error);
2105 }
2106
2107 /*
2108  * Journal vop_nmkdir { a_ncp, a_vpp, a_cred, a_vap }
2109  */
2110 static
2111 int
2112 journal_nmkdir(struct vop_nmkdir_args *ap)
2113 {
2114     struct mount *mp;
2115     struct journal *jo;
2116     struct jrecord jrec;
2117     void *save;         /* warning, save pointers do not always remain valid */
2118     int error;
2119
2120     error = vop_journal_operate_ap(&ap->a_head);
2121     mp = ap->a_head.a_ops->vv_mount;
2122     if (error == 0) {
2123         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2124             jrecord_init(jo, &jrec, -1);
2125             if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
2126                 save = jrecord_push(&jrec, JTYPE_UNDO);
2127                 /* XXX undo operations */
2128                 jrecord_pop(&jrec, save);
2129             }
2130 #if 0
2131             if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
2132                 jrecord_write_audit(&jrec);
2133             }
2134 #endif
2135             save = jrecord_push(&jrec, JTYPE_MKDIR);
2136             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2137             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2138             jrecord_write_vattr(&jrec, ap->a_vap);
2139             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2140             if (*ap->a_vpp)
2141                 jrecord_write_vnode_ref(&jrec, *ap->a_vpp);
2142             jrecord_pop(&jrec, save);
2143             jrecord_done(&jrec, 0);
2144         }
2145     }
2146     return (error);
2147 }
2148
2149 /*
2150  * Journal vop_nrmdir { a_ncp, a_cred }
2151  */
2152 static
2153 int
2154 journal_nrmdir(struct vop_nrmdir_args *ap)
2155 {
2156     struct mount *mp;
2157     struct journal *jo;
2158     struct jrecord jrec;
2159     void *save;         /* warning, save pointers do not always remain valid */
2160     int error;
2161
2162     error = vop_journal_operate_ap(&ap->a_head);
2163     mp = ap->a_head.a_ops->vv_mount;
2164     if (error == 0) {
2165         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2166             jrecord_init(jo, &jrec, -1);
2167             save = jrecord_push(&jrec, JTYPE_RMDIR);
2168             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2169             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
2170             jrecord_pop(&jrec, save);
2171             jrecord_done(&jrec, 0);
2172         }
2173     }
2174     return (error);
2175 }
2176
2177 /*
2178  * Journal vop_nrename { a_fncp, a_tncp, a_cred }
2179  */
2180 static
2181 int
2182 journal_nrename(struct vop_nrename_args *ap)
2183 {
2184     struct mount *mp;
2185     struct journal *jo;
2186     struct jrecord jrec;
2187     void *save;         /* warning, save pointers do not always remain valid */
2188     int error;
2189
2190     error = vop_journal_operate_ap(&ap->a_head);
2191     mp = ap->a_head.a_ops->vv_mount;
2192     if (error == 0) {
2193         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2194             jrecord_init(jo, &jrec, -1);
2195             save = jrecord_push(&jrec, JTYPE_RENAME);
2196             jrecord_write_cred(&jrec, NULL, ap->a_cred);
2197             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_fncp);
2198             jrecord_write_path(&jrec, JLEAF_PATH2, ap->a_tncp);
2199             jrecord_pop(&jrec, save);
2200             jrecord_done(&jrec, 0);
2201         }
2202     }
2203     return (error);
2204 }
2205