2 * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Matthew Dillon <dillon@backplane.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 * $DragonFly: src/sbin/jscan/jstream.c,v 1.4 2005/07/06 06:06:44 dillon Exp $
39 static struct jhash *JHashAry[JHASH_SIZE];
41 static struct jstream *jaddrecord(struct jfile *jf, struct jstream *js);
42 static void jnormalize(struct jstream *js);
45 * Locate the next (or previous) complete virtual stream transaction given a
46 * file descriptor and direction. Keep track of partial stream records as
49 * Note that a transaction might represent a huge I/O operation, resulting
50 * in an overall node structure that spans gigabytes, but individual
51 * subrecord leaf nodes are limited in size and we depend on this to simplify
52 * the handling of leaf records.
54 * A transaction may cover several raw records. The jstream collection for
55 * a transaction is only returned when the entire transaction has been
56 * successfully scanned. Due to the interleaving of transactions the ordering
57 * of returned JS's may be different (not exactly reversed) when scanning a
58 * journal backwards verses forwards. Since parallel operations are
59 * theoretically non-conflicting, this should not present a problem.
62 jscan_stream(struct jfile *jf)
64 struct journal_rawrecbeg head;
65 struct journal_rawrecend tail;
66 struct journal_ackrecord ack;
73 * Get the current offset and make sure it is 16-byte aligned. If it
74 * isn't, align it and enter search mode.
76 if (jf->jf_pos & 15) {
77 jf_warn(jf, "realigning bad offset and entering search mode");
87 if (jf->jf_direction == JF_FORWARDS) {
89 * Scan the journal forwards. Note that the file pointer might not
92 while ((error = jread(jf, &head, sizeof(head))) == 0) {
93 if (head.begmagic != JREC_BEGMAGIC) {
95 jf_warn(jf, "bad beginmagic, searching for new record");
100 recsize = (head.recsize + 15) & ~15;
102 jf_warn(jf, "bad recordsize: %d\n", recsize);
108 js = malloc(offsetof(struct jstream, js_data[recsize]));
109 bzero(js, sizeof(struct jstream));
110 bcopy(&head, js->js_data, sizeof(head));
111 error = jread(jf, js->js_data + sizeof(head), recsize - sizeof(head));
113 jf_warn(jf, "Incomplete stream record\n");
121 * XXX if the stream is full duplex send the ack back now. This
122 * really needs to be delayed until the transaction is committed,
123 * but there are stalling issues if the transaction being
124 * collected exceeds to the size of the FIFO. So for now this
125 * is just for testing.
127 if (jf->jf_flags & JF_FULL_DUPLEX) {
128 bzero(&ack, sizeof(ack));
129 ack.rbeg.begmagic = JREC_BEGMAGIC;
130 ack.rbeg.streamid = JREC_STREAMID_ACK;
131 ack.rbeg.transid = head.transid;
132 ack.rbeg.recsize = sizeof(ack);
133 ack.rend.endmagic = JREC_ENDMAGIC;
134 ack.rend.recsize = sizeof(ack);
135 jwrite(jf, &ack, sizeof(ack));
139 * note: recsize is aligned (the actual record size),
140 * head.recsize is unaligned (the actual payload size).
142 js->js_size = head.recsize;
143 bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
144 if (tail.endmagic != JREC_ENDMAGIC) {
145 jf_warn(jf, "bad endmagic, searching for new record");
153 if ((js = jaddrecord(jf, js)) != NULL)
158 * Scan the journal backwards. Note that jread()'s reverse-seek and
159 * read. The data read will be forward ordered, however.
161 while ((error = jread(jf, &tail, sizeof(tail))) == 0) {
162 if (tail.endmagic != JREC_ENDMAGIC) {
164 jf_warn(jf, "bad endmagic, searching for new record");
169 recsize = (tail.recsize + 15) & ~15;
171 jf_warn(jf, "bad recordsize: %d\n", recsize);
177 js = malloc(offsetof(struct jstream, js_data[recsize]));
178 bzero(js, sizeof(struct jstream));
179 bcopy(&tail, js->js_data + recsize - sizeof(tail), sizeof(tail));
180 error = jread(jf, js->js_data, recsize - sizeof(tail));
183 jf_warn(jf, "Incomplete stream record\n");
189 js->js_size = tail.recsize;
190 bcopy(js->js_data + recsize - sizeof(tail), &tail, sizeof(tail));
191 bcopy(js->js_data, &head, sizeof(head));
192 if (head.begmagic != JREC_BEGMAGIC) {
193 jf_warn(jf, "bad begmagic, searching for new record");
200 if ((js = jaddrecord(jf, js)) != NULL)
204 jf->jf_error = error;
209 * Integrate a jstream record. Deal with the transaction begin and end flags
210 * to create a forward-referenced collection of jstream records. If we are
211 * able to complete a transaction, the first js associated with that
212 * transaction is returned.
214 * XXX we need to store the data for very large multi-record transactions
215 * separately since it might not fit into memory.
217 static struct jstream *
218 jaddrecord(struct jfile *jf, struct jstream *js)
220 struct journal_rawrecbeg *head = (void *)js->js_data;
225 * Check for a completely self-contained transaction, just return the
228 if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
229 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
236 * Check for an open transaction in the hash table, create a new one
239 jhp = &JHashAry[head->streamid & JHASH_MASK];
240 while ((jh = *jhp) != NULL) {
241 if (((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0)
246 jh = malloc(sizeof(*jh));
247 bzero(jh, sizeof(*jh));
251 jh->jh_transid = head->streamid;
256 * Emplace the stream segment
258 jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
259 if (jf->jf_direction == JF_FORWARDS) {
260 jh->jh_last->js_next = js;
263 js->js_next = jh->jh_first;
268 * If the transaction is complete, remove the hash entry and return the
269 * js representing the beginning of the transaction.
271 if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
272 (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
286 * Renormalize the jscan list to remove all the meta record headers
287 * and trailers except for the very first one.
291 jnormalize(struct jstream *js)
293 struct jstream *jscan;
296 js->js_normalized_off = 0;
297 js->js_normalized_base = js->js_data;
298 js->js_normalized_size = ((struct journal_rawrecbeg *)js->js_data)->recsize - sizeof(struct journal_rawrecend);
299 js->js_normalized_total = js->js_normalized_size;
300 off = js->js_normalized_size;
301 for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
302 jscan->js_normalized_off = off;
303 jscan->js_normalized_base = jscan->js_data +
304 sizeof(struct journal_rawrecbeg);
305 jscan->js_normalized_size = jscan->js_size -
306 sizeof(struct journal_rawrecbeg) -
307 sizeof(struct journal_rawrecend);
308 off += jscan->js_normalized_size;
309 js->js_normalized_total += jscan->js_normalized_size;
314 jscan_dispose(struct jstream *js)
316 struct jstream *jnext;
318 if (js->js_alloc_buf) {
319 free(js->js_alloc_buf);
320 js->js_alloc_buf = NULL;
321 js->js_alloc_size = 0;
332 * Read the specified block of data out of a linked set of jstream
333 * structures. Returns 0 on success or an error code on error.
336 jsread(struct jstream *js, off_t off, void *buf, int bytes)
342 n = jsreadany(js, off, &ptr);
348 buf = (char *)buf + n;
356 * Read the specified block of data out of a linked set of jstream
357 * structures. Attempt to return a pointer into the data set but
358 * allocate and copy if that is not possible. Returns 0 on success
359 * or an error code on error.
362 jsreadp(struct jstream *js, off_t off, const void **bufp,
368 n = jsreadany(js, off, bufp);
370 if (js->js_alloc_size < bytes) {
371 if (js->js_alloc_buf)
372 free(js->js_alloc_buf);
373 js->js_alloc_buf = malloc(bytes);
374 js->js_alloc_size = bytes;
375 assert(js->js_alloc_buf != NULL);
377 error = jsread(js, off, js->js_alloc_buf, bytes);
381 *bufp = js->js_alloc_buf;
388 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
389 int fd, off_t off, int bytes)
397 while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
400 r = func(fd, bufp, n);
413 * Return the largest contiguous buffer starting at the specified offset,
417 jsreadany(struct jstream *js, off_t off, const void **bufp)
419 struct jstream *scan;
422 if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
424 while (scan && scan->js_normalized_off <= off) {
426 if (scan->js_normalized_off + scan->js_normalized_size > off) {
427 n = (int)(off - scan->js_normalized_off);
428 *bufp = scan->js_normalized_base + n;
429 return(scan->js_normalized_size - n);
431 scan = scan->js_next;