Major continuing work on jscan, the userland backend for the journaling
[dragonfly.git] / sbin / jscan / jstream.c
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.2 2005/07/05 00:26:03 dillon Exp $
35  */
36
37 #include "jscan.h"
38
39 static struct jhash     *JHashAry[JHASH_SIZE];
40
41 static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js);
42 static void jnormalize(struct jstream *js);
43
44 /*
45  * Locate the next (or previous) complete virtual stream transaction given a
46  * file descriptor and direction.  Keep track of partial stream records as
47  * a side effect.
48  *
49  * Note that a transaction might represent a huge I/O operation, resulting
50  * in an overall node structure that spans gigabytes, but individual
51  * subrecord leaf nodes are limited in size and we depend on this to simplify
52  * the handling of leaf records. 
53  *
54  * A transaction may cover several raw records.  The jstream collection for
55  * a transaction is only returned when the entire transaction has been
56  * successfully scanned.  Due to the interleaving of transactions the ordering
57  * of returned JS's may be different (not exactly reversed) when scanning a
58  * journal backwards verses forwards.  Since parallel operations are 
59  * theoretically non-conflicting, this should not present a problem.
60  */
61 struct jstream *
62 jscan_stream(struct jfile *jf)
63 {
64     struct journal_rawrecbeg head;
65     struct journal_rawrecend tail;
66     int recsize;
67     int search;
68     int error;
69     struct jstream *js;
70
71     /*
72      * Get the current offset and make sure it is 16-byte aligned.  If it
73      * isn't, align it and enter search mode.
74      */
75     if (jf->jf_pos & 15) {
76         jf_warn(jf, "realigning bad offset and entering search mode");
77         jalign(jf);
78         search = 1;
79     } else {
80         search = 0;
81     }
82
83     error = 0;
84     js = NULL;
85
86     if (jf->jf_direction == JF_FORWARDS) {
87         /*
88          * Scan the journal forwards.  Note that the file pointer might not
89          * be seekable.
90          */
91         while ((error = jread(jf, &head, sizeof(head))) == 0) {
92             if (head.begmagic != JREC_BEGMAGIC) {
93                 if (search == 0)
94                     jf_warn(jf, "bad beginmagic, searching for new record");
95                 search = 1;
96                 jalign(jf);
97                 continue;
98             }
99             recsize = (head.recsize + 15) & ~15;
100             if (recsize <= 0) {
101                 jf_warn(jf, "bad recordsize: %d\n", recsize);
102                 search = 1;
103                 jalign(jf);
104                 continue;
105             }
106             jset(jf);
107             js = malloc(offsetof(struct jstream, js_data[recsize]));
108             bzero(js, sizeof(struct jstream));
109             bcopy(&head, js->js_data, sizeof(head));
110             error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head));
111             if (error) {
112                 jf_warn(jf, "Incomplete stream record\n");
113                 jreturn(jf);
114                 free(js);
115                 js = NULL;
116                 break;
117             }
118             js->js_size = recsize;
119             bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
120             if (tail.endmagic != JREC_ENDMAGIC) {
121                 jf_warn(jf, "bad endmagic, searching for new record");
122                 search = 1;
123                 jreturn(jf);
124                 free(js);
125                 js = NULL;
126                 continue;
127             }
128             jflush(jf);
129             if ((js = jaddrecord(jf, js)) != NULL)
130                 break;
131         }
132     } else {
133         /*
134          * Scan the journal backwards.  Note that jread()'s reverse-seek and
135          * read.  The data read will be forward ordered, however.
136          */
137         while ((error = jread(jf, &tail, sizeof(tail))) == 0) {
138             if (tail.endmagic != JREC_ENDMAGIC) {
139                 if (search == 0)
140                     jf_warn(jf, "bad endmagic, searching for new record");
141                 search = 1;
142                 jalign(jf);
143                 continue;
144             }
145             recsize = (tail.recsize + 15) & ~15;
146             if (recsize <= 0) {
147                 jf_warn(jf, "bad recordsize: %d\n", recsize);
148                 search = 1;
149                 jalign(jf);
150                 continue;
151             }
152             jset(jf);
153             js = malloc(offsetof(struct jstream, js_data[recsize]));
154             bzero(js, sizeof(struct jstream));
155             bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail));
156             error = jread(jf, js->js_data, recsize - sizeof(tail));
157
158             if (error) {
159                 jf_warn(jf, "Incomplete stream record\n");
160                 jreturn(jf);
161                 free(js);
162                 js = NULL;
163                 break;
164             }
165             js->js_size = recsize;
166             bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
167             bcopy(js->js_data, &head, sizeof(head));
168             if (head.begmagic != JREC_BEGMAGIC) {
169                 jf_warn(jf, "bad begmagic, searching for new record");
170                 search = 1;
171                 jreturn(jf);
172                 free(js);
173                 continue;
174             }
175             jflush(jf);
176             if ((js = jaddrecord(jf, js)) != NULL)
177                 break;
178         }
179     }
180     jf->jf_error = error;
181     return(js);
182 }
183
184 /*
185  * Integrate a jstream record.  Deal with the transaction begin and end flags
186  * to create a forward-referenced collection of jstream records.  If we are
187  * able to complete a transaction, the first js associated with that
188  * transaction is returned.
189  *
190  * XXX we need to store the data for very large multi-record transactions
191  * separately since it might not fit into memory.
192  */
193 static struct jstream *
194 jaddrecord(struct jfile *jf, struct jstream *js)
195 {
196     struct journal_rawrecbeg *head = (void *)js->js_data;
197     struct jhash *jh;
198     struct jhash **jhp;
199
200     /*
201      * Check for a completely self-contained transaction, just return the
202      * js if possible.
203      */
204     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
205         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
206     ) {
207         jnormalize(js);
208         return (js);
209     }
210
211     /*
212      * Check for an open transaction in the hash table, create a new one
213      * if necessary.
214      */
215     jhp = &JHashAry[head->streamid & JHASH_MASK];
216     while ((jh = *jhp) != NULL) {
217         if (((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0)
218             break;
219         jhp = &jh->jh_hash;
220     }
221     if (jh == NULL) {
222         jh = malloc(sizeof(*jh));
223         bzero(jh, sizeof(*jh));
224         *jhp = jh;
225         jh->jh_first = js;
226         jh->jh_last = js;
227         jh->jh_transid = head->streamid;
228         return (NULL);
229     }
230
231     /*
232      * Emplace the stream segment
233      */
234     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
235     if (jf->jf_direction == JF_FORWARDS) {
236         jh->jh_last->js_next = js;
237         jh->jh_last = js;
238     } else {
239         js->js_next = jh->jh_first;
240         jh->jh_first = js;
241     }
242
243     /*
244      * If the transaction is complete, remove the hash entry and return the
245      * js representing the beginning of the transaction.
246      */
247     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
248         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
249     ) {
250         *jhp = jh->jh_hash;
251         js = jh->jh_first;
252         free(jh);
253
254         jnormalize(js);
255     } else {
256         js = NULL;
257     }
258     return (js);
259 }
260
261 /*
262  * Renormalize the jscan list to remove all the meta record headers
263  * and trailers except for the very first one.
264  */
265 static
266 void
267 jnormalize(struct jstream *js)
268 {
269     struct jstream *jscan;
270     off_t off;
271
272     js->js_normalized_off = 0;
273     js->js_normalized_base = js->js_data;
274     js->js_normalized_size = ((struct journal_rawrecbeg *)js->js_data)->recsize - sizeof(struct journal_rawrecend);
275     js->js_normalized_total = js->js_normalized_size;
276     off = js->js_normalized_size;
277     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
278         jscan->js_normalized_off = off;
279         jscan->js_normalized_base = jscan->js_data + 
280                 sizeof(struct journal_rawrecbeg);
281         jscan->js_normalized_size = jscan->js_size -
282                sizeof(struct journal_rawrecbeg) -
283                sizeof(struct journal_rawrecend);
284         off += jscan->js_normalized_size;
285         js->js_normalized_total += jscan->js_normalized_size;
286     }
287 }
288
289 void
290 jscan_dispose(struct jstream *js)
291 {
292     struct jstream *jnext;
293
294     if (js->js_alloc_buf) {
295         free(js->js_alloc_buf);
296         js->js_alloc_buf = NULL;
297         js->js_alloc_size = 0;
298     }
299
300     while (js) {
301         jnext = js->js_next;
302         free(js);
303         js = jnext;
304     }
305 }
306
307 /*
308  * Read the specified block of data out of a linked set of jstream
309  * structures.  Returns 0 on success or an error code on error.
310  */
311 int
312 jsread(struct jstream *js, off_t off, void *buf, int bytes)
313 {
314     const void *ptr;
315     int n;
316
317     while (bytes) {
318         n = jsreadany(js, off, &ptr);
319         if (n == 0)
320             return (ENOENT);
321         if (n > bytes)
322             n = bytes;
323         bcopy(ptr, buf, n);
324         buf = (char *)buf + n;
325         off += n;
326         bytes -= n;
327     }
328     return(0);
329 }
330
331 /*
332  * Read the specified block of data out of a linked set of jstream
333  * structures.  Attempt to return a pointer into the data set but
334  * allocate and copy if that is not possible.  Returns 0 on success
335  * or an error code on error.
336  */
337 int
338 jsreadp(struct jstream *js, off_t off, const void **bufp,
339         int bytes)
340 {
341     int error = 0;
342     int n;
343
344     n = jsreadany(js, off, bufp);
345     if (n < bytes) {
346         if (js->js_alloc_size < bytes) {
347             if (js->js_alloc_buf)
348                 free(js->js_alloc_buf);
349             js->js_alloc_buf = malloc(bytes);
350             js->js_alloc_size = bytes;
351             assert(js->js_alloc_buf != NULL);
352         }
353         error = jsread(js, off, js->js_alloc_buf, bytes);
354         if (error) {
355             *bufp = NULL;
356         } else {
357             *bufp = js->js_alloc_buf;
358         }
359     }
360     return(error);
361 }
362
363 /*
364  * Return the largest contiguous buffer starting at the specified offset,
365  * or 0.
366  */
367 int
368 jsreadany(struct jstream *js, off_t off, const void **bufp)
369 {
370     struct jstream *scan;
371     int n;
372
373     if ((scan = js->js_cache) == NULL || scan->js_cache_off > off)
374         scan = js;
375     while (scan && scan->js_normalized_off <= off) {
376         js->js_cache = scan;
377         js->js_cache_off = scan->js_normalized_off;
378         if (scan->js_normalized_off + scan->js_normalized_size > off) {
379             n = (int)(off - scan->js_normalized_off);
380             *bufp = scan->js_normalized_base + n;
381             return(scan->js_normalized_size - n);
382         }
383         scan = scan->js_next;
384     }
385     return(0);
386 }
387