Rework and expand the algorithms in JSCAN, part 3/?.
[dragonfly.git] / sbin / jscan / jscan.c
1 /*
2  * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jscan.c,v 1.7 2005/09/06 22:33:00 dillon Exp $
35  */
36
37 #include "jscan.h"
38
39 static void usage(const char *av0);
40
41 int jmodes;
42 int fsync_opt;
43 int verbose_opt;
44 off_t prefix_file_size = 100 * 1024 * 1024;
45 off_t trans_count;
46 static enum jdirection jdirection = JD_FORWARDS;
47
48 static void jscan_do_output(struct jfile *, const char *, 
49                             const char *, int64_t);
50 static void jscan_do_mirror(struct jfile *, const char *,
51                             const char *, int64_t);
52 static void jscan_do_record(struct jfile *, const char *,
53                             const char *, int64_t);
54 static void jscan_do_debug(struct jfile *, const char *,
55                             const char *, int64_t);
56 static void fork_subprocess(struct jfile *,
57                             void (*)(struct jfile *, const char *,
58                                      const char *, int64_t),
59                             const char *,
60                             const char *, const char *, int64_t);
61
62 int
63 main(int ac, char **av)
64 {
65     const char *input_prefix = NULL;
66     char *output_transid_file = NULL;
67     char *mirror_transid_file = NULL;
68     const char *mirror_directory = ".";
69     char *record_prefix = NULL;
70     char *record_transid_file = NULL;
71     struct jsession jsdebug;
72     struct jsession jsoutput;
73     struct jsession jsmirror;
74     char *ptr;
75     int64_t mirror_transid;
76     int64_t output_transid;
77     int64_t record_transid;
78     int64_t transid;
79     int input_fd;
80     struct stat st;
81     struct jfile *jf;
82     struct jdata *jd;
83     int error;
84     int ch;
85
86     while ((ch = getopt(ac, av, "2dfm:o:s:uvw:D:O:W:F")) != -1) {
87         switch(ch) {
88         case '2':
89             jmodes |= JMODEF_INPUT_FULL;
90             break;
91         case 'c':
92             trans_count = strtoll(optarg, &ptr, 0);
93             switch(*ptr) {
94             case 't':
95                 trans_count *= 1024;
96                 /* fall through */
97             case 'g':
98                 trans_count *= 1024;
99                 /* fall through */
100             case 'm':
101                 trans_count *= 1024;
102                 /* fall through */
103             case 'k':
104                 trans_count *= 1024;
105                 break;
106             case 0:
107                 break;
108             default:
109                 fprintf(stderr, "Bad suffix for value specified with -c, use 'k', 'm', 'g', 't', or nothing\n");
110                 usage(av[0]);
111             }
112             break;
113         case 'd':
114             jmodes |= JMODEF_DEBUG;
115             break;
116         case 'f':
117             jmodes |= JMODEF_LOOP_FOREVER;
118             break;
119         case 'v':
120             ++verbose_opt;
121             break;
122         case 'm':
123             jmodes |= JMODEF_MIRROR;
124             if (strcmp(optarg, "none") != 0)
125                 mirror_transid_file = optarg;
126             break;
127         case 'O':
128             jmodes |= JMODEF_OUTPUT_FULL;
129             /* fall through */
130         case 'o':
131             jmodes |= JMODEF_OUTPUT;
132             if (strcmp(optarg, "none") != 0)
133                 output_transid_file = optarg;
134             break;
135         case 's':
136             prefix_file_size = strtoll(optarg, &ptr, 0);
137             switch(*ptr) {
138             case 't':
139                 prefix_file_size *= 1024;
140                 /* fall through */
141             case 'g':
142                 prefix_file_size *= 1024;
143                 /* fall through */
144             case 'm':
145                 prefix_file_size *= 1024;
146                 /* fall through */
147             case 'k':
148                 prefix_file_size *= 1024;
149                 break;
150             case 0:
151                 break;
152             default:
153                 fprintf(stderr, "Bad suffix for value specified with -s, use 'k', 'm', 'g', 't', or nothing\n");
154                 usage(av[0]);
155             }
156             break;
157         case 'u':
158             jdirection = JD_BACKWARDS;
159             break;
160         case 'W':
161             jmodes |= JMODEF_RECORD_TMP;
162             /* fall through */
163         case 'w':
164             jmodes |= JMODEF_RECORD;
165             record_prefix = optarg;
166             asprintf(&record_transid_file, "%s.transid", record_prefix);
167             break;
168         case 'D':
169             mirror_directory = optarg;
170             break;
171         case 'F':
172             ++fsync_opt;
173             break;
174         default:
175             fprintf(stderr, "unknown option: -%c\n", optopt);
176             usage(av[0]);
177         }
178     }
179
180     /*
181      * Sanity checks
182      */
183     if ((jmodes & JMODEF_COMMAND_MASK) == 0)
184         usage(av[0]);
185     if (optind > ac + 1)  {
186         fprintf(stderr, "Only one input file or prefix may be specified,\n"
187                         "or zero if stdin is to be the input.\n");
188         usage(av[0]);
189     }
190     if (jdirection == JD_BACKWARDS && (jmodes & (JMODEF_RECORD|JMODEF_OUTPUT))) {
191         fprintf(stderr, "Undo mode is only good in mirroring mode and "
192                         "cannot be mixed with other modes.\n");
193         exit(1);
194     }
195
196     /*
197      * STEP1 - OPEN INPUT
198      *
199      * The input will either be a pipe, a regular file, or a journaling 
200      * file prefix.
201      */
202     jf = NULL;
203     if (optind == ac) {
204         input_prefix = "<stdin>";
205         input_fd = 0;
206         if (fstat(0, &st) < 0 || !S_ISREG(st.st_mode)) {
207             jmodes |= JMODEF_INPUT_PIPE;
208             if (jdirection == JD_BACKWARDS) {
209                 fprintf(stderr, "Cannot scan journals on pipes backwards\n");
210                 usage(av[0]);
211             }
212         }
213         jf = jopen_fd(input_fd, jdirection);
214     } else if (stat(av[optind], &st) == 0 && S_ISREG(st.st_mode)) {
215         input_prefix = av[optind];
216         if ((input_fd = open(av[optind], O_RDONLY)) != NULL) {
217             jf = jopen_fd(input_fd, jdirection);
218         } else {
219             jf = NULL;
220         }
221     } else {
222         input_prefix = av[optind];
223         jf = jopen_prefix(input_prefix, jdirection, 0);
224         jmodes |= JMODEF_INPUT_PREFIX;
225     }
226     if (jf == NULL) {
227         fprintf(stderr, "Unable to open input %s: %s\n", 
228                 input_prefix, strerror(errno));
229         exit(1);
230     }
231
232     /*
233      * STEP 1 - SYNCHRONIZING THE INPUT STREAM
234      *
235      * Figure out the starting point for our various output modes.  Figure
236      * out the earliest transaction id and try to seek to that point,
237      * otherwise we might have to scan through terrabytes of data.
238      *
239      * Invalid transid's will be set to 0, but it should also be noted
240      * that 0 is also a valid transid.
241      */
242     get_transid_from_file(output_transid_file, &output_transid,
243                           JMODEF_OUTPUT_TRANSID_GOOD);
244     get_transid_from_file(mirror_transid_file, &mirror_transid, 
245                           JMODEF_MIRROR_TRANSID_GOOD);
246     get_transid_from_file(record_transid_file, &record_transid, 
247                           JMODEF_RECORD_TRANSID_GOOD);
248     transid = LLONG_MAX;
249     if ((jmodes & JMODEF_OUTPUT_TRANSID_GOOD) && output_transid < transid)
250         transid = output_transid;
251     if ((jmodes & JMODEF_MIRROR_TRANSID_GOOD) && mirror_transid < transid)
252         transid = mirror_transid;
253     if ((jmodes & JMODEF_RECORD_TRANSID_GOOD) && record_transid < transid)
254         transid = record_transid;
255     if ((jmodes & JMODEF_TRANSID_GOOD_MASK) == 0)
256         transid = 0;
257     if (verbose_opt) {
258         if (jmodes & JMODEF_OUTPUT) {
259             fprintf(stderr, "Starting transid for OUTPUT: %016llx\n",
260                     output_transid);
261         }
262         if (jmodes & JMODEF_MIRROR) {
263             fprintf(stderr, "Starting transid for MIRROR: %016llx\n",
264                     mirror_transid);
265         }
266         if (jmodes & JMODEF_RECORD) {
267             fprintf(stderr, "Starting transid for RECORD: %016llx\n",
268                     record_transid);
269         }
270     }
271
272     /*
273      * Now it gets more difficult.  If we are recording then the input
274      * could be representative of continuing data and not have any
275      * prior, older data that the output or mirror modes might need.  Those
276      * modes must work off the recording data even as we write to it.
277      * In that case we fork and have the sub-processes work off the
278      * record output.
279      *
280      * Then we take the input and start recording.
281      */
282     if (jmodes & JMODEF_RECORD) {
283         if (jrecord_init(record_prefix) < 0) {
284             fprintf(stderr, "Unable to initialize file set for: %s\n", 
285                     record_prefix);
286             exit(1);
287         }
288         if (jmodes & JMODEF_MIRROR) {
289             fork_subprocess(jf, jscan_do_mirror, record_prefix, 
290                             mirror_transid_file,
291                             mirror_directory, mirror_transid);
292             /* XXX ack stream for temporary record file removal */
293         }
294         if (jmodes & JMODEF_OUTPUT) {
295             fork_subprocess(jf, jscan_do_output, record_prefix,
296                             record_transid_file,
297                             NULL, output_transid);
298             /* XXX ack stream for temporary record file removal */
299         }
300         jscan_do_record(jf, record_transid_file, record_prefix, record_transid);
301         exit(0);
302     }
303
304     /*
305      * If the input is a prefix set we can just pass it to the appropriate
306      * jscan_do_*() function.  If we are doing both output and mirroring
307      * we fork the mirror and do the output in the foreground since that
308      * is going to stdout.
309      */
310     if (jmodes & JMODEF_INPUT_PREFIX) {
311         if ((jmodes & JMODEF_OUTPUT) && (jmodes & JMODEF_MIRROR)) {
312             fork_subprocess(jf, jscan_do_mirror, input_prefix, 
313                             mirror_transid_file,
314                             mirror_directory, mirror_transid);
315             jscan_do_output(jf, output_transid_file, NULL, output_transid);
316         } else if (jmodes & JMODEF_OUTPUT) {
317             jscan_do_output(jf, output_transid_file, NULL, output_transid);
318         } else if (jmodes & JMODEF_MIRROR) {
319             jscan_do_mirror(jf, mirror_transid_file, mirror_directory,
320                             mirror_transid);
321         } else if (jmodes & JMODEF_DEBUG) {
322             jscan_do_debug(jf, NULL, NULL, 0);
323         }
324         exit(0);
325     }
326
327     /*
328      * The input is not a prefix set and we are not recording, which means
329      * we have to transfer the data on the input pipe to the output and
330      * mirroring code on the fly.  This also means that we must keep track
331      * of meta-data records in-memory.  However, if the input is a regular
332      * file we *CAN* try to optimize where we start reading.
333      *
334      * NOTE: If the mirroring code encounters a transaction record that is
335      * not marked begin, and it does not have the begin record, it will
336      * attempt to locate the begin record if the input is not a pipe, then
337      * seek back.
338      */
339     if ((jmodes & JMODEF_TRANSID_GOOD_MASK) && !(jmodes & JMODEF_INPUT_PIPE))
340         jseek(jf, transid, jdirection);
341     jmodes |= JMODEF_MEMORY_TRACKING;
342
343     jsession_init(&jsdebug, jf, NULL, 0);
344     jsession_init(&jsoutput, jf, output_transid_file, output_transid);
345     jsession_init(&jsmirror, jf, mirror_transid_file, mirror_transid);
346     jsmirror.ss_mirror_directory = mirror_directory;
347
348     while ((error = jread(jf, &jd, jdirection)) == 0) {
349         if (jmodes & JMODEF_DEBUG)
350             dump_debug(&jsdebug, jd);
351         if (jmodes & JMODEF_OUTPUT)
352             dump_output(&jsoutput, jd);
353         if (jmodes & JMODEF_MIRROR)
354             dump_mirror(&jsmirror, jd);
355         jfree(jf, jd);
356     }
357     jclose(jf);
358     jsession_term(&jsdebug);
359     jsession_term(&jsoutput);
360     jsession_term(&jsmirror);
361     exit(error ? 1 : 0);
362 }
363
364 /*
365  * When we have multiple commands and are writing to a prefix set, we can
366  * 'background' the output and/or mirroring command and have the background
367  * processes feed off the prefix set the foreground process is writing to.
368  */
369 static
370 void
371 fork_subprocess(struct jfile *jftoclose,
372         void (*func)(struct jfile *, const char *, const char *, int64_t),
373         const char *input_prefix, const char *transid_file, const char *info,
374         int64_t transid)
375 {
376     pid_t pid;
377     struct jfile *jf;
378
379     if ((pid = fork()) == 0) {
380         jmodes &= ~(JMODEF_DEBUG | JMODEF_INPUT_PIPE);
381         jmodes |= JMODEF_LOOP_FOREVER;  /* keep checking for new input */
382         jclose(jftoclose);
383         jf = jopen_prefix(input_prefix, jdirection, 0);
384         jmodes |= JMODEF_INPUT_PREFIX;
385         func(jf, transid_file, info, transid);
386         jclose(jf);
387         exit(0);
388     } else if (pid < 0) {
389         fprintf(stderr, "fork(): %s\n", strerror(errno));
390         exit(1);
391     }
392 }
393
394 static
395 void
396 jscan_do_output(struct jfile *jf, const char *output_transid_file, const char *dummy __unused, int64_t transid)
397 {
398     struct jdata *jd;
399     struct jsession jsdebug;
400     struct jsession jsoutput;
401
402     jsession_init(&jsdebug, jf, NULL, 0);
403     jsession_init(&jsoutput, jf, output_transid_file, transid);
404
405     if ((jmodes & JMODEF_OUTPUT_TRANSID_GOOD) && !(jmodes & JMODEF_INPUT_PIPE))
406         jseek(jf, transid, jdirection);
407     while (jread(jf, &jd, jdirection) == 0) {
408         if (jmodes & JMODEF_DEBUG)
409             dump_debug(&jsdebug, jd);
410         dump_output(&jsoutput, jd);
411         jfree(jf, jd);
412     }
413 }
414
415 static
416 void
417 jscan_do_mirror(struct jfile *jf, const char *mirror_transid_file, const char *mirror_directory, int64_t transid)
418 {
419     struct jsession jsdebug;
420     struct jsession jsmirror;
421     struct jdata *jd;
422
423     jsession_init(&jsdebug, jf, NULL, 0);
424     jsession_init(&jsmirror, jf, mirror_transid_file, transid);
425     jsmirror.ss_mirror_directory = mirror_directory;
426
427     if ((jmodes & JMODEF_MIRROR_TRANSID_GOOD) && !(jmodes & JMODEF_INPUT_PIPE))
428         jseek(jf, transid, jdirection);
429     while (jread(jf, &jd, jdirection) == 0) {
430         if (jmodes & JMODEF_DEBUG)
431             dump_debug(&jsdebug, jd);
432         dump_mirror(&jsmirror, jd);
433         jfree(jf, jd);
434     }
435 }
436
437 static
438 void
439 jscan_do_record(struct jfile *jfin, const char *record_transid_file, const char *prefix, int64_t transid)
440 {
441     struct jsession jsdebug;
442     struct jsession jsrecord;
443     struct jdata *jd;
444
445     jsession_init(&jsdebug, jfin, NULL, 0);
446     jsession_init(&jsrecord, jfin, record_transid_file, transid);
447
448     assert(jdirection == JD_FORWARDS);
449     jsrecord.ss_jfout = jopen_prefix(prefix, JD_FORWARDS, 1);
450     if (jsrecord.ss_jfout == NULL) {
451         fprintf(stderr, "Unable to open prefix set for writing: %s\n", prefix);
452         exit(1);
453     }
454     if ((jmodes & JMODEF_RECORD_TRANSID_GOOD) && !(jmodes & JMODEF_INPUT_PIPE))
455         jseek(jfin, transid, jdirection);
456     while (jread(jfin, &jd, jdirection) == 0) {
457         if (jmodes & JMODEF_DEBUG)
458             dump_debug(&jsdebug, jd);
459         dump_record(&jsrecord, jd);
460         jfree(jfin, jd);
461     }
462     jclose(jsrecord.ss_jfout);
463 }
464
465 static
466 void
467 jscan_do_debug(struct jfile *jfin, const char *dummy1 __unused,
468                const char *dummy __unused, int64_t transid __unused)
469 {
470     struct jsession jsdebug;
471     struct jdata *jd;
472
473     jsession_init(&jsdebug, jfin, NULL, 0);
474     while (jread(jfin, &jd, jdirection) == 0) {
475         dump_debug(&jsdebug, jd);
476     }
477 }
478
479 static void
480 usage(const char *av0)
481 {
482     fprintf(stderr, 
483         "%s [-2duF] [-D dir] [-m mirror_transid_file/none]\n"
484         "\t[-o/O output_trnasid_file/none]\n"
485         "\t[-s size[kmgt]] -w/W record_prefix] [input_file/input_prefix]\n",
486         av0);
487     exit(1);
488 }
489