Merge from vendor branch LESS:
[dragonfly.git] / sbin / jscan / jstream.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
3 *
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
6 *
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
9 * are met:
10 *
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
16 * distribution.
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
20 *
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32 * SUCH DAMAGE.
33 *
34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $
35 */
36
37#include "jscan.h"
38
39static struct jhash *JHashAry[JHASH_SIZE];
40
41static void jnormalize(struct jstream *js);
42static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd);
43
44/*
45 * Integrate a raw record. Deal with the transaction begin and end flags
46 * to create a forward-referenced collection of jstream records. If we are
47 * able to complete a transaction, the first js associated with that
48 * transaction is returned.
49 *
50 * XXX we need to store the data for very large multi-record transactions
51 * separately since it might not fit into memory.
52 *
53 * Note that a transaction might represent a huge I/O operation, resulting
54 * in an overall node structure that spans gigabytes, but individual
55 * subrecord leaf nodes are limited in size and we depend on this to simplify
56 * the handling of leaf records.
57 *
58 * A transaction may cover several raw records. The jstream collection for
59 * a transaction is only returned when the entire transaction has been
60 * successfully scanned. Due to the interleaving of transactions the ordering
61 * of returned JS's may be different (not exactly reversed) when scanning a
62 * journal backwards verses forwards. Since parallel operations are
63 * theoretically non-conflicting, this should not present a problem.
64 */
65struct jstream *
66jaddrecord(struct jsession *ss, struct jdata *jd)
67{
68 struct journal_rawrecbeg *head;
69 struct jstream *js;
70 struct jhash *jh;
71 struct jhash **jhp;
72
73 js = malloc(sizeof(struct jstream));
74 bzero(js, sizeof(struct jstream));
75 js->js_jdata = jref(jd);
76 js->js_head = (void *)jd->jd_data;
77 js->js_session = ss;
78 head = js->js_head;
79
80 /*
81 * Check for a completely self-contained transaction, just return the
82 * js if possible.
83 */
84 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
85 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
86 ) {
87 jnormalize(js);
88 return (js);
89 }
90
91retry:
92 /*
93 * Check for an open transaction in the hash table.
94 */
95 jhp = &JHashAry[head->streamid & JHASH_MASK];
96 while ((jh = *jhp) != NULL) {
97 if (jh->jh_session == ss &&
98 ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
99 ) {
100 break;
101 }
102 jhp = &jh->jh_hash;
103 }
104
105 /*
106 * We might have picked up a transaction in the middle, which we
107 * detect by not finding a hash record coupled with a raw record
108 * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END
109 * bit if we are scanning backwards).
110 *
111 * When this case occurs we have to backtrack to locate the
112 * BEGIN (END if scanning backwards) and collect those records
113 * in order to obtain a complete transaction.
114 *
115 * This case most often occurs when a batch operation runs in
116 * prefix set whos starting raw-record transaction id is in
117 * the middle of one or more meta-transactions. It's a bit of
118 * a tricky situation, but easily resolvable by scanning the
119 * prefix set backwards (forwards if originally scanning backwards)
120 * to locate the raw record representing the start (end) of the
121 * transaction.
122 */
123 if (jh == NULL) {
124 if (ss->ss_direction == JD_FORWARDS &&
125 (head->streamid & JREC_STREAMCTL_BEGIN) == 0
126 ) {
127 if (verbose_opt > 1)
128 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
129 if (jaddrecord_backtrack(ss, jd) == 0) {
130 if (verbose_opt)
131 fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
132 goto retry;
133 }
134 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
135 jscan_dispose(js);
136 return(NULL);
137 } else if (ss->ss_direction == JD_BACKWARDS &&
138 (head->streamid & JREC_STREAMCTL_END) == 0
139 ) {
140 if (verbose_opt > 1)
141 fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK);
142 if (jaddrecord_backtrack(ss, jd) == 0) {
143 if (verbose_opt)
144 fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK);
145 goto retry;
146 }
147 fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK);
148 jscan_dispose(js);
149 return(NULL);
150 }
151 }
152
153 /*
154 * If we've made it to here and we still don't have a hash record
155 * to track the transaction, create one.
156 */
157 if (jh == NULL) {
158 jh = malloc(sizeof(*jh));
159 bzero(jh, sizeof(*jh));
160 *jhp = jh;
161 jh->jh_first = js;
162 jh->jh_last = js;
163 jh->jh_transid = head->streamid;
164 jh->jh_session = ss;
165 return (NULL);
166 }
167
168 /*
169 * Emplace the stream segment
170 */
171 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
172 if (ss->ss_direction == JD_FORWARDS) {
173 jh->jh_last->js_next = js;
174 jh->jh_last = js;
175 } else {
176 js->js_next = jh->jh_first;
177 jh->jh_first = js;
178 }
179
180 /*
181 * If the transaction is complete, remove the hash entry and return the
182 * js representing the beginning of the transaction. Otherwise leave
183 * the hash entry intact and return NULL.
184 */
185 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
186 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
187 ) {
188 *jhp = jh->jh_hash;
189 js = jh->jh_first;
190 free(jh);
191
192 jnormalize(js);
193 } else {
194 js = NULL;
195 }
196 return (js);
197}
198
199/*
200 * Renormalize the jscan list to remove all the meta record headers
201 * and trailers except for the very first one.
202 */
203static
204void
205jnormalize(struct jstream *js)
206{
207 struct jstream *jscan;
208 off_t off;
209
210 js->js_normalized_off = 0;
211 js->js_normalized_base = (void *)js->js_head;
212 js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
213 js->js_normalized_total = js->js_normalized_size;
214 off = js->js_normalized_size;
215 for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
216 jscan->js_normalized_off = off;
217 jscan->js_normalized_base = (char *)jscan->js_head +
218 sizeof(struct journal_rawrecbeg);
219 jscan->js_normalized_size = jscan->js_head->recsize -
220 sizeof(struct journal_rawrecbeg) -
221 sizeof(struct journal_rawrecend);
222 off += jscan->js_normalized_size;
223 js->js_normalized_total += jscan->js_normalized_size;
224 }
225}
226
227/*
228 * For sanity's sake I will describe the normal backtracking that must occur,
229 * but this routine must also operate on reverse-scanned (undo) records
230 * by forward tracking.
231 *
232 * A record has been found that represents the middle or end of a transaction
233 * when we were expecting the beginning of a transaction. We must backtrack
234 * to locate the start of the transaction, then process raw records relating
235 * to the transaction until we reach our current point (jd) again. If
236 * we find a matching streamid representing the end of a transaction instead
237 * of the expected start-of-transaction that record belongs to a wholely
238 * different meta-transaction and the record we seek is known to not be
239 * available.
240 *
241 * jd is the current record, directon is the normal scan direction (we have
242 * to scan in the reverse direction).
243 */
244static
245int
246jaddrecord_backtrack(struct jsession *ss, struct jdata *jd)
247{
248 struct jfile *jf = ss->ss_jfin;
249 struct jdata *scan;
250 struct jstream *js;
251 u_int16_t streamid;
252 u_int16_t scanid;
253
254 assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS);
255 if (jmodes & JMODEF_INPUT_PIPE)
256 return(-1);
257
258 streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK;
259
260 if (ss->ss_direction == JD_FORWARDS) {
261 /*
262 * Backtrack in the reverse direction looking for the transaction
263 * start bit. If we find an end bit instead it belongs to an
264 * unrelated transaction using the same streamid and there is no
265 * point continuing.
266 */
267 scan = jref(jd);
268 while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) {
269 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
270 if ((scanid & JREC_STREAMID_MASK) != streamid)
271 continue;
272 if (scanid & JREC_STREAMCTL_END) {
273 jfree(jf, scan);
274 return(-1);
275 }
276 if (scanid & JREC_STREAMCTL_BEGIN)
277 break;
278 }
279
280 /*
281 * Now jaddrecord the related records.
282 */
283 while (scan != NULL && scan->jd_transid < jd->jd_transid) {
284 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
285 if ((scanid & JREC_STREAMID_MASK) == streamid) {
286 js = jaddrecord(ss, scan);
287 assert(js == NULL);
288 }
289 scan = jread(jf, scan, JD_FORWARDS);
290 }
291 if (scan == NULL)
292 return(-1);
293 jfree(jf, scan);
294 } else {
295 /*
296 * Backtrack in the forwards direction looking for the transaction
297 * end bit. If we find a start bit instead if belongs to an
298 * unrelated transaction using the same streamid and there is no
299 * point continuing.
300 */
301 scan = jref(jd);
302 while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) {
303 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
304 if ((scanid & JREC_STREAMID_MASK) != streamid)
305 continue;
306 if (scanid & JREC_STREAMCTL_BEGIN) {
307 jfree(jf, scan);
308 return(-1);
309 }
310 if (scanid & JREC_STREAMCTL_END)
311 break;
312 }
313
314 /*
315 * Now jaddrecord the related records.
316 */
317 while (scan != NULL && scan->jd_transid > jd->jd_transid) {
318 scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid;
319 if ((scanid & JREC_STREAMID_MASK) == streamid) {
320 js = jaddrecord(ss, scan);
321 assert(js == NULL);
322 }
323 scan = jread(jf, scan, JD_BACKWARDS);
324 }
325 if (scan == NULL)
326 return(-1);
327 jfree(jf, scan);
328 }
329 return(0);
330}
331
332void
333jscan_dispose(struct jstream *js)
334{
335 struct jstream *jnext;
336
337 if (js->js_alloc_buf) {
338 free(js->js_alloc_buf);
339 js->js_alloc_buf = NULL;
340 js->js_alloc_size = 0;
341 }
342
343 while (js) {
344 jnext = js->js_next;
345 jfree(js->js_session->ss_jfin, js->js_jdata);
346 js->js_jdata = NULL;
347 free(js);
348 js = jnext;
349 }
350}
351
352/*
353 * Read the specified block of data out of a linked set of jstream
354 * structures. Returns 0 on success or an error code on error.
355 */
356int
357jsread(struct jstream *js, off_t off, void *buf, int bytes)
358{
359 const void *ptr;
360 int n;
361
362 while (bytes) {
363 n = jsreadany(js, off, &ptr);
364 if (n == 0)
365 return (ENOENT);
366 if (n > bytes)
367 n = bytes;
368 bcopy(ptr, buf, n);
369 buf = (char *)buf + n;
370 off += n;
371 bytes -= n;
372 }
373 return(0);
374}
375
376/*
377 * Read the specified block of data out of a linked set of jstream
378 * structures. Attempt to return a pointer into the data set but
379 * allocate and copy if that is not possible. Returns 0 on success
380 * or an error code on error.
381 */
382int
383jsreadp(struct jstream *js, off_t off, const void **bufp,
384 int bytes)
385{
386 int error = 0;
387 int n;
388
389 n = jsreadany(js, off, bufp);
390 if (n < bytes) {
391 if (js->js_alloc_size < bytes) {
392 if (js->js_alloc_buf)
393 free(js->js_alloc_buf);
394 js->js_alloc_buf = malloc(bytes);
395 js->js_alloc_size = bytes;
396 if (js->js_alloc_buf == NULL)
397 fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes);
398 assert(js->js_alloc_buf != NULL);
399 }
400 error = jsread(js, off, js->js_alloc_buf, bytes);
401 if (error) {
402 *bufp = NULL;
403 } else {
404 *bufp = js->js_alloc_buf;
405 }
406 }
407 return(error);
408}
409
410int
411jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
412 int fd, off_t off, int bytes)
413{
414 const void *bufp;
415 int res;
416 int n;
417 int r;
418
419 res = 0;
420 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
421 if (n > bytes)
422 n = bytes;
423 r = func(fd, bufp, n);
424 if (r != n) {
425 if (res == 0)
426 res = -1;
427 }
428 res += n;
429 bytes -= n;
430 off += n;
431 }
432 return(res);
433}
434
435/*
436 * Return the largest contiguous buffer starting at the specified offset,
437 * or 0.
438 */
439int
440jsreadany(struct jstream *js, off_t off, const void **bufp)
441{
442 struct jstream *scan;
443 int n;
444
445 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
446 scan = js;
447 while (scan && scan->js_normalized_off <= off) {
448 js->js_cache = scan;
449 if (scan->js_normalized_off + scan->js_normalized_size > off) {
450 n = (int)(off - scan->js_normalized_off);
451 *bufp = scan->js_normalized_base + n;
452 return(scan->js_normalized_size - n);
453 }
454 scan = scan->js_next;
455 }
456 return(0);
457}
458