Merge from vendor branch OPENSSL:
[dragonfly.git] / sbin / jscan / jstream.c
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
35  */
36
37 #include "jscan.h"
38
39 static struct jhash     *JHashAry[JHASH_SIZE];
40
41 static void jnormalize(struct jstream *js);
42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
43
44 /*
45  * Integrate a raw record.  Deal with the transaction begin and end flags
46  * to create a forward-referenced collection of jstream records.  If we are
47  * able to complete a transaction, the first js associated with that
48  * transaction is returned.
49  *
50  * XXX we need to store the data for very large multi-record transactions
51  * separately since it might not fit into memory.
52  *
53  * Note that a transaction might represent a huge I/O operation, resulting
54  * in an overall node structure that spans gigabytes, but individual
55  * subrecord leaf nodes are limited in size and we depend on this to simplify
56  * the handling of leaf records.
57  *
58  * A transaction may cover several raw records.  The jstream collection for
59  * a transaction is only returned when the entire transaction has been
60  * successfully scanned.  Due to the interleaving of transactions the ordering
61  * of returned JS's may be different (not exactly reversed) when scanning a
62  * journal backwards verses forwards.  Since parallel operations are
63  * theoretically non-conflicting, this should not present a problem.
64  */
65 struct jstream *
66 jaddrecord(struct jsession *ss, struct jdata *jd)
67 {
68     struct journal_rawrecbeg *head;
69     struct jstream *js;
70     struct jhash *jh;
71     struct jhash **jhp;
72
73     js = malloc(sizeof(struct jstream));
74     bzero(js, sizeof(struct jstream));
75     js->js_jdata = jref(jd);
76     js->js_head = (void *)jd->jd_data;
77     js->js_session = ss;
78     head = js->js_head;
79
80     /*
81      * Check for a completely self-contained transaction, just return the
82      * js if possible.
83      */
84     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
86     ) {
87         jnormalize(js);
88         return (js);
89     }
90
91 retry:
92     /*
93      * Check for an open transaction in the hash table.
94      */
95     jhp = &JHashAry[head->streamid & JHASH_MASK];
96     while ((jh = *jhp) != NULL) {
97         if (jh->jh_session == ss &&
98            ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
99         ) {
100             break;
101         }
102         jhp = &jh->jh_hash;
103     }
104
105     /*
106      * We might have picked up a transaction in the middle, which we
107      * detect by not finding a hash record coupled with a raw record
108      * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109      * bit if we are scanning backwards).
110      *
111      * When this case occurs we have to backtrack to locate the
112      * BEGIN (END if scanning backwards) and collect those records
113      * in order to obtain a complete transaction.
114      *
115      * This case most often occurs when a batch operation runs in 
116      * prefix set whos starting raw-record transaction id is in
117      * the middle of one or more meta-transactions.  It's a bit of
118      * a tricky situation, but easily resolvable by scanning the
119      * prefix set backwards (forwards if originally scanning backwards)
120      * to locate the raw record representing the start (end) of the
121      * transaction.
122      */
123     if (jh == NULL) {
124         if (ss->ss_direction == JD_FORWARDS &&
125             (head->streamid & JREC_STREAMCTL_BEGIN) == 0
126         ) {
127             if (verbose_opt > 1)
128                 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
129             if (jaddrecord_backtrack(ss, jd) == 0) {
130                 if (verbose_opt)
131                     fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
132                 goto retry;
133             }
134             fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
135             jscan_dispose(js);
136             return(NULL);
137         } else if (ss->ss_direction == JD_BACKWARDS &&
138             (head->streamid & JREC_STREAMCTL_END) == 0
139         ) {
140             if (verbose_opt > 1)
141                 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
142             if (jaddrecord_backtrack(ss, jd) == 0) {
143                 if (verbose_opt)
144                     fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
145                 goto retry;
146             }
147             fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
148             jscan_dispose(js);
149             return(NULL);
150         }
151     }
152
153     /*
154      * If we've made it to here and we still don't have a hash record
155      * to track the transaction, create one.
156      */
157     if (jh == NULL) {
158         jh = malloc(sizeof(*jh));
159         bzero(jh, sizeof(*jh));
160         *jhp = jh;
161         jh->jh_first = js;
162         jh->jh_last = js;
163         jh->jh_transid = head->streamid;
164         jh->jh_session = ss;
165         return (NULL);
166     }
167
168     /*
169      * Emplace the stream segment
170      */
171     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
172     if (ss->ss_direction == JD_FORWARDS) {
173         jh->jh_last->js_next = js;
174         jh->jh_last = js;
175     } else {
176         js->js_next = jh->jh_first;
177         jh->jh_first = js;
178     }
179
180     /*
181      * If the transaction is complete, remove the hash entry and return the
182      * js representing the beginning of the transaction.  Otherwise leave
183      * the hash entry intact and return NULL.
184      */
185     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
186         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
187     ) {
188         *jhp = jh->jh_hash;
189         js = jh->jh_first;
190         free(jh);
191
192         jnormalize(js);
193     } else {
194         js = NULL;
195     }
196     return (js);
197 }
198
199 /*
200  * Renormalize the jscan list to remove all the meta record headers
201  * and trailers except for the very first one.
202  */
203 static
204 void
205 jnormalize(struct jstream *js)
206 {
207     struct jstream *jscan;
208     off_t off;
209
210     js->js_normalized_off = 0;
211     js->js_normalized_base = (void *)js->js_head;
212     js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
213     js->js_normalized_total = js->js_normalized_size;
214     off = js->js_normalized_size;
215     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
216         jscan->js_normalized_off = off;
217         jscan->js_normalized_base = (char *)jscan->js_head + 
218                 sizeof(struct journal_rawrecbeg);
219         jscan->js_normalized_size = jscan->js_head->recsize -
220                sizeof(struct journal_rawrecbeg) -
221                sizeof(struct journal_rawrecend);
222         off += jscan->js_normalized_size;
223         js->js_normalized_total += jscan->js_normalized_size;
224     }
225 }
226
227 /*
228  * For sanity's sake I will describe the normal backtracking that must occur,
229  * but this routine must also operate on reverse-scanned (undo) records
230  * by forward tracking.
231  *
232  * A record has been found that represents the middle or end of a transaction
233  * when we were expecting the beginning of a transaction.  We must backtrack
234  * to locate the start of the transaction, then process raw records relating
235  * to the transaction until we reach our current point (jd) again.  If
236  * we find a matching streamid representing the end of a transaction instead
237  * of the expected start-of-transaction that record belongs to a wholely
238  * different meta-transaction and the record we seek is known to not be
239  * available.
240  *
241  * jd is the current record, directon is the normal scan direction (we have
242  * to scan in the reverse direction). 
243  */
244 static
245 int
246 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
247 {
248     struct jfile *jf = ss->ss_jfin;
249     struct jdata *scan;
250     struct jstream *js;
251     u_int16_t streamid;
252     u_int16_t scanid;
253
254     assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
255     if (jmodes & JMODEF_INPUT_PIPE)
256         return(-1);
257
258     streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
259
260     if (ss->ss_direction == JD_FORWARDS) {
261         /*
262          * Backtrack in the reverse direction looking for the transaction 
263          * start bit.  If we find an end bit instead it belongs to an
264          * unrelated transaction using the same streamid and there is no
265          * point continuing.
266          */
267         scan = jref(jd);
268         while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
269             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
270             if ((scanid & JREC_STREAMID_MASK) != streamid)
271                 continue;
272             if (scanid & JREC_STREAMCTL_END) {
273                 jfree(jf, scan);
274                 return(-1);
275             }
276             if (scanid & JREC_STREAMCTL_BEGIN)
277                 break;
278         }
279
280         /*
281          * Now jaddrecord the related records.
282          */
283         while (scan != NULL && scan->jd_transid < jd->jd_transid) {
284             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
285             if ((scanid & JREC_STREAMID_MASK) == streamid) {
286                 js = jaddrecord(ss, scan);
287                 assert(js == NULL);
288             }
289             scan = jread(jf, scan, JD_FORWARDS);
290         }
291         if (scan == NULL)
292             return(-1);
293         jfree(jf, scan);
294     } else {
295         /*
296          * Backtrack in the forwards direction looking for the transaction
297          * end bit.  If we find a start bit instead if belongs to an
298          * unrelated transaction using the same streamid and there is no
299          * point continuing.
300          */
301         scan = jref(jd);
302         while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
303             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
304             if ((scanid & JREC_STREAMID_MASK) != streamid)
305                 continue;
306             if (scanid & JREC_STREAMCTL_BEGIN) {
307                 jfree(jf, scan);
308                 return(-1);
309             }
310             if (scanid & JREC_STREAMCTL_END)
311                 break;
312         }
313
314         /*
315          * Now jaddrecord the related records.
316          */
317         while (scan != NULL && scan->jd_transid > jd->jd_transid) {
318             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
319             if ((scanid & JREC_STREAMID_MASK) == streamid) {
320                 js = jaddrecord(ss, scan);
321                 assert(js == NULL);
322             }
323             scan = jread(jf, scan, JD_BACKWARDS);
324         }
325         if (scan == NULL)
326             return(-1);
327         jfree(jf, scan);
328     }
329     return(0);
330 }
331
332 void
333 jscan_dispose(struct jstream *js)
334 {
335     struct jstream *jnext;
336
337     if (js->js_alloc_buf) {
338         free(js->js_alloc_buf);
339         js->js_alloc_buf = NULL;
340         js->js_alloc_size = 0;
341     }
342
343     while (js) {
344         jnext = js->js_next;
345         jfree(js->js_session->ss_jfin, js->js_jdata);
346         js->js_jdata = NULL;
347         free(js);
348         js = jnext;
349     }
350 }
351
352 /*
353  * Read the specified block of data out of a linked set of jstream
354  * structures.  Returns 0 on success or an error code on error.
355  */
356 int
357 jsread(struct jstream *js, off_t off, void *buf, int bytes)
358 {
359     const void *ptr;
360     int n;
361
362     while (bytes) {
363         n = jsreadany(js, off, &ptr);
364         if (n == 0)
365             return (ENOENT);
366         if (n > bytes)
367             n = bytes;
368         bcopy(ptr, buf, n);
369         buf = (char *)buf + n;
370         off += n;
371         bytes -= n;
372     }
373     return(0);
374 }
375
376 /*
377  * Read the specified block of data out of a linked set of jstream
378  * structures.  Attempt to return a pointer into the data set but
379  * allocate and copy if that is not possible.  Returns 0 on success
380  * or an error code on error.
381  */
382 int
383 jsreadp(struct jstream *js, off_t off, const void **bufp,
384         int bytes)
385 {
386     int error = 0;
387     int n;
388
389     n = jsreadany(js, off, bufp);
390     if (n < bytes) {
391         if (js->js_alloc_size < bytes) {
392             if (js->js_alloc_buf)
393                 free(js->js_alloc_buf);
394             js->js_alloc_buf = malloc(bytes);
395             js->js_alloc_size = bytes;
396             if (js->js_alloc_buf == NULL)
397                 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
398             assert(js->js_alloc_buf != NULL);
399         }
400         error = jsread(js, off, js->js_alloc_buf, bytes);
401         if (error) {
402             *bufp = NULL;
403         } else {
404             *bufp = js->js_alloc_buf;
405         }
406     }
407     return(error);
408 }
409
410 int
411 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
412                 int fd, off_t off, int bytes)
413 {
414     const void *bufp;
415     int res;
416     int n;
417     int r;
418
419     res = 0;
420     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
421         if (n > bytes)
422             n = bytes;
423         r = func(fd, bufp, n);
424         if (r != n) {
425             if (res == 0)
426                 res = -1;
427         }
428         res += n;
429         bytes -= n;
430         off += n;
431     }
432     return(res);
433 }
434
435 /*
436  * Return the largest contiguous buffer starting at the specified offset,
437  * or 0.
438  */
439 int
440 jsreadany(struct jstream *js, off_t off, const void **bufp)
441 {
442     struct jstream *scan;
443     int n;
444
445     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
446         scan = js;
447     while (scan && scan->js_normalized_off <= off) {
448         js->js_cache = scan;
449         if (scan->js_normalized_off + scan->js_normalized_size > off) {
450             n = (int)(off - scan->js_normalized_off);
451             *bufp = scan->js_normalized_base + n;
452             return(scan->js_normalized_size - n);
453         }
454         scan = scan->js_next;
455     }
456     return(0);
457 }
458