2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
39 static struct jhash *JHashAry[JHASH_SIZE];
41 static void jnormalize(struct jstream *js);
42 static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
45 * Integrate a raw record. Deal with the transaction begin and end flags
46 * to create a forward-referenced collection of jstream records. If we are
47 * able to complete a transaction, the first js associated with that
48 * transaction is returned.
50 * XXX we need to store the data for very large multi-record transactions
51 * separately since it might not fit into memory.
53 * Note that a transaction might represent a huge I/O operation, resulting
54 * in an overall node structure that spans gigabytes, but individual
55 * subrecord leaf nodes are limited in size and we depend on this to simplify
56 * the handling of leaf records.
58 * A transaction may cover several raw records. The jstream collection for
59 * a transaction is only returned when the entire transaction has been
60 * successfully scanned. Due to the interleaving of transactions the ordering
61 * of returned JS's may be different (not exactly reversed) when scanning a
62 * journal backwards verses forwards. Since parallel operations are
63 * theoretically non-conflicting, this should not present a problem.
66 jaddrecord(struct jsession *ss, struct jdata *jd)
68 struct journal_rawrecbeg *head;
73 js = malloc(sizeof(struct jstream));
74 bzero(js, sizeof(struct jstream));
75 js->js_jdata = jref(jd);
76 js->js_head = (void *)jd->jd_data;
81 * Check for a completely self-contained transaction, just return the
84 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
93 * Check for an open transaction in the hash table.
95 jhp = &JHashAry[head->streamid & JHASH_MASK];
96 while ((jh = *jhp) != NULL) {
97 if (jh->jh_session == ss &&
98 ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
106 * We might have picked up a transaction in the middle, which we
107 * detect by not finding a hash record coupled with a raw record
108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109 * bit if we are scanning backwards).
111 * When this case occurs we have to backtrack to locate the
112 * BEGIN (END if scanning backwards) and collect those records
113 * in order to obtain a complete transaction.
115 * This case most often occurs when a batch operation runs in
116 * prefix set whos starting raw-record transaction id is in
117 * the middle of one or more meta-transactions. It's a bit of
118 * a tricky situation, but easily resolvable by scanning the
119 * prefix set backwards (forwards if originally scanning backwards)
120 * to locate the raw record representing the start (end) of the
124 if (ss->ss_direction == JD_FORWARDS &&
125 (head->streamid & JREC_STREAMCTL_BEGIN) == 0
128 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
129 if (jaddrecord_backtrack(ss, jd) == 0) {
131 fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
134 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
137 } else if (ss->ss_direction == JD_BACKWARDS &&
138 (head->streamid & JREC_STREAMCTL_END) == 0
141 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
142 if (jaddrecord_backtrack(ss, jd) == 0) {
144 fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
147 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
154 * If we've made it to here and we still don't have a hash record
155 * to track the transaction, create one.
158 jh = malloc(sizeof(*jh));
159 bzero(jh, sizeof(*jh));
163 jh->jh_transid = head->streamid;
169 * Emplace the stream segment
171 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
172 if (ss->ss_direction == JD_FORWARDS) {
173 jh->jh_last->js_next = js;
176 js->js_next = jh->jh_first;
181 * If the transaction is complete, remove the hash entry and return the
182 * js representing the beginning of the transaction. Otherwise leave
183 * the hash entry intact and return NULL.
185 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
186 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
200 * Renormalize the jscan list to remove all the meta record headers
201 * and trailers except for the very first one.
205 jnormalize(struct jstream *js)
207 struct jstream *jscan;
210 js->js_normalized_off = 0;
211 js->js_normalized_base = (void *)js->js_head;
212 js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
213 js->js_normalized_total = js->js_normalized_size;
214 off = js->js_normalized_size;
215 for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
216 jscan->js_normalized_off = off;
217 jscan->js_normalized_base = (char *)jscan->js_head +
218 sizeof(struct journal_rawrecbeg);
219 jscan->js_normalized_size = jscan->js_head->recsize -
220 sizeof(struct journal_rawrecbeg) -
221 sizeof(struct journal_rawrecend);
222 off += jscan->js_normalized_size;
223 js->js_normalized_total += jscan->js_normalized_size;
228 * For sanity's sake I will describe the normal backtracking that must occur,
229 * but this routine must also operate on reverse-scanned (undo) records
230 * by forward tracking.
232 * A record has been found that represents the middle or end of a transaction
233 * when we were expecting the beginning of a transaction. We must backtrack
234 * to locate the start of the transaction, then process raw records relating
235 * to the transaction until we reach our current point (jd) again. If
236 * we find a matching streamid representing the end of a transaction instead
237 * of the expected start-of-transaction that record belongs to a wholely
238 * different meta-transaction and the record we seek is known to not be
241 * jd is the current record, directon is the normal scan direction (we have
242 * to scan in the reverse direction).
246 jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
248 struct jfile *jf = ss->ss_jfin;
254 assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
255 if (jmodes & JMODEF_INPUT_PIPE)
258 streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
260 if (ss->ss_direction == JD_FORWARDS) {
262 * Backtrack in the reverse direction looking for the transaction
263 * start bit. If we find an end bit instead it belongs to an
264 * unrelated transaction using the same streamid and there is no
268 while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
269 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
270 if ((scanid & JREC_STREAMID_MASK) != streamid)
272 if (scanid & JREC_STREAMCTL_END) {
276 if (scanid & JREC_STREAMCTL_BEGIN)
281 * Now jaddrecord the related records.
283 while (scan != NULL && scan->jd_transid < jd->jd_transid) {
284 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
285 if ((scanid & JREC_STREAMID_MASK) == streamid) {
286 js = jaddrecord(ss, scan);
289 scan = jread(jf, scan, JD_FORWARDS);
296 * Backtrack in the forwards direction looking for the transaction
297 * end bit. If we find a start bit instead if belongs to an
298 * unrelated transaction using the same streamid and there is no
302 while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
303 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
304 if ((scanid & JREC_STREAMID_MASK) != streamid)
306 if (scanid & JREC_STREAMCTL_BEGIN) {
310 if (scanid & JREC_STREAMCTL_END)
315 * Now jaddrecord the related records.
317 while (scan != NULL && scan->jd_transid > jd->jd_transid) {
318 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
319 if ((scanid & JREC_STREAMID_MASK) == streamid) {
320 js = jaddrecord(ss, scan);
323 scan = jread(jf, scan, JD_BACKWARDS);
333 jscan_dispose(struct jstream *js)
335 struct jstream *jnext;
337 if (js->js_alloc_buf) {
338 free(js->js_alloc_buf);
339 js->js_alloc_buf = NULL;
340 js->js_alloc_size = 0;
345 jfree(js->js_session->ss_jfin, js->js_jdata);
353 * Read the specified block of data out of a linked set of jstream
354 * structures. Returns 0 on success or an error code on error.
357 jsread(struct jstream *js, off_t off, void *buf, int bytes)
363 n = jsreadany(js, off, &ptr);
369 buf = (char *)buf + n;
377 * Read the specified block of data out of a linked set of jstream
378 * structures. Attempt to return a pointer into the data set but
379 * allocate and copy if that is not possible. Returns 0 on success
380 * or an error code on error.
383 jsreadp(struct jstream *js, off_t off, const void **bufp,
389 n = jsreadany(js, off, bufp);
391 if (js->js_alloc_size < bytes) {
392 if (js->js_alloc_buf)
393 free(js->js_alloc_buf);
394 js->js_alloc_buf = malloc(bytes);
395 js->js_alloc_size = bytes;
396 if (js->js_alloc_buf == NULL)
397 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
398 assert(js->js_alloc_buf != NULL);
400 error = jsread(js, off, js->js_alloc_buf, bytes);
404 *bufp = js->js_alloc_buf;
411 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
412 int fd, off_t off, int bytes)
420 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
423 r = func(fd, bufp, n);
436 * Return the largest contiguous buffer starting at the specified offset,
440 jsreadany(struct jstream *js, off_t off, const void **bufp)
442 struct jstream *scan;
445 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
447 while (scan && scan->js_normalized_off <= off) {
449 if (scan->js_normalized_off + scan->js_normalized_size > off) {
450 n = (int)(off - scan->js_normalized_off);
451 *bufp = scan->js_normalized_base + n;
452 return(scan->js_normalized_size - n);
454 scan = scan->js_next;