2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * $DragonFly: src/sbin/jscan/jfile.c,v 1.9 2005/09/07 02:34:37 dillon Exp $
40 static void jalign(struct jfile *jf);
41 static int jreadbuf(struct jfile *jf, void *buf, int bytes);
42 static void jreset(struct jfile *jf, unsigned int seq,
43 enum jdirection direction);
46 * Open a file descriptor for journal record access.
48 * NOTE: only seekable descriptors are supported for backwards scans.
51 jopen_fd(int fd, enum jdirection direction)
55 jf = malloc(sizeof(struct jfile));
56 bzero(jf, sizeof(struct jfile));
59 jf->jf_open_flags = O_RDONLY;
60 if (direction == JD_BACKWARDS) {
61 jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END);
63 jf->jf_direction = direction;
68 * Open a prefix set. <prefix>.nnnnnnnnn files or a <prefix>.transid file
69 * must exist to succeed. No file descriptor is actually opened but
70 * the sequence number is initialized to the beginning or end of the set.
73 jopen_prefix(const char *prefix, enum jdirection direction, int rw)
77 unsigned int seq_beg = -1;
78 unsigned int seq_end = -1;
91 dirname = data = strdup(prefix);
92 if ((basename = strrchr(dirname, '/')) != NULL) {
98 baselen = strlen(basename);
99 if ((dir = opendir(dirname)) != NULL) {
100 while ((den = readdir(dir)) != NULL) {
101 if (strncmp(den->d_name, basename, baselen) == 0 &&
102 den->d_name[baselen] == '.'
104 seq = strtoul(den->d_name + baselen + 1, &ptr, 16);
105 if (*ptr == 0 && seq != ULONG_MAX) {
106 if (seq_beg == (unsigned int)-1 || seq_beg > seq)
108 if (seq_end == (unsigned int)-1 || seq_end < seq)
118 asprintf(&data, "%s.transid", prefix);
119 if (stat(data, &st) == 0)
123 if (seq_beg != (unsigned int)-1 || hastransid) {
124 if (seq_beg == (unsigned int)-1) {
128 asprintf(&data, "%s.%08x", prefix, 0);
129 if ((fd = open(data, O_RDWR|O_CREAT, 0666)) >= 0)
134 jf = malloc(sizeof(struct jfile));
135 bzero(jf, sizeof(struct jfile));
137 jf->jf_write_fd = -1;
138 jf->jf_prefix = strdup(prefix);
139 jf->jf_seq_beg = seq_beg;
140 jf->jf_seq_end = seq_end;
141 jf->jf_open_flags = rw ? (O_RDWR|O_CREAT) : O_RDONLY;
142 jreset(jf, seq_end, JD_BACKWARDS);
144 fprintf(stderr, "Open prefix set %08x-%08x\n", seq_beg, seq_end);
145 if (jread(jf, &jd, JD_BACKWARDS) == 0) {
146 jf->jf_last_transid = jd->jd_transid;
149 if (direction == JD_BACKWARDS)
150 jreset(jf, jf->jf_seq_end, direction);
152 jreset(jf, jf->jf_seq_beg, direction);
160 * Get a prefix set ready for append.
163 jrecord_init(const char *prefix)
172 * Determine whether we already have a prefix set or whether we need
175 jf = jopen_prefix(prefix, 0, 0);
178 if (jf->jf_seq_beg != (unsigned int)-1)
182 asprintf(&data, "%s.transid", prefix);
185 * If the sequence exists the transid file must ALREADY exist for us
186 * to be able to safely 'append' to the space. Locked-down sequence
187 * spaces do not have a transid file.
190 fd = open(data, O_RDWR, 0666);
192 fd = open(data, O_RDWR|O_CREAT, 0666);
197 if (fstat(fd, &st) == 0 && st.st_size == 0)
198 write(fd, "0000000000000000\n", 17); /* starting transid in hex */
204 * Close a previously opened journal, clean up any side allocations.
207 jclose(struct jfile *jf)
209 if (jf->jf_fd >= 0) {
213 if (jf->jf_write_fd >= 0) {
214 close(jf->jf_write_fd);
215 jf->jf_write_fd = -1;
221 * Locate the next (or previous) complete virtual stream transaction given a
222 * file descriptor and direction. Keep track of partial stream records as
225 * Note that a transaction might represent a huge I/O operation, resulting
226 * in an overall node structure that spans gigabytes, but individual
227 * subrecord leaf nodes are limited in size and we depend on this to simplify
228 * the handling of leaf records.
230 * A transaction may cover several raw records. The jstream collection for
231 * a transaction is only returned when the entire transaction has been
232 * successfully scanned. Due to the interleaving of transactions the ordering
233 * of returned JS's may be different (not exactly reversed) when scanning a
234 * journal backwards verses forwards. Since parallel operations are
235 * theoretically non-conflicting, this should not present a problem.
237 * PAD RECORD SPECIAL CASE. Pad records can be 16 bytes long, which means
238 * that that rawrecend overlaps the transid field of the rawrecbeg. Because
239 * the transid is garbage, we must skip and cannot return pad records.
242 jread(struct jfile *jf, struct jdata **jdp, enum jdirection direction)
244 struct journal_rawrecbeg head;
245 struct journal_rawrecbeg *headp;
246 struct journal_rawrecend tail;
247 struct journal_rawrecend *tailp;
258 * If changing direction on an open descriptor we have to fixup jf_pos.
259 * When reading backwards the actual file seek position does not match
262 * If you read forwards then read backwards, or read backwords then
263 * read forwards, you will get the same record.
265 if (jf->jf_direction != direction) {
266 if (jf->jf_fd >= 0) {
267 if (direction == JD_FORWARDS) {
268 lseek(jf->jf_fd, jf->jf_pos, 0);
271 jf->jf_direction = direction;
276 * If reading in prefix mode and we have no descriptor, open
277 * a new descriptor based on the current sequence number. If
278 * this fails we will fall all the way through to the end which will
279 * setup the next sequence number and loop.
281 if (jf->jf_fd == -1 && jf->jf_prefix) {
282 asprintf(&filename, "%s.%08x", jf->jf_prefix, jf->jf_seq);
283 if ((jf->jf_fd = open(filename, O_RDONLY)) >= 0) {
284 if (jf->jf_direction == JD_FORWARDS)
287 jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END);
291 fprintf(stderr, "Open %s fd %d\n", filename, jf->jf_fd);
296 * Get the current offset and make sure it is 16-byte aligned. If it
297 * isn't, align it and enter search mode.
299 if (jf->jf_pos & 15) {
300 jf_warn(jf, "realigning bad offset and entering search mode");
308 if (jf->jf_direction == JD_FORWARDS) {
310 * Scan the journal forwards. Note that the file pointer might not
313 while ((error = jreadbuf(jf, &head, sizeof(head))) == sizeof(head)) {
314 if (head.begmagic != JREC_BEGMAGIC) {
316 jf_warn(jf, "bad beginmagic, searching for new record");
323 * The actual record is 16-byte aligned. head.recsize contains
324 * the unaligned record size.
326 recsize = (head.recsize + 15) & ~15;
327 if (recsize < JREC_MINRECSIZE || recsize > JREC_MAXRECSIZE) {
329 jf_warn(jf, "bad recordsize: %d\n", recsize);
334 allocsize = offsetof(struct jdata, jd_data[recsize]);
335 allocsize = (allocsize + 255) & ~255;
336 jd = malloc(allocsize);
337 bzero(jd, offsetof(struct jdata, jd_data[0]));
338 bcopy(&head, jd->jd_data, sizeof(head));
339 n = jreadbuf(jf, jd->jd_data + sizeof(head),
340 recsize - sizeof(head));
341 if (n != (int)(recsize - sizeof(head))) {
343 jf_warn(jf, "Incomplete stream record\n");
350 tailp = (void *)(jd->jd_data + recsize - sizeof(*tailp));
351 if (tailp->endmagic != JREC_ENDMAGIC) {
353 jf_warn(jf, "bad endmagic, searching for new record");
363 if (head.streamid == JREC_STREAMID_PAD) {
369 * note: recsize is aligned (the actual record size),
370 * head.recsize is unaligned (the actual payload size).
372 jd->jd_transid = head.transid;
373 jd->jd_alloc = allocsize;
374 jd->jd_size = recsize;
381 * Scan the journal backwards. Note that jread()'s reverse-seek and
382 * read. The data read will be forward ordered, however.
384 while ((error = jreadbuf(jf, &tail, sizeof(tail))) == sizeof(tail)) {
385 if (tail.endmagic != JREC_ENDMAGIC) {
387 jf_warn(jf, "bad endmagic, searching for new record");
394 * The actual record is 16-byte aligned. head.recsize contains
395 * the unaligned record size.
397 recsize = (tail.recsize + 15) & ~15;
398 if (recsize < JREC_MINRECSIZE || recsize > JREC_MAXRECSIZE) {
400 jf_warn(jf, "bad recordsize: %d\n", recsize);
405 allocsize = offsetof(struct jdata, jd_data[recsize]);
406 allocsize = (allocsize + 255) & ~255;
407 jd = malloc(allocsize);
408 bzero(jd, offsetof(struct jdata, jd_data[0]));
409 bcopy(&tail, jd->jd_data + recsize - sizeof(tail), sizeof(tail));
410 n = jreadbuf(jf, jd->jd_data, recsize - sizeof(tail));
411 if (n != (int)(recsize - sizeof(tail))) {
413 jf_warn(jf, "Incomplete stream record\n");
420 headp = (void *)jd->jd_data;
421 if (headp->begmagic != JREC_BEGMAGIC) {
423 jf_warn(jf, "bad begmagic, searching for new record");
433 if (head.streamid == JREC_STREAMID_PAD) {
439 * note: recsize is aligned (the actual record size),
440 * head.recsize is unaligned (the actual payload size).
442 jd->jd_transid = headp->transid;
443 jd->jd_alloc = allocsize;
444 jd->jd_size = recsize;
452 * If reading in prefix mode and there is no more data, close the
453 * current descriptor, adjust the sequence number, and loop.
455 * If we hit the end of the sequence space and were asked to loop,
456 * check for the next sequence number and adjust jf_seq_end. Leave
457 * the current descriptor open so we do not loose track of its seek
458 * position, and also to catch a race where another jscan may have
459 * written more data to the current sequence number before rolling
460 * the next sequence number.
462 if (error == 0 && jf->jf_prefix) {
463 if (jf->jf_direction == JD_FORWARDS) {
464 if (jf->jf_seq < jf->jf_seq_end) {
467 fprintf(stderr, "jread: roll to seq %08x\n", jf->jf_seq);
468 if (jf->jf_fd >= 0) {
474 if (jmodes & JMODEF_LOOP_FOREVER) {
475 asprintf(&filename, "%s.%08x", jf->jf_prefix, jf->jf_seq + 1);
476 if (stat(filename, &st) == 0) {
479 fprintf(stderr, "jread: roll seq_end to %08x\n",
487 if (jf->jf_seq > jf->jf_seq_beg) {
490 fprintf(stderr, "jread: roll to seq %08x\n", jf->jf_seq);
491 if (jf->jf_fd >= 0) {
501 * If we hit EOF and were asked to loop forever on the input, leave
502 * the current descriptor open, sleep, and loop.
504 * We have already handled the prefix case. This feature only works
505 * when doing forward scans and the input is not a pipe.
507 if (error == 0 && (jmodes & JMODEF_LOOP_FOREVER) &&
508 !(jmodes & JMODEF_INPUT_PIPE) && jf->jf_direction == JD_FORWARDS &&
509 jf->jf_prefix == NULL
516 * Otherwise there are no more records and we are done.
523 * Write a record out. If this is a prefix set and the file would
524 * exceed record_size, we rotate into a new sequence number.
527 jwrite(struct jfile *jf, struct jdata *jd)
533 assert(jf->jf_prefix);
537 * Open/create a new file in the prefix set
539 if (jf->jf_write_fd < 0) {
540 asprintf(&path, "%s.%08x", jf->jf_prefix, jf->jf_seq_end);
541 jf->jf_write_fd = open(path, O_RDWR|O_CREAT, 0666);
542 if (jf->jf_write_fd < 0 || fstat(jf->jf_write_fd, &st) != 0) {
543 fprintf(stderr, "Unable to open/create %s\n", path);
546 jf->jf_write_pos = st.st_size;
547 lseek(jf->jf_write_fd, jf->jf_write_pos, 0);
552 * Each file must contain at least one raw record, even if it exceeds
553 * the user-requested record-size. Apart from that, we cycle to the next
554 * file when its size would exceed the user-specified
556 if (jf->jf_write_pos > 0 &&
557 jf->jf_write_pos + jd->jd_size > prefix_file_size
559 close(jf->jf_write_fd);
560 jf->jf_write_fd = -1;
566 * Terminate if a failure occurs (for now).
568 n = write(jf->jf_write_fd, jd->jd_data, jd->jd_size);
569 if (n != jd->jd_size) {
570 ftruncate(jf->jf_write_fd, jf->jf_write_pos);
571 fprintf(stderr, "jwrite: failed %s\n", strerror(errno));
574 jf->jf_write_pos += n;
575 jf->jf_last_transid = jd->jd_transid;
579 * Reset the direction and seek us to the beginning or end
580 * of the currenet file. In prefix mode we might as well
581 * just let jsread() do it since it might have to do it
585 jreset(struct jfile *jf, unsigned int seq, enum jdirection direction)
588 if (jf->jf_fd >= 0) {
596 jf->jf_pos = lseek(jf->jf_fd, 0L, 0);
598 jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END);
601 jf->jf_direction = direction;
605 * Position the file such that the next jread() in the specified
606 * direction will read the record for the specified transaction id.
607 * If the transaction id does not exist the jseek will position the
608 * file at the next higher (if reading forwards) or lower (if reading
609 * backwards) transaction id.
611 * jseek is not required to be exact. It is allowed to position the
612 * file at any point <= the transid (forwards) or >= the transid
613 * (backwards). However, the more off jseek is, the more scanning
614 * the code will have to do to position itself properly.
617 jseek(struct jfile *jf, int64_t transid, enum jdirection direction)
621 unsigned int seq = (unsigned int)-1;
625 * If we have a prefix set search the sequence space backwards until
626 * we find the file most likely to contain the transaction id.
629 if (verbose_opt > 2) {
630 fprintf(stderr, "jseek prefix set %s %08x-%08x\n", jf->jf_prefix,
631 jf->jf_seq_beg, jf->jf_seq_end);
633 for (seq = jf->jf_seq_end; seq != jf->jf_seq_beg - 1; --seq) {
634 jreset(jf, seq, JD_FORWARDS);
636 fprintf(stderr, "try seq %08x\n", seq);
637 if (jread(jf, &jd, JD_FORWARDS) == 0) {
638 transid_beg = jd->jd_transid;
640 fprintf(stderr, "transid %016llx\n", jd->jd_transid);
642 if (transid_beg == transid) {
643 jreset(jf, seq, JD_FORWARDS);
646 if (transid_beg < transid)
650 if (seq == jf->jf_seq_beg - 1) {
651 seq = jf->jf_seq_beg;
654 fprintf(stderr, "jseek input prefix set to seq %08x\n", seq);
658 * Position us within the current file.
660 jreset(jf, seq, JD_BACKWARDS);
661 while (jread(jf, &jd, JD_BACKWARDS) == 0) {
662 transid_end = jd->jd_transid;
666 * If we are at the sequence number the next forward read
667 * will re-read the record since we were going backwards. If
668 * the caller wants to go backwards we have to go forwards one
669 * record so the caller gets the transid record when it does
670 * its first backwards read. Confused yet?
672 * If we are at a smaller sequence number we need to read forwards
673 * by one so the next forwards read gets the first record > transid,
674 * or the next backwards read gets the first record < transid.
676 if (transid_end == transid) {
677 if (direction == JD_BACKWARDS) {
678 if (jread(jf, &jd, JD_FORWARDS) == 0)
683 if (transid_end < transid) {
684 if (jread(jf, &jd, JD_FORWARDS) == 0)
690 fprintf(stderr, "jseek %s to seq %08x offset 0x%08llx\n",
691 jf->jf_prefix, jf->jf_seq, jf->jf_pos);
696 * Data returned by jread() is persistent until released.
699 jref(struct jdata *jd)
706 jfree(struct jfile *jf __unused, struct jdata *jd)
708 if (--jd->jd_refs == 0)
713 * Align us to the next 16 byte boundary. If scanning forwards we align
714 * forwards if not already aligned. If scanning backwards we align
715 * backwards if not already aligned. We only have to synchronize the
716 * seek position with the file seek position for forward scans.
719 jalign(struct jfile *jf)
724 if ((int)jf->jf_pos & 15) {
725 if (jf->jf_direction == JD_FORWARDS) {
726 bytes = 16 - ((int)jf->jf_pos & 15);
727 jreadbuf(jf, dummy, bytes);
729 jf->jf_pos = jf->jf_pos & ~(off_t)15;
735 * Read the next raw journal record forwards or backwards and return a
736 * pointer to it. Note that the file pointer's actual seek position does
737 * not match jf_pos in the reverse direction case.
740 jreadbuf(struct jfile *jf, void *buf, int bytes)
748 if (jf->jf_direction == JD_FORWARDS) {
749 while (ttl != bytes) {
750 n = read(jf->jf_fd, (char *)buf + ttl, bytes - ttl);
752 if (n < 0 && ttl == 0)
760 if (jf->jf_pos >= bytes) {
762 lseek(jf->jf_fd, jf->jf_pos, 0);
763 while (ttl != bytes) {
764 n = read(jf->jf_fd, (char *)buf + ttl, bytes - ttl);
766 if (n < 0 && ttl == 0)