Scrap DEC Alpha support.
[dragonfly.git] / sys / kern / vfs_journal.c
1 /*
2  * Copyright (c) 2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.6 2005/01/09 03:04:51 dillon Exp $
35  */
36 /*
37  * Each mount point may have zero or more independantly configured journals
38  * attached to it.  Each journal is represented by a memory FIFO and worker
39  * thread.  Journal events are streamed through the FIFO to the thread,
40  * batched up (typically on one-second intervals), and written out by the
41  * thread. 
42  *
43  * Journal vnode ops are executed instead of mnt_vn_norm_ops when one or
44  * more journals have been installed on a mount point.  It becomes the
45  * responsibility of the journal op to call the underlying normal op as
46  * appropriate.
47  *
48  * The journaling protocol is intended to evolve into a two-way stream
49  * whereby transaction IDs can be acknowledged by the journaling target
50  * when the data has been committed to hard storage.  Both implicit and
51  * explicit acknowledgement schemes will be supported, depending on the
52  * sophistication of the journaling stream, plus resynchronization and
53  * restart when a journaling stream is interrupted.  This information will
54  * also be made available to journaling-aware filesystems to allow better
55  * management of their own physical storage synchronization mechanisms as
56  * well as to allow such filesystems to take direct advantage of the kernel's
57  * journaling layer so they don't have to roll their own.
58  *
59  * In addition, the worker thread will have access to much larger 
60  * spooling areas then the memory buffer is able to provide by e.g. 
61  * reserving swap space, in order to absorb potentially long interruptions
62  * of off-site journaling streams, and to prevent 'slow' off-site linkages
63  * from radically slowing down local filesystem operations.  
64  *
65  * Because of the non-trivial algorithms the journaling system will be
66  * required to support, use of a worker thread is mandatory.  Efficiencies
67  * are maintained by utilitizing the memory FIFO to batch transactions when
68  * possible, reducing the number of gratuitous thread switches and taking
69  * advantage of cpu caches through the use of shorter batched code paths
70  * rather then trying to do everything in the context of the process
71  * originating the filesystem op.  In the future the memory FIFO can be
72  * made per-cpu to remove BGL or other locking requirements.
73  */
74 #include <sys/param.h>
75 #include <sys/systm.h>
76 #include <sys/buf.h>
77 #include <sys/conf.h>
78 #include <sys/kernel.h>
79 #include <sys/queue.h>
80 #include <sys/lock.h>
81 #include <sys/malloc.h>
82 #include <sys/mount.h>
83 #include <sys/unistd.h>
84 #include <sys/vnode.h>
85 #include <sys/poll.h>
86 #include <sys/mountctl.h>
87 #include <sys/file.h>
88
89 #include <machine/limits.h>
90
91 #include <vm/vm.h>
92 #include <vm/vm_object.h>
93 #include <vm/vm_page.h>
94 #include <vm/vm_pager.h>
95 #include <vm/vnode_pager.h>
96
97 #include <sys/file2.h>
98 #include <sys/thread2.h>
99
100 static int journal_attach(struct mount *mp);
101 static void journal_detach(struct mount *mp);
102 static int journal_install_vfs_journal(struct mount *mp, struct file *fp,
103                             const struct mountctl_install_journal *info);
104 static int journal_remove_vfs_journal(struct mount *mp,
105                             const struct mountctl_remove_journal *info);
106 static int journal_resync_vfs_journal(struct mount *mp, const void *ctl);
107 static int journal_status_vfs_journal(struct mount *mp,
108                        const struct mountctl_status_journal *info,
109                        struct mountctl_journal_ret_status *rstat,
110                        int buflen, int *res);
111 static void journal_thread(void *info);
112
113 static void *journal_reserve(struct journal *jo, 
114                             struct journal_rawrecbeg **rawpp, 
115                             int16_t streamid, int bytes);
116 static void *journal_extend(struct journal *jo,
117                             struct journal_rawrecbeg **rawpp,
118                             int truncbytes, int bytes, int *newstreamrecp);
119 static void journal_abort(struct journal *jo, 
120                             struct journal_rawrecbeg **rawpp);
121 static void journal_commit(struct journal *jo, 
122                             struct journal_rawrecbeg **rawpp, 
123                             int bytes, int closeout);
124
125 static void jrecord_init(struct journal *jo, 
126                             struct jrecord *jrec, int16_t streamid);
127 static struct journal_subrecord *jrecord_push(
128                             struct jrecord *jrec, int16_t rectype);
129 static void jrecord_pop(struct jrecord *jrec, struct journal_subrecord *parent);
130 static struct journal_subrecord *jrecord_write(struct jrecord *jrec,
131                             int16_t rectype, int bytes);
132 static void jrecord_data(struct jrecord *jrec, const void *buf, int bytes);
133 static void jrecord_done(struct jrecord *jrec, int abortit);
134
135 static void jrecord_write_path(struct jrecord *jrec, 
136                             int16_t rectype, struct namecache *ncp);
137 static void jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat);
138
139
140 static int journal_setattr(struct vop_setattr_args *ap);
141 static int journal_write(struct vop_write_args *ap);
142 static int journal_fsync(struct vop_fsync_args *ap);
143 static int journal_putpages(struct vop_putpages_args *ap);
144 static int journal_setacl(struct vop_setacl_args *ap);
145 static int journal_setextattr(struct vop_setextattr_args *ap);
146 static int journal_ncreate(struct vop_ncreate_args *ap);
147 static int journal_nmknod(struct vop_nmknod_args *ap);
148 static int journal_nlink(struct vop_nlink_args *ap);
149 static int journal_nsymlink(struct vop_nsymlink_args *ap);
150 static int journal_nwhiteout(struct vop_nwhiteout_args *ap);
151 static int journal_nremove(struct vop_nremove_args *ap);
152 static int journal_nmkdir(struct vop_nmkdir_args *ap);
153 static int journal_nrmdir(struct vop_nrmdir_args *ap);
154 static int journal_nrename(struct vop_nrename_args *ap);
155
156 static struct vnodeopv_entry_desc journal_vnodeop_entries[] = {
157     { &vop_default_desc,                vop_journal_operate_ap },
158     { &vop_mountctl_desc,               (void *)journal_mountctl },
159     { &vop_setattr_desc,                (void *)journal_setattr },
160     { &vop_write_desc,                  (void *)journal_write },
161     { &vop_fsync_desc,                  (void *)journal_fsync },
162     { &vop_putpages_desc,               (void *)journal_putpages },
163     { &vop_setacl_desc,                 (void *)journal_setacl },
164     { &vop_setextattr_desc,             (void *)journal_setextattr },
165     { &vop_ncreate_desc,                (void *)journal_ncreate },
166     { &vop_nmknod_desc,                 (void *)journal_nmknod },
167     { &vop_nlink_desc,                  (void *)journal_nlink },
168     { &vop_nsymlink_desc,               (void *)journal_nsymlink },
169     { &vop_nwhiteout_desc,              (void *)journal_nwhiteout },
170     { &vop_nremove_desc,                (void *)journal_nremove },
171     { &vop_nmkdir_desc,                 (void *)journal_nmkdir },
172     { &vop_nrmdir_desc,                 (void *)journal_nrmdir },
173     { &vop_nrename_desc,                (void *)journal_nrename },
174     { NULL, NULL }
175 };
176
177 static MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
178 static MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
179
180 int
181 journal_mountctl(struct vop_mountctl_args *ap)
182 {
183     struct mount *mp;
184     int error = 0;
185
186     mp = ap->a_head.a_ops->vv_mount;
187     KKASSERT(mp);
188
189     if (mp->mnt_vn_journal_ops == NULL) {
190         switch(ap->a_op) {
191         case MOUNTCTL_INSTALL_VFS_JOURNAL:
192             error = journal_attach(mp);
193             if (error == 0 && ap->a_ctllen != sizeof(struct mountctl_install_journal))
194                 error = EINVAL;
195             if (error == 0 && ap->a_fp == NULL)
196                 error = EBADF;
197             if (error == 0)
198                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
199             if (TAILQ_EMPTY(&mp->mnt_jlist))
200                 journal_detach(mp);
201             break;
202         case MOUNTCTL_REMOVE_VFS_JOURNAL:
203         case MOUNTCTL_RESYNC_VFS_JOURNAL:
204         case MOUNTCTL_STATUS_VFS_JOURNAL:
205             error = ENOENT;
206             break;
207         default:
208             error = EOPNOTSUPP;
209             break;
210         }
211     } else {
212         switch(ap->a_op) {
213         case MOUNTCTL_INSTALL_VFS_JOURNAL:
214             if (ap->a_ctllen != sizeof(struct mountctl_install_journal))
215                 error = EINVAL;
216             if (error == 0 && ap->a_fp == NULL)
217                 error = EBADF;
218             if (error == 0)
219                 error = journal_install_vfs_journal(mp, ap->a_fp, ap->a_ctl);
220             break;
221         case MOUNTCTL_REMOVE_VFS_JOURNAL:
222             if (ap->a_ctllen != sizeof(struct mountctl_remove_journal))
223                 error = EINVAL;
224             if (error == 0)
225                 error = journal_remove_vfs_journal(mp, ap->a_ctl);
226             if (TAILQ_EMPTY(&mp->mnt_jlist))
227                 journal_detach(mp);
228             break;
229         case MOUNTCTL_RESYNC_VFS_JOURNAL:
230             if (ap->a_ctllen != 0)
231                 error = EINVAL;
232             error = journal_resync_vfs_journal(mp, ap->a_ctl);
233             break;
234         case MOUNTCTL_STATUS_VFS_JOURNAL:
235             if (ap->a_ctllen != sizeof(struct mountctl_status_journal))
236                 error = EINVAL;
237             if (error == 0) {
238                 error = journal_status_vfs_journal(mp, ap->a_ctl, 
239                                         ap->a_buf, ap->a_buflen, ap->a_res);
240             }
241             break;
242         default:
243             error = EOPNOTSUPP;
244             break;
245         }
246     }
247     return (error);
248 }
249
250 /*
251  * High level mount point setup.  When a 
252  */
253 static int
254 journal_attach(struct mount *mp)
255 {
256     vfs_add_vnodeops(mp, &mp->mnt_vn_journal_ops, journal_vnodeop_entries);
257     return(0);
258 }
259
260 static void
261 journal_detach(struct mount *mp)
262 {
263     if (mp->mnt_vn_journal_ops)
264         vfs_rm_vnodeops(&mp->mnt_vn_journal_ops);
265 }
266
267 /*
268  * Install a journal on a mount point.  Each journal has an associated worker
269  * thread which is responsible for buffering and spooling the data to the
270  * target.  A mount point may have multiple journals attached to it.  An
271  * initial start record is generated when the journal is associated.
272  */
273 static int
274 journal_install_vfs_journal(struct mount *mp, struct file *fp, 
275                             const struct mountctl_install_journal *info)
276 {
277     struct journal *jo;
278     struct jrecord jrec;
279     int error = 0;
280     int size;
281
282     jo = malloc(sizeof(struct journal), M_JOURNAL, M_WAITOK|M_ZERO);
283     bcopy(info->id, jo->id, sizeof(jo->id));
284     jo->flags = info->flags & ~(MC_JOURNAL_ACTIVE | MC_JOURNAL_STOP_REQ);
285
286     /*
287      * Memory FIFO size, round to nearest power of 2
288      */
289     if (info->membufsize) {
290         if (info->membufsize < 65536)
291             size = 65536;
292         else if (info->membufsize > 128 * 1024 * 1024)
293             size = 128 * 1024 * 1024;
294         else
295             size = (int)info->membufsize;
296     } else {
297         size = 1024 * 1024;
298     }
299     jo->fifo.size = 1;
300     while (jo->fifo.size < size)
301         jo->fifo.size <<= 1;
302
303     /*
304      * Other parameters.  If not specified the starting transaction id
305      * will be the current date.
306      */
307     if (info->transid) {
308         jo->transid = info->transid;
309     } else {
310         struct timespec ts;
311         getnanotime(&ts);
312         jo->transid = ((int64_t)ts.tv_sec << 30) | ts.tv_nsec;
313     }
314
315     jo->fp = fp;
316
317     /*
318      * Allocate the memory FIFO
319      */
320     jo->fifo.mask = jo->fifo.size - 1;
321     jo->fifo.membase = malloc(jo->fifo.size, M_JFIFO, M_WAITOK|M_ZERO|M_NULLOK);
322     if (jo->fifo.membase == NULL)
323         error = ENOMEM;
324
325     /*
326      * Create the worker thread and generate the association record.
327      */
328     if (error) {
329         free(jo, M_JOURNAL);
330     } else {
331         fhold(fp);
332         jo->flags |= MC_JOURNAL_ACTIVE;
333         lwkt_create(journal_thread, jo, NULL, &jo->thread,
334                         TDF_STOPREQ, -1, "journal %.*s", JIDMAX, jo->id);
335         lwkt_setpri(&jo->thread, TDPRI_KERN_DAEMON);
336         lwkt_schedule(&jo->thread);
337
338         jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
339         jrecord_write(&jrec, JTYPE_ASSOCIATE, 0);
340         jrecord_done(&jrec, 0);
341         TAILQ_INSERT_TAIL(&mp->mnt_jlist, jo, jentry);
342     }
343     return(error);
344 }
345
346 /*
347  * Disassociate a journal from a mount point and terminate its worker thread.
348  * A final termination record is written out before the file pointer is
349  * dropped.
350  */
351 static int
352 journal_remove_vfs_journal(struct mount *mp, 
353                            const struct mountctl_remove_journal *info)
354 {
355     struct journal *jo;
356     struct jrecord jrec;
357     int error;
358
359     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
360         if (bcmp(jo->id, info->id, sizeof(jo->id)) == 0)
361             break;
362     }
363     if (jo) {
364         error = 0;
365         TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
366
367         jrecord_init(jo, &jrec, JREC_STREAMID_DISCONT);
368         jrecord_write(&jrec, JTYPE_DISASSOCIATE, 0);
369         jrecord_done(&jrec, 0);
370
371         jo->flags |= MC_JOURNAL_STOP_REQ | (info->flags & MC_JOURNAL_STOP_IMM);
372         wakeup(&jo->fifo);
373         while (jo->flags & MC_JOURNAL_ACTIVE) {
374             tsleep(jo, 0, "jwait", 0);
375         }
376         lwkt_free_thread(&jo->thread); /* XXX SMP */
377         if (jo->fp)
378             fdrop(jo->fp, curthread);
379         if (jo->fifo.membase)
380             free(jo->fifo.membase, M_JFIFO);
381         free(jo, M_JOURNAL);
382     } else {
383         error = EINVAL;
384     }
385     return (error);
386 }
387
388 static int
389 journal_resync_vfs_journal(struct mount *mp, const void *ctl)
390 {
391     return(EINVAL);
392 }
393
394 static int
395 journal_status_vfs_journal(struct mount *mp, 
396                        const struct mountctl_status_journal *info,
397                        struct mountctl_journal_ret_status *rstat,
398                        int buflen, int *res)
399 {
400     struct journal *jo;
401     int error = 0;
402     int index;
403
404     index = 0;
405     *res = 0;
406     TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
407         if (info->index == MC_JOURNAL_INDEX_ID) {
408             if (bcmp(jo->id, info->id, sizeof(jo->id)) != 0)
409                 continue;
410         } else if (info->index >= 0) {
411             if (info->index < index)
412                 continue;
413         } else if (info->index != MC_JOURNAL_INDEX_ALL) {
414             continue;
415         }
416         if (buflen < sizeof(*rstat)) {
417             if (*res)
418                 rstat[-1].flags |= MC_JOURNAL_STATUS_MORETOCOME;
419             else
420                 error = EINVAL;
421             break;
422         }
423         bzero(rstat, sizeof(*rstat));
424         rstat->recsize = sizeof(*rstat);
425         bcopy(jo->id, rstat->id, sizeof(jo->id));
426         rstat->index = index;
427         rstat->membufsize = jo->fifo.size;
428         rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
429         rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
430         rstat->bytessent = jo->total_acked;
431         ++rstat;
432         ++index;
433         *res += sizeof(*rstat);
434         buflen -= sizeof(*rstat);
435     }
436     return(error);
437 }
438 /*
439  * The per-journal worker thread is responsible for writing out the
440  * journal's FIFO to the target stream.
441  */
442 static void
443 journal_thread(void *info)
444 {
445     struct journal *jo = info;
446     struct journal_rawrecbeg *rawp;
447     int bytes;
448     int error;
449     int avail;
450     int res;
451
452     for (;;) {
453         /*
454          * Calculate the number of bytes available to write.  This buffer
455          * area may contain reserved records so we can't just write it out
456          * without further checks.
457          */
458         bytes = jo->fifo.windex - jo->fifo.rindex;
459
460         /*
461          * sleep if no bytes are available or if an incomplete record is
462          * encountered (it needs to be filled in before we can write it
463          * out), and skip any pad records that we encounter.
464          */
465         if (bytes == 0) {
466             if (jo->flags & MC_JOURNAL_STOP_REQ)
467                 break;
468             tsleep(&jo->fifo, 0, "jfifo", hz);
469             continue;
470         }
471         rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
472         if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
473             tsleep(&jo->fifo, 0, "jpad", hz);
474             continue;
475         }
476         if (rawp->streamid == JREC_STREAMID_PAD) {
477             jo->fifo.rindex += (rawp->recsize + 15) & ~15;
478             KKASSERT(jo->fifo.windex - jo->fifo.rindex > 0);
479             continue;
480         }
481
482         /*
483          * Figure out how much we can write out, beware the buffer wrap
484          * case.
485          */
486         res = 0;
487         avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
488         while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
489             res += (rawp->recsize + 15) & ~15;
490             if (res >= avail) {
491                 KKASSERT(res == avail);
492                 break;
493             }
494         }
495
496         /*
497          * Issue the write and deal with any errors or other conditions.
498          * For now assume blocking I/O.  Since we are record-aware the
499          * code cannot yet handle partial writes.
500          *
501          * XXX EWOULDBLOCK/NBIO
502          * XXX notification on failure
503          * XXX two-way acknowledgement stream in the return direction / xindex
504          */
505         printf("write @%d,%d\n", jo->fifo.rindex & jo->fifo.mask, bytes);
506         bytes = res;
507         error = fp_write(jo->fp, 
508                         jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
509                         bytes, &res);
510         if (error) {
511             printf("journal_thread(%s) write, error %d\n", jo->id, error);
512             /* XXX */
513         } else {
514             KKASSERT(res == bytes);
515             printf("journal_thread(%s) write %d\n", jo->id, res);
516         }
517
518         /*
519          * Advance rindex.  XXX for now also advance xindex, which will
520          * eventually be advanced when the target acknowledges the sequence
521          * space.
522          */
523         jo->fifo.rindex += bytes;
524         jo->fifo.xindex += bytes;
525         jo->total_acked += bytes;
526         if (jo->flags & MC_JOURNAL_WWAIT) {
527             jo->flags &= ~MC_JOURNAL_WWAIT;     /* XXX hysteresis */
528             wakeup(&jo->fifo.windex);
529         }
530     }
531     jo->flags &= ~MC_JOURNAL_ACTIVE;
532     wakeup(jo);
533     wakeup(&jo->fifo.windex);
534 }
535
536 static __inline
537 void
538 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize)
539 {
540     struct journal_rawrecend *rendp;
541     
542     KKASSERT((recsize & 15) == 0 && recsize >= 16);
543
544     rawp->begmagic = JREC_BEGMAGIC;
545     rawp->streamid = JREC_STREAMID_PAD;
546     rawp->recsize = recsize;    /* must be 16-byte aligned */
547     rawp->seqno = 0;
548     /*
549      * WARNING, rendp may overlap rawp->seqno.  This is necessary to
550      * allow PAD records to fit in 16 bytes.  Use cpu_mb1() to
551      * hopefully cause the compiler to not make any assumptions.
552      */
553     cpu_mb1();
554     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
555     rendp->endmagic = JREC_ENDMAGIC;
556     rendp->check = 0;
557     rendp->recsize = rawp->recsize;
558 }
559
560 /*
561  * Wake up the worker thread if the FIFO is more then half full or if
562  * someone is waiting for space to be freed up.  Otherwise let the 
563  * heartbeat deal with it.  Being able to avoid waking up the worker
564  * is the key to the journal's cpu efficiency.
565  */
566 static __inline
567 void
568 journal_commit_wakeup(struct journal *jo)
569 {
570     int avail;
571
572     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
573     KKASSERT(avail >= 0);
574     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
575         wakeup(&jo->fifo);
576 }
577
578 /*
579  * Create a new BEGIN stream record with the specified streamid and the
580  * specified amount of payload space.  *rawpp will be set to point to the
581  * base of the new stream record and a pointer to the base of the payload
582  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
583  * making this call.
584  *
585  * A stream can be extended, aborted, or committed by other API calls
586  * below.  This may result in a sequence of potentially disconnected
587  * stream records to be output to the journaling target.  The first record
588  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
589  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
590  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
591  * up being the same as the first, in which case the bits are all set in
592  * the first record.
593  *
594  * The stream record is created in an incomplete state by setting the begin
595  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
596  * flushing the fifo past our record until we have finished populating it.
597  * Other threads can reserve and operate on their own space without stalling
598  * but the stream output will stall until we have completed operations.  The
599  * memory FIFO is intended to be large enough to absorb such situations
600  * without stalling out other threads.
601  */
602 static
603 void *
604 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
605                 int16_t streamid, int bytes)
606 {
607     struct journal_rawrecbeg *rawp;
608     int avail;
609     int availtoend;
610     int req;
611
612     /*
613      * Add header and trailer overheads to the passed payload.  Note that
614      * the passed payload size need not be aligned in any way.
615      */
616     bytes += sizeof(struct journal_rawrecbeg);
617     bytes += sizeof(struct journal_rawrecend);
618
619     for (;;) {
620         /*
621          * First, check boundary conditions.  If the request would wrap around
622          * we have to skip past the ending block and return to the beginning
623          * of the FIFO's buffer.  Calculate 'req' which is the actual number
624          * of bytes being reserved, including wrap-around dead space.
625          *
626          * Note that availtoend is not truncated to avail and so cannot be
627          * used to determine whether the reservation is possible by itself.
628          * Also, since all fifo ops are 16-byte aligned, we can check
629          * the size before calculating the aligned size.
630          */
631         availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
632         if (bytes > availtoend) 
633             req = bytes + availtoend;   /* add pad to end */
634         else
635             req = bytes;
636
637         /*
638          * Next calculate the total available space and see if it is
639          * sufficient.  We cannot overwrite previously buffered data
640          * past xindex because otherwise we would not be able to restart
641          * a broken link at the target's last point of commit.
642          */
643         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
644         KKASSERT(avail >= 0 && (avail & 15) == 0);
645
646         if (avail < req) {
647             /* XXX MC_JOURNAL_STOP_IMM */
648             jo->flags |= MC_JOURNAL_WWAIT;
649             tsleep(&jo->fifo.windex, 0, "jwrite", 0);
650             continue;
651         }
652
653         /*
654          * Create a pad record for any dead space and create an incomplete
655          * record for the live space, then return a pointer to the
656          * contiguous buffer space that was requested.
657          *
658          * NOTE: The worker thread will not flush past an incomplete
659          * record, so the reserved space can be filled in at-will.  The
660          * journaling code must also be aware the reserved sections occuring
661          * after this one will also not be written out even if completed
662          * until this one is completed.
663          */
664         rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
665         if (req != bytes) {
666             journal_build_pad(rawp, req - bytes);
667             rawp = (void *)jo->fifo.membase;
668         }
669         rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
670         rawp->recsize = bytes;                  /* (unaligned size) */
671         rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
672         rawp->seqno = 0;                        /* set by caller */
673
674         /*
675          * Issue a memory barrier to guarentee that the record data has been
676          * properly initialized before we advance the write index and return
677          * a pointer to the reserved record.  Otherwise the worker thread
678          * could accidently run past us.
679          *
680          * Note that stream records are always 16-byte aligned.
681          */
682         cpu_mb1();
683         jo->fifo.windex += (req + 15) & ~15;
684         *rawpp = rawp;
685         return(rawp + 1);
686     }
687     /* not reached */
688     *rawpp = NULL;
689     return(NULL);
690 }
691
692 /*
693  * Extend a previous reservation by the specified number of payload bytes.
694  * If it is not possible to extend the existing reservation due to either
695  * another thread having reserved space after us or due to a boundary
696  * condition, the current reservation will be committed and possibly
697  * truncated and a new reservation with the specified payload size will
698  * be created. *rawpp is set to the new reservation in this case but the
699  * caller cannot depend on a comparison with the old rawp to determine if
700  * this case occurs because we could end up using the same memory FIFO
701  * offset for the new stream record.
702  *
703  * In either case this function will return a pointer to the base of the
704  * extended payload space.
705  *
706  * If a new stream block is created the caller needs to recalculate payload
707  * byte counts, if the same stream block is used the caller needs to extend
708  * its current notion of the payload byte count.
709  */
710 static void *
711 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
712                 int truncbytes, int bytes, int *newstreamrecp)
713 {
714     struct journal_rawrecbeg *rawp;
715     int16_t streamid;
716     int availtoend;
717     int avail;
718     int osize;
719     int nsize;
720     int wbase;
721     void *rptr;
722
723     *newstreamrecp = 0;
724     rawp = *rawpp;
725     osize = (rawp->recsize + 15) & ~15;
726     nsize = (rawp->recsize + bytes + 15) & ~15;
727     wbase = (char *)rawp - jo->fifo.membase;
728
729     /*
730      * If the aligned record size does not change we can trivially extend
731      * the record.
732      */
733     if (nsize == osize) {
734         rawp->recsize += bytes;
735         return((char *)rawp + rawp->recsize - bytes);
736     }
737
738     /*
739      * If the fifo's write index hasn't been modified since we made the
740      * reservation and we do not hit any boundary conditions, we can 
741      * trivially extend the record.
742      */
743     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
744         availtoend = jo->fifo.size - wbase;
745         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
746         KKASSERT((availtoend & 15) == 0);
747         KKASSERT((avail & 15) == 0);
748         if (nsize <= avail && nsize <= availtoend) {
749             jo->fifo.windex += nsize - osize;
750             rawp->recsize += bytes;
751             return((char *)rawp + rawp->recsize - bytes);
752         }
753     }
754
755     /*
756      * It was not possible to extend the buffer.  Commit the current
757      * buffer and create a new one.  We manually clear the BEGIN mark that
758      * journal_reserve() creates (because this is a continuing record, not
759      * the start of a new stream).
760      */
761     streamid = rawp->streamid & JREC_STREAMID_MASK;
762     journal_commit(jo, rawpp, truncbytes, 0);
763     rptr = journal_reserve(jo, rawpp, streamid, bytes);
764     rawp = *rawpp;
765     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
766     *newstreamrecp = 1;
767     return(rptr);
768 }
769
770 /*
771  * Abort a journal record.  If the transaction record represents a stream
772  * BEGIN and we can reverse the fifo's write index we can simply reverse
773  * index the entire record, as if it were never reserved in the first place.
774  *
775  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
776  * with the payload truncated to 0 bytes.
777  */
778 static void
779 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
780 {
781     struct journal_rawrecbeg *rawp;
782     int osize;
783
784     rawp = *rawpp;
785     osize = (rawp->recsize + 15) & ~15;
786
787     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
788         (jo->fifo.windex & jo->fifo.mask) == 
789          (char *)rawp - jo->fifo.membase + osize)
790     {
791         jo->fifo.windex -= osize;
792         *rawpp = NULL;
793     } else {
794         rawp->streamid |= JREC_STREAMCTL_ABORTED;
795         journal_commit(jo, rawpp, 0, 1);
796     }
797 }
798
799 /*
800  * Commit a journal record and potentially truncate it to the specified
801  * number of payload bytes.  If you do not want to truncate the record,
802  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
803  * field includes header and trailer and will not be correct.  Note that
804  * passing 0 will truncate the entire data payload of the record.
805  *
806  * The logical stream is terminated by this function.
807  *
808  * If truncation occurs, and it is not possible to physically optimize the
809  * memory FIFO due to other threads having reserved space after ours,
810  * the remaining reserved space will be covered by a pad record.
811  */
812 static void
813 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
814                 int bytes, int closeout)
815 {
816     struct journal_rawrecbeg *rawp;
817     struct journal_rawrecend *rendp;
818     int osize;
819     int nsize;
820
821     rawp = *rawpp;
822     *rawpp = NULL;
823
824     KKASSERT((char *)rawp >= jo->fifo.membase &&
825              (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
826     KKASSERT(((intptr_t)rawp & 15) == 0);
827
828     /*
829      * Truncate the record if requested.  If the FIFO write index as still
830      * at the end of our record we can optimally backindex it.  Otherwise
831      * we have to insert a pad record.
832      *
833      * We calculate osize which is the 16-byte-aligned original recsize.
834      * We calculate nsize which is the 16-byte-aligned new recsize.
835      *
836      * Due to alignment issues or in case the passed truncation bytes is
837      * the same as the original payload, windex will be equal to nindex.
838      */
839     if (bytes >= 0) {
840         KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
841         osize = (rawp->recsize + 15) & ~15;
842         rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
843                         sizeof(struct journal_rawrecend);
844         nsize = (rawp->recsize + 15) & ~15;
845         if (osize == nsize) {
846             /* do nothing */
847         } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
848             /* we are able to backindex the fifo */
849             jo->fifo.windex -= osize - nsize;
850         } else {
851             /* we cannot backindex the fifo, emplace a pad in the dead space */
852             journal_build_pad((void *)((char *)rawp + osize), osize - nsize);
853         }
854     }
855
856     /*
857      * Fill in the trailer.  Note that unlike pad records, the trailer will
858      * never overlap the header.
859      */
860     rendp = (void *)((char *)rawp + 
861             ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
862     rendp->endmagic = JREC_ENDMAGIC;
863     rendp->recsize = rawp->recsize;
864     rendp->check = 0;           /* XXX check word, disabled for now */
865
866     /*
867      * Fill in begmagic last.  This will allow the worker thread to proceed.
868      * Use a memory barrier to guarentee write ordering.  Mark the stream
869      * as terminated if closeout is set.  This is the typical case.
870      */
871     if (closeout)
872         rawp->streamid |= JREC_STREAMCTL_END;
873     cpu_mb1();                  /* memory barrier */
874     rawp->begmagic = JREC_BEGMAGIC;
875
876     journal_commit_wakeup(jo);
877 }
878
879 /************************************************************************
880  *                      TRANSACTION SUPPORT ROUTINES                    *
881  ************************************************************************
882  *
883  * JRECORD_*() - routines to create subrecord transactions and embed them
884  *               in the logical streams managed by the journal_*() routines.
885  */
886
887 static int16_t sid = JREC_STREAMID_JMIN;
888
889 /*
890  * Initialize the passed jrecord structure and start a new stream transaction
891  * by reserving an initial build space in the journal's memory FIFO.
892  */
893 static void
894 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
895 {
896     bzero(jrec, sizeof(*jrec));
897     jrec->jo = jo;
898     if (streamid < 0) {
899         streamid = sid++;       /* XXX need to track stream ids! */
900         if (sid == JREC_STREAMID_JMAX)
901             sid = JREC_STREAMID_JMIN;
902     }
903     jrec->streamid = streamid;
904     jrec->stream_residual = JREC_DEFAULTSIZE;
905     jrec->stream_reserved = jrec->stream_residual;
906     jrec->stream_ptr = 
907         journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
908 }
909
910 /*
911  * Push a recursive record type.  All pushes should have matching pops.
912  * The old parent is returned and the newly pushed record becomes the
913  * new parent.  Note that the old parent's pointer may already be invalid
914  * or may become invalid if jrecord_write() had to build a new stream
915  * record, so the caller should not mess with the returned pointer in
916  * any way other then to save it.
917  */
918 static 
919 struct journal_subrecord *
920 jrecord_push(struct jrecord *jrec, int16_t rectype)
921 {
922     struct journal_subrecord *save;
923
924     save = jrec->parent;
925     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
926     jrec->last = NULL;
927     KKASSERT(jrec->parent != NULL);
928     ++jrec->pushcount;
929     ++jrec->pushptrgood;        /* cleared on flush */
930     return(save);
931 }
932
933 /*
934  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
935  * on the last record written within the subtransaction.  If the last 
936  * record written is not accessible or if the subtransaction is empty,
937  * we must write out a pad record with JMASK_LAST set before popping.
938  *
939  * When popping a subtransaction the parent record's recsize field
940  * will be properly set.  If the parent pointer is no longer valid
941  * (which can occur if the data has already been flushed out to the
942  * stream), the protocol spec allows us to leave it 0.
943  *
944  * The saved parent pointer which we restore may or may not be valid,
945  * and if not valid may or may not be NULL, depending on the value
946  * of pushptrgood.
947  */
948 static void
949 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
950 {
951     struct journal_subrecord *last;
952
953     KKASSERT(jrec->pushcount > 0);
954     KKASSERT(jrec->residual == 0);
955
956     /*
957      * Set JMASK_LAST on the last record we wrote at the current
958      * level.  If last is NULL we either no longer have access to the
959      * record or the subtransaction was empty and we must write out a pad
960      * record.
961      */
962     if ((last = jrec->last) == NULL) {
963         jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
964         last = jrec->last;      /* reload after possible flush */
965     } else {
966         last->rectype |= JMASK_LAST;
967     }
968
969     /*
970      * pushptrgood tells us how many levels of parent record pointers
971      * are valid.  The jrec only stores the current parent record pointer
972      * (and it is only valid if pushptrgood != 0).  The higher level parent
973      * record pointers are saved by the routines calling jrecord_push() and
974      * jrecord_pop().  These pointers may become stale and we determine
975      * that fact by tracking the count of valid parent pointers with 
976      * pushptrgood.  Pointers become invalid when their related stream
977      * record gets pushed out.
978      *
979      * [parentA]
980      *    [node X]
981      *    [parentB]
982      *       [node Y]
983      *       [node Z]
984      *    (pop B)       see NOTE B
985      * (pop A)          see NOTE A
986      *
987      * NOTE B:  This pop sets LAST in node Z if the node is still accessible,
988      *          else a PAD record is appended and LAST is set in that.
989      *
990      *          This pop sets the record size in parentB if parentB is still
991      *          accessible, else the record size is left 0 (the scanner must
992      *          deal with that).
993      *
994      *          This pop sets the new 'last' record to parentB, the pointer
995      *          to which may or may not still be accessible.
996      *
997      * NOTE A:  This pop sets LAST in parentB if the node is still accessible,
998      *          else a PAD record is appended and LAST is set in that.
999      *
1000      *          This pop sets the record size in parentA if parentA is still
1001      *          accessible, else the record size is left 0 (the scanner must
1002      *          deal with that).
1003      *
1004      *          This pop sets the new 'last' record to parentA, the pointer
1005      *          to which may or may not still be accessible.
1006      *
1007      * Also note that the last record in the stream transaction, which in
1008      * the above example is parentA, does not currently have the LAST bit
1009      * set.
1010      *
1011      * The current parent becomes the last record relative to the
1012      * saved parent passed into us.  It's validity is based on 
1013      * whether pushptrgood is non-zero prior to decrementing.  The saved
1014      * parent becomes the new parent, and its validity is based on whether
1015      * pushptrgood is non-zero after decrementing.
1016      *
1017      * The old jrec->parent may be NULL if it is no longer accessible.
1018      * If pushptrgood is non-zero, however, it is guarenteed to not
1019      * be NULL (since no flush occured).
1020      */
1021     jrec->last = jrec->parent;
1022     --jrec->pushcount;
1023     if (jrec->pushptrgood) {
1024         KKASSERT(jrec->last != NULL && last != NULL);
1025         if (--jrec->pushptrgood == 0) {
1026             jrec->parent = NULL;        /* 'save' contains garbage or NULL */
1027         } else {
1028             KKASSERT(save != NULL);
1029             jrec->parent = save;        /* 'save' must not be NULL */
1030         }
1031
1032         /*
1033          * Set the record size in the old parent.  'last' still points to
1034          * the original last record in the subtransaction being popped,
1035          * jrec->last points to the old parent (which became the last
1036          * record relative to the new parent being popped into).
1037          */
1038         jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
1039     } else {
1040         jrec->parent = NULL;
1041         KKASSERT(jrec->last == NULL);
1042     }
1043 }
1044
1045 /*
1046  * Write a leaf record out and return a pointer to its base.  The leaf
1047  * record may contain potentially megabytes of data which is supplied
1048  * in jrecord_data() calls.  The exact amount must be specified in this
1049  * call.
1050  */
1051 static
1052 struct journal_subrecord *
1053 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1054 {
1055     struct journal_subrecord *last;
1056     int pusheditout;
1057
1058     /*
1059      * Try to catch some obvious errors.  Nesting records must specify a
1060      * size of 0, and there should be no left-overs from previous operations
1061      * (such as incomplete data writeouts).
1062      */
1063     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1064     KKASSERT(jrec->residual == 0);
1065
1066     /*
1067      * Check to see if the current stream record has enough room for
1068      * the new subrecord header.  If it doesn't we extend the current
1069      * stream record.
1070      *
1071      * This may have the side effect of pushing out the current stream record
1072      * and creating a new one.  We must adjust our stream tracking fields
1073      * accordingly.
1074      */
1075     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1076         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1077                                 jrec->stream_reserved - jrec->stream_residual,
1078                                 JREC_DEFAULTSIZE, &pusheditout);
1079         if (pusheditout) {
1080             jrec->stream_reserved = JREC_DEFAULTSIZE;
1081             jrec->stream_residual = JREC_DEFAULTSIZE;
1082             jrec->parent = NULL;        /* no longer accessible */
1083             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1084         } else {
1085             jrec->stream_reserved += JREC_DEFAULTSIZE;
1086             jrec->stream_residual += JREC_DEFAULTSIZE;
1087         }
1088     }
1089     last = (void *)jrec->stream_ptr;
1090     last->rectype = rectype;
1091     last->reserved = 0;
1092     last->recsize = sizeof(struct journal_subrecord) + bytes;
1093     jrec->last = last;
1094     jrec->residual = bytes;             /* remaining data to be posted */
1095     jrec->residual_align = -bytes & 7;  /* post-data alignment required */
1096     return(last);
1097 }
1098
1099 /*
1100  * Write out the data associated with a leaf record.  Any number of calls
1101  * to this routine may be made as long as the byte count adds up to the
1102  * amount originally specified in jrecord_write().
1103  *
1104  * The act of writing out the leaf data may result in numerous stream records
1105  * being pushed out.   Callers should be aware that even the associated
1106  * subrecord header may become inaccessible due to stream record pushouts.
1107  */
1108 static void
1109 jrecord_data(struct jrecord *jrec, const void *buf, int bytes)
1110 {
1111     int pusheditout;
1112     int extsize;
1113
1114     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1115
1116     /*
1117      * Push out stream records as long as there is insufficient room to hold
1118      * the remaining data.
1119      */
1120     while (jrec->stream_residual < bytes) {
1121         /*
1122          * Fill in any remaining space in the current stream record.
1123          */
1124         bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1125         buf = (const char *)buf + jrec->stream_residual;
1126         bytes -= jrec->stream_residual;
1127         /*jrec->stream_ptr += jrec->stream_residual;*/
1128         jrec->stream_residual = 0;
1129         jrec->residual -= jrec->stream_residual;
1130
1131         /*
1132          * Try to extend the current stream record, but no more then 1/4
1133          * the size of the FIFO.
1134          */
1135         extsize = jrec->jo->fifo.size >> 2;
1136         if (extsize > bytes)
1137             extsize = (bytes + 15) & ~15;
1138
1139         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1140                                 jrec->stream_reserved - jrec->stream_residual,
1141                                 extsize, &pusheditout);
1142         if (pusheditout) {
1143             jrec->stream_reserved = extsize;
1144             jrec->stream_residual = extsize;
1145             jrec->parent = NULL;        /* no longer accessible */
1146             jrec->last = NULL;          /* no longer accessible */
1147             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1148         } else {
1149             jrec->stream_reserved += extsize;
1150             jrec->stream_residual += extsize;
1151         }
1152     }
1153
1154     /*
1155      * Push out any remaining bytes into the current stream record.
1156      */
1157     if (bytes) {
1158         bcopy(buf, jrec->stream_ptr, bytes);
1159         jrec->stream_ptr += bytes;
1160         jrec->stream_residual -= bytes;
1161         jrec->residual -= bytes;
1162     }
1163
1164     /*
1165      * Handle data alignment requirements for the subrecord.  Because the
1166      * stream record's data space is more strictly aligned, it must already
1167      * have sufficient space to hold any subrecord alignment slop.
1168      */
1169     if (jrec->residual == 0 && jrec->residual_align) {
1170         KKASSERT(jrec->residual_align <= jrec->stream_residual);
1171         bzero(jrec->stream_ptr, jrec->residual_align);
1172         jrec->stream_ptr += jrec->residual_align;
1173         jrec->stream_residual -= jrec->residual_align;
1174         jrec->residual_align = 0;
1175     }
1176 }
1177
1178 /*
1179  * We are finished with a transaction.  If abortit is not set then we must
1180  * be at the top level with no residual subrecord data left to output.
1181  * If abortit is set then we can be in any state.
1182  *
1183  * The stream record will be committed or aborted as specified and jrecord
1184  * resources will be cleaned up.
1185  */
1186 static void
1187 jrecord_done(struct jrecord *jrec, int abortit)
1188 {
1189     KKASSERT(jrec->rawp != NULL);
1190
1191     if (abortit) {
1192         journal_abort(jrec->jo, &jrec->rawp);
1193     } else {
1194         KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1195         journal_commit(jrec->jo, &jrec->rawp, 
1196                         jrec->stream_reserved - jrec->stream_residual, 1);
1197     }
1198
1199     /*
1200      * jrec should not be used beyond this point without another init,
1201      * but clean up some fields to ensure that we panic if it is.
1202      *
1203      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1204      */
1205     jrec->jo = NULL;
1206     jrec->stream_ptr = NULL;
1207 }
1208
1209 /************************************************************************
1210  *                      LEAF RECORD SUPPORT ROUTINES                    *
1211  ************************************************************************
1212  *
1213  * These routine create leaf subrecords representing common filesystem
1214  * structures.
1215  */
1216
1217 static void
1218 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1219 {
1220 }
1221
1222 static void
1223 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1224 {
1225 }
1226
1227 /************************************************************************
1228  *                      JOURNAL VNOPS                                   *
1229  ************************************************************************
1230  *
1231  * These are function shims replacing the normal filesystem ops.  We become
1232  * responsible for calling the underlying filesystem ops.  We have the choice
1233  * of executing the underlying op first and then generating the journal entry,
1234  * or starting the journal entry, executing the underlying op, and then
1235  * either completing or aborting it.  
1236  *
1237  * The journal is supposed to be a high-level entity, which generally means
1238  * identifying files by name rather then by inode.  Supplying both allows
1239  * the journal to be used both for inode-number-compatible 'mirrors' and
1240  * for simple filesystem replication.
1241  *
1242  * Writes are particularly difficult to deal with because a single write may
1243  * represent a hundred megabyte buffer or more, and both writes and truncations
1244  * require the 'old' data to be written out as well as the new data if the
1245  * log is reversable.  Other issues:
1246  *
1247  * - How to deal with operations on unlinked files (no path available),
1248  *   but which may still be filesystem visible due to hard links.
1249  *
1250  * - How to deal with modifications made via a memory map.
1251  *
1252  * - Future cache coherency support will require cache coherency API calls
1253  *   both prior to and after the call to the underlying VFS.
1254  *
1255  * ALSO NOTE: We do not have to shim compatibility VOPs like MKDIR which have
1256  * new VFS equivalents (NMKDIR).
1257  */
1258
1259 static
1260 int
1261 journal_setattr(struct vop_setattr_args *ap)
1262 {
1263     struct mount *mp;
1264     struct journal *jo;
1265     struct jrecord jrec;
1266     void *save;         /* warning, save pointers do not always remain valid */
1267     int error;
1268
1269     error = vop_journal_operate_ap(&ap->a_head);
1270     mp = ap->a_head.a_ops->vv_mount;
1271     if (error == 0) {
1272         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1273             jrecord_init(jo, &jrec, -1);
1274             save = jrecord_push(&jrec, JTYPE_SETATTR);
1275             jrecord_pop(&jrec, save);
1276             jrecord_done(&jrec, 0);
1277         }
1278     }
1279     return (error);
1280 }
1281
1282 static
1283 int
1284 journal_write(struct vop_write_args *ap)
1285 {
1286     struct mount *mp;
1287     struct journal *jo;
1288     struct jrecord jrec;
1289     void *save;         /* warning, save pointers do not always remain valid */
1290     int error;
1291
1292     error = vop_journal_operate_ap(&ap->a_head);
1293     mp = ap->a_head.a_ops->vv_mount;
1294     if (error == 0) {
1295         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1296             jrecord_init(jo, &jrec, -1);
1297             save = jrecord_push(&jrec, JTYPE_WRITE);
1298             jrecord_pop(&jrec, save);
1299             jrecord_done(&jrec, 0);
1300         }
1301     }
1302     return (error);
1303 }
1304
1305 static
1306 int
1307 journal_fsync(struct vop_fsync_args *ap)
1308 {
1309     struct mount *mp;
1310     struct journal *jo;
1311     int error;
1312
1313     error = vop_journal_operate_ap(&ap->a_head);
1314     mp = ap->a_head.a_ops->vv_mount;
1315     if (error == 0) {
1316         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1317             /* XXX synchronize pending journal records */
1318         }
1319     }
1320     return (error);
1321 }
1322
1323 static
1324 int
1325 journal_putpages(struct vop_putpages_args *ap)
1326 {
1327     struct mount *mp;
1328     struct journal *jo;
1329     struct jrecord jrec;
1330     void *save;         /* warning, save pointers do not always remain valid */
1331     int error;
1332
1333     error = vop_journal_operate_ap(&ap->a_head);
1334     mp = ap->a_head.a_ops->vv_mount;
1335     if (error == 0) {
1336         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1337             jrecord_init(jo, &jrec, -1);
1338             save = jrecord_push(&jrec, JTYPE_PUTPAGES);
1339             jrecord_pop(&jrec, save);
1340             jrecord_done(&jrec, 0);
1341         }
1342     }
1343     return (error);
1344 }
1345
1346 static
1347 int
1348 journal_setacl(struct vop_setacl_args *ap)
1349 {
1350     struct mount *mp;
1351     struct journal *jo;
1352     struct jrecord jrec;
1353     void *save;         /* warning, save pointers do not always remain valid */
1354     int error;
1355
1356     error = vop_journal_operate_ap(&ap->a_head);
1357     mp = ap->a_head.a_ops->vv_mount;
1358     if (error == 0) {
1359         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1360             jrecord_init(jo, &jrec, -1);
1361             save = jrecord_push(&jrec, JTYPE_SETACL);
1362             jrecord_pop(&jrec, save);
1363             jrecord_done(&jrec, 0);
1364         }
1365     }
1366     return (error);
1367 }
1368
1369 static
1370 int
1371 journal_setextattr(struct vop_setextattr_args *ap)
1372 {
1373     struct mount *mp;
1374     struct journal *jo;
1375     struct jrecord jrec;
1376     void *save;         /* warning, save pointers do not always remain valid */
1377     int error;
1378
1379     error = vop_journal_operate_ap(&ap->a_head);
1380     mp = ap->a_head.a_ops->vv_mount;
1381     if (error == 0) {
1382         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1383             jrecord_init(jo, &jrec, -1);
1384             save = jrecord_push(&jrec, JTYPE_SETEXTATTR);
1385             jrecord_pop(&jrec, save);
1386             jrecord_done(&jrec, 0);
1387         }
1388     }
1389     return (error);
1390 }
1391
1392 static
1393 int
1394 journal_ncreate(struct vop_ncreate_args *ap)
1395 {
1396     struct mount *mp;
1397     struct journal *jo;
1398     struct jrecord jrec;
1399     void *save;         /* warning, save pointers do not always remain valid */
1400     int error;
1401
1402     error = vop_journal_operate_ap(&ap->a_head);
1403     mp = ap->a_head.a_ops->vv_mount;
1404     if (error == 0) {
1405         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1406             jrecord_init(jo, &jrec, -1);
1407             save = jrecord_push(&jrec, JTYPE_CREATE);
1408             jrecord_pop(&jrec, save);
1409             jrecord_done(&jrec, 0);
1410         }
1411     }
1412     return (error);
1413 }
1414
1415 static
1416 int
1417 journal_nmknod(struct vop_nmknod_args *ap)
1418 {
1419     struct mount *mp;
1420     struct journal *jo;
1421     struct jrecord jrec;
1422     void *save;         /* warning, save pointers do not always remain valid */
1423     int error;
1424
1425     error = vop_journal_operate_ap(&ap->a_head);
1426     mp = ap->a_head.a_ops->vv_mount;
1427     if (error == 0) {
1428         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1429             jrecord_init(jo, &jrec, -1);
1430             save = jrecord_push(&jrec, JTYPE_MKNOD);
1431             jrecord_pop(&jrec, save);
1432             jrecord_done(&jrec, 0);
1433         }
1434     }
1435     return (error);
1436 }
1437
1438 static
1439 int
1440 journal_nlink(struct vop_nlink_args *ap)
1441 {
1442     struct mount *mp;
1443     struct journal *jo;
1444     struct jrecord jrec;
1445     void *save;         /* warning, save pointers do not always remain valid */
1446     int error;
1447
1448     error = vop_journal_operate_ap(&ap->a_head);
1449     mp = ap->a_head.a_ops->vv_mount;
1450     if (error == 0) {
1451         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1452             jrecord_init(jo, &jrec, -1);
1453             save = jrecord_push(&jrec, JTYPE_LINK);
1454             jrecord_pop(&jrec, save);
1455             jrecord_done(&jrec, 0);
1456         }
1457     }
1458     return (error);
1459 }
1460
1461 static
1462 int
1463 journal_nsymlink(struct vop_nsymlink_args *ap)
1464 {
1465     struct mount *mp;
1466     struct journal *jo;
1467     struct jrecord jrec;
1468     void *save;         /* warning, save pointers do not always remain valid */
1469     int error;
1470
1471     error = vop_journal_operate_ap(&ap->a_head);
1472     mp = ap->a_head.a_ops->vv_mount;
1473     if (error == 0) {
1474         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1475             jrecord_init(jo, &jrec, -1);
1476             save = jrecord_push(&jrec, JTYPE_SYMLINK);
1477             jrecord_pop(&jrec, save);
1478             jrecord_done(&jrec, 0);
1479         }
1480     }
1481     return (error);
1482 }
1483
1484 static
1485 int
1486 journal_nwhiteout(struct vop_nwhiteout_args *ap)
1487 {
1488     struct mount *mp;
1489     struct journal *jo;
1490     struct jrecord jrec;
1491     void *save;         /* warning, save pointers do not always remain valid */
1492     int error;
1493
1494     error = vop_journal_operate_ap(&ap->a_head);
1495     mp = ap->a_head.a_ops->vv_mount;
1496     if (error == 0) {
1497         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1498             jrecord_init(jo, &jrec, -1);
1499             save = jrecord_push(&jrec, JTYPE_WHITEOUT);
1500             jrecord_pop(&jrec, save);
1501             jrecord_done(&jrec, 0);
1502         }
1503     }
1504     return (error);
1505 }
1506
1507 static
1508 int
1509 journal_nremove(struct vop_nremove_args *ap)
1510 {
1511     struct mount *mp;
1512     struct journal *jo;
1513     struct jrecord jrec;
1514     void *save;         /* warning, save pointers do not always remain valid */
1515     int error;
1516
1517     error = vop_journal_operate_ap(&ap->a_head);
1518     mp = ap->a_head.a_ops->vv_mount;
1519     if (error == 0) {
1520         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1521             jrecord_init(jo, &jrec, -1);
1522             save = jrecord_push(&jrec, JTYPE_REMOVE);
1523             jrecord_pop(&jrec, save);
1524             jrecord_done(&jrec, 0);
1525         }
1526     }
1527     return (error);
1528 }
1529
1530 static
1531 int
1532 journal_nmkdir(struct vop_nmkdir_args *ap)
1533 {
1534     struct mount *mp;
1535     struct journal *jo;
1536     struct jrecord jrec;
1537     void *save;         /* warning, save pointers do not always remain valid */
1538     int error;
1539
1540     error = vop_journal_operate_ap(&ap->a_head);
1541     mp = ap->a_head.a_ops->vv_mount;
1542     if (error == 0) {
1543         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1544             jrecord_init(jo, &jrec, -1);
1545             if (jo->flags & MC_JOURNAL_WANT_REVERSABLE) {
1546                 save = jrecord_push(&jrec, JTYPE_UNDO);
1547                 /* XXX undo operations */
1548                 jrecord_pop(&jrec, save);
1549             }
1550 #if 0
1551             if (jo->flags & MC_JOURNAL_WANT_AUDIT) {
1552                 jrecord_write_audit(&jrec);
1553             }
1554 #endif
1555             save = jrecord_push(&jrec, JTYPE_MKDIR);
1556             jrecord_write_path(&jrec, JLEAF_PATH1, ap->a_ncp);
1557             jrecord_write_vattr(&jrec, ap->a_vap);
1558             jrecord_pop(&jrec, save);
1559             jrecord_done(&jrec, 0);
1560         }
1561     }
1562     return (error);
1563 }
1564
1565
1566 static
1567 int
1568 journal_nrmdir(struct vop_nrmdir_args *ap)
1569 {
1570     struct mount *mp;
1571     struct journal *jo;
1572     struct jrecord jrec;
1573     void *save;         /* warning, save pointers do not always remain valid */
1574     int error;
1575
1576     error = vop_journal_operate_ap(&ap->a_head);
1577     mp = ap->a_head.a_ops->vv_mount;
1578     if (error == 0) {
1579         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1580             jrecord_init(jo, &jrec, -1);
1581             save = jrecord_push(&jrec, JTYPE_RMDIR);
1582             jrecord_pop(&jrec, save);
1583             jrecord_done(&jrec, 0);
1584         }
1585     }
1586     return (error);
1587 }
1588
1589 static
1590 int
1591 journal_nrename(struct vop_nrename_args *ap)
1592 {
1593     struct mount *mp;
1594     struct journal *jo;
1595     struct jrecord jrec;
1596     void *save;         /* warning, save pointers do not always remain valid */
1597     int error;
1598
1599     error = vop_journal_operate_ap(&ap->a_head);
1600     mp = ap->a_head.a_ops->vv_mount;
1601     if (error == 0) {
1602         TAILQ_FOREACH(jo, &mp->mnt_jlist, jentry) {
1603             jrecord_init(jo, &jrec, -1);
1604             save = jrecord_push(&jrec, JTYPE_RENAME);
1605             jrecord_pop(&jrec, save);
1606             jrecord_done(&jrec, 0);
1607         }
1608     }
1609     return (error);
1610 }
1611