Rework and expand the algorithms in JSCAN, part 1/2. Implement a new
[dragonfly.git] / sbin / jscan / jfile.c
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jfile.c,v 1.6 2005/09/06 06:42:44 dillon Exp $
35  */
36
37 #include "jscan.h"
38 #include <dirent.h>
39
40 static void jalign(struct jfile *jf);
41 static int jreadbuf(struct jfile *jf, void *buf, int bytes);
42
43 /*
44  * Open a file descriptor for journal record access. 
45  *
46  * NOTE: only seekable descriptors are supported for backwards scans.
47  */
48 struct jfile *
49 jopen_fd(int fd, enum jdirection direction)
50 {
51     struct jfile *jf;
52
53     jf = malloc(sizeof(struct jfile));
54     bzero(jf, sizeof(struct jfile));
55     jf->jf_fd = fd;
56     jf->jf_open_flags = O_RDONLY;
57     if (direction == JD_BACKWARDS) {
58         jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END);
59     }
60     jf->jf_direction = direction;
61     return(jf);
62 }
63
64 /*
65  * Open a prefix set.  <prefix>.nnnnnnnnn files or a <prefix>.transid file
66  * must exist to succeed.  No file descriptor is actually opened but
67  * the sequence number is initialized to the beginning or end of the set.
68  */
69 struct jfile *
70 jopen_prefix(const char *prefix, enum jdirection direction, int rw)
71 {
72     struct jfile *jf;
73     unsigned int seq_beg = -1;
74     unsigned int seq_end = -1;
75     unsigned int seq;
76     struct stat st;
77     const char *dirname;
78     struct dirent *den;
79     DIR *dir;
80     char *basename;
81     char *data;
82     char *ptr;
83     int hastransid;
84     int baselen;
85     int fd;
86
87     dirname = data = strdup(prefix);
88     if ((basename = strrchr(dirname, '/')) != NULL) {
89         *basename++ = 0;
90     } else {
91         basename = data;
92         dirname = "./";
93     }
94     baselen = strlen(basename);
95     if ((dir = opendir(dirname)) != NULL) {
96         while ((den = readdir(dir)) != NULL) {
97             if (strncmp(den->d_name, basename, baselen) == 0 && 
98                 den->d_name[baselen] == '.'
99             ) {
100                 seq = strtoul(den->d_name + baselen + 1, &ptr, 10);
101                 if (*ptr == 0 && seq > 0) {
102                     if (seq_beg == (unsigned int)-1 || seq_beg > seq)
103                         seq_beg = seq;
104                     if (seq_end == (unsigned int)-1 || seq_end < seq)
105                         seq_end = seq;
106                 }
107             }
108         }
109         closedir(dir);
110     }
111     free(data);
112
113     hastransid = 0;
114     asprintf(&data, "%s.transid", prefix);
115     if (stat(data, &st) == 0)
116         hastransid = 1;
117     free(data);
118
119     if (seq_beg != (unsigned int)-1 || hastransid) {
120         if (seq_beg == (unsigned int)-1) {
121             seq_beg = 0;
122             seq_end = 0;
123             if (rw) {
124                 asprintf(&data, "%s.%08x", prefix, 0);
125                 if ((fd = open(data, O_RDWR|O_CREAT, 0666)) >= 0)
126                     close(fd);
127                 free(data);
128             }
129         }
130         jf = malloc(sizeof(struct jfile));
131         bzero(jf, sizeof(struct jfile));
132         jf->jf_fd = -1;
133         jf->jf_prefix = strdup(prefix);
134         jf->jf_seq_beg = seq_beg;
135         jf->jf_seq_end = seq_end;
136         jf->jf_pos = -1;
137         if (direction == JD_BACKWARDS) {
138             jf->jf_seq = jf->jf_seq_end;
139         } else {
140             jf->jf_seq = jf->jf_seq_beg;
141         }
142         jf->jf_direction = direction;
143         jf->jf_open_flags = rw ? (O_RDWR|O_CREAT) : O_RDONLY;
144     } else {
145         jf = NULL;
146     }
147     return(jf);
148 }
149
150 /*
151  * Get a prefix set ready for append.
152  */
153 int
154 jrecord_init(const char *prefix)
155 {
156     struct jfile *jf;
157     struct stat st;
158     char *data;
159     int hasseqspace;
160     int fd;
161
162     /*
163      * Determine whether we already have a prefix set or whether we need
164      * to create one.
165      */
166     jf = jopen_prefix(prefix, 0, 0);
167     hasseqspace = 0;
168     if (jf) {
169         if (jf->jf_seq_beg != (unsigned int)-1)
170             hasseqspace = 1;
171         jclose(jf);
172     }
173     asprintf(&data, "%s.transid", prefix);
174
175     /*
176      * If the sequence exists the transid file must ALREADY exist for us
177      * to be able to safely 'append' to the space.  Locked-down sequence
178      * spaces do not have a transid file.
179      */
180     if (hasseqspace) {
181         fd = open(data, O_RDWR, 0666);
182     } else {
183         fd = open(data, O_RDWR|O_CREAT, 0666);
184     }
185     free(data);
186     if (fd < 0)
187         return(-1);
188     if (fstat(fd, &st) == 0 && st.st_size == 0)
189         write(fd, "0000000000000000\n", 17);    /* starting transid in hex */
190     close(fd);
191     return(0);
192 }
193
194 /*
195  * Close a previously opened journal, clean up any side allocations.
196  */
197 void
198 jclose(struct jfile *jf)
199 {
200     close(jf->jf_fd);
201     jf->jf_fd = -1;
202     free(jf);
203 }
204
205 /*
206  * Locate the next (or previous) complete virtual stream transaction given a
207  * file descriptor and direction.  Keep track of partial stream records as
208  * a side effect.
209  *
210  * Note that a transaction might represent a huge I/O operation, resulting
211  * in an overall node structure that spans gigabytes, but individual
212  * subrecord leaf nodes are limited in size and we depend on this to simplify
213  * the handling of leaf records. 
214  *
215  * A transaction may cover several raw records.  The jstream collection for
216  * a transaction is only returned when the entire transaction has been
217  * successfully scanned.  Due to the interleaving of transactions the ordering
218  * of returned JS's may be different (not exactly reversed) when scanning a
219  * journal backwards verses forwards.  Since parallel operations are 
220  * theoretically non-conflicting, this should not present a problem.
221  */
222 int
223 jread(struct jfile *jf, struct jdata **jdp, enum jdirection direction)
224 {
225     struct journal_rawrecbeg head;
226     struct journal_rawrecbeg *headp;
227     struct journal_rawrecend tail;
228     struct journal_rawrecend *tailp;
229     struct jdata *jd;
230     char *filename;
231     int allocsize;
232     int recsize;
233     int search;
234     int n;
235
236     /*
237      * If changing direction on an open descriptor we have to fixup jf_pos.
238      * When reading backwards the actual file seek position does not match
239      * jf_pos.
240      *
241      * If you read forwards then read backwards, or read backwords then
242      * read forwards, you will get the same record.
243      */
244     if (jf->jf_direction != direction) {
245         if (jf->jf_fd >= 0) {
246             if (direction == JD_FORWARDS) {
247                 lseek(jf->jf_fd, jf->jf_pos, 0);
248             }
249         }
250         jf->jf_direction = direction;
251     }
252
253 top:
254     /*
255      * If reading in prefix mode and we have no descriptor, open
256      * a new descriptor based on the current sequence number.  If
257      * this fails we will fall all the way through to the end which will
258      * setup the next sequence number and loop.
259      */
260     if (jf->jf_fd == -1 && jf->jf_prefix) {
261         asprintf(&filename, "%s.%08x", jf->jf_prefix, jf->jf_seq);
262         if ((jf->jf_fd = open(filename, O_RDONLY)) >= 0) {
263             if (jf->jf_direction == JD_FORWARDS)
264                 jf->jf_pos = 0;
265             else
266                 jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END);
267             search = 0;
268         }
269         fprintf(stderr, "Open %s fd %d\n", filename, jf->jf_fd);
270         free(filename);
271     }
272
273     /*
274      * Get the current offset and make sure it is 16-byte aligned.  If it
275      * isn't, align it and enter search mode.
276      */
277     if (jf->jf_pos & 15) {
278         jf_warn(jf, "realigning bad offset and entering search mode");
279         jalign(jf);
280         search = 1;
281     } else {
282         search = 0;
283     }
284
285     if (jf->jf_direction == JD_FORWARDS) {
286         /*
287          * Scan the journal forwards.  Note that the file pointer might not
288          * be seekable.
289          */
290         while (jreadbuf(jf, &head, sizeof(head)) == sizeof(head)) {
291             if (head.begmagic != JREC_BEGMAGIC) {
292                 if (search == 0)
293                     jf_warn(jf, "bad beginmagic, searching for new record");
294                 search = 1;
295                 jalign(jf);
296                 continue;
297             }
298
299             /*
300              * The actual record is 16-byte aligned.  head.recsize contains
301              * the unaligned record size.
302              */
303             recsize = (head.recsize + 15) & ~15;
304             if (recsize < JREC_MINRECSIZE || recsize > JREC_MAXRECSIZE) {
305                 if (search == 0)
306                     jf_warn(jf, "bad recordsize: %d\n", recsize);
307                 search = 1;
308                 jalign(jf);
309                 continue;
310             }
311             allocsize = offsetof(struct jdata, jd_data[recsize]);
312             allocsize = (allocsize + 255) & ~255;
313             jd = malloc(allocsize);
314             bzero(jd, offsetof(struct jdata, jd_data[0]));
315             bcopy(&head, jd->jd_data, sizeof(head));
316             n = jreadbuf(jf, jd->jd_data + sizeof(head), 
317                          recsize - sizeof(head));
318             if (n != (int)(recsize - sizeof(head))) {
319                 if (search == 0)
320                     jf_warn(jf, "Incomplete stream record\n");
321                 search = 1;
322                 jalign(jf);
323                 free(jd);
324                 continue;
325             }
326
327             tailp = (void *)(jd->jd_data + recsize - sizeof(*tailp));
328             if (tailp->endmagic != JREC_ENDMAGIC) {
329                 if (search == 0)
330                     jf_warn(jf, "bad endmagic, searching for new record");
331                 search = 1;
332                 jalign(jf);
333                 free(jd);
334                 continue;
335             }
336
337             /*
338              * note: recsize is aligned (the actual record size),
339              * head.recsize is unaligned (the actual payload size).
340              */
341             jd->jd_transid = head.transid;
342             jd->jd_alloc = allocsize;
343             jd->jd_size = recsize;
344             jd->jd_refs = 1;
345             jd->jd_next = jf->jf_data;
346             jf->jf_data = jd;
347             *jdp = jd;
348             return(0);
349         }
350     } else {
351         /*
352          * Scan the journal backwards.  Note that jread()'s reverse-seek and
353          * read.  The data read will be forward ordered, however.
354          */
355         while (jreadbuf(jf, &tail, sizeof(tail)) == sizeof(tail)) {
356             if (tail.endmagic != JREC_ENDMAGIC) {
357                 if (search == 0)
358                     jf_warn(jf, "bad endmagic, searching for new record");
359                 search = 1;
360                 jalign(jf);
361                 continue;
362             }
363
364             /*
365              * The actual record is 16-byte aligned.  head.recsize contains
366              * the unaligned record size.
367              */
368             recsize = (tail.recsize + 15) & ~15;
369             if (recsize < JREC_MINRECSIZE || recsize > JREC_MAXRECSIZE) {
370                 if (search == 0)
371                     jf_warn(jf, "bad recordsize: %d\n", recsize);
372                 search = 1;
373                 jalign(jf);
374                 continue;
375             }
376             allocsize = offsetof(struct jdata, jd_data[recsize]);
377             allocsize = (allocsize + 255) & ~255;
378             jd = malloc(allocsize);
379             bzero(jd, offsetof(struct jdata, jd_data[0]));
380             bcopy(&tail, jd->jd_data + recsize - sizeof(tail), sizeof(tail));
381             n = jreadbuf(jf, jd->jd_data, recsize - sizeof(tail));
382             if (n != (int)(recsize - sizeof(tail))) {
383                 if (search == 0)
384                     jf_warn(jf, "Incomplete stream record\n");
385                 search = 1;
386                 jalign(jf);
387                 free(jd);
388                 continue;
389             }
390
391             headp = (void *)jd->jd_data;
392             if (headp->begmagic != JREC_BEGMAGIC) {
393                 if (search == 0)
394                     jf_warn(jf, "bad begmagic, searching for new record");
395                 search = 1;
396                 jalign(jf);
397                 free(jd);
398                 continue;
399             }
400
401             /*
402              * note: recsize is aligned (the actual record size),
403              * head.recsize is unaligned (the actual payload size).
404              */
405             jd->jd_transid = headp->transid;
406             jd->jd_alloc = allocsize;
407             jd->jd_size = recsize;
408             jd->jd_refs = 1;
409             jd->jd_next = jf->jf_data;
410             jf->jf_data = jd;
411             *jdp = jd;
412             return(0);
413         }
414     }
415
416     /*
417      * If reading in prefix mode and there is no more data, close the 
418      * current descriptor, adjust the sequence number, and loop.
419      */
420     if (jf->jf_prefix) {
421         close(jf->jf_fd);
422         jf->jf_fd = -1;
423         if (jf->jf_direction == JD_FORWARDS) {
424             if (jf->jf_seq < jf->jf_seq_end) {
425                 ++jf->jf_seq;
426                 goto top;
427             }
428         } else {
429             if (jf->jf_seq > jf->jf_seq_beg) {
430                 --jf->jf_seq;
431                 goto top;
432             }
433         }
434     }
435
436     /*
437      * Otherwise there are no more records and we are done.
438      */
439     *jdp = NULL;
440     return(-1);
441 }
442
443 /*
444  * Write a record out.  If this is a prefix set and the file would
445  * exceed record_size, we rotate into a new sequence number.
446  */
447 int
448 jwrite(struct jfile *jf, struct jdata *jd)
449 {
450     int n;
451
452     n = write(jf->jf_fd, jd->jd_data, jd->jd_size);
453     return(n);
454 }
455
456 /*
457  * Reset the direction and seek us to the beginning or end
458  * of the currenet file.  In prefix mode we might as well
459  * just let jsread() do it since it might have to do it 
460  * anyway.
461  */
462 static void
463 jreset(struct jfile *jf, unsigned int seq, enum jdirection direction)
464 {
465     if (jf->jf_prefix) {
466         if (jf->jf_fd >= 0) {
467             close(jf->jf_fd);
468             jf->jf_fd = -1;
469         }
470         jf->jf_pos = -1;
471         jf->jf_seq = seq;
472     } else {
473         if (direction) {
474             jf->jf_pos = lseek(jf->jf_fd, 0L, 0);
475         } else {
476             jf->jf_pos = lseek(jf->jf_fd, 0L, SEEK_END);
477         }
478     }
479     jf->jf_direction = direction;
480 }
481
482 /*
483  * Position the file such that the next jread() in the specified
484  * direction will read the record for the specified transaction id.
485  * If the transaction id does not exist the jseek will position the
486  * file at the next higher (if reading forwards) or lower (if reading
487  * backwards) transaction id.
488  *
489  * jseek is not required to be exact.  It is allowed to position the
490  * file at any point <= the transid (forwards) or >= the transid
491  * (backwards).  However, the more off jseek is, the more scanning
492  * the code will have to do to position itself properly.
493  */
494 void
495 jseek(struct jfile *jf, int64_t transid, enum jdirection direction)
496 {
497     int64_t transid_beg;
498     int64_t transid_end;
499     unsigned int seq;
500     struct jdata *jd;
501
502     /*
503      * If we have a prefix set search the sequence space backwards until
504      * we find the file most likely to contain the transaction id.
505      */
506     if (jf->jf_prefix) {
507         for (seq = jf->jf_seq_end; seq >= jf->jf_seq_beg; --seq) {
508             jreset(jf, seq, JD_FORWARDS);
509             if (jread(jf, &jd, JD_FORWARDS) == 0) {
510                 transid_beg = jd->jd_transid;
511                 jfree(jf, jd);
512                 if (transid_beg == transid) {
513                     jreset(jf, seq, JD_FORWARDS);
514                     break;
515                 }
516                 if (transid_beg < transid)
517                     break;
518             }
519         }
520     }
521
522     /*
523      * Position us within the current file.
524      */
525     jreset(jf, seq, JD_BACKWARDS);
526     while (jread(jf, &jd, JD_BACKWARDS) == 0) {
527         transid_end = jd->jd_transid;
528         jfree(jf, jd);
529
530         /*
531          * If we are at the sequence number the next forward read
532          * will re-read the record since we were going backwards.  If
533          * the caller wants to go backwards we have to go forwards one
534          * record so the caller gets the transid record when it does
535          * its first backwards read.  Confused yet?
536          *
537          * If we are at a smaller sequence number we need to read forwards
538          * by one so the next forwards read gets the first record > transid,
539          * or the next backwards read gets the first record < transid.
540          */
541         if (transid_end == transid) {
542             if (direction == JD_BACKWARDS) {
543                 if (jread(jf, &jd, JD_FORWARDS) == 0)
544                     jfree(jf, jd);
545             }
546             break;
547         }
548         if (transid_end < transid) {
549             if (jread(jf, &jd, JD_FORWARDS) == 0)
550                 jfree(jf, jd);
551         }
552     }
553 }
554
555 /*
556  * Data returned by jread() is persistent until released.
557  */
558 struct jdata *
559 jref(struct jdata *jd)
560 {
561     ++jd->jd_refs;
562     return(jd);
563 }
564
565 void
566 jfree(struct jfile *jf, struct jdata *jd)
567 {
568     struct jdata **jdp;
569
570     if (--jd->jd_refs == 0){
571         for (jdp = &jf->jf_data; *jdp != jd; jdp = &(*jdp)->jd_next) {
572             assert(*jdp != NULL);
573         }
574         *jdp = jd->jd_next;
575         free(jd);
576     }
577 }
578
579 /*
580  * Align us to the next 16 byte boundary.  If scanning forwards we align
581  * forwards if not already aligned.  If scanning backwards we align
582  * backwards if not already aligned.  We only have to synchronize the
583  * seek position with the file seek position for forward scans.
584  */
585 static void
586 jalign(struct jfile *jf)
587 {
588     char dummy[16];
589     int bytes;
590
591     if ((int)jf->jf_pos & 15) {
592         if (jf->jf_direction == JD_FORWARDS) {
593             bytes = 16 - ((int)jf->jf_pos & 15);
594             jf->jf_pos += jreadbuf(jf, dummy, bytes);
595         } else {
596             jf->jf_pos = jf->jf_pos & ~(off_t)15;
597         }
598     }
599 }
600
601 /*
602  * Read the next raw journal record forwards or backwards and return a
603  * pointer to it.  Note that the file pointer's actual seek position does
604  * not match jf_pos in the reverse direction case.
605  */
606 static int
607 jreadbuf(struct jfile *jf, void *buf, int bytes)
608 {
609     int ttl = 0;
610     int n;
611
612     if (jf->jf_fd < 0)
613         return(0);
614
615     if (jf->jf_direction == JD_FORWARDS) {
616         while (ttl != bytes) {
617             n = read(jf->jf_fd, (char *)buf + ttl, bytes - ttl);
618             if (n <= 0)
619                 break;
620             ttl += n;
621         }
622     } else {
623         if (jf->jf_pos >= bytes) {
624             jf->jf_pos -= bytes;
625             lseek(jf->jf_fd, jf->jf_pos, 0);
626             while (ttl != bytes) {
627                 n = read(jf->jf_fd, (char *)buf + ttl, bytes - ttl);
628                 if (n <= 0)
629                     break;
630                 ttl += n;
631             }
632         }
633     }
634     return(ttl);
635 }
636