Merge branch 'vendor/BINUTILS225'
[dragonfly.git] / sys / kern / vfs_journal.c
1 /*
2  * Copyright (c) 2004-2006 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  *
34  * $DragonFly: src/sys/kern/vfs_journal.c,v 1.33 2007/05/09 00:53:34 dillon Exp $
35  */
36 /*
37  * The journaling protocol is intended to evolve into a two-way stream
38  * whereby transaction IDs can be acknowledged by the journaling target
39  * when the data has been committed to hard storage.  Both implicit and
40  * explicit acknowledgement schemes will be supported, depending on the
41  * sophistication of the journaling stream, plus resynchronization and
42  * restart when a journaling stream is interrupted.  This information will
43  * also be made available to journaling-aware filesystems to allow better
44  * management of their own physical storage synchronization mechanisms as
45  * well as to allow such filesystems to take direct advantage of the kernel's
46  * journaling layer so they don't have to roll their own.
47  *
48  * In addition, the worker thread will have access to much larger 
49  * spooling areas then the memory buffer is able to provide by e.g. 
50  * reserving swap space, in order to absorb potentially long interruptions
51  * of off-site journaling streams, and to prevent 'slow' off-site linkages
52  * from radically slowing down local filesystem operations.  
53  *
54  * Because of the non-trivial algorithms the journaling system will be
55  * required to support, use of a worker thread is mandatory.  Efficiencies
56  * are maintained by utilitizing the memory FIFO to batch transactions when
57  * possible, reducing the number of gratuitous thread switches and taking
58  * advantage of cpu caches through the use of shorter batched code paths
59  * rather then trying to do everything in the context of the process
60  * originating the filesystem op.  In the future the memory FIFO can be
61  * made per-cpu to remove BGL or other locking requirements.
62  */
63 #include <sys/param.h>
64 #include <sys/systm.h>
65 #include <sys/buf.h>
66 #include <sys/conf.h>
67 #include <sys/kernel.h>
68 #include <sys/queue.h>
69 #include <sys/lock.h>
70 #include <sys/malloc.h>
71 #include <sys/mount.h>
72 #include <sys/unistd.h>
73 #include <sys/vnode.h>
74 #include <sys/poll.h>
75 #include <sys/mountctl.h>
76 #include <sys/journal.h>
77 #include <sys/file.h>
78 #include <sys/proc.h>
79 #include <sys/xio.h>
80 #include <sys/socket.h>
81 #include <sys/socketvar.h>
82
83 #include <machine/limits.h>
84
85 #include <vm/vm.h>
86 #include <vm/vm_object.h>
87 #include <vm/vm_page.h>
88 #include <vm/vm_pager.h>
89 #include <vm/vnode_pager.h>
90
91 #include <sys/file2.h>
92 #include <sys/thread2.h>
93 #include <sys/mplock2.h>
94 #include <sys/spinlock2.h>
95
96 static void journal_wthread(void *info);
97 static void journal_rthread(void *info);
98
99 static void *journal_reserve(struct journal *jo,
100                         struct journal_rawrecbeg **rawpp,
101                         int16_t streamid, int bytes);
102 static void *journal_extend(struct journal *jo,
103                         struct journal_rawrecbeg **rawpp,
104                         int truncbytes, int bytes, int *newstreamrecp);
105 static void journal_abort(struct journal *jo,
106                         struct journal_rawrecbeg **rawpp);
107 static void journal_commit(struct journal *jo,
108                         struct journal_rawrecbeg **rawpp,
109                         int bytes, int closeout);
110 static void jrecord_data(struct jrecord *jrec,
111                         void *buf, int bytes, int dtype);
112
113
114 MALLOC_DEFINE(M_JOURNAL, "journal", "Journaling structures");
115 MALLOC_DEFINE(M_JFIFO, "journal-fifo", "Journal FIFO");
116
117 void
118 journal_create_threads(struct journal *jo)
119 {
120         jo->flags &= ~(MC_JOURNAL_STOP_REQ | MC_JOURNAL_STOP_IMM);
121         jo->flags |= MC_JOURNAL_WACTIVE;
122         lwkt_create(journal_wthread, jo, NULL, &jo->wthread,
123                     TDF_NOSTART, -1,
124                     "journal w:%.*s", JIDMAX, jo->id);
125         lwkt_setpri(&jo->wthread, TDPRI_KERN_DAEMON);
126         lwkt_schedule(&jo->wthread);
127
128         if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) {
129             jo->flags |= MC_JOURNAL_RACTIVE;
130             lwkt_create(journal_rthread, jo, NULL, &jo->rthread,
131                         TDF_NOSTART, -1,
132                         "journal r:%.*s", JIDMAX, jo->id);
133             lwkt_setpri(&jo->rthread, TDPRI_KERN_DAEMON);
134             lwkt_schedule(&jo->rthread);
135         }
136 }
137
138 void
139 journal_destroy_threads(struct journal *jo, int flags)
140 {
141     int wcount;
142
143     jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
144     wakeup(&jo->fifo);
145     wcount = 0;
146     while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
147         tsleep(jo, 0, "jwait", hz);
148         if (++wcount % 10 == 0) {
149             kprintf("Warning: journal %s waiting for descriptors to close\n",
150                 jo->id);
151         }
152     }
153
154     /*
155      * XXX SMP - threads should move to cpu requesting the restart or
156      * termination before finishing up to properly interlock.
157      */
158     tsleep(jo, 0, "jwait", hz);
159     lwkt_free_thread(&jo->wthread);
160     if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
161         lwkt_free_thread(&jo->rthread);
162 }
163
164 /*
165  * The per-journal worker thread is responsible for writing out the
166  * journal's FIFO to the target stream.
167  */
168 static void
169 journal_wthread(void *info)
170 {
171     struct journal *jo = info;
172     struct journal_rawrecbeg *rawp;
173     int error;
174     size_t avail;
175     size_t bytes;
176     size_t res;
177
178     /* not MPSAFE yet */
179     get_mplock();
180
181     for (;;) {
182         /*
183          * Calculate the number of bytes available to write.  This buffer
184          * area may contain reserved records so we can't just write it out
185          * without further checks.
186          */
187         bytes = jo->fifo.windex - jo->fifo.rindex;
188
189         /*
190          * sleep if no bytes are available or if an incomplete record is
191          * encountered (it needs to be filled in before we can write it
192          * out), and skip any pad records that we encounter.
193          */
194         if (bytes == 0) {
195             if (jo->flags & MC_JOURNAL_STOP_REQ)
196                 break;
197             tsleep(&jo->fifo, 0, "jfifo", hz);
198             continue;
199         }
200
201         /*
202          * Sleep if we can not go any further due to hitting an incomplete
203          * record.  This case should occur rarely but may have to be better
204          * optimized XXX.
205          */
206         rawp = (void *)(jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask));
207         if (rawp->begmagic == JREC_INCOMPLETEMAGIC) {
208             tsleep(&jo->fifo, 0, "jpad", hz);
209             continue;
210         }
211
212         /*
213          * Skip any pad records.  We do not write out pad records if we can
214          * help it. 
215          */
216         if (rawp->streamid == JREC_STREAMID_PAD) {
217             if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
218                 if (jo->fifo.rindex == jo->fifo.xindex) {
219                     jo->fifo.xindex += (rawp->recsize + 15) & ~15;
220                     jo->total_acked += (rawp->recsize + 15) & ~15;
221                 }
222             }
223             jo->fifo.rindex += (rawp->recsize + 15) & ~15;
224             jo->total_acked += bytes;
225             KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
226             continue;
227         }
228
229         /*
230          * 'bytes' is the amount of data that can potentially be written out.  
231          * Calculate 'res', the amount of data that can actually be written
232          * out.  res is bounded either by hitting the end of the physical
233          * memory buffer or by hitting an incomplete record.  Incomplete
234          * records often occur due to the way the space reservation model
235          * works.
236          */
237         res = 0;
238         avail = jo->fifo.size - (jo->fifo.rindex & jo->fifo.mask);
239         while (res < bytes && rawp->begmagic == JREC_BEGMAGIC) {
240             res += (rawp->recsize + 15) & ~15;
241             if (res >= avail) {
242                 KKASSERT(res == avail);
243                 break;
244             }
245             rawp = (void *)((char *)rawp + ((rawp->recsize + 15) & ~15));
246         }
247
248         /*
249          * Issue the write and deal with any errors or other conditions.
250          * For now assume blocking I/O.  Since we are record-aware the
251          * code cannot yet handle partial writes.
252          *
253          * We bump rindex prior to issuing the write to avoid racing
254          * the acknowledgement coming back (which could prevent the ack
255          * from bumping xindex).  Restarts are always based on xindex so
256          * we do not try to undo the rindex if an error occurs.
257          *
258          * XXX EWOULDBLOCK/NBIO
259          * XXX notification on failure
260          * XXX permanent verses temporary failures
261          * XXX two-way acknowledgement stream in the return direction / xindex
262          */
263         bytes = res;
264         jo->fifo.rindex += bytes;
265         error = fp_write(jo->fp, 
266                         jo->fifo.membase +
267                          ((jo->fifo.rindex - bytes) & jo->fifo.mask),
268                         bytes, &res, UIO_SYSSPACE);
269         if (error) {
270             kprintf("journal_thread(%s) write, error %d\n", jo->id, error);
271             /* XXX */
272         } else {
273             KKASSERT(res == bytes);
274         }
275
276         /*
277          * Advance rindex.  If the journal stream is not full duplex we also
278          * advance xindex, otherwise the rjournal thread is responsible for
279          * advancing xindex.
280          */
281         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
282             jo->fifo.xindex += bytes;
283             jo->total_acked += bytes;
284         }
285         KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
286         if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
287             if (jo->flags & MC_JOURNAL_WWAIT) {
288                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
289                 wakeup(&jo->fifo.windex);
290             }
291         }
292     }
293     fp_shutdown(jo->fp, SHUT_WR);
294     jo->flags &= ~MC_JOURNAL_WACTIVE;
295     wakeup(jo);
296     wakeup(&jo->fifo.windex);
297     rel_mplock();
298 }
299
300 /*
301  * A second per-journal worker thread is created for two-way journaling
302  * streams to deal with the return acknowledgement stream.
303  */
304 static void
305 journal_rthread(void *info)
306 {
307     struct journal_rawrecbeg *rawp;
308     struct journal_ackrecord ack;
309     struct journal *jo = info;
310     int64_t transid;
311     int error;
312     size_t count;
313     size_t bytes;
314
315     transid = 0;
316     error = 0;
317
318     /* not MPSAFE yet */
319     get_mplock();
320
321     for (;;) {
322         /*
323          * We have been asked to stop
324          */
325         if (jo->flags & MC_JOURNAL_STOP_REQ)
326                 break;
327
328         /*
329          * If we have no active transaction id, get one from the return
330          * stream.
331          */
332         if (transid == 0) {
333             error = fp_read(jo->fp, &ack, sizeof(ack), &count, 
334                             1, UIO_SYSSPACE);
335 #if 0
336             kprintf("fp_read ack error %d count %d\n", error, count);
337 #endif
338             if (error || count != sizeof(ack))
339                 break;
340             if (error) {
341                 kprintf("read error %d on receive stream\n", error);
342                 break;
343             }
344             if (ack.rbeg.begmagic != JREC_BEGMAGIC ||
345                 ack.rend.endmagic != JREC_ENDMAGIC
346             ) {
347                 kprintf("bad begmagic or endmagic on receive stream\n");
348                 break;
349             }
350             transid = ack.rbeg.transid;
351         }
352
353         /*
354          * Calculate the number of unacknowledged bytes.  If there are no
355          * unacknowledged bytes then unsent data was acknowledged, report,
356          * sleep a bit, and loop in that case.  This should not happen 
357          * normally.  The ack record is thrown away.
358          */
359         bytes = jo->fifo.rindex - jo->fifo.xindex;
360
361         if (bytes == 0) {
362             kprintf("warning: unsent data acknowledged transid %08llx\n",
363                     (long long)transid);
364             tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
365             transid = 0;
366             continue;
367         }
368
369         /*
370          * Since rindex has advanced, the record pointed to by xindex
371          * must be a valid record.
372          */
373         rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
374         KKASSERT(rawp->begmagic == JREC_BEGMAGIC);
375         KKASSERT(rawp->recsize <= bytes);
376
377         /*
378          * The target can acknowledge several records at once.
379          */
380         if (rawp->transid < transid) {
381 #if 1
382             kprintf("ackskip %08llx/%08llx\n",
383                     (long long)rawp->transid,
384                     (long long)transid);
385 #endif
386             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
387             jo->total_acked += (rawp->recsize + 15) & ~15;
388             if (jo->flags & MC_JOURNAL_WWAIT) {
389                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
390                 wakeup(&jo->fifo.windex);
391             }
392             continue;
393         }
394         if (rawp->transid == transid) {
395 #if 1
396             kprintf("ackskip %08llx/%08llx\n",
397                     (long long)rawp->transid,
398                     (long long)transid);
399 #endif
400             jo->fifo.xindex += (rawp->recsize + 15) & ~15;
401             jo->total_acked += (rawp->recsize + 15) & ~15;
402             if (jo->flags & MC_JOURNAL_WWAIT) {
403                 jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
404                 wakeup(&jo->fifo.windex);
405             }
406             transid = 0;
407             continue;
408         }
409         kprintf("warning: unsent data(2) acknowledged transid %08llx\n",
410                 (long long)transid);
411         transid = 0;
412     }
413     jo->flags &= ~MC_JOURNAL_RACTIVE;
414     wakeup(jo);
415     wakeup(&jo->fifo.windex);
416     rel_mplock();
417 }
418
419 /*
420  * This builds a pad record which the journaling thread will skip over.  Pad
421  * records are required when we are unable to reserve sufficient stream space
422  * due to insufficient space at the end of the physical memory fifo.
423  *
424  * Even though the record is not transmitted, a normal transid must be 
425  * assigned to it so link recovery operations after a failure work properly.
426  */
427 static
428 void
429 journal_build_pad(struct journal_rawrecbeg *rawp, int recsize, int64_t transid)
430 {
431     struct journal_rawrecend *rendp;
432     
433     KKASSERT((recsize & 15) == 0 && recsize >= 16);
434
435     rawp->streamid = JREC_STREAMID_PAD;
436     rawp->recsize = recsize;    /* must be 16-byte aligned */
437     rawp->transid = transid;
438     /*
439      * WARNING, rendp may overlap rawp->transid.  This is necessary to
440      * allow PAD records to fit in 16 bytes.  Use cpu_ccfence() to
441      * hopefully cause the compiler to not make any assumptions.
442      */
443     rendp = (void *)((char *)rawp + rawp->recsize - sizeof(*rendp));
444     rendp->endmagic = JREC_ENDMAGIC;
445     rendp->check = 0;
446     rendp->recsize = rawp->recsize;
447
448     /*
449      * Set the begin magic last.  This is what will allow the journal
450      * thread to write the record out.  Use a store fence to prevent
451      * compiler and cpu reordering of the writes.
452      */
453     cpu_sfence();
454     rawp->begmagic = JREC_BEGMAGIC;
455 }
456
457 /*
458  * Wake up the worker thread if the FIFO is more then half full or if
459  * someone is waiting for space to be freed up.  Otherwise let the 
460  * heartbeat deal with it.  Being able to avoid waking up the worker
461  * is the key to the journal's cpu performance.
462  */
463 static __inline
464 void
465 journal_commit_wakeup(struct journal *jo)
466 {
467     int avail;
468
469     avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
470     KKASSERT(avail >= 0);
471     if ((avail < (jo->fifo.size >> 1)) || (jo->flags & MC_JOURNAL_WWAIT))
472         wakeup(&jo->fifo);
473 }
474
475 /*
476  * Create a new BEGIN stream record with the specified streamid and the
477  * specified amount of payload space.  *rawpp will be set to point to the
478  * base of the new stream record and a pointer to the base of the payload
479  * space will be returned.  *rawpp does not need to be pre-NULLd prior to
480  * making this call.  The raw record header will be partially initialized.
481  *
482  * A stream can be extended, aborted, or committed by other API calls
483  * below.  This may result in a sequence of potentially disconnected
484  * stream records to be output to the journaling target.  The first record
485  * (the one created by this function) will be marked JREC_STREAMCTL_BEGIN,
486  * while the last record on commit or abort will be marked JREC_STREAMCTL_END
487  * (and possibly also JREC_STREAMCTL_ABORTED).  The last record could wind
488  * up being the same as the first, in which case the bits are all set in
489  * the first record.
490  *
491  * The stream record is created in an incomplete state by setting the begin
492  * magic to JREC_INCOMPLETEMAGIC.  This prevents the worker thread from
493  * flushing the fifo past our record until we have finished populating it.
494  * Other threads can reserve and operate on their own space without stalling
495  * but the stream output will stall until we have completed operations.  The
496  * memory FIFO is intended to be large enough to absorb such situations
497  * without stalling out other threads.
498  */
499 static
500 void *
501 journal_reserve(struct journal *jo, struct journal_rawrecbeg **rawpp,
502                 int16_t streamid, int bytes)
503 {
504     struct journal_rawrecbeg *rawp;
505     int avail;
506     int availtoend;
507     int req;
508
509     /*
510      * Add header and trailer overheads to the passed payload.  Note that
511      * the passed payload size need not be aligned in any way.
512      */
513     bytes += sizeof(struct journal_rawrecbeg);
514     bytes += sizeof(struct journal_rawrecend);
515
516     for (;;) {
517         /*
518          * First, check boundary conditions.  If the request would wrap around
519          * we have to skip past the ending block and return to the beginning
520          * of the FIFO's buffer.  Calculate 'req' which is the actual number
521          * of bytes being reserved, including wrap-around dead space.
522          *
523          * Neither 'bytes' or 'req' are aligned.
524          *
525          * Note that availtoend is not truncated to avail and so cannot be
526          * used to determine whether the reservation is possible by itself.
527          * Also, since all fifo ops are 16-byte aligned, we can check
528          * the size before calculating the aligned size.
529          */
530         availtoend = jo->fifo.size - (jo->fifo.windex & jo->fifo.mask);
531         KKASSERT((availtoend & 15) == 0);
532         if (bytes > availtoend) 
533             req = bytes + availtoend;   /* add pad to end */
534         else
535             req = bytes;
536
537         /*
538          * Next calculate the total available space and see if it is
539          * sufficient.  We cannot overwrite previously buffered data
540          * past xindex because otherwise we would not be able to restart
541          * a broken link at the target's last point of commit.
542          */
543         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex);
544         KKASSERT(avail >= 0 && (avail & 15) == 0);
545
546         if (avail < req) {
547             /* XXX MC_JOURNAL_STOP_IMM */
548             jo->flags |= MC_JOURNAL_WWAIT;
549             ++jo->fifostalls;
550             tsleep(&jo->fifo.windex, 0, "jwrite", 0);
551             continue;
552         }
553
554         /*
555          * Create a pad record for any dead space and create an incomplete
556          * record for the live space, then return a pointer to the
557          * contiguous buffer space that was requested.
558          *
559          * NOTE: The worker thread will not flush past an incomplete
560          * record, so the reserved space can be filled in at-will.  The
561          * journaling code must also be aware the reserved sections occuring
562          * after this one will also not be written out even if completed
563          * until this one is completed.
564          *
565          * The transaction id must accomodate real and potential pad creation.
566          */
567         rawp = (void *)(jo->fifo.membase + (jo->fifo.windex & jo->fifo.mask));
568         if (req != bytes) {
569             journal_build_pad(rawp, availtoend, jo->transid);
570             ++jo->transid;
571             rawp = (void *)jo->fifo.membase;
572         }
573         rawp->begmagic = JREC_INCOMPLETEMAGIC;  /* updated by abort/commit */
574         rawp->recsize = bytes;                  /* (unaligned size) */
575         rawp->streamid = streamid | JREC_STREAMCTL_BEGIN;
576         rawp->transid = jo->transid;
577         jo->transid += 2;
578
579         /*
580          * Issue a memory barrier to guarentee that the record data has been
581          * properly initialized before we advance the write index and return
582          * a pointer to the reserved record.  Otherwise the worker thread
583          * could accidently run past us.
584          *
585          * Note that stream records are always 16-byte aligned.
586          */
587         cpu_sfence();
588         jo->fifo.windex += (req + 15) & ~15;
589         *rawpp = rawp;
590         return(rawp + 1);
591     }
592     /* not reached */
593     *rawpp = NULL;
594     return(NULL);
595 }
596
597 /*
598  * Attempt to extend the stream record by <bytes> worth of payload space.
599  *
600  * If it is possible to extend the existing stream record no truncation
601  * occurs and the record is extended as specified.  A pointer to the 
602  * truncation offset within the payload space is returned.
603  *
604  * If it is not possible to do this the existing stream record is truncated
605  * and committed, and a new stream record of size <bytes> is created.  A
606  * pointer to the base of the new stream record's payload space is returned.
607  *
608  * *rawpp is set to the new reservation in the case of a new record but
609  * the caller cannot depend on a comparison with the old rawp to determine if
610  * this case occurs because we could end up using the same memory FIFO
611  * offset for the new stream record.  Use *newstreamrecp instead.
612  */
613 static void *
614 journal_extend(struct journal *jo, struct journal_rawrecbeg **rawpp, 
615                 int truncbytes, int bytes, int *newstreamrecp)
616 {
617     struct journal_rawrecbeg *rawp;
618     int16_t streamid;
619     int availtoend;
620     int avail;
621     int osize;
622     int nsize;
623     int wbase;
624     void *rptr;
625
626     *newstreamrecp = 0;
627     rawp = *rawpp;
628     osize = (rawp->recsize + 15) & ~15;
629     nsize = (rawp->recsize + bytes + 15) & ~15;
630     wbase = (char *)rawp - jo->fifo.membase;
631
632     /*
633      * If the aligned record size does not change we can trivially adjust
634      * the record size.
635      */
636     if (nsize == osize) {
637         rawp->recsize += bytes;
638         return((char *)(rawp + 1) + truncbytes);
639     }
640
641     /*
642      * If the fifo's write index hasn't been modified since we made the
643      * reservation and we do not hit any boundary conditions, we can 
644      * trivially make the record smaller or larger.
645      */
646     if ((jo->fifo.windex & jo->fifo.mask) == wbase + osize) {
647         availtoend = jo->fifo.size - wbase;
648         avail = jo->fifo.size - (jo->fifo.windex - jo->fifo.xindex) + osize;
649         KKASSERT((availtoend & 15) == 0);
650         KKASSERT((avail & 15) == 0);
651         if (nsize <= avail && nsize <= availtoend) {
652             jo->fifo.windex += nsize - osize;
653             rawp->recsize += bytes;
654             return((char *)(rawp + 1) + truncbytes);
655         }
656     }
657
658     /*
659      * It was not possible to extend the buffer.  Commit the current
660      * buffer and create a new one.  We manually clear the BEGIN mark that
661      * journal_reserve() creates (because this is a continuing record, not
662      * the start of a new stream).
663      */
664     streamid = rawp->streamid & JREC_STREAMID_MASK;
665     journal_commit(jo, rawpp, truncbytes, 0);
666     rptr = journal_reserve(jo, rawpp, streamid, bytes);
667     rawp = *rawpp;
668     rawp->streamid &= ~JREC_STREAMCTL_BEGIN;
669     *newstreamrecp = 1;
670     return(rptr);
671 }
672
673 /*
674  * Abort a journal record.  If the transaction record represents a stream
675  * BEGIN and we can reverse the fifo's write index we can simply reverse
676  * index the entire record, as if it were never reserved in the first place.
677  *
678  * Otherwise we set the JREC_STREAMCTL_ABORTED bit and commit the record
679  * with the payload truncated to 0 bytes.
680  */
681 static void
682 journal_abort(struct journal *jo, struct journal_rawrecbeg **rawpp)
683 {
684     struct journal_rawrecbeg *rawp;
685     int osize;
686
687     rawp = *rawpp;
688     osize = (rawp->recsize + 15) & ~15;
689
690     if ((rawp->streamid & JREC_STREAMCTL_BEGIN) &&
691         (jo->fifo.windex & jo->fifo.mask) == 
692          (char *)rawp - jo->fifo.membase + osize)
693     {
694         jo->fifo.windex -= osize;
695         *rawpp = NULL;
696     } else {
697         rawp->streamid |= JREC_STREAMCTL_ABORTED;
698         journal_commit(jo, rawpp, 0, 1);
699     }
700 }
701
702 /*
703  * Commit a journal record and potentially truncate it to the specified
704  * number of payload bytes.  If you do not want to truncate the record,
705  * simply pass -1 for the bytes parameter.  Do not pass rawp->recsize, that
706  * field includes header and trailer and will not be correct.  Note that
707  * passing 0 will truncate the entire data payload of the record.
708  *
709  * The logical stream is terminated by this function.
710  *
711  * If truncation occurs, and it is not possible to physically optimize the
712  * memory FIFO due to other threads having reserved space after ours,
713  * the remaining reserved space will be covered by a pad record.
714  */
715 static void
716 journal_commit(struct journal *jo, struct journal_rawrecbeg **rawpp,
717                 int bytes, int closeout)
718 {
719     struct journal_rawrecbeg *rawp;
720     struct journal_rawrecend *rendp;
721     int osize;
722     int nsize;
723
724     rawp = *rawpp;
725     *rawpp = NULL;
726
727     KKASSERT((char *)rawp >= jo->fifo.membase &&
728              (char *)rawp + rawp->recsize <= jo->fifo.membase + jo->fifo.size);
729     KKASSERT(((intptr_t)rawp & 15) == 0);
730
731     /*
732      * Truncate the record if necessary.  If the FIFO write index as still
733      * at the end of our record we can optimally backindex it.  Otherwise
734      * we have to insert a pad record to cover the dead space.
735      *
736      * We calculate osize which is the 16-byte-aligned original recsize.
737      * We calculate nsize which is the 16-byte-aligned new recsize.
738      *
739      * Due to alignment issues or in case the passed truncation bytes is
740      * the same as the original payload, nsize may be equal to osize even
741      * if the committed bytes is less then the originally reserved bytes.
742      */
743     if (bytes >= 0) {
744         KKASSERT(bytes >= 0 && bytes <= rawp->recsize - sizeof(struct journal_rawrecbeg) - sizeof(struct journal_rawrecend));
745         osize = (rawp->recsize + 15) & ~15;
746         rawp->recsize = bytes + sizeof(struct journal_rawrecbeg) +
747                         sizeof(struct journal_rawrecend);
748         nsize = (rawp->recsize + 15) & ~15;
749         KKASSERT(nsize <= osize);
750         if (osize == nsize) {
751             /* do nothing */
752         } else if ((jo->fifo.windex & jo->fifo.mask) == (char *)rawp - jo->fifo.membase + osize) {
753             /* we are able to backindex the fifo */
754             jo->fifo.windex -= osize - nsize;
755         } else {
756             /* we cannot backindex the fifo, emplace a pad in the dead space */
757             journal_build_pad((void *)((char *)rawp + nsize), osize - nsize,
758                                 rawp->transid + 1);
759         }
760     }
761
762     /*
763      * Fill in the trailer.  Note that unlike pad records, the trailer will
764      * never overlap the header.
765      */
766     rendp = (void *)((char *)rawp + 
767             ((rawp->recsize + 15) & ~15) - sizeof(*rendp));
768     rendp->endmagic = JREC_ENDMAGIC;
769     rendp->recsize = rawp->recsize;
770     rendp->check = 0;           /* XXX check word, disabled for now */
771
772     /*
773      * Fill in begmagic last.  This will allow the worker thread to proceed.
774      * Use a memory barrier to guarentee write ordering.  Mark the stream
775      * as terminated if closeout is set.  This is the typical case.
776      */
777     if (closeout)
778         rawp->streamid |= JREC_STREAMCTL_END;
779     cpu_sfence();               /* memory and compiler barrier */
780     rawp->begmagic = JREC_BEGMAGIC;
781
782     journal_commit_wakeup(jo);
783 }
784
785 /************************************************************************
786  *                      TRANSACTION SUPPORT ROUTINES                    *
787  ************************************************************************
788  *
789  * JRECORD_*() - routines to create subrecord transactions and embed them
790  *               in the logical streams managed by the journal_*() routines.
791  */
792
793 /*
794  * Initialize the passed jrecord structure and start a new stream transaction
795  * by reserving an initial build space in the journal's memory FIFO.
796  */
797 void
798 jrecord_init(struct journal *jo, struct jrecord *jrec, int16_t streamid)
799 {
800     bzero(jrec, sizeof(*jrec));
801     jrec->jo = jo;
802     jrec->streamid = streamid;
803     jrec->stream_residual = JREC_DEFAULTSIZE;
804     jrec->stream_reserved = jrec->stream_residual;
805     jrec->stream_ptr = 
806         journal_reserve(jo, &jrec->rawp, streamid, jrec->stream_reserved);
807 }
808
809 /*
810  * Push a recursive record type.  All pushes should have matching pops.
811  * The old parent is returned and the newly pushed record becomes the
812  * new parent.  Note that the old parent's pointer may already be invalid
813  * or may become invalid if jrecord_write() had to build a new stream
814  * record, so the caller should not mess with the returned pointer in
815  * any way other then to save it.
816  */
817 struct journal_subrecord *
818 jrecord_push(struct jrecord *jrec, int16_t rectype)
819 {
820     struct journal_subrecord *save;
821
822     save = jrec->parent;
823     jrec->parent = jrecord_write(jrec, rectype|JMASK_NESTED, 0);
824     jrec->last = NULL;
825     KKASSERT(jrec->parent != NULL);
826     ++jrec->pushcount;
827     ++jrec->pushptrgood;        /* cleared on flush */
828     return(save);
829 }
830
831 /*
832  * Pop a previously pushed sub-transaction.  We must set JMASK_LAST
833  * on the last record written within the subtransaction.  If the last 
834  * record written is not accessible or if the subtransaction is empty,
835  * we must write out a pad record with JMASK_LAST set before popping.
836  *
837  * When popping a subtransaction the parent record's recsize field
838  * will be properly set.  If the parent pointer is no longer valid
839  * (which can occur if the data has already been flushed out to the
840  * stream), the protocol spec allows us to leave it 0.
841  *
842  * The saved parent pointer which we restore may or may not be valid,
843  * and if not valid may or may not be NULL, depending on the value
844  * of pushptrgood.
845  */
846 void
847 jrecord_pop(struct jrecord *jrec, struct journal_subrecord *save)
848 {
849     struct journal_subrecord *last;
850
851     KKASSERT(jrec->pushcount > 0);
852     KKASSERT(jrec->residual == 0);
853
854     /*
855      * Set JMASK_LAST on the last record we wrote at the current
856      * level.  If last is NULL we either no longer have access to the
857      * record or the subtransaction was empty and we must write out a pad
858      * record.
859      */
860     if ((last = jrec->last) == NULL) {
861         jrecord_write(jrec, JLEAF_PAD|JMASK_LAST, 0);
862         last = jrec->last;      /* reload after possible flush */
863     } else {
864         last->rectype |= JMASK_LAST;
865     }
866
867     /*
868      * pushptrgood tells us how many levels of parent record pointers
869      * are valid.  The jrec only stores the current parent record pointer
870      * (and it is only valid if pushptrgood != 0).  The higher level parent
871      * record pointers are saved by the routines calling jrecord_push() and
872      * jrecord_pop().  These pointers may become stale and we determine
873      * that fact by tracking the count of valid parent pointers with 
874      * pushptrgood.  Pointers become invalid when their related stream
875      * record gets pushed out.
876      *
877      * If no pointer is available (the data has already been pushed out),
878      * then no fixup of e.g. the length field is possible for non-leaf
879      * nodes.  The protocol allows for this situation by placing a larger
880      * burden on the program scanning the stream on the other end.
881      *
882      * [parentA]
883      *    [node X]
884      *    [parentB]
885      *       [node Y]
886      *       [node Z]
887      *    (pop B)       see NOTE B
888      * (pop A)          see NOTE A
889      *
890      * NOTE B:  This pop sets LAST in node Z if the node is still accessible,
891      *          else a PAD record is appended and LAST is set in that.
892      *
893      *          This pop sets the record size in parentB if parentB is still
894      *          accessible, else the record size is left 0 (the scanner must
895      *          deal with that).
896      *
897      *          This pop sets the new 'last' record to parentB, the pointer
898      *          to which may or may not still be accessible.
899      *
900      * NOTE A:  This pop sets LAST in parentB if the node is still accessible,
901      *          else a PAD record is appended and LAST is set in that.
902      *
903      *          This pop sets the record size in parentA if parentA is still
904      *          accessible, else the record size is left 0 (the scanner must
905      *          deal with that).
906      *
907      *          This pop sets the new 'last' record to parentA, the pointer
908      *          to which may or may not still be accessible.
909      *
910      * Also note that the last record in the stream transaction, which in
911      * the above example is parentA, does not currently have the LAST bit
912      * set.
913      *
914      * The current parent becomes the last record relative to the
915      * saved parent passed into us.  It's validity is based on 
916      * whether pushptrgood is non-zero prior to decrementing.  The saved
917      * parent becomes the new parent, and its validity is based on whether
918      * pushptrgood is non-zero after decrementing.
919      *
920      * The old jrec->parent may be NULL if it is no longer accessible.
921      * If pushptrgood is non-zero, however, it is guarenteed to not
922      * be NULL (since no flush occured).
923      */
924     jrec->last = jrec->parent;
925     --jrec->pushcount;
926     if (jrec->pushptrgood) {
927         KKASSERT(jrec->last != NULL && last != NULL);
928         if (--jrec->pushptrgood == 0) {
929             jrec->parent = NULL;        /* 'save' contains garbage or NULL */
930         } else {
931             KKASSERT(save != NULL);
932             jrec->parent = save;        /* 'save' must not be NULL */
933         }
934
935         /*
936          * Set the record size in the old parent.  'last' still points to
937          * the original last record in the subtransaction being popped,
938          * jrec->last points to the old parent (which became the last
939          * record relative to the new parent being popped into).
940          */
941         jrec->last->recsize = (char *)last + last->recsize - (char *)jrec->last;
942     } else {
943         jrec->parent = NULL;
944         KKASSERT(jrec->last == NULL);
945     }
946 }
947
948 /*
949  * Write out a leaf record, including associated data.
950  */
951 void
952 jrecord_leaf(struct jrecord *jrec, int16_t rectype, void *ptr, int bytes)
953 {
954     jrecord_write(jrec, rectype, bytes);
955     jrecord_data(jrec, ptr, bytes, JDATA_KERN);
956 }
957
958 void
959 jrecord_leaf_uio(struct jrecord *jrec, int16_t rectype,
960                  struct uio *uio)
961 {
962     struct iovec *iov;
963     int i;
964
965     for (i = 0; i < uio->uio_iovcnt; ++i) {
966         iov = &uio->uio_iov[i];
967         if (iov->iov_len == 0)
968             continue;
969         if (uio->uio_segflg == UIO_SYSSPACE) {
970             jrecord_write(jrec, rectype, iov->iov_len);
971             jrecord_data(jrec, iov->iov_base, iov->iov_len, JDATA_KERN);
972         } else { /* UIO_USERSPACE */
973             jrecord_write(jrec, rectype, iov->iov_len);
974             jrecord_data(jrec, iov->iov_base, iov->iov_len, JDATA_USER);
975         }
976     }
977 }
978
979 void
980 jrecord_leaf_xio(struct jrecord *jrec, int16_t rectype, xio_t xio)
981 {
982     int bytes = xio->xio_npages * PAGE_SIZE;
983
984     jrecord_write(jrec, rectype, bytes);
985     jrecord_data(jrec, xio, bytes, JDATA_XIO);
986 }
987
988 /*
989  * Write a leaf record out and return a pointer to its base.  The leaf
990  * record may contain potentially megabytes of data which is supplied
991  * in jrecord_data() calls.  The exact amount must be specified in this
992  * call.
993  *
994  * THE RETURNED SUBRECORD POINTER IS ONLY VALID IMMEDIATELY AFTER THE
995  * CALL AND MAY BECOME INVALID AT ANY TIME.  ONLY THE PUSH/POP CODE SHOULD
996  * USE THE RETURN VALUE.
997  */
998 struct journal_subrecord *
999 jrecord_write(struct jrecord *jrec, int16_t rectype, int bytes)
1000 {
1001     struct journal_subrecord *last;
1002     int pusheditout;
1003
1004     /*
1005      * Try to catch some obvious errors.  Nesting records must specify a
1006      * size of 0, and there should be no left-overs from previous operations
1007      * (such as incomplete data writeouts).
1008      */
1009     KKASSERT(bytes == 0 || (rectype & JMASK_NESTED) == 0);
1010     KKASSERT(jrec->residual == 0);
1011
1012     /*
1013      * Check to see if the current stream record has enough room for
1014      * the new subrecord header.  If it doesn't we extend the current
1015      * stream record.
1016      *
1017      * This may have the side effect of pushing out the current stream record
1018      * and creating a new one.  We must adjust our stream tracking fields
1019      * accordingly.
1020      */
1021     if (jrec->stream_residual < sizeof(struct journal_subrecord)) {
1022         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1023                                 jrec->stream_reserved - jrec->stream_residual,
1024                                 JREC_DEFAULTSIZE, &pusheditout);
1025         if (pusheditout) {
1026             /*
1027              * If a pushout occured, the pushed out stream record was
1028              * truncated as specified and the new record is exactly the
1029              * extension size specified.
1030              */
1031             jrec->stream_reserved = JREC_DEFAULTSIZE;
1032             jrec->stream_residual = JREC_DEFAULTSIZE;
1033             jrec->parent = NULL;        /* no longer accessible */
1034             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1035         } else {
1036             /*
1037              * If no pushout occured the stream record is NOT truncated and
1038              * IS extended.
1039              */
1040             jrec->stream_reserved += JREC_DEFAULTSIZE;
1041             jrec->stream_residual += JREC_DEFAULTSIZE;
1042         }
1043     }
1044     last = (void *)jrec->stream_ptr;
1045     last->rectype = rectype;
1046     last->reserved = 0;
1047
1048     /*
1049      * We may not know the record size for recursive records and the 
1050      * header may become unavailable due to limited FIFO space.  Write
1051      * -1 to indicate this special case.
1052      */
1053     if ((rectype & JMASK_NESTED) && bytes == 0)
1054         last->recsize = -1;
1055     else
1056         last->recsize = sizeof(struct journal_subrecord) + bytes;
1057     jrec->last = last;
1058     jrec->residual = bytes;             /* remaining data to be posted */
1059     jrec->residual_align = -bytes & 7;  /* post-data alignment required */
1060     jrec->stream_ptr += sizeof(*last);  /* current write pointer */
1061     jrec->stream_residual -= sizeof(*last); /* space remaining in stream */
1062     return(last);
1063 }
1064
1065 /*
1066  * Write out the data associated with a leaf record.  Any number of calls
1067  * to this routine may be made as long as the byte count adds up to the
1068  * amount originally specified in jrecord_write().
1069  *
1070  * The act of writing out the leaf data may result in numerous stream records
1071  * being pushed out.   Callers should be aware that even the associated
1072  * subrecord header may become inaccessible due to stream record pushouts.
1073  */
1074 static void
1075 jrecord_data(struct jrecord *jrec, void *buf, int bytes, int dtype)
1076 {
1077     int pusheditout;
1078     int extsize;
1079     int xio_offset = 0;
1080
1081     KKASSERT(bytes >= 0 && bytes <= jrec->residual);
1082
1083     /*
1084      * Push out stream records as long as there is insufficient room to hold
1085      * the remaining data.
1086      */
1087     while (jrec->stream_residual < bytes) {
1088         /*
1089          * Fill in any remaining space in the current stream record.
1090          */
1091         switch (dtype) {
1092         case JDATA_KERN:
1093             bcopy(buf, jrec->stream_ptr, jrec->stream_residual);
1094             break;
1095         case JDATA_USER:
1096             copyin(buf, jrec->stream_ptr, jrec->stream_residual);
1097             break;
1098         case JDATA_XIO:
1099             xio_copy_xtok((xio_t)buf, xio_offset, jrec->stream_ptr,
1100                           jrec->stream_residual);
1101             xio_offset += jrec->stream_residual;
1102             break;
1103         }
1104         if (dtype != JDATA_XIO)
1105             buf = (char *)buf + jrec->stream_residual;
1106         bytes -= jrec->stream_residual;
1107         /*jrec->stream_ptr += jrec->stream_residual;*/
1108         jrec->residual -= jrec->stream_residual;
1109         jrec->stream_residual = 0;
1110
1111         /*
1112          * Try to extend the current stream record, but no more then 1/4
1113          * the size of the FIFO.
1114          */
1115         extsize = jrec->jo->fifo.size >> 2;
1116         if (extsize > bytes)
1117             extsize = (bytes + 15) & ~15;
1118
1119         jrec->stream_ptr = journal_extend(jrec->jo, &jrec->rawp,
1120                                 jrec->stream_reserved - jrec->stream_residual,
1121                                 extsize, &pusheditout);
1122         if (pusheditout) {
1123             jrec->stream_reserved = extsize;
1124             jrec->stream_residual = extsize;
1125             jrec->parent = NULL;        /* no longer accessible */
1126             jrec->last = NULL;          /* no longer accessible */
1127             jrec->pushptrgood = 0;      /* restored parents in pops no good */
1128         } else {
1129             jrec->stream_reserved += extsize;
1130             jrec->stream_residual += extsize;
1131         }
1132     }
1133
1134     /*
1135      * Push out any remaining bytes into the current stream record.
1136      */
1137     if (bytes) {
1138         switch (dtype) {
1139         case JDATA_KERN:
1140             bcopy(buf, jrec->stream_ptr, bytes);
1141             break;
1142         case JDATA_USER:
1143             copyin(buf, jrec->stream_ptr, bytes);
1144             break;
1145         case JDATA_XIO:
1146             xio_copy_xtok((xio_t)buf, xio_offset, jrec->stream_ptr, bytes);
1147             break;
1148         }
1149         jrec->stream_ptr += bytes;
1150         jrec->stream_residual -= bytes;
1151         jrec->residual -= bytes;
1152     }
1153
1154     /*
1155      * Handle data alignment requirements for the subrecord.  Because the
1156      * stream record's data space is more strictly aligned, it must already
1157      * have sufficient space to hold any subrecord alignment slop.
1158      */
1159     if (jrec->residual == 0 && jrec->residual_align) {
1160         KKASSERT(jrec->residual_align <= jrec->stream_residual);
1161         bzero(jrec->stream_ptr, jrec->residual_align);
1162         jrec->stream_ptr += jrec->residual_align;
1163         jrec->stream_residual -= jrec->residual_align;
1164         jrec->residual_align = 0;
1165     }
1166 }
1167
1168 /*
1169  * We are finished with the transaction.  This closes the transaction created
1170  * by jrecord_init().
1171  *
1172  * NOTE: If abortit is not set then we must be at the top level with no
1173  *       residual subrecord data left to output.
1174  *
1175  *       If abortit is set then we can be in any state, all pushes will be 
1176  *       popped and it is ok for there to be residual data.  This works 
1177  *       because the virtual stream itself is truncated.  Scanners must deal
1178  *       with this situation.
1179  *
1180  * The stream record will be committed or aborted as specified and jrecord
1181  * resources will be cleaned up.
1182  */
1183 void
1184 jrecord_done(struct jrecord *jrec, int abortit)
1185 {
1186     KKASSERT(jrec->rawp != NULL);
1187
1188     if (abortit) {
1189         journal_abort(jrec->jo, &jrec->rawp);
1190     } else {
1191         KKASSERT(jrec->pushcount == 0 && jrec->residual == 0);
1192         journal_commit(jrec->jo, &jrec->rawp, 
1193                         jrec->stream_reserved - jrec->stream_residual, 1);
1194     }
1195
1196     /*
1197      * jrec should not be used beyond this point without another init,
1198      * but clean up some fields to ensure that we panic if it is.
1199      *
1200      * Note that jrec->rawp is NULLd out by journal_abort/journal_commit.
1201      */
1202     jrec->jo = NULL;
1203     jrec->stream_ptr = NULL;
1204 }
1205
1206 /************************************************************************
1207  *                      LOW LEVEL RECORD SUPPORT ROUTINES               *
1208  ************************************************************************
1209  *
1210  * These routine create low level recursive and leaf subrecords representing
1211  * common filesystem structures.
1212  */
1213
1214 /*
1215  * Write out a filename path relative to the base of the mount point.
1216  * rectype is typically JLEAF_PATH{1,2,3,4}.
1217  */
1218 void
1219 jrecord_write_path(struct jrecord *jrec, int16_t rectype, struct namecache *ncp)
1220 {
1221     char buf[64];       /* local buffer if it fits, else malloced */
1222     char *base;
1223     int pathlen;
1224     int index;
1225     struct namecache *scan;
1226
1227     /*
1228      * Pass 1 - figure out the number of bytes required.  Include terminating
1229      *         \0 on last element and '/' separator on other elements.
1230      *
1231      * The namecache topology terminates at the root of the filesystem
1232      * (the normal lookup code would then continue by using the mount
1233      * structure to figure out what it was mounted on).
1234      */
1235 again:
1236     pathlen = 0;
1237     for (scan = ncp; scan; scan = scan->nc_parent) {
1238         if (scan->nc_nlen > 0)
1239             pathlen += scan->nc_nlen + 1;
1240     }
1241
1242     if (pathlen <= sizeof(buf))
1243         base = buf;
1244     else
1245         base = kmalloc(pathlen, M_TEMP, M_INTWAIT);
1246
1247     /*
1248      * Pass 2 - generate the path buffer
1249      */
1250     index = pathlen;
1251     for (scan = ncp; scan; scan = scan->nc_parent) {
1252         if (scan->nc_nlen == 0)
1253             continue;
1254         if (scan->nc_nlen >= index) {
1255             if (base != buf)
1256                 kfree(base, M_TEMP);
1257             goto again;
1258         }
1259         if (index == pathlen)
1260             base[--index] = 0;
1261         else
1262             base[--index] = '/';
1263         index -= scan->nc_nlen;
1264         bcopy(scan->nc_name, base + index, scan->nc_nlen);
1265     }
1266     jrecord_leaf(jrec, rectype, base + index, pathlen - index);
1267     if (base != buf)
1268         kfree(base, M_TEMP);
1269 }
1270
1271 /*
1272  * Write out a file attribute structure.  While somewhat inefficient, using
1273  * a recursive data structure is the most portable and extensible way.
1274  */
1275 void
1276 jrecord_write_vattr(struct jrecord *jrec, struct vattr *vat)
1277 {
1278     void *save;
1279
1280     save = jrecord_push(jrec, JTYPE_VATTR);
1281     if (vat->va_type != VNON)
1282         jrecord_leaf(jrec, JLEAF_VTYPE, &vat->va_type, sizeof(vat->va_type));
1283     if (vat->va_mode != (mode_t)VNOVAL)
1284         jrecord_leaf(jrec, JLEAF_MODES, &vat->va_mode, sizeof(vat->va_mode));
1285     if (vat->va_nlink != VNOVAL)
1286         jrecord_leaf(jrec, JLEAF_NLINK, &vat->va_nlink, sizeof(vat->va_nlink));
1287     if (vat->va_uid != VNOVAL)
1288         jrecord_leaf(jrec, JLEAF_UID, &vat->va_uid, sizeof(vat->va_uid));
1289     if (vat->va_gid != VNOVAL)
1290         jrecord_leaf(jrec, JLEAF_GID, &vat->va_gid, sizeof(vat->va_gid));
1291     if (vat->va_fsid != VNOVAL)
1292         jrecord_leaf(jrec, JLEAF_FSID, &vat->va_fsid, sizeof(vat->va_fsid));
1293     if (vat->va_fileid != VNOVAL)
1294         jrecord_leaf(jrec, JLEAF_INUM, &vat->va_fileid, sizeof(vat->va_fileid));
1295     if (vat->va_size != VNOVAL)
1296         jrecord_leaf(jrec, JLEAF_SIZE, &vat->va_size, sizeof(vat->va_size));
1297     if (vat->va_atime.tv_sec != VNOVAL)
1298         jrecord_leaf(jrec, JLEAF_ATIME, &vat->va_atime, sizeof(vat->va_atime));
1299     if (vat->va_mtime.tv_sec != VNOVAL)
1300         jrecord_leaf(jrec, JLEAF_MTIME, &vat->va_mtime, sizeof(vat->va_mtime));
1301     if (vat->va_ctime.tv_sec != VNOVAL)
1302         jrecord_leaf(jrec, JLEAF_CTIME, &vat->va_ctime, sizeof(vat->va_ctime));
1303     if (vat->va_gen != VNOVAL)
1304         jrecord_leaf(jrec, JLEAF_GEN, &vat->va_gen, sizeof(vat->va_gen));
1305     if (vat->va_flags != VNOVAL)
1306         jrecord_leaf(jrec, JLEAF_FLAGS, &vat->va_flags, sizeof(vat->va_flags));
1307     if (vat->va_rmajor != VNOVAL) {
1308         udev_t rdev = makeudev(vat->va_rmajor, vat->va_rminor);
1309         jrecord_leaf(jrec, JLEAF_UDEV, &rdev, sizeof(rdev));
1310         jrecord_leaf(jrec, JLEAF_UMAJOR, &vat->va_rmajor, sizeof(vat->va_rmajor));
1311         jrecord_leaf(jrec, JLEAF_UMINOR, &vat->va_rminor, sizeof(vat->va_rminor));
1312     }
1313 #if 0
1314     if (vat->va_filerev != VNOVAL)
1315         jrecord_leaf(jrec, JLEAF_FILEREV, &vat->va_filerev, sizeof(vat->va_filerev));
1316 #endif
1317     jrecord_pop(jrec, save);
1318 }
1319
1320 /*
1321  * Write out the creds used to issue a file operation.  If a process is
1322  * available write out additional tracking information related to the 
1323  * process.
1324  *
1325  * XXX additional tracking info
1326  * XXX tty line info
1327  */
1328 void
1329 jrecord_write_cred(struct jrecord *jrec, struct thread *td, struct ucred *cred)
1330 {
1331     void *save;
1332     struct proc *p;
1333
1334     save = jrecord_push(jrec, JTYPE_CRED);
1335     jrecord_leaf(jrec, JLEAF_UID, &cred->cr_uid, sizeof(cred->cr_uid));
1336     jrecord_leaf(jrec, JLEAF_GID, &cred->cr_gid, sizeof(cred->cr_gid));
1337     if (td && (p = td->td_proc) != NULL) {
1338         jrecord_leaf(jrec, JLEAF_PID, &p->p_pid, sizeof(p->p_pid));
1339         jrecord_leaf(jrec, JLEAF_COMM, p->p_comm, sizeof(p->p_comm));
1340     }
1341     jrecord_pop(jrec, save);
1342 }
1343
1344 /*
1345  * Write out information required to identify a vnode
1346  *
1347  * XXX this needs work.  We should write out the inode number as well,
1348  * and in fact avoid writing out the file path for seqential writes
1349  * occuring within e.g. a certain period of time.
1350  */
1351 void
1352 jrecord_write_vnode_ref(struct jrecord *jrec, struct vnode *vp)
1353 {
1354     struct nchandle nch;
1355
1356     nch.mount = vp->v_mount;
1357     spin_lock(&vp->v_spin);
1358     TAILQ_FOREACH(nch.ncp, &vp->v_namecache, nc_vnode) {
1359         if ((nch.ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1360             break;
1361     }
1362     if (nch.ncp) {
1363         cache_hold(&nch);
1364         spin_unlock(&vp->v_spin);
1365         jrecord_write_path(jrec, JLEAF_PATH_REF, nch.ncp);
1366         cache_drop(&nch);
1367     } else {
1368         spin_unlock(&vp->v_spin);
1369     }
1370 }
1371
1372 void
1373 jrecord_write_vnode_link(struct jrecord *jrec, struct vnode *vp, 
1374                          struct namecache *notncp)
1375 {
1376     struct nchandle nch;
1377
1378     nch.mount = vp->v_mount;
1379     spin_lock(&vp->v_spin);
1380     TAILQ_FOREACH(nch.ncp, &vp->v_namecache, nc_vnode) {
1381         if (nch.ncp == notncp)
1382             continue;
1383         if ((nch.ncp->nc_flag & (NCF_UNRESOLVED|NCF_DESTROYED)) == 0)
1384             break;
1385     }
1386     if (nch.ncp) {
1387         cache_hold(&nch);
1388         spin_unlock(&vp->v_spin);
1389         jrecord_write_path(jrec, JLEAF_PATH_REF, nch.ncp);
1390         cache_drop(&nch);
1391     } else {
1392         spin_unlock(&vp->v_spin);
1393     }
1394 }
1395
1396 /*
1397  * Write out the data represented by a pagelist
1398  */
1399 void
1400 jrecord_write_pagelist(struct jrecord *jrec, int16_t rectype,
1401                         struct vm_page **pglist, int *rtvals, int pgcount,
1402                         off_t offset)
1403 {
1404     struct xio xio;
1405     int error;
1406     int b;
1407     int i;
1408
1409     i = 0;
1410     xio_init(&xio);
1411     while (i < pgcount) {
1412         /*
1413          * Find the next valid section.  Skip any invalid elements
1414          */
1415         if (rtvals[i] != VM_PAGER_OK) {
1416             ++i;
1417             offset += PAGE_SIZE;
1418             continue;
1419         }
1420
1421         /*
1422          * Figure out how big the valid section is, capping I/O at what the
1423          * MSFBUF can represent.
1424          */
1425         b = i;
1426         while (i < pgcount && i - b != XIO_INTERNAL_PAGES && 
1427                rtvals[i] == VM_PAGER_OK
1428         ) {
1429             ++i;
1430         }
1431
1432         /*
1433          * And write it out.
1434          */
1435         if (i - b) {
1436             error = xio_init_pages(&xio, pglist + b, i - b, XIOF_READ);
1437             if (error == 0) {
1438                 jrecord_leaf(jrec, JLEAF_SEEKPOS, &offset, sizeof(offset));
1439                 jrecord_leaf_xio(jrec, rectype, &xio);
1440             } else {
1441                 kprintf("jrecord_write_pagelist: xio init failure\n");
1442             }
1443             xio_release(&xio);
1444             offset += (off_t)(i - b) << PAGE_SHIFT;
1445         }
1446     }
1447 }
1448
1449 /*
1450  * Write out the data represented by a UIO.
1451  */
1452 void
1453 jrecord_write_uio(struct jrecord *jrec, int16_t rectype, struct uio *uio)
1454 {
1455     if (uio->uio_segflg != UIO_NOCOPY) {
1456         jrecord_leaf(jrec, JLEAF_SEEKPOS, &uio->uio_offset, 
1457                      sizeof(uio->uio_offset));
1458         jrecord_leaf_uio(jrec, rectype, uio);
1459     }
1460 }
1461
1462 void
1463 jrecord_file_data(struct jrecord *jrec, struct vnode *vp, 
1464                   off_t off, off_t bytes)
1465 {
1466     const int bufsize = 8192;
1467     char *buf;
1468     int error;
1469     int n;
1470
1471     buf = kmalloc(bufsize, M_JOURNAL, M_WAITOK);
1472     jrecord_leaf(jrec, JLEAF_SEEKPOS, &off, sizeof(off));
1473     while (bytes) {
1474         n = (bytes > bufsize) ? bufsize : (int)bytes;
1475         error = vn_rdwr(UIO_READ, vp, buf, n, off, UIO_SYSSPACE, IO_NODELOCKED,
1476                         proc0.p_ucred, NULL);
1477         if (error) {
1478             jrecord_leaf(jrec, JLEAF_ERROR, &error, sizeof(error));
1479             break;
1480         }
1481         jrecord_leaf(jrec, JLEAF_FILEDATA, buf, n);
1482         bytes -= n;
1483         off += n;
1484     }
1485     kfree(buf, M_JOURNAL);
1486 }
1487