carp: add carp_group_demote_adj()
[dragonfly.git] / sbin / jscan / jstream.c
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
35  */
36
37 #include "jscan.h"
38
39 static struct jhash     *JHashAry[JHASH_SIZE];
40
41 static void jnormalize(struct jstream *js);
42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
43
44 /*
45  * Integrate a raw record.  Deal with the transaction begin and end flags
46  * to create a forward-referenced collection of jstream records.  If we are
47  * able to complete a transaction, the first js associated with that
48  * transaction is returned.
49  *
50  * XXX we need to store the data for very large multi-record transactions
51  * separately since it might not fit into memory.
52  *
53  * Note that a transaction might represent a huge I/O operation, resulting
54  * in an overall node structure that spans gigabytes, but individual
55  * subrecord leaf nodes are limited in size and we depend on this to simplify
56  * the handling of leaf records.
57  *
58  * A transaction may cover several raw records.  The jstream collection for
59  * a transaction is only returned when the entire transaction has been
60  * successfully scanned.  Due to the interleaving of transactions the ordering
61  * of returned JS's may be different (not exactly reversed) when scanning a
62  * journal backwards verses forwards.  Since parallel operations are
63  * theoretically non-conflicting, this should not present a problem.
64  */
65 struct jstream *
66 jaddrecord(struct jsession *ss, struct jdata *jd)
67 {
68     struct journal_rawrecbeg *head;
69     struct jstream *js;
70     struct jhash *jh;
71     struct jhash **jhp;
72
73     js = malloc(sizeof(struct jstream));
74     bzero(js, sizeof(struct jstream));
75     js->js_jdata = jref(jd);
76     js->js_head = (void *)jd->jd_data;
77     js->js_session = ss;
78     head = js->js_head;
79
80     /*
81      * Check for a completely self-contained transaction, just return the
82      * js if possible.
83      */
84     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
86     ) {
87         jnormalize(js);
88         return (js);
89     }
90
91 retry:
92     /*
93      * Check for an open transaction in the hash table.
94      */
95     jhp = &JHashAry[head->streamid & JHASH_MASK];
96     while ((jh = *jhp) != NULL) {
97         if (jh->jh_session == ss &&
98            ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
99         ) {
100             break;
101         }
102         jhp = &jh->jh_hash;
103     }
104
105     /*
106      * We might have picked up a transaction in the middle, which we
107      * detect by not finding a hash record coupled with a raw record
108      * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109      * bit if we are scanning backwards).
110      *
111      * When this case occurs we have to backtrack to locate the
112      * BEGIN (END if scanning backwards) and collect those records
113      * in order to obtain a complete transaction.
114      *
115      * This case most often occurs when a batch operation runs in 
116      * prefix set whos starting raw-record transaction id is in
117      * the middle of one or more meta-transactions.  It's a bit of
118      * a tricky situation, but easily resolvable by scanning the
119      * prefix set backwards (forwards if originally scanning backwards)
120      * to locate the raw record representing the start (end) of the
121      * transaction.
122      */
123     if (jh == NULL) {
124         if (ss->ss_direction == JD_FORWARDS &&
125             (head->streamid & JREC_STREAMCTL_BEGIN) == 0
126         ) {
127             if (verbose_opt > 1)
128                 fprintf(stderr, "mid-transaction detected transid %016jx "
129                                 "streamid %04x\n",
130                         (uintmax_t)jd->jd_transid,
131                         head->streamid & JREC_STREAMID_MASK);
132             if (jaddrecord_backtrack(ss, jd) == 0) {
133                 if (verbose_opt)
134                     fprintf(stderr, "mid-transaction streamid %04x collection "
135                                     "succeeded\n",
136                             head->streamid & JREC_STREAMID_MASK);
137                 goto retry;
138             }
139             fprintf(stderr, "mid-transaction streamid %04x collection failed\n",
140                     head->streamid & JREC_STREAMID_MASK);
141             jscan_dispose(js);
142             return(NULL);
143         } else if (ss->ss_direction == JD_BACKWARDS &&
144             (head->streamid & JREC_STREAMCTL_END) == 0
145         ) {
146             if (verbose_opt > 1)
147                 fprintf(stderr, "mid-transaction detected transid %016jx "
148                                 "streamid %04x\n",
149                                 (uintmax_t)jd->jd_transid,
150                                 head->streamid & JREC_STREAMID_MASK);
151             if (jaddrecord_backtrack(ss, jd) == 0) {
152                 if (verbose_opt)
153                     fprintf(stderr, "mid-transaction streamid %04x "
154                                     "collection succeeded\n",
155                             head->streamid & JREC_STREAMID_MASK);
156                 goto retry;
157             }
158             fprintf(stderr, "mid-transaction streamid %04x collection failed\n",
159                     head->streamid & JREC_STREAMID_MASK);
160             jscan_dispose(js);
161             return(NULL);
162         }
163     }
164
165     /*
166      * If we've made it to here and we still don't have a hash record
167      * to track the transaction, create one.
168      */
169     if (jh == NULL) {
170         jh = malloc(sizeof(*jh));
171         bzero(jh, sizeof(*jh));
172         *jhp = jh;
173         jh->jh_first = js;
174         jh->jh_last = js;
175         jh->jh_transid = head->streamid;
176         jh->jh_session = ss;
177         return (NULL);
178     }
179
180     /*
181      * Emplace the stream segment
182      */
183     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
184     if (ss->ss_direction == JD_FORWARDS) {
185         jh->jh_last->js_next = js;
186         jh->jh_last = js;
187     } else {
188         js->js_next = jh->jh_first;
189         jh->jh_first = js;
190     }
191
192     /*
193      * If the transaction is complete, remove the hash entry and return the
194      * js representing the beginning of the transaction.  Otherwise leave
195      * the hash entry intact and return NULL.
196      */
197     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
198         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
199     ) {
200         *jhp = jh->jh_hash;
201         js = jh->jh_first;
202         free(jh);
203
204         jnormalize(js);
205     } else {
206         js = NULL;
207     }
208     return (js);
209 }
210
211 /*
212  * Renormalize the jscan list to remove all the meta record headers
213  * and trailers except for the very first one.
214  */
215 static
216 void
217 jnormalize(struct jstream *js)
218 {
219     struct jstream *jscan;
220     off_t off;
221
222     js->js_normalized_off = 0;
223     js->js_normalized_base = (void *)js->js_head;
224     js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
225     js->js_normalized_total = js->js_normalized_size;
226     off = js->js_normalized_size;
227     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
228         jscan->js_normalized_off = off;
229         jscan->js_normalized_base = (char *)jscan->js_head + 
230                 sizeof(struct journal_rawrecbeg);
231         jscan->js_normalized_size = jscan->js_head->recsize -
232                sizeof(struct journal_rawrecbeg) -
233                sizeof(struct journal_rawrecend);
234         off += jscan->js_normalized_size;
235         js->js_normalized_total += jscan->js_normalized_size;
236     }
237 }
238
239 /*
240  * For sanity's sake I will describe the normal backtracking that must occur,
241  * but this routine must also operate on reverse-scanned (undo) records
242  * by forward tracking.
243  *
244  * A record has been found that represents the middle or end of a transaction
245  * when we were expecting the beginning of a transaction.  We must backtrack
246  * to locate the start of the transaction, then process raw records relating
247  * to the transaction until we reach our current point (jd) again.  If
248  * we find a matching streamid representing the end of a transaction instead
249  * of the expected start-of-transaction that record belongs to a wholely
250  * different meta-transaction and the record we seek is known to not be
251  * available.
252  *
253  * jd is the current record, directon is the normal scan direction (we have
254  * to scan in the reverse direction). 
255  */
256 static
257 int
258 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
259 {
260     struct jfile *jf = ss->ss_jfin;
261     struct jdata *scan;
262     struct jstream *js;
263     u_int16_t streamid;
264     u_int16_t scanid;
265
266     assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
267     if (jmodes & JMODEF_INPUT_PIPE)
268         return(-1);
269
270     streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
271
272     if (ss->ss_direction == JD_FORWARDS) {
273         /*
274          * Backtrack in the reverse direction looking for the transaction 
275          * start bit.  If we find an end bit instead it belongs to an
276          * unrelated transaction using the same streamid and there is no
277          * point continuing.
278          */
279         scan = jref(jd);
280         while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
281             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
282             if ((scanid & JREC_STREAMID_MASK) != streamid)
283                 continue;
284             if (scanid & JREC_STREAMCTL_END) {
285                 jfree(jf, scan);
286                 return(-1);
287             }
288             if (scanid & JREC_STREAMCTL_BEGIN)
289                 break;
290         }
291
292         /*
293          * Now jaddrecord the related records.
294          */
295         while (scan != NULL && scan->jd_transid < jd->jd_transid) {
296             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
297             if ((scanid & JREC_STREAMID_MASK) == streamid) {
298                 js = jaddrecord(ss, scan);
299                 assert(js == NULL);
300             }
301             scan = jread(jf, scan, JD_FORWARDS);
302         }
303         if (scan == NULL)
304             return(-1);
305         jfree(jf, scan);
306     } else {
307         /*
308          * Backtrack in the forwards direction looking for the transaction
309          * end bit.  If we find a start bit instead if belongs to an
310          * unrelated transaction using the same streamid and there is no
311          * point continuing.
312          */
313         scan = jref(jd);
314         while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
315             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
316             if ((scanid & JREC_STREAMID_MASK) != streamid)
317                 continue;
318             if (scanid & JREC_STREAMCTL_BEGIN) {
319                 jfree(jf, scan);
320                 return(-1);
321             }
322             if (scanid & JREC_STREAMCTL_END)
323                 break;
324         }
325
326         /*
327          * Now jaddrecord the related records.
328          */
329         while (scan != NULL && scan->jd_transid > jd->jd_transid) {
330             scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
331             if ((scanid & JREC_STREAMID_MASK) == streamid) {
332                 js = jaddrecord(ss, scan);
333                 assert(js == NULL);
334             }
335             scan = jread(jf, scan, JD_BACKWARDS);
336         }
337         if (scan == NULL)
338             return(-1);
339         jfree(jf, scan);
340     }
341     return(0);
342 }
343
344 void
345 jscan_dispose(struct jstream *js)
346 {
347     struct jstream *jnext;
348
349     if (js->js_alloc_buf) {
350         free(js->js_alloc_buf);
351         js->js_alloc_buf = NULL;
352         js->js_alloc_size = 0;
353     }
354
355     while (js) {
356         jnext = js->js_next;
357         jfree(js->js_session->ss_jfin, js->js_jdata);
358         js->js_jdata = NULL;
359         free(js);
360         js = jnext;
361     }
362 }
363
364 /*
365  * Read the specified block of data out of a linked set of jstream
366  * structures.  Returns 0 on success or an error code on error.
367  */
368 int
369 jsread(struct jstream *js, off_t off, void *buf, int bytes)
370 {
371     const void *ptr;
372     int n;
373
374     while (bytes) {
375         n = jsreadany(js, off, &ptr);
376         if (n == 0)
377             return (ENOENT);
378         if (n > bytes)
379             n = bytes;
380         bcopy(ptr, buf, n);
381         buf = (char *)buf + n;
382         off += n;
383         bytes -= n;
384     }
385     return(0);
386 }
387
388 /*
389  * Read the specified block of data out of a linked set of jstream
390  * structures.  Attempt to return a pointer into the data set but
391  * allocate and copy if that is not possible.  Returns 0 on success
392  * or an error code on error.
393  */
394 int
395 jsreadp(struct jstream *js, off_t off, const void **bufp,
396         int bytes)
397 {
398     int error = 0;
399     int n;
400
401     n = jsreadany(js, off, bufp);
402     if (n < bytes) {
403         if (js->js_alloc_size < bytes) {
404             if (js->js_alloc_buf)
405                 free(js->js_alloc_buf);
406             js->js_alloc_buf = malloc(bytes);
407             js->js_alloc_size = bytes;
408             if (js->js_alloc_buf == NULL)
409                 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
410             assert(js->js_alloc_buf != NULL);
411         }
412         error = jsread(js, off, js->js_alloc_buf, bytes);
413         if (error) {
414             *bufp = NULL;
415         } else {
416             *bufp = js->js_alloc_buf;
417         }
418     }
419     return(error);
420 }
421
422 int
423 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
424                 int fd, off_t off, int bytes)
425 {
426     const void *bufp;
427     int res;
428     int n;
429     int r;
430
431     res = 0;
432     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
433         if (n > bytes)
434             n = bytes;
435         r = func(fd, bufp, n);
436         if (r != n) {
437             if (res == 0)
438                 res = -1;
439         }
440         res += n;
441         bytes -= n;
442         off += n;
443     }
444     return(res);
445 }
446
447 /*
448  * Return the largest contiguous buffer starting at the specified offset,
449  * or 0.
450  */
451 int
452 jsreadany(struct jstream *js, off_t off, const void **bufp)
453 {
454     struct jstream *scan;
455     int n;
456
457     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
458         scan = js;
459     while (scan && scan->js_normalized_off <= off) {
460         js->js_cache = scan;
461         if (scan->js_normalized_off + scan->js_normalized_size > off) {
462             n = (int)(off - scan->js_normalized_off);
463             *bufp = scan->js_normalized_base + n;
464             return(scan->js_normalized_size - n);
465         }
466         scan = scan->js_next;
467     }
468     return(0);
469 }
470