Merge from vendor branch GROFF:
[dragonfly.git] / sbin / jscan / jstream.c
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.4 2005/07/06 06:06:44 dillon Exp $
35  */
36
37 #include "jscan.h"
38
39 static struct jhash     *JHashAry[JHASH_SIZE];
40
41 static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js);
42 static void jnormalize(struct jstream *js);
43
44 /*
45  * Locate the next (or previous) complete virtual stream transaction given a
46  * file descriptor and direction.  Keep track of partial stream records as
47  * a side effect.
48  *
49  * Note that a transaction might represent a huge I/O operation, resulting
50  * in an overall node structure that spans gigabytes, but individual
51  * subrecord leaf nodes are limited in size and we depend on this to simplify
52  * the handling of leaf records. 
53  *
54  * A transaction may cover several raw records.  The jstream collection for
55  * a transaction is only returned when the entire transaction has been
56  * successfully scanned.  Due to the interleaving of transactions the ordering
57  * of returned JS's may be different (not exactly reversed) when scanning a
58  * journal backwards verses forwards.  Since parallel operations are 
59  * theoretically non-conflicting, this should not present a problem.
60  */
61 struct jstream *
62 jscan_stream(struct jfile *jf)
63 {
64     struct journal_rawrecbeg head;
65     struct journal_rawrecend tail;
66     struct journal_ackrecord ack;
67     int recsize;
68     int search;
69     int error;
70     struct jstream *js;
71
72     /*
73      * Get the current offset and make sure it is 16-byte aligned.  If it
74      * isn't, align it and enter search mode.
75      */
76     if (jf->jf_pos & 15) {
77         jf_warn(jf, "realigning bad offset and entering search mode");
78         jalign(jf);
79         search = 1;
80     } else {
81         search = 0;
82     }
83
84     error = 0;
85     js = NULL;
86
87     if (jf->jf_direction == JF_FORWARDS) {
88         /*
89          * Scan the journal forwards.  Note that the file pointer might not
90          * be seekable.
91          */
92         while ((error = jread(jf, &head, sizeof(head))) == 0) {
93             if (head.begmagic != JREC_BEGMAGIC) {
94                 if (search == 0)
95                     jf_warn(jf, "bad beginmagic, searching for new record");
96                 search = 1;
97                 jalign(jf);
98                 continue;
99             }
100             recsize = (head.recsize + 15) & ~15;
101             if (recsize <= 0) {
102                 jf_warn(jf, "bad recordsize: %d\n", recsize);
103                 search = 1;
104                 jalign(jf);
105                 continue;
106             }
107             jset(jf);
108             js = malloc(offsetof(struct jstream, js_data[recsize]));
109             bzero(js, sizeof(struct jstream));
110             bcopy(&head, js->js_data, sizeof(head));
111             error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head));
112             if (error) {
113                 jf_warn(jf, "Incomplete stream record\n");
114                 jreturn(jf);
115                 free(js);
116                 js = NULL;
117                 break;
118             }
119
120             /*
121              * XXX if the stream is full duplex send the ack back now.  This
122              * really needs to be delayed until the transaction is committed,
123              * but there are stalling issues if the transaction being
124              * collected exceeds to the size of the FIFO.  So for now this
125              * is just for testing.
126              */
127             if (jf->jf_flags & JF_FULL_DUPLEX) {
128                 bzero(&ack, sizeof(ack));
129                 ack.rbeg.begmagic = JREC_BEGMAGIC;
130                 ack.rbeg.streamid = JREC_STREAMID_ACK;
131                 ack.rbeg.transid = head.transid;
132                 ack.rbeg.recsize = sizeof(ack);
133                 ack.rend.endmagic = JREC_ENDMAGIC;
134                 ack.rend.recsize = sizeof(ack);
135                 jwrite(jf, &ack, sizeof(ack));
136             }
137
138             /*
139              * note: recsize is aligned (the actual record size),
140              * head.recsize is unaligned (the actual payload size).
141              */
142             js->js_size = head.recsize;
143             bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
144             if (tail.endmagic != JREC_ENDMAGIC) {
145                 jf_warn(jf, "bad endmagic, searching for new record");
146                 search = 1;
147                 jreturn(jf);
148                 free(js);
149                 js = NULL;
150                 continue;
151             }
152             jflush(jf);
153             if ((js = jaddrecord(jf, js)) != NULL)
154                 break;
155         }
156     } else {
157         /*
158          * Scan the journal backwards.  Note that jread()'s reverse-seek and
159          * read.  The data read will be forward ordered, however.
160          */
161         while ((error = jread(jf, &tail, sizeof(tail))) == 0) {
162             if (tail.endmagic != JREC_ENDMAGIC) {
163                 if (search == 0)
164                     jf_warn(jf, "bad endmagic, searching for new record");
165                 search = 1;
166                 jalign(jf);
167                 continue;
168             }
169             recsize = (tail.recsize + 15) & ~15;
170             if (recsize <= 0) {
171                 jf_warn(jf, "bad recordsize: %d\n", recsize);
172                 search = 1;
173                 jalign(jf);
174                 continue;
175             }
176             jset(jf);
177             js = malloc(offsetof(struct jstream, js_data[recsize]));
178             bzero(js, sizeof(struct jstream));
179             bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail));
180             error = jread(jf, js->js_data, recsize - sizeof(tail));
181
182             if (error) {
183                 jf_warn(jf, "Incomplete stream record\n");
184                 jreturn(jf);
185                 free(js);
186                 js = NULL;
187                 break;
188             }
189             js->js_size = tail.recsize;
190             bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
191             bcopy(js->js_data, &head, sizeof(head));
192             if (head.begmagic != JREC_BEGMAGIC) {
193                 jf_warn(jf, "bad begmagic, searching for new record");
194                 search = 1;
195                 jreturn(jf);
196                 free(js);
197                 continue;
198             }
199             jflush(jf);
200             if ((js = jaddrecord(jf, js)) != NULL)
201                 break;
202         }
203     }
204     jf->jf_error = error;
205     return(js);
206 }
207
208 /*
209  * Integrate a jstream record.  Deal with the transaction begin and end flags
210  * to create a forward-referenced collection of jstream records.  If we are
211  * able to complete a transaction, the first js associated with that
212  * transaction is returned.
213  *
214  * XXX we need to store the data for very large multi-record transactions
215  * separately since it might not fit into memory.
216  */
217 static struct jstream *
218 jaddrecord(struct jfile *jf, struct jstream *js)
219 {
220     struct journal_rawrecbeg *head = (void *)js->js_data;
221     struct jhash *jh;
222     struct jhash **jhp;
223
224     /*
225      * Check for a completely self-contained transaction, just return the
226      * js if possible.
227      */
228     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
229         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
230     ) {
231         jnormalize(js);
232         return (js);
233     }
234
235     /*
236      * Check for an open transaction in the hash table, create a new one
237      * if necessary.
238      */
239     jhp = &JHashAry[head->streamid & JHASH_MASK];
240     while ((jh = *jhp) != NULL) {
241         if (((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0)
242             break;
243         jhp = &jh->jh_hash;
244     }
245     if (jh == NULL) {
246         jh = malloc(sizeof(*jh));
247         bzero(jh, sizeof(*jh));
248         *jhp = jh;
249         jh->jh_first = js;
250         jh->jh_last = js;
251         jh->jh_transid = head->streamid;
252         return (NULL);
253     }
254
255     /*
256      * Emplace the stream segment
257      */
258     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
259     if (jf->jf_direction == JF_FORWARDS) {
260         jh->jh_last->js_next = js;
261         jh->jh_last = js;
262     } else {
263         js->js_next = jh->jh_first;
264         jh->jh_first = js;
265     }
266
267     /*
268      * If the transaction is complete, remove the hash entry and return the
269      * js representing the beginning of the transaction.
270      */
271     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
272         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
273     ) {
274         *jhp = jh->jh_hash;
275         js = jh->jh_first;
276         free(jh);
277
278         jnormalize(js);
279     } else {
280         js = NULL;
281     }
282     return (js);
283 }
284
285 /*
286  * Renormalize the jscan list to remove all the meta record headers
287  * and trailers except for the very first one.
288  */
289 static
290 void
291 jnormalize(struct jstream *js)
292 {
293     struct jstream *jscan;
294     off_t off;
295
296     js->js_normalized_off = 0;
297     js->js_normalized_base = js->js_data;
298     js->js_normalized_size = ((struct journal_rawrecbeg *)js->js_data)->recsize - sizeof(struct journal_rawrecend);
299     js->js_normalized_total = js->js_normalized_size;
300     off = js->js_normalized_size;
301     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
302         jscan->js_normalized_off = off;
303         jscan->js_normalized_base = jscan->js_data + 
304                 sizeof(struct journal_rawrecbeg);
305         jscan->js_normalized_size = jscan->js_size -
306                sizeof(struct journal_rawrecbeg) -
307                sizeof(struct journal_rawrecend);
308         off += jscan->js_normalized_size;
309         js->js_normalized_total += jscan->js_normalized_size;
310     }
311 }
312
313 void
314 jscan_dispose(struct jstream *js)
315 {
316     struct jstream *jnext;
317
318     if (js->js_alloc_buf) {
319         free(js->js_alloc_buf);
320         js->js_alloc_buf = NULL;
321         js->js_alloc_size = 0;
322     }
323
324     while (js) {
325         jnext = js->js_next;
326         free(js);
327         js = jnext;
328     }
329 }
330
331 /*
332  * Read the specified block of data out of a linked set of jstream
333  * structures.  Returns 0 on success or an error code on error.
334  */
335 int
336 jsread(struct jstream *js, off_t off, void *buf, int bytes)
337 {
338     const void *ptr;
339     int n;
340
341     while (bytes) {
342         n = jsreadany(js, off, &ptr);
343         if (n == 0)
344             return (ENOENT);
345         if (n > bytes)
346             n = bytes;
347         bcopy(ptr, buf, n);
348         buf = (char *)buf + n;
349         off += n;
350         bytes -= n;
351     }
352     return(0);
353 }
354
355 /*
356  * Read the specified block of data out of a linked set of jstream
357  * structures.  Attempt to return a pointer into the data set but
358  * allocate and copy if that is not possible.  Returns 0 on success
359  * or an error code on error.
360  */
361 int
362 jsreadp(struct jstream *js, off_t off, const void **bufp,
363         int bytes)
364 {
365     int error = 0;
366     int n;
367
368     n = jsreadany(js, off, bufp);
369     if (n < bytes) {
370         if (js->js_alloc_size < bytes) {
371             if (js->js_alloc_buf)
372                 free(js->js_alloc_buf);
373             js->js_alloc_buf = malloc(bytes);
374             js->js_alloc_size = bytes;
375             assert(js->js_alloc_buf != NULL);
376         }
377         error = jsread(js, off, js->js_alloc_buf, bytes);
378         if (error) {
379             *bufp = NULL;
380         } else {
381             *bufp = js->js_alloc_buf;
382         }
383     }
384     return(error);
385 }
386
387 int
388 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
389                 int fd, off_t off, int bytes)
390 {
391     const void *bufp;
392     int res;
393     int n;
394     int r;
395
396     res = 0;
397     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
398         if (n > bytes)
399             n = bytes;
400         r = func(fd, bufp, n);
401         if (r != n) {
402             if (res == 0)
403                 res = -1;
404         }
405         res += n;
406         bytes -= n;
407         off += n;
408     }
409     return(res);
410 }
411
412 /*
413  * Return the largest contiguous buffer starting at the specified offset,
414  * or 0.
415  */
416 int
417 jsreadany(struct jstream *js, off_t off, const void **bufp)
418 {
419     struct jstream *scan;
420     int n;
421
422     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
423         scan = js;
424     while (scan && scan->js_normalized_off <= off) {
425         js->js_cache = scan;
426         if (scan->js_normalized_off + scan->js_normalized_size > off) {
427             n = (int)(off - scan->js_normalized_off);
428             *bufp = scan->js_normalized_base + n;
429             return(scan->js_normalized_size - n);
430         }
431         scan = scan->js_next;
432     }
433     return(0);
434 }
435