Recode the streamid selector. The streamid was faked before. Do it for
[dragonfly.git] / sys / kern / vfs_journal.c
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.26 2006/05/07 00:24:56 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread. 
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger 
60  * spooling areas then the memory buffer is able to provide by e.g. 
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.  
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/journal.h>
88 #include <sys/file.h>
89 #include <sys/proc.h>
90 #include <sys/msfbuf.h>
91 #include <sys/socket.h>
92 #include <sys/socketvar.h>
93
94 #include <machine/limits.h>
95
96 #include <vm/vm.h>
97 #include <vm/vm_object.h>
98 #include <vm/vm_page.h>
99 #include <vm/vm_pager.h>
100 #include <vm/vnode_pager.h>
101
102 #include <sys/file2.h>
103 #include <sys/thread2.h>
104
105 static int journal_attach(struct mount *mp);
106 static void journal_detach(struct mount *mp);
107 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
108                             const struct mountctl_install_journal *info);
109 static int journal_restart_vfs_journal(struct mount *mp, struct file *fp,
110                             const struct mountctl_restart_journal *info);
111 static int journal_remove_vfs_journal(struct mount *mp,
112                             const struct mountctl_remove_journal *info);
113 static int journal_restart(struct mount *mp, struct file *fp,
114                             struct journal *jo, int flags);
115 static int journal_destroy(struct mount *mp, struct journal *jo, int flags);
116 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
117 static int journal_status_vfs_journal(struct mount *mp,
118                        const struct mountctl_status_journal *info,
119                        struct mountctl_journal_ret_status *rstat,
120                        int buflen, int *res);
121 static void journal_create_threads(struct journal *jo);
122 static void journal_destroy_threads(struct journal *jo, int flags);
123 static void journal_wthread(void *info);
124 static void journal_rthread(void *info);
125
126 static void *journal_reserve(struct journal *jo, 
127                             struct journal_rawrecbeg **rawpp, 
128                             int16_t streamid, int bytes);
129 static void *journal_extend(struct journal *jo,
130                             struct journal_rawrecbeg **rawpp,
131                             int truncbytes, int bytes, int *newstreamrecp);
132 static void journal_abort(struct journal *jo, 
133                             struct journal_rawrecbeg **rawpp);
134 static void journal_commit(struct journal *jo, 
135                             struct journal_rawrecbeg **rawpp, 
136                             int bytes, int closeout);
137
138 static void jrecord_init(struct journal *jo, 
139                             struct jrecord *jrec, int16_t streamid);
140 static struct journal_subrecord *jrecord_push(
141                             struct jrecord *jrec, int16_t rectype);
142 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
143 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
144                             int16_t rectype, int bytes);
145 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
146 static void jrecord_done(struct jrecord *jrec, int abortit);
147 static void jrecord_undo_file(struct jrecord *jrec, struct vnode *vp, 
148                             int jrflags, off_t off, off_t bytes);
149
150 static int journal_setattr(struct vop_setattr_args *ap);
151 static int journal_write(struct vop_write_args *ap);
152 static int journal_fsync(struct vop_fsync_args *ap);
153 static int journal_putpages(struct vop_putpages_args *ap);
154 static int journal_setacl(struct vop_setacl_args *ap);
155 static int journal_setextattr(struct vop_setextattr_args *ap);
156 static int journal_ncreate(struct vop_ncreate_args *ap);
157 static int journal_nmknod(struct vop_nmknod_args *ap);
158 static int journal_nlink(struct vop_nlink_args *ap);
159 static int journal_nsymlink(struct vop_nsymlink_args *ap);
160 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
161 static int journal_nremove(struct vop_nremove_args *ap);
162 static int journal_nmkdir(struct vop_nmkdir_args *ap);
163 static int journal_nrmdir(struct vop_nrmdir_args *ap);
164 static int journal_nrename(struct vop_nrename_args *ap);
165
166 #define JRUNDO_SIZE     0x00000001
167 #define JRUNDO_UID      0x00000002
168 #define JRUNDO_GID      0x00000004
169 #define JRUNDO_FSID     0x00000008
170 #define JRUNDO_MODES    0x00000010
171 #define JRUNDO_INUM     0x00000020
172 #define JRUNDO_ATIME    0x00000040
173 #define JRUNDO_MTIME    0x00000080
174 #define JRUNDO_CTIME    0x00000100
175 #define JRUNDO_GEN      0x00000200
176 #define JRUNDO_FLAGS    0x00000400
177 #define JRUNDO_UDEV     0x00000800
178 #define JRUNDO_NLINK    0x00001000
179 #define JRUNDO_FILEDATA 0x00010000
180 #define JRUNDO_GETVP    0x00020000
181 #define JRUNDO_CONDLINK 0x00040000      /* write file data if link count 1 */
182 #define JRUNDO_VATTR    (JRUNDO_SIZE|JRUNDO_UID|JRUNDO_GID|JRUNDO_FSID|\
183                          JRUNDO_MODES|JRUNDO_INUM|JRUNDO_ATIME|JRUNDO_MTIME|\
184                          JRUNDO_CTIME|JRUNDO_GEN|JRUNDO_FLAGS|JRUNDO_UDEV|\
185                          JRUNDO_NLINK)
186 #define JRUNDO_ALL      (JRUNDO_VATTR|JRUNDO_FILEDATA)
187
188 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
189     { &vop_default_desc,                vop_journal_operate_ap },
190     { &vop_mountctl_desc,               (void *)journal_mountctl },
191     { &vop_setattr_desc,                (void *)journal_setattr },
192     { &vop_write_desc,                  (void *)journal_write },
193     { &vop_fsync_desc,                  (void *)journal_fsync },
194     { &vop_putpages_desc,               (void *)journal_putpages },
195     { &vop_setacl_desc,                 (void *)journal_setacl },
196     { &vop_setextattr_desc,             (void *)journal_setextattr },
197     { &vop_ncreate_desc,                (void *)journal_ncreate },
198     { &vop_nmknod_desc,                 (void *)journal_nmknod },
199     { &vop_nlink_desc,                  (void *)journal_nlink },
200     { &vop_nsymlink_desc,               (void *)journal_nsymlink },
201     { &vop_nwhiteout_desc,              (void *)journal_nwhiteout },
202     { &vop_nremove_desc,                (void *)journal_nremove },
203     { &vop_nmkdir_desc,                 (void *)journal_nmkdir },
204     { &vop_nrmdir_desc,                 (void *)journal_nrmdir },
205     { &vop_nrename_desc,                (void *)journal_nrename },
206     { NULL, NULL }
207 };
208
209 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
210 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
211
212 int
213 journal_mountctl(struct vop_mountctl_args *ap)
214 {
215     struct mount *mp;
216     int error = 0;
217
218     mp = ap->a_head.a_ops->vv_mount;
219     KKASSERT(mp);
220
221     if (mp->mnt_vn_journal_ops == NULL) {
222         switch(ap->a_op) {
223         case MOUNTCTL_INSTALL_VFS_JOURNAL:
224             error = journal_attach(mp);
225             if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
226                 error = EINVAL;
227             if (error == 0 && ap->a_fp == NULL)
228                 error = EBADF;
229             if (error == 0)
230                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
231             if (TAILQ_EMPTY(&mp->mnt_jlist))
232                 journal_detach(mp);
233             break;
234         case MOUNTCTL_RESTART_VFS_JOURNAL:
235         case MOUNTCTL_REMOVE_VFS_JOURNAL:
236         case MOUNTCTL_RESYNC_VFS_JOURNAL:
237         case MOUNTCTL_STATUS_VFS_JOURNAL:
238             error = ENOENT;
239             break;
240         default:
241             error = EOPNOTSUPP;
242             break;
243         }
244     } else {
245         switch(ap->a_op) {
246         case MOUNTCTL_INSTALL_VFS_JOURNAL:
247             if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
248                 error = EINVAL;
249             if (error == 0 && ap->a_fp == NULL)
250                 error = EBADF;
251             if (error == 0)
252                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
253             break;
254         case MOUNTCTL_RESTART_VFS_JOURNAL:
255             if (ap->a_ctllen != sizeof(struct mountctl_restart_journal))
256                 error = EINVAL;
257             if (error == 0 && ap->a_fp == NULL)
258                 error = EBADF;
259             if (error == 0)
260                 error = journal_restart_vfs_journal(mp, ap->a_fp, ap->a_ctl);
261             break;
262         case MOUNTCTL_REMOVE_VFS_JOURNAL:
263             if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
264                 error = EINVAL;
265             if (error == 0)
266                 error = journal_remove_vfs_journal(mp, ap->a_ctl);
267             if (TAILQ_EMPTY(&mp->mnt_jlist))
268                 journal_detach(mp);
269             break;
270         case MOUNTCTL_RESYNC_VFS_JOURNAL:
271             if (ap->a_ctllen != 0)
272                 error = EINVAL;
273             error = journal_resync_vfs_journal(mp, ap->a_ctl);
274             break;
275         case MOUNTCTL_STATUS_VFS_JOURNAL:
276             if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
277                 error = EINVAL;
278             if (error == 0) {
279                 error = journal_status_vfs_journal(mp, ap->a_ctl, 
280                                         ap->a_buf, ap->a_buflen, ap->a_res);
281             }
282             break;
283         default:
284             error = EOPNOTSUPP;
285             break;
286         }
287     }
288     return (error);
289 }
290
291 /*
292  * High level mount point setup.  When a 
293  */
294 static int
295 journal_attach(struct mount *mp)
296 {
297     KKASSERT(mp->mnt_jbitmap == NULL);
298     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops, 
299                      journal_vnodeop_entries, 0);
300     mp->mnt_jbitmap = malloc(JREC_STREAMID_JMAX/8, M_JOURNAL, M_WAITOK|M_ZERO);
301     mp->mnt_streamid = JREC_STREAMID_JMIN;
302     return(0);
303 }
304
305 static void
306 journal_detach(struct mount *mp)
307 {
308     KKASSERT(mp->mnt_jbitmap != NULL);
309     if (mp->mnt_vn_journal_ops)
310         vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
311     free(mp->mnt_jbitmap, M_JOURNAL);
312     mp->mnt_jbitmap = NULL;
313 }
314
315 /*
316  * Install a journal on a mount point.  Each journal has an associated worker
317  * thread which is responsible for buffering and spooling the data to the
318  * target.  A mount point may have multiple journals attached to it.  An
319  * initial start record is generated when the journal is associated.
320  */
321 static int
322 journal_install_vfs_journal(struct mount *mp, struct file *fp, 
323                             const struct mountctl_install_journal *info)
324 {
325     struct journal *jo;
326     struct jrecord jrec;
327     int error = 0;
328     int size;
329
330     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
331     bcopy(info->id, jo->id, sizeof(jo->id));
332     jo->flags = info->flags & ~(MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE |
333                                 MC_JOURNAL_STOP_REQ);
334
335     /*
336      * Memory FIFO size, round to nearest power of 2
337      */
338     if (info->membufsize) {
339         if (info->membufsize < 65536)
340             size = 65536;
341         else if (info->membufsize > 128 * 1024 * 1024)
342             size = 128 * 1024 * 1024;
343         else
344             size = (int)info->membufsize;
345     } else {
346         size = 1024 * 1024;
347     }
348     jo->fifo.size = 1;
349     while (jo->fifo.size < size)
350         jo->fifo.size <<= 1;
351
352     /*
353      * Other parameters.  If not specified the starting transaction id
354      * will be the current date.
355      */
356     if (info->transid) {
357         jo->transid = info->transid;
358     } else {
359         struct timespec ts;
360         getnanotime(&ts);
361         jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
362     }
363
364     jo->fp = fp;
365
366     /*
367      * Allocate the memory FIFO
368      */
369     jo->fifo.mask = jo->fifo.size - 1;
370     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
371     if (jo->fifo.membase == NULL)
372         error = ENOMEM;
373
374     /*
375      * Create the worker threads and generate the association record.
376      */
377     if (error) {
378         free(jo, M_JOURNAL);
379     } else {
380         fhold(fp);
381         journal_create_threads(jo);
382         jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
383         jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
384         jrecord_done(&jrec, 0);
385         TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
386     }
387     return(error);
388 }
389
390 /*
391  * Restart a journal with a new descriptor.   The existing reader and writer
392  * threads are terminated and a new descriptor is associated with the
393  * journal.  The FIFO rindex is reset to xindex and the threads are then
394  * restarted.
395  */
396 static int
397 journal_restart_vfs_journal(struct mount *mp, struct file *fp,
398                            const struct mountctl_restart_journal *info)
399 {
400     struct journal *jo;
401     int error;
402
403     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
404         if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
405             break;
406     }
407     if (jo)
408         error = journal_restart(mp, fp, jo, info->flags);
409     else
410         error = EINVAL;
411     return (error);
412 }
413
414 static int
415 journal_restart(struct mount *mp, struct file *fp, 
416                 struct journal *jo, int flags)
417 {
418     /*
419      * XXX lock the jo
420      */
421
422 #if 0
423     /*
424      * Record the fact that we are doing a restart in the journal.
425      * XXX it isn't safe to do this if the journal is being restarted
426      * because it was locked up and the writer thread has already exited.
427      */
428     jrecord_init(jo, &jrec, JREC_STREAMID_RESTART);
429     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
430     jrecord_done(&jrec, 0);
431 #endif
432
433     /*
434      * Stop the reader and writer threads and clean up the current 
435      * descriptor.
436      */
437     printf("RESTART WITH FP %p KILLING %p\n", fp, jo->fp);
438     journal_destroy_threads(jo, flags);
439
440     if (jo->fp)
441         fdrop(jo->fp);
442
443     /*
444      * Associate the new descriptor, reset the FIFO index, and recreate
445      * the threads.
446      */
447     fhold(fp);
448     jo->fp = fp;
449     jo->fifo.rindex = jo->fifo.xindex;
450     journal_create_threads(jo);
451
452     return(0);
453 }
454
455 /*
456  * Disassociate a journal from a mount point and terminate its worker thread.
457  * A final termination record is written out before the file pointer is
458  * dropped.
459  */
460 static int
461 journal_remove_vfs_journal(struct mount *mp, 
462                            const struct mountctl_remove_journal *info)
463 {
464     struct journal *jo;
465     int error;
466
467     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
468         if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
469             break;
470     }
471     if (jo)
472         error = journal_destroy(mp, jo, info->flags);
473     else
474         error = EINVAL;
475     return (error);
476 }
477
478 /*
479  * Remove all journals associated with a mount point.  Usually called
480  * by the umount code.
481  */
482 void
483 journal_remove_all_journals(struct mount *mp, int flags)
484 {
485     struct journal *jo;
486
487     while ((jo = TAILQ_FIRST(&mp->mnt_jlist)) != NULL) {
488         journal_destroy(mp, jo, flags);
489     }
490 }
491
492 static int
493 journal_destroy(struct mount *mp, struct journal *jo, int flags)
494 {
495     struct jrecord jrec;
496
497     TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
498
499     jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
500     jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
501     jrecord_done(&jrec, 0);
502
503     journal_destroy_threads(jo, flags);
504
505     if (jo->fp)
506         fdrop(jo->fp);
507     if (jo->fifo.membase)
508         free(jo->fifo.membase, M_JFIFO);
509     free(jo, M_JOURNAL);
510
511     return(0);
512 }
513
514 static int
515 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
516 {
517     return(EINVAL);
518 }
519
520 static int
521 journal_status_vfs_journal(struct mount *mp, 
522                        const struct mountctl_status_journal *info,
523                        struct mountctl_journal_ret_status *rstat,
524                        int buflen, int *res)
525 {
526     struct journal *jo;
527     int error = 0;
528     int index;
529
530     index = 0;
531     *res = 0;
532     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
533         if (info->index == MC_JOURNAL_INDEX_ID) {
534             if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
535                 continue;
536         } else if (info->index >= 0) {
537             if (info->index < index)
538                 continue;
539         } else if (info->index != MC_JOURNAL_INDEX_ALL) {
540             continue;
541         }
542         if (buflen < sizeof(*rstat)) {
543             if (*res)
544                 rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
545             else
546                 error = EINVAL;
547             break;
548         }
549         bzero(rstat, sizeof(*rstat));
550         rstat->recsize = sizeof(*rstat);
551         bcopy(jo->id, rstat->id, sizeof(jo->id));
552         rstat->index = index;
553         rstat->membufsize = jo->fifo.size;
554         rstat->membufused = jo->fifo.windex - jo->fifo.xindex;
555         rstat->membufunacked = jo->fifo.rindex - jo->fifo.xindex;
556         rstat->bytessent = jo->total_acked;
557         rstat->fifostalls = jo->fifostalls;
558         ++rstat;
559         ++index;
560         *res += sizeof(*rstat);
561         buflen -= sizeof(*rstat);
562     }
563     return(error);
564 }
565
566 static void
567 journal_create_threads(struct journal *jo)
568 {
569         jo->flags &= ~(MC_JOURNAL_STOP_REQ | MC_JOURNAL_STOP_IMM);
570         jo->flags |= MC_JOURNAL_WACTIVE;
571         lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
572                         TDF_STOPREQ, -1, "journal w:%.*s", JIDMAX, jo->id);
573         lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
574         lwkt_schedule(&jo->wthread);
575
576         if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
577             jo->flags |= MC_JOURNAL_RACTIVE;
578             lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
579                         TDF_STOPREQ, -1, "journal r:%.*s", JIDMAX, jo->id);
580             lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
581             lwkt_schedule(&jo->rthread);
582         }
583 }
584
585 static void
586 journal_destroy_threads(struct journal *jo, int flags)
587 {
588     int wcount;
589
590     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
591     wakeup(&jo->fifo);
592     wcount = 0;
593     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
594         tsleep(jo, 0, "jwait", hz);
595         if (++wcount % 10 == 0) {
596             printf("Warning: journal %s waiting for descriptors to close\n",
597                 jo->id);
598         }
599     }
600
601     /*
602      * XXX SMP - threads should move to cpu requesting the restart or
603      * termination before finishing up to properly interlock.
604      */
605     tsleep(jo, 0, "jwait", hz);
606     lwkt_free_thread(&jo->wthread);
607     if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
608         lwkt_free_thread(&jo->rthread);
609 }
610
611 /*
612  * The per-journal worker thread is responsible for writing out the
613  * journal's FIFO to the target stream.
614  */
615 static void
616 journal_wthread(void *info)
617 {
618     struct journal *jo = info;
619     struct journal_rawrecbeg *rawp;
620     int bytes;
621     int error;
622     int avail;
623     int res;
624
625     for (;;) {
626         /*
627          * Calculate the number of bytes available to write.  This buffer
628          * area may contain reserved records so we can't just write it out
629          * without further checks.
630          */
631         bytes = jo->fifo.windex - jo->fifo.rindex;
632
633         /*
634          * sleep if no bytes are available or if an incomplete record is
635          * encountered (it needs to be filled in before we can write it
636          * out), and skip any pad records that we encounter.
637          */
638         if (bytes == 0) {
639             if (jo->flags & MC_JOURNAL_STOP_REQ)
640                 break;
641             tsleep(&jo->fifo, 0, "jfifo", hz);
642             continue;
643         }
644
645         /*
646          * Sleep if we can not go any further due to hitting an incomplete
647          * record.  This case should occur rarely but may have to be better
648          * optimized XXX.
649          */
650         rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
651         if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
652             tsleep(&jo->fifo, 0, "jpad", hz);
653             continue;
654         }
655
656         /*
657          * Skip any pad records.  We do not write out pad records if we can
658          * help it. 
659          */
660         if (rawp->streamid == JREC_STREAMID_PAD) {
661             if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
662                 if (jo->fifo.rindex == jo->fifo.xindex) {
663                     jo->fifo.xindex += (rawp->recsize + 15) & ~15;
664                     jo->total_acked += (rawp->recsize + 15) & ~15;
665                 }
666             }
667             jo->fifo.rindex += (rawp->recsize + 15) & ~15;
668             jo->total_acked += bytes;
669             KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
670             continue;
671         }
672
673         /*
674          * 'bytes' is the amount of data that can potentially be written out.  
675          * Calculate 'res', the amount of data that can actually be written
676          * out.  res is bounded either by hitting the end of the physical
677          * memory buffer or by hitting an incomplete record.  Incomplete
678          * records often occur due to the way the space reservation model
679          * works.
680          */
681         res = 0;
682         avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
683         while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
684             res += (rawp->recsize + 15) & ~15;
685             if (res >= avail) {
686                 KKASSERT(res == avail);
687                 break;
688             }
689             rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
690         }
691
692         /*
693          * Issue the write and deal with any errors or other conditions.
694          * For now assume blocking I/O.  Since we are record-aware the
695          * code cannot yet handle partial writes.
696          *
697          * We bump rindex prior to issuing the write to avoid racing
698          * the acknowledgement coming back (which could prevent the ack
699          * from bumping xindex).  Restarts are always based on xindex so
700          * we do not try to undo the rindex if an error occurs.
701          *
702          * XXX EWOULDBLOCK/NBIO
703          * XXX notification on failure
704          * XXX permanent verses temporary failures
705          * XXX two-way acknowledgement stream in the return direction / xindex
706          */
707         bytes = res;
708         jo->fifo.rindex += bytes;
709         error = fp_write(jo->fp, 
710                         jo->fifo.membase + ((jo->fifo.rindex - bytes) & jo->fifo.mask),
711                         bytes, &res);
712         if (error) {
713             printf("journal_thread(%s) write, error %d\n", jo->id, error);
714             /* XXX */
715         } else {
716             KKASSERT(res == bytes);
717         }
718
719         /*
720          * Advance rindex.  If the journal stream is not full duplex we also
721          * advance xindex, otherwise the rjournal thread is responsible for
722          * advancing xindex.
723          */
724         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
725             jo->fifo.xindex += bytes;
726             jo->total_acked += bytes;
727         }
728         KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
729         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
730             if (jo->flags & MC_JOURNAL_WWAIT) {
731                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
732                 wakeup(&jo->fifo.windex);
733             }
734         }
735     }
736     fp_shutdown(jo->fp, SHUT_WR);
737     jo->flags &= ~MC_JOURNAL_WACTIVE;
738     wakeup(jo);
739     wakeup(&jo->fifo.windex);
740 }
741
742 /*
743  * A second per-journal worker thread is created for two-way journaling
744  * streams to deal with the return acknowledgement stream.
745  */
746 static void
747 journal_rthread(void *info)
748 {
749     struct journal_rawrecbeg *rawp;
750     struct journal_ackrecord ack;
751     struct journal *jo = info;
752     int64_t transid;
753     int error;
754     int count;
755     int bytes;
756
757     transid = 0;
758     error = 0;
759
760     for (;;) {
761         /*
762          * We have been asked to stop
763          */
764         if (jo->flags & MC_JOURNAL_STOP_REQ)
765                 break;
766
767         /*
768          * If we have no active transaction id, get one from the return
769          * stream.
770          */
771         if (transid == 0) {
772             error = fp_read(jo->fp, &ack, sizeof(ack), &count, 1);
773 #if 0
774             printf("fp_read ack error %d count %d\n", error, count);
775 #endif
776             if (error || count != sizeof(ack))
777                 break;
778             if (error) {
779                 printf("read error %d on receive stream\n", error);
780                 break;
781             }
782             if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
783                 ack.rend.endmagic != JREC_ENDMAGIC
784             ) {
785                 printf("bad begmagic or endmagic on receive stream\n");
786                 break;
787             }
788             transid = ack.rbeg.transid;
789         }
790
791         /*
792          * Calculate the number of unacknowledged bytes.  If there are no
793          * unacknowledged bytes then unsent data was acknowledged, report,
794          * sleep a bit, and loop in that case.  This should not happen 
795          * normally.  The ack record is thrown away.
796          */
797         bytes = jo->fifo.rindex - jo->fifo.xindex;
798
799         if (bytes == 0) {
800             printf("warning: unsent data acknowledged transid %08llx\n", transid);
801             tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
802             transid = 0;
803             continue;
804         }
805
806         /*
807          * Since rindex has advanced, the record pointed to by xindex
808          * must be a valid record.
809          */
810         rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
811         KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
812         KKASSERT(rawp->recsize <= bytes);
813
814         /*
815          * The target can acknowledge several records at once.
816          */
817         if (rawp->transid < transid) {
818 #if 1
819             printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
820 #endif
821             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
822             jo->total_acked += (rawp->recsize + 15) & ~15;
823             if (jo->flags & MC_JOURNAL_WWAIT) {
824                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
825                 wakeup(&jo->fifo.windex);
826             }
827             continue;
828         }
829         if (rawp->transid == transid) {
830 #if 1
831             printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
832 #endif
833             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
834             jo->total_acked += (rawp->recsize + 15) & ~15;
835             if (jo->flags & MC_JOURNAL_WWAIT) {
836                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
837                 wakeup(&jo->fifo.windex);
838             }
839             transid = 0;
840             continue;
841         }
842         printf("warning: unsent data(2) acknowledged transid %08llx\n", transid);
843         transid = 0;
844     }
845     jo->flags &= ~MC_JOURNAL_RACTIVE;
846     wakeup(jo);
847     wakeup(&jo->fifo.windex);
848 }
849
850 /*
851  * This builds a pad record which the journaling thread will skip over.  Pad
852  * records are required when we are unable to reserve sufficient stream space
853  * due to insufficient space at the end of the physical memory fifo.
854  *
855  * Even though the record is not transmitted, a normal transid must be 
856  * assigned to it so link recovery operations after a failure work properly.
857  */
858 static
859 void
860 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
861 {
862     struct journal_rawrecend *rendp;
863     
864     KKASSERT((recsize & 15) == 0 && recsize >= 16);
865
866     rawp->streamid = JREC_STREAMID_PAD;
867     rawp->recsize = recsize;    /* must be 16-byte aligned */
868     rawp->transid = transid;
869     /*
870      * WARNING, rendp may overlap rawp->transid.  This is necessary to
871      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
872      * hopefully cause the compiler to not make any assumptions.
873      */
874     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
875     rendp->endmagic = JREC_ENDMAGIC;
876     rendp->check = 0;
877     rendp->recsize = rawp->recsize;
878
879     /*
880      * Set the begin magic last.  This is what will allow the journal
881      * thread to write the record out.  Use a store fence to prevent
882      * compiler and cpu reordering of the writes.
883      */
884     cpu_sfence();
885     rawp->begmagic = JREC_BEGMAGIC;
886 }
887
888 /*
889  * Wake up the worker thread if the FIFO is more then half full or if
890  * someone is waiting for space to be freed up.  Otherwise let the 
891  * heartbeat deal with it.  Being able to avoid waking up the worker
892  * is the key to the journal's cpu performance.
893  */
894 static __inline
895 void
896 journal_commit_wakeup(struct journal *jo)
897 {
898     int avail;
899
900     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
901     KKASSERT(avail >= 0);
902     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
903         wakeup(&jo->fifo);
904 }
905
906 /*
907  * Create a new BEGIN stream record with the specified streamid and the
908  * specified amount of payload space.  *rawpp will be set to point to the
909  * base of the new stream record and a pointer to the base of the payload
910  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
911  * making this call.  The raw record header will be partially initialized.
912  *
913  * A stream can be extended, aborted, or committed by other API calls
914  * below.  This may result in a sequence of potentially disconnected
915  * stream records to be output to the journaling target.  The first record
916  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
917  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
918  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
919  * up being the same as the first, in which case the bits are all set in
920  * the first record.
921  *
922  * The stream record is created in an incomplete state by setting the begin
923  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
924  * flushing the fifo past our record until we have finished populating it.
925  * Other threads can reserve and operate on their own space without stalling
926  * but the stream output will stall until we have completed operations.  The
927  * memory FIFO is intended to be large enough to absorb such situations
928  * without stalling out other threads.
929  */
930 static
931 void *
932 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
933                 int16_t streamid, int bytes)
934 {
935     struct journal_rawrecbeg *rawp;
936     int avail;
937     int availtoend;
938     int req;
939
940     /*
941      * Add header and trailer overheads to the passed payload.  Note that
942      * the passed payload size need not be aligned in any way.
943      */
944     bytes += sizeof(struct journal_rawrecbeg);
945     bytes += sizeof(struct journal_rawrecend);
946
947     for (;;) {
948         /*
949          * First, check boundary conditions.  If the request would wrap around
950          * we have to skip past the ending block and return to the beginning
951          * of the FIFO's buffer.  Calculate 'req' which is the actual number
952          * of bytes being reserved, including wrap-around dead space.
953          *
954          * Neither 'bytes' or 'req' are aligned.
955          *
956          * Note that availtoend is not truncated to avail and so cannot be
957          * used to determine whether the reservation is possible by itself.
958          * Also, since all fifo ops are 16-byte aligned, we can check
959          * the size before calculating the aligned size.
960          */
961         availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
962         KKASSERT((availtoend & 15) == 0);
963         if (bytes > availtoend) 
964             req = bytes + availtoend;   /* add pad to end */
965         else
966             req = bytes;
967
968         /*
969          * Next calculate the total available space and see if it is
970          * sufficient.  We cannot overwrite previously buffered data
971          * past xindex because otherwise we would not be able to restart
972          * a broken link at the target's last point of commit.
973          */
974         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
975         KKASSERT(avail >= 0 && (avail & 15) == 0);
976
977         if (avail < req) {
978             /* XXX MC_JOURNAL_STOP_IMM */
979             jo->flags |= MC_JOURNAL_WWAIT;
980             ++jo->fifostalls;
981             tsleep(&jo->fifo.windex, 0, "jwrite", 0);
982             continue;
983         }
984
985         /*
986          * Create a pad record for any dead space and create an incomplete
987          * record for the live space, then return a pointer to the
988          * contiguous buffer space that was requested.
989          *
990          * NOTE: The worker thread will not flush past an incomplete
991          * record, so the reserved space can be filled in at-will.  The
992          * journaling code must also be aware the reserved sections occuring
993          * after this one will also not be written out even if completed
994          * until this one is completed.
995          *
996          * The transaction id must accomodate real and potential pad creation.
997          */
998         rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
999         if (req != bytes) {
1000             journal_build_pad(rawp, availtoend, jo->transid);
1001             ++jo->transid;
1002             rawp = (void *)jo->fifo.membase;
1003         }
1004         rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
1005         rawp->recsize = bytes;                  /* (unaligned size) */
1006         rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
1007         rawp->transid = jo->transid;
1008         jo->transid += 2;
1009
1010         /*
1011          * Issue a memory barrier to guarentee that the record data has been
1012          * properly initialized before we advance the write index and return
1013          * a pointer to the reserved record.  Otherwise the worker thread
1014          * could accidently run past us.
1015          *
1016          * Note that stream records are always 16-byte aligned.
1017          */
1018         cpu_sfence();
1019         jo->fifo.windex += (req + 15) & ~15;
1020         *rawpp = rawp;
1021         return(rawp + 1);
1022     }
1023     /* not reached */
1024     *rawpp = NULL;
1025     return(NULL);
1026 }
1027
1028 /*
1029  * Attempt to extend the stream record by <bytes> worth of payload space.
1030  *
1031  * If it is possible to extend the existing stream record no truncation
1032  * occurs and the record is extended as specified.  A pointer to the 
1033  * truncation offset within the payload space is returned.
1034  *
1035  * If it is not possible to do this the existing stream record is truncated
1036  * and committed, and a new stream record of size <bytes> is created.  A
1037  * pointer to the base of the new stream record's payload space is returned.
1038  *
1039  * *rawpp is set to the new reservation in the case of a new record but
1040  * the caller cannot depend on a comparison with the old rawp to determine if
1041  * this case occurs because we could end up using the same memory FIFO
1042  * offset for the new stream record.  Use *newstreamrecp instead.
1043  */
1044 static void *
1045 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
1046                 int truncbytes, int bytes, int *newstreamrecp)
1047 {
1048     struct journal_rawrecbeg *rawp;
1049     int16_t streamid;
1050     int availtoend;
1051     int avail;
1052     int osize;
1053     int nsize;
1054     int wbase;
1055     void *rptr;
1056
1057     *newstreamrecp = 0;
1058     rawp = *rawpp;
1059     osize = (rawp->recsize + 15) & ~15;
1060     nsize = (rawp->recsize + bytes + 15) & ~15;
1061     wbase = (char *)rawp - jo->fifo.membase;
1062
1063     /*
1064      * If the aligned record size does not change we can trivially adjust
1065      * the record size.
1066      */
1067     if (nsize == osize) {
1068         rawp->recsize += bytes;
1069         return((char *)(rawp + 1) + truncbytes);
1070     }
1071
1072     /*
1073      * If the fifo's write index hasn't been modified since we made the
1074      * reservation and we do not hit any boundary conditions, we can 
1075      * trivially make the record smaller or larger.
1076      */
1077     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
1078         availtoend = jo->fifo.size - wbase;
1079         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
1080         KKASSERT((availtoend & 15) == 0);
1081         KKASSERT((avail & 15) == 0);
1082         if (nsize <= avail && nsize <= availtoend) {
1083             jo->fifo.windex += nsize - osize;
1084             rawp->recsize += bytes;
1085             return((char *)(rawp + 1) + truncbytes);
1086         }
1087     }
1088
1089     /*
1090      * It was not possible to extend the buffer.  Commit the current
1091      * buffer and create a new one.  We manually clear the BEGIN mark that
1092      * journal_reserve() creates (because this is a continuing record, not
1093      * the start of a new stream).
1094      */
1095     streamid = rawp->streamid & JREC_STREAMID_MASK;
1096     journal_commit(jo, rawpp, truncbytes, 0);
1097     rptr = journal_reserve(jo, rawpp, streamid, bytes);
1098     rawp = *rawpp;
1099     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
1100     *newstreamrecp = 1;
1101     return(rptr);
1102 }
1103
1104 /*
1105  * Abort a journal record.  If the transaction record represents a stream
1106  * BEGIN and we can reverse the fifo's write index we can simply reverse
1107  * index the entire record, as if it were never reserved in the first place.
1108  *
1109  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
1110  * with the payload truncated to 0 bytes.
1111  */
1112 static void
1113 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
1114 {
1115     struct journal_rawrecbeg *rawp;
1116     int osize;
1117
1118     rawp = *rawpp;
1119     osize = (rawp->recsize + 15) & ~15;
1120
1121     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
1122         (jo->fifo.windex & jo->fifo.mask) == 
1123          (char *)rawp - jo->fifo.membase + osize)
1124     {
1125         jo->fifo.windex -= osize;
1126         *rawpp = NULL;
1127     } else {
1128         rawp->streamid |= JREC_STREAMCTL_ABORTED;
1129         journal_commit(jo, rawpp, 0, 1);
1130     }
1131 }
1132
1133 /*
1134  * Commit a journal record and potentially truncate it to the specified
1135  * number of payload bytes.  If you do not want to truncate the record,
1136  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
1137  * field includes header and trailer and will not be correct.  Note that
1138  * passing 0 will truncate the entire data payload of the record.
1139  *
1140  * The logical stream is terminated by this function.
1141  *
1142  * If truncation occurs, and it is not possible to physically optimize the
1143  * memory FIFO due to other threads having reserved space after ours,
1144  * the remaining reserved space will be covered by a pad record.
1145  */
1146 static void
1147 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
1148                 int bytes, int closeout)
1149 {
1150     struct journal_rawrecbeg *rawp;
1151     struct journal_rawrecend *rendp;
1152     int osize;
1153     int nsize;
1154
1155     rawp = *rawpp;
1156     *rawpp = NULL;
1157
1158     KKASSERT((char *)rawp >= jo->fifo.membase &&
1159              (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
1160     KKASSERT(((intptr_t)rawp & 15) == 0);
1161
1162     /*
1163      * Truncate the record if necessary.  If the FIFO write index as still
1164      * at the end of our record we can optimally backindex it.  Otherwise
1165      * we have to insert a pad record to cover the dead space.
1166      *
1167      * We calculate osize which is the 16-byte-aligned original recsize.
1168      * We calculate nsize which is the 16-byte-aligned new recsize.
1169      *
1170      * Due to alignment issues or in case the passed truncation bytes is
1171      * the same as the original payload, nsize may be equal to osize even
1172      * if the committed bytes is less then the originally reserved bytes.
1173      */
1174     if (bytes >= 0) {
1175         KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
1176         osize = (rawp->recsize + 15) & ~15;
1177         rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
1178                         sizeof(struct journal_rawrecend);
1179         nsize = (rawp->recsize + 15) & ~15;
1180         KKASSERT(nsize <= osize);
1181         if (osize == nsize) {
1182             /* do nothing */
1183         } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
1184             /* we are able to backindex the fifo */
1185             jo->fifo.windex -= osize - nsize;
1186         } else {
1187             /* we cannot backindex the fifo, emplace a pad in the dead space */
1188             journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
1189                                 rawp->transid + 1);
1190         }
1191     }
1192
1193     /*
1194      * Fill in the trailer.  Note that unlike pad records, the trailer will
1195      * never overlap the header.
1196      */
1197     rendp = (void *)((char *)rawp + 
1198             ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
1199     rendp->endmagic = JREC_ENDMAGIC;
1200     rendp->recsize = rawp->recsize;
1201     rendp->check = 0;           /* XXX check word, disabled for now */
1202
1203     /*
1204      * Fill in begmagic last.  This will allow the worker thread to proceed.
1205      * Use a memory barrier to guarentee write ordering.  Mark the stream
1206      * as terminated if closeout is set.  This is the typical case.
1207      */
1208     if (closeout)
1209         rawp->streamid |= JREC_STREAMCTL_END;
1210     cpu_sfence();               /* memory and compiler barrier */
1211     rawp->begmagic = JREC_BEGMAGIC;
1212
1213     journal_commit_wakeup(jo);
1214 }
1215
1216 /************************************************************************
1217  *                      PARALLEL TRANSACTION SUPPORT ROUTINES           *
1218  ************************************************************************
1219  *
1220  * JRECLIST_*() - routines which create and iterate over jrecord structures,
1221  *                because a mount point may have multiple attached journals.
1222  */
1223
1224 /*
1225  * Initialize the passed jrecord_list and create a jrecord for each 
1226  * journal we need to write to.  Unnecessary mallocs are avoided by
1227  * using the passed jrecord structure as the first jrecord in the list.
1228  * A starting transaction is pushed for each jrecord.
1229  *
1230  * Returns non-zero if any of the journals require undo records.
1231  */
1232 static
1233 int
1234 jreclist_init(struct mount *mp, struct jrecord_list *jreclist, 
1235               struct jrecord *jreccache, int16_t rectype)
1236 {
1237     struct journal *jo;
1238     struct jrecord *jrec;
1239     int wantrev;
1240     int count;
1241     int16_t streamid;
1242
1243     TAILQ_INIT(&jreclist->list);
1244
1245     /*
1246      * Select the stream ID to use for the transaction.  We must select
1247      * a stream ID that is not currently in use by some other parallel
1248      * transaction.  
1249      *
1250      * Don't bother calculating the next streamid when reassigning
1251      * mnt_streamid, since parallel transactions are fairly rare.  This
1252      * also allows someone observing the raw records to clearly see
1253      * when parallel transactions occur.
1254      */
1255     streamid = mp->mnt_streamid;
1256     count = 0;
1257     while (mp->mnt_jbitmap[streamid >> 3] & (1 << (streamid & 7))) {
1258         if (++streamid == JREC_STREAMID_JMAX)
1259                 streamid = JREC_STREAMID_JMIN;
1260         if (++count == JREC_STREAMID_JMAX - JREC_STREAMID_JMIN) {
1261                 printf("jreclist_init: all streamid's in use! sleeping\n");
1262                 tsleep(jreclist, 0, "jsidfl", hz * 10);
1263                 count = 0;
1264         }
1265     }
1266     mp->mnt_jbitmap[streamid >> 3] |= 1 << (streamid & 7);
1267     mp->mnt_streamid = streamid;
1268     jreclist->streamid = streamid;
1269
1270     /*
1271      * Now initialize a stream on each journal.
1272      */
1273     count = 0;
1274     wantrev = 0;
1275     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1276         if (count == 0)
1277             jrec = jreccache;
1278         else
1279             jrec = malloc(sizeof(*jrec), M_JOURNAL, M_WAITOK);
1280         jrecord_init(jo, jrec, streamid);
1281         jrec->user_save = jrecord_push(jrec, rectype);
1282         TAILQ_INSERT_TAIL(&jreclist->list, jrec, user_entry);
1283         if (jo->flags & MC_JOURNAL_WANT_REVERSABLE)
1284             wantrev = 1;
1285         ++count;
1286     }
1287     return(wantrev);
1288 }
1289
1290 /*
1291  * Terminate the journaled transactions started by jreclist_init().  If
1292  * an error occured, the transaction records will be aborted.
1293  */
1294 static
1295 void
1296 jreclist_done(struct mount *mp, struct jrecord_list *jreclist, int error)
1297 {
1298     struct jrecord *jrec;
1299     int count;
1300
1301     /*
1302      * Cleanup the jrecord state on each journal.
1303      */
1304     TAILQ_FOREACH(jrec, &jreclist->list, user_entry) {
1305         jrecord_pop(jrec, jrec->user_save);
1306         jrecord_done(jrec, error);
1307     }
1308
1309     /*
1310      * Free allocated jrec's (the first is always supplied)
1311      */
1312     count = 0;
1313     while ((jrec = TAILQ_FIRST(&jreclist->list)) != NULL) {
1314         TAILQ_REMOVE(&jreclist->list, jrec, user_entry);
1315         if (count)
1316             free(jrec, M_JOURNAL);
1317         ++count;
1318     }
1319
1320     /*
1321      * Clear the streamid so it can be reused.
1322      */
1323     mp->mnt_jbitmap[jreclist->streamid >> 3] &= ~(1 << (jreclist->streamid & 7));
1324 }
1325
1326 /*
1327  * This procedure writes out UNDO records for available reversable
1328  * journals.
1329  *
1330  * XXX could use improvement.  There is no need to re-read the file
1331  * for each journal.
1332  */
1333 static
1334 void
1335 jreclist_undo_file(struct jrecord_list *jreclist, struct vnode *vp, 
1336                    int jrflags, off_t off, off_t bytes)
1337 {
1338     struct jrecord *jrec;
1339     int error;
1340
1341     error = 0;
1342     if (jrflags & JRUNDO_GETVP)
1343         error = vget(vp, LK_SHARED);
1344     if (error == 0) {
1345         TAILQ_FOREACH(jrec, &jreclist->list, user_entry) {
1346             if (jrec->jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
1347                 jrecord_undo_file(jrec, vp, jrflags, off, bytes);
1348             }
1349         }
1350     }
1351     if (error == 0 && jrflags & JRUNDO_GETVP)
1352         vput(vp);
1353 }
1354
1355 /************************************************************************
1356  *                      TRANSACTION SUPPORT ROUTINES                    *
1357  ************************************************************************
1358  *
1359  * JRECORD_*() - routines to create subrecord transactions and embed them
1360  *               in the logical streams managed by the journal_*() routines.
1361  */
1362
1363 /*
1364  * Initialize the passed jrecord structure and start a new stream transaction
1365  * by reserving an initial build space in the journal's memory FIFO.
1366  */
1367 static void
1368 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
1369 {
1370     bzero(jrec, sizeof(*jrec));
1371     jrec->jo = jo;
1372     jrec->streamid = streamid;
1373     jrec->stream_residual = JREC_DEFAULTSIZE;
1374     jrec->stream_reserved = jrec->stream_residual;
1375     jrec->stream_ptr = 
1376         journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
1377 }
1378
1379 /*
1380  * Push a recursive record type.  All pushes should have matching pops.
1381  * The old parent is returned and the newly pushed record becomes the
1382  * new parent.  Note that the old parent's pointer may already be invalid
1383  * or may become invalid if jrecord_write() had to build a new stream
1384  * record, so the caller should not mess with the returned pointer in
1385  * any way other then to save it.
1386  */
1387 static 
1388 struct journal_subrecord *
1389 jrecord_push(struct jrecord *jrec, int16_t rectype)
1390 {
1391     struct journal_subrecord *save;
1392
1393     save = jrec->parent;
1394     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
1395     jrec->last = NULL;
1396     KKASSERT(jrec->parent != NULL);
1397     ++jrec->pushcount;
1398     ++jrec->pushptrgood;        /* cleared on flush */
1399     return(save);
1400 }
1401
1402 /*
1403  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
1404  * on the last record written within the subtransaction.  If the last 
1405  * record written is not accessible or if the subtransaction is empty,
1406  * we must write out a pad record with JMASK_LAST set before popping.
1407  *
1408  * When popping a subtransaction the parent record's recsize field
1409  * will be properly set.  If the parent pointer is no longer valid
1410  * (which can occur if the data has already been flushed out to the
1411  * stream), the protocol spec allows us to leave it 0.
1412  *
1413  * The saved parent pointer which we restore may or may not be valid,
1414  * and if not valid may or may not be NULL, depending on the value
1415  * of pushptrgood.
1416  */
1417 static void
1418 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
1419 {
1420     struct journal_subrecord *last;
1421
1422     KKASSERT(jrec->pushcount > 0);
1423     KKASSERT(jrec->residual == 0);
1424
1425     /*
1426      * Set JMASK_LAST on the last record we wrote at the current
1427      * level.  If last is NULL we either no longer have access to the
1428      * record or the subtransaction was empty and we must write out a pad
1429      * record.
1430      */
1431     if ((last = jrec->last) == NULL) {
1432         jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
1433         last = jrec->last;      /* reload after possible flush */
1434     } else {
1435         last->rectype |= JMASK_LAST;
1436     }
1437
1438     /*
1439      * pushptrgood tells us how many levels of parent record pointers
1440      * are valid.  The jrec only stores the current parent record pointer
1441      * (and it is only valid if pushptrgood != 0).  The higher level parent
1442      * record pointers are saved by the routines calling jrecord_push() and
1443      * jrecord_pop().  These pointers may become stale and we determine
1444      * that fact by tracking the count of valid parent pointers with 
1445      * pushptrgood.  Pointers become invalid when their related stream
1446      * record gets pushed out.
1447      *
1448      * If no pointer is available (the data has already been pushed out),
1449      * then no fixup of e.g. the length field is possible for non-leaf
1450      * nodes.  The protocol allows for this situation by placing a larger
1451      * burden on the program scanning the stream on the other end.
1452      *
1453      * [parentA]
1454      *    [node X]
1455      *    [parentB]
1456      *       [node Y]
1457      *       [node Z]
1458      *    (pop B)       see NOTE B
1459      * (pop A)          see NOTE A
1460      *
1461      * NOTE B:  This pop sets LAST in node Z if the node is still accessible,
1462      *          else a PAD record is appended and LAST is set in that.
1463      *
1464      *          This pop sets the record size in parentB if parentB is still
1465      *          accessible, else the record size is left 0 (the scanner must
1466      *          deal with that).
1467      *
1468      *          This pop sets the new 'last' record to parentB, the pointer
1469      *          to which may or may not still be accessible.
1470      *
1471      * NOTE A:  This pop sets LAST in parentB if the node is still accessible,
1472      *          else a PAD record is appended and LAST is set in that.
1473      *
1474      *          This pop sets the record size in parentA if parentA is still
1475      *          accessible, else the record size is left 0 (the scanner must
1476      *          deal with that).
1477      *
1478      *          This pop sets the new 'last' record to parentA, the pointer
1479      *          to which may or may not still be accessible.
1480      *
1481      * Also note that the last record in the stream transaction, which in
1482      * the above example is parentA, does not currently have the LAST bit
1483      * set.
1484      *
1485      * The current parent becomes the last record relative to the
1486      * saved parent passed into us.  It's validity is based on 
1487      * whether pushptrgood is non-zero prior to decrementing.  The saved
1488      * parent becomes the new parent, and its validity is based on whether
1489      * pushptrgood is non-zero after decrementing.
1490      *
1491      * The old jrec->parent may be NULL if it is no longer accessible.
1492      * If pushptrgood is non-zero, however, it is guarenteed to not
1493      * be NULL (since no flush occured).
1494      */
1495     jrec->last = jrec->parent;
1496     --jrec->pushcount;
1497     if (jrec->pushptrgood) {
1498         KKASSERT(jrec->last != NULL && last != NULL);
1499         if (--jrec->pushptrgood == 0) {
1500             jrec->parent = NULL;        /* 'save' contains garbage or NULL */
1501         } else {
1502             KKASSERT(save != NULL);
1503             jrec->parent = save;        /* 'save' must not be NULL */
1504         }
1505
1506         /*
1507          * Set the record size in the old parent.  'last' still points to
1508          * the original last record in the subtransaction being popped,
1509          * jrec->last points to the old parent (which became the last
1510          * record relative to the new parent being popped into).
1511          */
1512         jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1513     } else {
1514         jrec->parent = NULL;
1515         KKASSERT(jrec->last == NULL);
1516     }
1517 }
1518
1519 /*
1520  * Write out a leaf record, including associated data.
1521  */
1522 static
1523 void
1524 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
1525 {
1526     jrecord_write(jrec, rectype, bytes);
1527     jrecord_data(jrec, ptr, bytes);
1528 }
1529
1530 /*
1531  * Write a leaf record out and return a pointer to its base.  The leaf
1532  * record may contain potentially megabytes of data which is supplied
1533  * in jrecord_data() calls.  The exact amount must be specified in this
1534  * call.
1535  *
1536  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
1537  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
1538  * USE THE RETURN VALUE.
1539  */
1540 static
1541 struct journal_subrecord *
1542 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1543 {
1544     struct journal_subrecord *last;
1545     int pusheditout;
1546
1547     /*
1548      * Try to catch some obvious errors.  Nesting records must specify a
1549      * size of 0, and there should be no left-overs from previous operations
1550      * (such as incomplete data writeouts).
1551      */
1552     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1553     KKASSERT(jrec->residual == 0);
1554
1555     /*
1556      * Check to see if the current stream record has enough room for
1557      * the new subrecord header.  If it doesn't we extend the current
1558      * stream record.
1559      *
1560      * This may have the side effect of pushing out the current stream record
1561      * and creating a new one.  We must adjust our stream tracking fields
1562      * accordingly.
1563      */
1564     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1565         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1566                                 jrec->stream_reserved - jrec->stream_residual,
1567                                 JREC_DEFAULTSIZE, &pusheditout);
1568         if (pusheditout) {
1569             /*
1570              * If a pushout occured, the pushed out stream record was
1571              * truncated as specified and the new record is exactly the
1572              * extension size specified.
1573              */
1574             jrec->stream_reserved = JREC_DEFAULTSIZE;
1575             jrec->stream_residual = JREC_DEFAULTSIZE;
1576             jrec->parent = NULL;        /* no longer accessible */
1577             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1578         } else {
1579             /*
1580              * If no pushout occured the stream record is NOT truncated and
1581              * IS extended.
1582              */
1583             jrec->stream_reserved += JREC_DEFAULTSIZE;
1584             jrec->stream_residual += JREC_DEFAULTSIZE;
1585         }
1586     }
1587     last = (void *)jrec->stream_ptr;
1588     last->rectype = rectype;
1589     last->reserved = 0;
1590
1591     /*
1592      * We may not know the record size for recursive records and the 
1593      * header may become unavailable due to limited FIFO space.  Write
1594      * -1 to indicate this special case.
1595      */
1596     if ((rectype & JMASK_NESTED) && bytes == 0)
1597         last->recsize = -1;
1598     else
1599         last->recsize = sizeof(struct journal_subrecord) + bytes;
1600     jrec->last = last;
1601     jrec->residual = bytes;             /* remaining data to be posted */
1602     jrec->residual_align = -bytes & 7;  /* post-data alignment required */
1603     jrec->stream_ptr += sizeof(*last);  /* current write pointer */
1604     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1605     return(last);
1606 }
1607
1608 /*
1609  * Write out the data associated with a leaf record.  Any number of calls
1610  * to this routine may be made as long as the byte count adds up to the
1611  * amount originally specified in jrecord_write().
1612  *
1613  * The act of writing out the leaf data may result in numerous stream records
1614  * being pushed out.   Callers should be aware that even the associated
1615  * subrecord header may become inaccessible due to stream record pushouts.
1616  */
1617 static void
1618 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1619 {
1620     int pusheditout;
1621     int extsize;
1622
1623     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1624
1625     /*
1626      * Push out stream records as long as there is insufficient room to hold
1627      * the remaining data.
1628      */
1629     while (jrec->stream_residual < bytes) {
1630         /*
1631          * Fill in any remaining space in the current stream record.
1632          */
1633         bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1634         buf = (const char *)buf + jrec->stream_residual;
1635         bytes -= jrec->stream_residual;
1636         /*jrec->stream_ptr += jrec->stream_residual;*/
1637         jrec->residual -= jrec->stream_residual;
1638         jrec->stream_residual = 0;
1639
1640         /*
1641          * Try to extend the current stream record, but no more then 1/4
1642          * the size of the FIFO.
1643          */
1644         extsize = jrec->jo->fifo.size >> 2;
1645         if (extsize > bytes)
1646             extsize = (bytes + 15) & ~15;
1647
1648         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1649                                 jrec->stream_reserved - jrec->stream_residual,
1650                                 extsize, &pusheditout);
1651         if (pusheditout) {
1652             jrec->stream_reserved = extsize;
1653             jrec->stream_residual = extsize;
1654             jrec->parent = NULL;        /* no longer accessible */
1655             jrec->last = NULL;          /* no longer accessible */
1656             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1657         } else {
1658             jrec->stream_reserved += extsize;
1659             jrec->stream_residual += extsize;
1660         }
1661     }
1662
1663     /*
1664      * Push out any remaining bytes into the current stream record.
1665      */
1666     if (bytes) {
1667         bcopy(buf, jrec->stream_ptr, bytes);
1668         jrec->stream_ptr += bytes;
1669         jrec->stream_residual -= bytes;
1670         jrec->residual -= bytes;
1671     }
1672
1673     /*
1674      * Handle data alignment requirements for the subrecord.  Because the
1675      * stream record's data space is more strictly aligned, it must already
1676      * have sufficient space to hold any subrecord alignment slop.
1677      */
1678     if (jrec->residual == 0 && jrec->residual_align) {
1679         KKASSERT(jrec->residual_align <= jrec->stream_residual);
1680         bzero(jrec->stream_ptr, jrec->residual_align);
1681         jrec->stream_ptr += jrec->residual_align;
1682         jrec->stream_residual -= jrec->residual_align;
1683         jrec->residual_align = 0;
1684     }
1685 }
1686
1687 /*
1688  * We are finished with the transaction.  This closes the transaction created
1689  * by jrecord_init().
1690  *
1691  * NOTE: If abortit is not set then we must be at the top level with no
1692  *       residual subrecord data left to output.
1693  *
1694  *       If abortit is set then we can be in any state, all pushes will be 
1695  *       popped and it is ok for there to be residual data.  This works 
1696  *       because the virtual stream itself is truncated.  Scanners must deal
1697  *       with this situation.
1698  *
1699  * The stream record will be committed or aborted as specified and jrecord
1700  * resources will be cleaned up.
1701  */
1702 static void
1703 jrecord_done(struct jrecord *jrec, int abortit)
1704 {
1705     KKASSERT(jrec->rawp != NULL);
1706
1707     if (abortit) {
1708         journal_abort(jrec->jo, &jrec->rawp);
1709     } else {
1710         KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1711         journal_commit(jrec->jo, &jrec->rawp, 
1712                         jrec->stream_reserved - jrec->stream_residual, 1);
1713     }
1714
1715     /*
1716      * jrec should not be used beyond this point without another init,
1717      * but clean up some fields to ensure that we panic if it is.
1718      *
1719      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1720      */
1721     jrec->jo = NULL;
1722     jrec->stream_ptr = NULL;
1723 }
1724
1725 /************************************************************************
1726  *                      LOW LEVEL RECORD SUPPORT ROUTINES               *
1727  ************************************************************************
1728  *
1729  * These routine create low level recursive and leaf subrecords representing
1730  * common filesystem structures.
1731  */
1732
1733 /*
1734  * Write out a filename path relative to the base of the mount point.
1735  * rectype is typically JLEAF_PATH{1,2,3,4}.
1736  */
1737 static void
1738 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1739 {
1740     char buf[64];       /* local buffer if it fits, else malloced */
1741     char *base;
1742     int pathlen;
1743     int index;
1744     struct namecache *scan;
1745
1746     /*
1747      * Pass 1 - figure out the number of bytes required.  Include terminating
1748      *         \0 on last element and '/' separator on other elements.
1749      */
1750 again:
1751     pathlen = 0;
1752     for (scan = ncp; 
1753          scan && (scan->nc_flag & NCF_MOUNTPT) == 0; 
1754          scan = scan->nc_parent
1755     ) {
1756         pathlen += scan->nc_nlen + 1;
1757     }
1758
1759     if (pathlen <= sizeof(buf))
1760         base = buf;
1761     else
1762         base = malloc(pathlen, M_TEMP, M_INTWAIT);
1763
1764     /*
1765      * Pass 2 - generate the path buffer
1766      */
1767     index = pathlen;
1768     for (scan = ncp; 
1769          scan && (scan->nc_flag & NCF_MOUNTPT) == 0; 
1770          scan = scan->nc_parent
1771     ) {
1772         if (scan->nc_nlen >= index) {
1773             if (base != buf)
1774                 free(base, M_TEMP);
1775             goto again;
1776         }
1777         if (index == pathlen)
1778             base[--index] = 0;
1779         else
1780             base[--index] = '/';
1781         index -= scan->nc_nlen;
1782         bcopy(scan->nc_name, base + index, scan->nc_nlen);
1783     }
1784     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1785     if (base != buf)
1786         free(base, M_TEMP);
1787 }
1788
1789 /*
1790  * Write out a file attribute structure.  While somewhat inefficient, using
1791  * a recursive data structure is the most portable and extensible way.
1792  */
1793 static void
1794 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1795 {
1796     void *save;
1797
1798     save = jrecord_push(jrec, JTYPE_VATTR);
1799     if (vat->va_type != VNON)
1800         jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1801     if (vat->va_mode != (mode_t)VNOVAL)
1802         jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1803     if (vat->va_nlink != VNOVAL)
1804         jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1805     if (vat->va_uid != VNOVAL)
1806         jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1807     if (vat->va_gid != VNOVAL)
1808         jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1809     if (vat->va_fsid != VNOVAL)
1810         jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1811     if (vat->va_fileid != VNOVAL)
1812         jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1813     if (vat->va_size != VNOVAL)
1814         jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1815     if (vat->va_atime.tv_sec != VNOVAL)
1816         jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1817     if (vat->va_mtime.tv_sec != VNOVAL)
1818         jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1819     if (vat->va_ctime.tv_sec != VNOVAL)
1820         jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1821     if (vat->va_gen != VNOVAL)
1822         jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1823     if (vat->va_flags != VNOVAL)
1824         jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1825     if (vat->va_rdev != VNOVAL)
1826         jrecord_leaf(jrec, JLEAF_UDEV, &vat->va_rdev, sizeof(vat->va_rdev));
1827 #if 0
1828     if (vat->va_filerev != VNOVAL)
1829         jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1830 #endif
1831     jrecord_pop(jrec, save);
1832 }
1833
1834 /*
1835  * Write out the creds used to issue a file operation.  If a process is
1836  * available write out additional tracking information related to the 
1837  * process.
1838  *
1839  * XXX additional tracking info
1840  * XXX tty line info
1841  */
1842 static void
1843 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1844 {
1845     void *save;
1846     struct proc *p;
1847
1848     save = jrecord_push(jrec, JTYPE_CRED);
1849     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1850     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1851     if (td && (p = td->td_proc) != NULL) {
1852         jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1853         jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1854     }
1855     jrecord_pop(jrec, save);
1856 }
1857
1858 /*
1859  * Write out information required to identify a vnode
1860  *
1861  * XXX this needs work.  We should write out the inode number as well,
1862  * and in fact avoid writing out the file path for seqential writes
1863  * occuring within e.g. a certain period of time.
1864  */
1865 static void
1866 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1867 {
1868     struct namecache *ncp;
1869
1870     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1871         if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1872             break;
1873     }
1874     if (ncp)
1875         jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1876 }
1877
1878 static void
1879 jrecord_write_vnode_link(struct jrecord *jrec, struct vnode *vp, 
1880                          struct namecache *notncp)
1881 {
1882     struct namecache *ncp;
1883
1884     TAILQ_FOREACH(ncp, &vp->v_namecache, nc_vnode) {
1885         if (ncp == notncp)
1886             continue;
1887         if ((ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1888             break;
1889     }
1890     if (ncp)
1891         jrecord_write_path(jrec, JLEAF_PATH_REF, ncp);
1892 }
1893
1894 #if 0
1895 /*
1896  * Write out the current contents of the file within the specified
1897  * range.  This is typically called from within an UNDO section.  A
1898  * locked vnode must be passed.
1899  */
1900 static int
1901 jrecord_write_filearea(struct jrecord *jrec, struct vnode *vp, 
1902                         off_t begoff, off_t endoff)
1903 {
1904 }
1905 #endif
1906
1907 /*
1908  * Write out the data represented by a pagelist
1909  */
1910 static void
1911 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1912                         struct vm_page **pglist, int *rtvals, int pgcount,
1913                         off_t offset)
1914 {
1915     struct msf_buf *msf;
1916     int error;
1917     int b;
1918     int i;
1919
1920     i = 0;
1921     while (i < pgcount) {
1922         /*
1923          * Find the next valid section.  Skip any invalid elements
1924          */
1925         if (rtvals[i] != VM_PAGER_OK) {
1926             ++i;
1927             offset += PAGE_SIZE;
1928             continue;
1929         }
1930
1931         /*
1932          * Figure out how big the valid section is, capping I/O at what the
1933          * MSFBUF can represent.
1934          */
1935         b = i;
1936         while (i < pgcount && i - b != XIO_INTERNAL_PAGES && 
1937                rtvals[i] == VM_PAGER_OK
1938         ) {
1939             ++i;
1940         }
1941
1942         /*
1943          * And write it out.
1944          */
1945         if (i - b) {
1946             error = msf_map_pagelist(&msf, pglist + b, i - b, 0);
1947             if (error == 0) {
1948                 printf("RECORD PUTPAGES %d\n", msf_buf_bytes(msf));
1949                 jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1950                 jrecord_leaf(jrec, rectype, 
1951                              msf_buf_kva(msf), msf_buf_bytes(msf));
1952                 msf_buf_free(msf);
1953             } else {
1954                 printf("jrecord_write_pagelist: mapping failure\n");
1955             }
1956             offset += (off_t)(i - b) << PAGE_SHIFT;
1957         }
1958     }
1959 }
1960
1961 /*
1962  * Write out the data represented by a UIO.
1963  */
1964 struct jwuio_info {
1965     struct jrecord *jrec;
1966     int16_t rectype;
1967 };
1968
1969 static int jrecord_write_uio_callback(void *info, char *buf, int bytes);
1970
1971 static void
1972 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1973 {
1974     struct jwuio_info info = { jrec, rectype };
1975     int error;
1976
1977     if (uio->uio_segflg != UIO_NOCOPY) {
1978         jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset, 
1979                      sizeof(uio->uio_offset));
1980         error = msf_uio_iterate(uio, jrecord_write_uio_callback, &info);
1981         if (error)
1982             printf("XXX warning uio iterate failed %d\n", error);
1983     }
1984 }
1985
1986 static int
1987 jrecord_write_uio_callback(void *info_arg, char *buf, int bytes)
1988 {
1989     struct jwuio_info *info = info_arg;
1990
1991     jrecord_leaf(info->jrec, info->rectype, buf, bytes);
1992     return(0);
1993 }
1994
1995 static void
1996 jrecord_file_data(struct jrecord *jrec, struct vnode *vp, 
1997                   off_t off, off_t bytes)
1998 {
1999     const int bufsize = 8192;
2000     char *buf;
2001     int error;
2002     int n;
2003
2004     buf = malloc(bufsize, M_JOURNAL, M_WAITOK);
2005     jrecord_leaf(jrec, JLEAF_SEEKPOS, &off, sizeof(off));
2006     while (bytes) {
2007         n = (bytes > bufsize) ? bufsize : (int)bytes;
2008         error = vn_rdwr(UIO_READ, vp, buf, n, off, UIO_SYSSPACE, IO_NODELOCKED,
2009                         proc0.p_ucred, NULL);
2010         if (error) {
2011             jrecord_leaf(jrec, JLEAF_ERROR, &error, sizeof(error));
2012             break;
2013         }
2014         jrecord_leaf(jrec, JLEAF_FILEDATA, buf, n);
2015         bytes -= n;
2016         off += n;
2017     }
2018     free(buf, M_JOURNAL);
2019 }
2020
2021 /************************************************************************
2022  *                      LOW LEVEL UNDO SUPPORT ROUTINE                  *
2023  ************************************************************************
2024  *
2025  * This function is used to support UNDO records.  It will generate an
2026  * appropriate record with the requested portion of the file data.  Note
2027  * that file data is only recorded if JRUNDO_FILEDATA is passed.  If bytes
2028  * is -1, it will be set to the size of the file.
2029  */
2030 static void
2031 jrecord_undo_file(struct jrecord *jrec, struct vnode *vp, int jrflags, 
2032                   off_t off, off_t bytes)
2033 {
2034     struct vattr attr;
2035     void *save1; /* warning, save pointers do not always remain valid */
2036     void *save2;
2037     int error;
2038
2039     /*
2040      * Setup.  Start the UNDO record, obtain a shared lock on the vnode,
2041      * and retrieve attribute info.
2042      */
2043     save1 = jrecord_push(jrec, JTYPE_UNDO);
2044     error = VOP_GETATTR(vp, &attr);
2045     if (error)
2046         goto done;
2047
2048     /*
2049      * Generate UNDO records as requested.
2050      */
2051     if (jrflags & JRUNDO_VATTR) {
2052         save2 = jrecord_push(jrec, JTYPE_VATTR);
2053         jrecord_leaf(jrec, JLEAF_VTYPE, &attr.va_type, sizeof(attr.va_type));
2054         if ((jrflags & JRUNDO_NLINK) && attr.va_nlink != VNOVAL)
2055             jrecord_leaf(jrec, JLEAF_NLINK, &attr.va_nlink, sizeof(attr.va_nlink));
2056         if ((jrflags & JRUNDO_SIZE) && attr.va_size != VNOVAL)
2057             jrecord_leaf(jrec, JLEAF_SIZE, &attr.va_size, sizeof(attr.va_size));
2058         if ((jrflags & JRUNDO_UID) && attr.va_uid != VNOVAL)
2059             jrecord_leaf(jrec, JLEAF_UID, &attr.va_uid, sizeof(attr.va_uid));
2060         if ((jrflags & JRUNDO_GID) && attr.va_gid != VNOVAL)
2061             jrecord_leaf(jrec, JLEAF_GID, &attr.va_gid, sizeof(attr.va_gid));
2062         if ((jrflags & JRUNDO_FSID) && attr.va_fsid != VNOVAL)
2063             jrecord_leaf(jrec, JLEAF_FSID, &attr.va_fsid, sizeof(attr.va_fsid));
2064         if ((jrflags & JRUNDO_MODES) && attr.va_mode != (mode_t)VNOVAL)
2065             jrecord_leaf(jrec, JLEAF_MODES, &attr.va_mode, sizeof(attr.va_mode));
2066         if ((jrflags & JRUNDO_INUM) && attr.va_fileid != VNOVAL)
2067             jrecord_leaf(jrec, JLEAF_INUM, &attr.va_fileid, sizeof(attr.va_fileid));
2068         if ((jrflags & JRUNDO_ATIME) && attr.va_atime.tv_sec != VNOVAL)
2069             jrecord_leaf(jrec, JLEAF_ATIME, &attr.va_atime, sizeof(attr.va_atime));
2070         if ((jrflags & JRUNDO_MTIME) && attr.va_mtime.tv_sec != VNOVAL)
2071             jrecord_leaf(jrec, JLEAF_MTIME, &attr.va_mtime, sizeof(attr.va_mtime));
2072         if ((jrflags & JRUNDO_CTIME) && attr.va_ctime.tv_sec != VNOVAL)
2073             jrecord_leaf(jrec, JLEAF_CTIME, &attr.va_ctime, sizeof(attr.va_ctime));
2074         if ((jrflags & JRUNDO_GEN) && attr.va_gen != VNOVAL)
2075             jrecord_leaf(jrec, JLEAF_GEN, &attr.va_gen, sizeof(attr.va_gen));
2076         if ((jrflags & JRUNDO_FLAGS) && attr.va_flags != VNOVAL)
2077             jrecord_leaf(jrec, JLEAF_FLAGS, &attr.va_flags, sizeof(attr.va_flags));
2078         if ((jrflags & JRUNDO_UDEV) && attr.va_rdev != VNOVAL)
2079             jrecord_leaf(jrec, JLEAF_UDEV, &attr.va_rdev, sizeof(attr.va_rdev));
2080         jrecord_pop(jrec, save2);
2081     }
2082
2083     /*
2084      * Output the file data being overwritten by reading the file and
2085      * writing it out to the journal prior to the write operation.  We
2086      * do not need to write out data past the current file EOF.
2087      *
2088      * XXX support JRUNDO_CONDLINK - do not write out file data for files
2089      * with a link count > 1.  The undo code needs to locate the inode and
2090      * regenerate the hardlink.
2091      */
2092     if ((jrflags & JRUNDO_FILEDATA) && attr.va_type == VREG) {
2093         if (attr.va_size != VNOVAL) {
2094             if (bytes == -1)
2095                 bytes = attr.va_size - off;
2096             if (off + bytes > attr.va_size)
2097                 bytes = attr.va_size - off;
2098             if (bytes > 0)
2099                 jrecord_file_data(jrec, vp, off, bytes);
2100         } else {
2101             error = EINVAL;
2102         }
2103     }
2104     if ((jrflags & JRUNDO_FILEDATA) && attr.va_type == VLNK) {
2105         struct iovec aiov;
2106         struct uio auio;
2107         char *buf;
2108
2109         buf = malloc(PATH_MAX, M_JOURNAL, M_WAITOK);
2110         aiov.iov_base = buf;
2111         aiov.iov_len = PATH_MAX;
2112         auio.uio_iov = &aiov;
2113         auio.uio_iovcnt = 1;
2114         auio.uio_offset = 0;
2115         auio.uio_rw = UIO_READ;
2116         auio.uio_segflg = UIO_SYSSPACE;
2117         auio.uio_td = curthread;
2118         auio.uio_resid = PATH_MAX;
2119         error = VOP_READLINK(vp, &auio, proc0.p_ucred);
2120         if (error == 0) {
2121                 jrecord_leaf(jrec, JLEAF_SYMLINKDATA, buf, 
2122                                 PATH_MAX - auio.uio_resid);
2123         }
2124         free(buf, M_JOURNAL);
2125     }
2126 done:
2127     if (error)
2128         jrecord_leaf(jrec, JLEAF_ERROR, &error, sizeof(error));
2129     jrecord_pop(jrec, save1);
2130 }
2131
2132 /************************************************************************
2133  *                      JOURNAL VNOPS                                   *
2134  ************************************************************************
2135  *
2136  * These are function shims replacing the normal filesystem ops.  We become
2137  * responsible for calling the underlying filesystem ops.  We have the choice
2138  * of executing the underlying op first and then generating the journal entry,
2139  * or starting the journal entry, executing the underlying op, and then
2140  * either completing or aborting it.  
2141  *
2142  * The journal is supposed to be a high-level entity, which generally means
2143  * identifying files by name rather then by inode.  Supplying both allows
2144  * the journal to be used both for inode-number-compatible 'mirrors' and
2145  * for simple filesystem replication.
2146  *
2147  * Writes are particularly difficult to deal with because a single write may
2148  * represent a hundred megabyte buffer or more, and both writes and truncations
2149  * require the 'old' data to be written out as well as the new data if the
2150  * log is reversable.  Other issues:
2151  *
2152  * - How to deal with operations on unlinked files (no path available),
2153  *   but which may still be filesystem visible due to hard links.
2154  *
2155  * - How to deal with modifications made via a memory map.
2156  *
2157  * - Future cache coherency support will require cache coherency API calls
2158  *   both prior to and after the call to the underlying VFS.
2159  *
2160  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
2161  * new VFS equivalents (NMKDIR).
2162  */
2163
2164 /*
2165  * Journal vop_settattr { a_vp, a_vap, a_cred, a_td }
2166  */
2167 static
2168 int
2169 journal_setattr(struct vop_setattr_args *ap)
2170 {
2171     struct jrecord_list jreclist;
2172     struct jrecord jreccache;
2173     struct jrecord *jrec;
2174     struct mount *mp;
2175     void *save;
2176     int error;
2177
2178     mp = ap->a_head.a_ops->vv_mount;
2179     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_SETATTR)) {
2180         jreclist_undo_file(&jreclist, ap->a_vp, JRUNDO_VATTR, 0, 0);
2181     }
2182     error = vop_journal_operate_ap(&ap->a_head);
2183     if (error == 0) {
2184         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2185             jrecord_write_cred(jrec, curthread, ap->a_cred);
2186             jrecord_write_vnode_ref(jrec, ap->a_vp);
2187             save = jrecord_push(jrec, JTYPE_REDO);
2188             jrecord_write_vattr(jrec, ap->a_vap);
2189             jrecord_pop(jrec, save);
2190         }
2191     }
2192     jreclist_done(mp, &jreclist, error);
2193     return (error);
2194 }
2195
2196 /*
2197  * Journal vop_write { a_vp, a_uio, a_ioflag, a_cred }
2198  */
2199 static
2200 int
2201 journal_write(struct vop_write_args *ap)
2202 {
2203     struct jrecord_list jreclist;
2204     struct jrecord jreccache;
2205     struct jrecord *jrec;
2206     struct mount *mp;
2207     struct uio uio_copy;
2208     struct iovec uio_one_iovec;
2209     void *save;
2210     int error;
2211
2212     /*
2213      * This is really nasty.  UIO's don't retain sufficient information to
2214      * be reusable once they've gone through the VOP chain.  The iovecs get
2215      * cleared, so we have to copy the UIO.
2216      *
2217      * XXX fix the UIO code to not destroy iov's during a scan so we can
2218      *     reuse the uio over and over again.
2219      *
2220      * XXX UNDO code needs to journal the old data prior to the write.
2221      */
2222     uio_copy = *ap->a_uio;
2223     if (uio_copy.uio_iovcnt == 1) {
2224         uio_one_iovec = ap->a_uio->uio_iov[0];
2225         uio_copy.uio_iov = &uio_one_iovec;
2226     } else {
2227         uio_copy.uio_iov = malloc(uio_copy.uio_iovcnt * sizeof(struct iovec),
2228                                     M_JOURNAL, M_WAITOK);
2229         bcopy(ap->a_uio->uio_iov, uio_copy.uio_iov, 
2230                 uio_copy.uio_iovcnt * sizeof(struct iovec));
2231     }
2232
2233     /*
2234      * Write out undo data.  Note that uio_offset is incorrect if
2235      * IO_APPEND is set, but fortunately we have no undo file data to
2236      * write out in that case.
2237      */
2238     mp = ap->a_head.a_ops->vv_mount;
2239     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_WRITE)) {
2240         if (ap->a_ioflag & IO_APPEND) {
2241             jreclist_undo_file(&jreclist, ap->a_vp, JRUNDO_SIZE|JRUNDO_MTIME, 0, 0);
2242         } else {
2243             jreclist_undo_file(&jreclist, ap->a_vp, 
2244                                JRUNDO_FILEDATA|JRUNDO_SIZE|JRUNDO_MTIME, 
2245                                uio_copy.uio_offset, uio_copy.uio_resid);
2246         }
2247     }
2248     error = vop_journal_operate_ap(&ap->a_head);
2249
2250     /*
2251      * XXX bad hack to figure out the offset for O_APPEND writes (note: 
2252      * uio field state after the VFS operation).
2253      */
2254     uio_copy.uio_offset = ap->a_uio->uio_offset - 
2255                           (uio_copy.uio_resid - ap->a_uio->uio_resid);
2256
2257     /*
2258      * Output the write data to the journal.
2259      */
2260     if (error == 0) {
2261         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2262             jrecord_write_cred(jrec, NULL, ap->a_cred);
2263             jrecord_write_vnode_ref(jrec, ap->a_vp);
2264             save = jrecord_push(jrec, JTYPE_REDO);
2265             jrecord_write_uio(jrec, JLEAF_FILEDATA, &uio_copy);
2266             jrecord_pop(jrec, save);
2267         }
2268     }
2269     jreclist_done(mp, &jreclist, error);
2270
2271     if (uio_copy.uio_iov != &uio_one_iovec)
2272         free(uio_copy.uio_iov, M_JOURNAL);
2273     return (error);
2274 }
2275
2276 /*
2277  * Journal vop_fsync { a_vp, a_waitfor, a_td }
2278  */
2279 static
2280 int
2281 journal_fsync(struct vop_fsync_args *ap)
2282 {
2283 #if 0
2284     struct mount *mp;
2285     struct journal *jo;
2286 #endif
2287     int error;
2288
2289     error = vop_journal_operate_ap(&ap->a_head);
2290 #if 0
2291     mp = ap->a_head.a_ops->vv_mount;
2292     if (error == 0) {
2293         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
2294             /* XXX synchronize pending journal records */
2295         }
2296     }
2297 #endif
2298     return (error);
2299 }
2300
2301 /*
2302  * Journal vop_putpages { a_vp, a_m, a_count, a_sync, a_rtvals, a_offset }
2303  *
2304  * note: a_count is in bytes.
2305  */
2306 static
2307 int
2308 journal_putpages(struct vop_putpages_args *ap)
2309 {
2310     struct jrecord_list jreclist;
2311     struct jrecord jreccache;
2312     struct jrecord *jrec;
2313     struct mount *mp;
2314     void *save;
2315     int error;
2316
2317     mp = ap->a_head.a_ops->vv_mount;
2318     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_PUTPAGES) && 
2319         ap->a_count > 0
2320     ) {
2321         jreclist_undo_file(&jreclist, ap->a_vp, 
2322                            JRUNDO_FILEDATA|JRUNDO_SIZE|JRUNDO_MTIME, 
2323                            ap->a_offset, btoc(ap->a_count));
2324     }
2325     error = vop_journal_operate_ap(&ap->a_head);
2326     if (error == 0 && ap->a_count > 0) {
2327         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2328             jrecord_write_vnode_ref(jrec, ap->a_vp);
2329             save = jrecord_push(jrec, JTYPE_REDO);
2330             jrecord_write_pagelist(jrec, JLEAF_FILEDATA, ap->a_m, ap->a_rtvals, 
2331                                    btoc(ap->a_count), ap->a_offset);
2332             jrecord_pop(jrec, save);
2333         }
2334     }
2335     jreclist_done(mp, &jreclist, error);
2336     return (error);
2337 }
2338
2339 /*
2340  * Journal vop_setacl { a_vp, a_type, a_aclp, a_cred, a_td }
2341  */
2342 static
2343 int
2344 journal_setacl(struct vop_setacl_args *ap)
2345 {
2346     struct jrecord_list jreclist;
2347     struct jrecord jreccache;
2348     struct jrecord *jrec;
2349     struct mount *mp;
2350     int error;
2351
2352     mp = ap->a_head.a_ops->vv_mount;
2353     jreclist_init(mp, &jreclist, &jreccache, JTYPE_SETACL);
2354     error = vop_journal_operate_ap(&ap->a_head);
2355     if (error == 0) {
2356         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2357 #if 0
2358             if ((jo->flags & MC_JOURNAL_WANT_REVERSABLE))
2359                 jrecord_undo_file(jrec, ap->a_vp, JRUNDO_XXX, 0, 0);
2360 #endif
2361             jrecord_write_cred(jrec, curthread, ap->a_cred);
2362             jrecord_write_vnode_ref(jrec, ap->a_vp);
2363 #if 0
2364             save = jrecord_push(jrec, JTYPE_REDO);
2365             /* XXX type, aclp */
2366             jrecord_pop(jrec, save);
2367 #endif
2368         }
2369     }
2370     jreclist_done(mp, &jreclist, error);
2371     return (error);
2372 }
2373
2374 /*
2375  * Journal vop_setextattr { a_vp, a_name, a_uio, a_cred, a_td }
2376  */
2377 static
2378 int
2379 journal_setextattr(struct vop_setextattr_args *ap)
2380 {
2381     struct jrecord_list jreclist;
2382     struct jrecord jreccache;
2383     struct jrecord *jrec;
2384     struct mount *mp;
2385     void *save;
2386     int error;
2387
2388     mp = ap->a_head.a_ops->vv_mount;
2389     jreclist_init(mp, &jreclist, &jreccache, JTYPE_SETEXTATTR);
2390     error = vop_journal_operate_ap(&ap->a_head);
2391     if (error == 0) {
2392         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2393 #if 0
2394             if ((jo->flags & MC_JOURNAL_WANT_REVERSABLE))
2395                 jrecord_undo_file(jrec, ap->a_vp, JRUNDO_XXX, 0, 0);
2396 #endif
2397             jrecord_write_cred(jrec, curthread, ap->a_cred);
2398             jrecord_write_vnode_ref(jrec, ap->a_vp);
2399             jrecord_leaf(jrec, JLEAF_ATTRNAME, ap->a_name, strlen(ap->a_name));
2400             save = jrecord_push(jrec, JTYPE_REDO);
2401             jrecord_write_uio(jrec, JLEAF_FILEDATA, ap->a_uio);
2402             jrecord_pop(jrec, save);
2403         }
2404     }
2405     jreclist_done(mp, &jreclist, error);
2406     return (error);
2407 }
2408
2409 /*
2410  * Journal vop_ncreate { a_ncp, a_vpp, a_cred, a_vap }
2411  */
2412 static
2413 int
2414 journal_ncreate(struct vop_ncreate_args *ap)
2415 {
2416     struct jrecord_list jreclist;
2417     struct jrecord jreccache;
2418     struct jrecord *jrec;
2419     struct mount *mp;
2420     void *save;
2421     int error;
2422
2423     mp = ap->a_head.a_ops->vv_mount;
2424     jreclist_init(mp, &jreclist, &jreccache, JTYPE_CREATE);
2425     error = vop_journal_operate_ap(&ap->a_head);
2426     if (error == 0) {
2427         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2428             jrecord_write_cred(jrec, NULL, ap->a_cred);
2429             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2430             if (*ap->a_vpp)
2431                 jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2432             save = jrecord_push(jrec, JTYPE_REDO);
2433             jrecord_write_vattr(jrec, ap->a_vap);
2434             jrecord_pop(jrec, save);
2435         }
2436     }
2437     jreclist_done(mp, &jreclist, error);
2438     return (error);
2439 }
2440
2441 /*
2442  * Journal vop_nmknod { a_ncp, a_vpp, a_cred, a_vap }
2443  */
2444 static
2445 int
2446 journal_nmknod(struct vop_nmknod_args *ap)
2447 {
2448     struct jrecord_list jreclist;
2449     struct jrecord jreccache;
2450     struct jrecord *jrec;
2451     struct mount *mp;
2452     void *save;
2453     int error;
2454
2455     mp = ap->a_head.a_ops->vv_mount;
2456     jreclist_init(mp, &jreclist, &jreccache, JTYPE_MKNOD);
2457     error = vop_journal_operate_ap(&ap->a_head);
2458     if (error == 0) {
2459         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2460             jrecord_write_cred(jrec, NULL, ap->a_cred);
2461             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2462             save = jrecord_push(jrec, JTYPE_REDO);
2463             jrecord_write_vattr(jrec, ap->a_vap);
2464             jrecord_pop(jrec, save);
2465             if (*ap->a_vpp)
2466                 jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2467         }
2468     }
2469     jreclist_done(mp, &jreclist, error);
2470     return (error);
2471 }
2472
2473 /*
2474  * Journal vop_nlink { a_ncp, a_vp, a_cred }
2475  */
2476 static
2477 int
2478 journal_nlink(struct vop_nlink_args *ap)
2479 {
2480     struct jrecord_list jreclist;
2481     struct jrecord jreccache;
2482     struct jrecord *jrec;
2483     struct mount *mp;
2484     void *save;
2485     int error;
2486
2487     mp = ap->a_head.a_ops->vv_mount;
2488     jreclist_init(mp, &jreclist, &jreccache, JTYPE_LINK);
2489     error = vop_journal_operate_ap(&ap->a_head);
2490     if (error == 0) {
2491         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2492             jrecord_write_cred(jrec, NULL, ap->a_cred);
2493             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2494             /* XXX PATH to VP and inode number */
2495             /* XXX this call may not record the correct path when
2496              * multiple paths are available */
2497             save = jrecord_push(jrec, JTYPE_REDO);
2498             jrecord_write_vnode_link(jrec, ap->a_vp, ap->a_ncp);
2499             jrecord_pop(jrec, save);
2500         }
2501     }
2502     jreclist_done(mp, &jreclist, error);
2503     return (error);
2504 }
2505
2506 /*
2507  * Journal vop_symlink { a_ncp, a_vpp, a_cred, a_vap, a_target }
2508  */
2509 static
2510 int
2511 journal_nsymlink(struct vop_nsymlink_args *ap)
2512 {
2513     struct jrecord_list jreclist;
2514     struct jrecord jreccache;
2515     struct jrecord *jrec;
2516     struct mount *mp;
2517     void *save;
2518     int error;
2519
2520     mp = ap->a_head.a_ops->vv_mount;
2521     jreclist_init(mp, &jreclist, &jreccache, JTYPE_SYMLINK);
2522     error = vop_journal_operate_ap(&ap->a_head);
2523     if (error == 0) {
2524         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2525             jrecord_write_cred(jrec, NULL, ap->a_cred);
2526             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2527             save = jrecord_push(jrec, JTYPE_REDO);
2528             jrecord_leaf(jrec, JLEAF_SYMLINKDATA,
2529                         ap->a_target, strlen(ap->a_target));
2530             jrecord_pop(jrec, save);
2531             if (*ap->a_vpp)
2532                 jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2533         }
2534     }
2535     jreclist_done(mp, &jreclist, error);
2536     return (error);
2537 }
2538
2539 /*
2540  * Journal vop_nwhiteout { a_ncp, a_cred, a_flags }
2541  */
2542 static
2543 int
2544 journal_nwhiteout(struct vop_nwhiteout_args *ap)
2545 {
2546     struct jrecord_list jreclist;
2547     struct jrecord jreccache;
2548     struct jrecord *jrec;
2549     struct mount *mp;
2550     int error;
2551
2552     mp = ap->a_head.a_ops->vv_mount;
2553     jreclist_init(mp, &jreclist, &jreccache, JTYPE_WHITEOUT);
2554     error = vop_journal_operate_ap(&ap->a_head);
2555     if (error == 0) {
2556         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2557             jrecord_write_cred(jrec, NULL, ap->a_cred);
2558             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2559         }
2560     }
2561     jreclist_done(mp, &jreclist, error);
2562     return (error);
2563 }
2564
2565 /*
2566  * Journal vop_nremove { a_ncp, a_cred }
2567  */
2568 static
2569 int
2570 journal_nremove(struct vop_nremove_args *ap)
2571 {
2572     struct jrecord_list jreclist;
2573     struct jrecord jreccache;
2574     struct jrecord *jrec;
2575     struct mount *mp;
2576     int error;
2577
2578     mp = ap->a_head.a_ops->vv_mount;
2579     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_REMOVE) &&
2580         ap->a_ncp->nc_vp
2581     ) {
2582         jreclist_undo_file(&jreclist, ap->a_ncp->nc_vp, 
2583                            JRUNDO_ALL|JRUNDO_GETVP|JRUNDO_CONDLINK, 0, -1);
2584     }
2585     error = vop_journal_operate_ap(&ap->a_head);
2586     if (error == 0) {
2587         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2588             jrecord_write_cred(jrec, NULL, ap->a_cred);
2589             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2590         }
2591     }
2592     jreclist_done(mp, &jreclist, error);
2593     return (error);
2594 }
2595
2596 /*
2597  * Journal vop_nmkdir { a_ncp, a_vpp, a_cred, a_vap }
2598  */
2599 static
2600 int
2601 journal_nmkdir(struct vop_nmkdir_args *ap)
2602 {
2603     struct jrecord_list jreclist;
2604     struct jrecord jreccache;
2605     struct jrecord *jrec;
2606     struct mount *mp;
2607     int error;
2608
2609     mp = ap->a_head.a_ops->vv_mount;
2610     jreclist_init(mp, &jreclist, &jreccache, JTYPE_MKDIR);
2611     error = vop_journal_operate_ap(&ap->a_head);
2612     if (error == 0) {
2613         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2614 #if 0
2615             if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
2616                 jrecord_write_audit(jrec);
2617             }
2618 #endif
2619             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2620             jrecord_write_cred(jrec, NULL, ap->a_cred);
2621             jrecord_write_vattr(jrec, ap->a_vap);
2622             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2623             if (*ap->a_vpp)
2624                 jrecord_write_vnode_ref(jrec, *ap->a_vpp);
2625         }
2626     }
2627     jreclist_done(mp, &jreclist, error);
2628     return (error);
2629 }
2630
2631 /*
2632  * Journal vop_nrmdir { a_ncp, a_cred }
2633  */
2634 static
2635 int
2636 journal_nrmdir(struct vop_nrmdir_args *ap)
2637 {
2638     struct jrecord_list jreclist;
2639     struct jrecord jreccache;
2640     struct jrecord *jrec;
2641     struct mount *mp;
2642     int error;
2643
2644     mp = ap->a_head.a_ops->vv_mount;
2645     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_RMDIR)) {
2646         jreclist_undo_file(&jreclist, ap->a_ncp->nc_vp,
2647                            JRUNDO_VATTR|JRUNDO_GETVP, 0, 0);
2648     }
2649     error = vop_journal_operate_ap(&ap->a_head);
2650     if (error == 0) {
2651         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2652             jrecord_write_cred(jrec, NULL, ap->a_cred);
2653             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_ncp);
2654         }
2655     }
2656     jreclist_done(mp, &jreclist, error);
2657     return (error);
2658 }
2659
2660 /*
2661  * Journal vop_nrename { a_fncp, a_tncp, a_cred }
2662  */
2663 static
2664 int
2665 journal_nrename(struct vop_nrename_args *ap)
2666 {
2667     struct jrecord_list jreclist;
2668     struct jrecord jreccache;
2669     struct jrecord *jrec;
2670     struct mount *mp;
2671     int error;
2672
2673     mp = ap->a_head.a_ops->vv_mount;
2674     if (jreclist_init(mp, &jreclist, &jreccache, JTYPE_RENAME) &&
2675         ap->a_tncp->nc_vp
2676     ) {
2677         jreclist_undo_file(&jreclist, ap->a_tncp->nc_vp, 
2678                            JRUNDO_ALL|JRUNDO_GETVP|JRUNDO_CONDLINK, 0, -1);
2679     }
2680     error = vop_journal_operate_ap(&ap->a_head);
2681     if (error == 0) {
2682         TAILQ_FOREACH(jrec, &jreclist.list, user_entry) {
2683             jrecord_write_cred(jrec, NULL, ap->a_cred);
2684             jrecord_write_path(jrec, JLEAF_PATH1, ap->a_fncp);
2685             jrecord_write_path(jrec, JLEAF_PATH2, ap->a_tncp);
2686         }
2687     }
2688     jreclist_done(mp, &jreclist, error);
2689     return (error);
2690 }
2691