Rework and expand the algorithms in JSCAN, part 2/?.
[dragonfly.git] / sbin / jscan / jstream.c
1 /*
2  * Copyright (c) 2004,2005 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/jscan/jstream.c,v 1.6 2005/09/06 18:43:52 dillon Exp $
35  */
36
37 #include "jscan.h"
38
39 static struct jhash     *JHashAry[JHASH_SIZE];
40
41 static void jnormalize(struct jstream *js);
42
43 /*
44  * Integrate a raw record.  Deal with the transaction begin and end flags
45  * to create a forward-referenced collection of jstream records.  If we are
46  * able to complete a transaction, the first js associated with that
47  * transaction is returned.
48  *
49  * XXX we need to store the data for very large multi-record transactions
50  * separately since it might not fit into memory.
51  */
52 struct jstream *
53 jaddrecord(struct jsession *ss, struct jdata *jd)
54 {
55     struct journal_rawrecbeg *head;
56     struct jstream *js;
57     struct jhash *jh;
58     struct jhash **jhp;
59
60     js = malloc(sizeof(struct jstream));
61     bzero(js, sizeof(struct jstream));
62     js->js_jdata = jref(jd);
63     js->js_head = (void *)jd->jd_data;
64     js->js_session = ss;
65     head = js->js_head;
66
67     /*
68      * Check for a completely self-contained transaction, just return the
69      * js if possible.
70      */
71     if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
72         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
73     ) {
74         jnormalize(js);
75         return (js);
76     }
77
78     /*
79      * Check for an open transaction in the hash table, create a new one
80      * if necessary.
81      */
82     jhp = &JHashAry[head->streamid & JHASH_MASK];
83     while ((jh = *jhp) != NULL) {
84         if (jh->jh_session == ss &&
85            ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0
86         ) {
87             break;
88         }
89         jhp = &jh->jh_hash;
90     }
91     if (jh == NULL) {
92         jh = malloc(sizeof(*jh));
93         bzero(jh, sizeof(*jh));
94         *jhp = jh;
95         jh->jh_first = js;
96         jh->jh_last = js;
97         jh->jh_transid = head->streamid;
98         jh->jh_session = ss;
99         return (NULL);
100     }
101
102     /*
103      * Emplace the stream segment
104      */
105     jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK;
106     if (js->js_session->ss_jfin->jf_direction == JD_FORWARDS) {
107         jh->jh_last->js_next = js;
108         jh->jh_last = js;
109     } else {
110         js->js_next = jh->jh_first;
111         jh->jh_first = js;
112     }
113
114     /*
115      * If the transaction is complete, remove the hash entry and return the
116      * js representing the beginning of the transaction.
117      */
118     if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) ==
119         (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)
120     ) {
121         *jhp = jh->jh_hash;
122         js = jh->jh_first;
123         free(jh);
124
125         jnormalize(js);
126     } else {
127         js = NULL;
128     }
129     return (js);
130 }
131
132 /*
133  * Renormalize the jscan list to remove all the meta record headers
134  * and trailers except for the very first one.
135  */
136 static
137 void
138 jnormalize(struct jstream *js)
139 {
140     struct jstream *jscan;
141     off_t off;
142
143     js->js_normalized_off = 0;
144     js->js_normalized_base = (void *)js->js_head;
145     js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend);
146     js->js_normalized_total = js->js_normalized_size;
147     off = js->js_normalized_size;
148     for (jscan = js->js_next; jscan; jscan = jscan->js_next) {
149         jscan->js_normalized_off = off;
150         jscan->js_normalized_base = (char *)jscan->js_head + 
151                 sizeof(struct journal_rawrecbeg);
152         jscan->js_normalized_size = jscan->js_head->recsize -
153                sizeof(struct journal_rawrecbeg) -
154                sizeof(struct journal_rawrecend);
155         off += jscan->js_normalized_size;
156         js->js_normalized_total += jscan->js_normalized_size;
157     }
158 }
159
160 void
161 jscan_dispose(struct jstream *js)
162 {
163     struct jstream *jnext;
164
165     if (js->js_alloc_buf) {
166         free(js->js_alloc_buf);
167         js->js_alloc_buf = NULL;
168         js->js_alloc_size = 0;
169     }
170
171     while (js) {
172         jnext = js->js_next;
173         jfree(js->js_session->ss_jfin, js->js_jdata);
174         js->js_jdata = NULL;
175         free(js);
176         js = jnext;
177     }
178 }
179
180 /*
181  * Read the specified block of data out of a linked set of jstream
182  * structures.  Returns 0 on success or an error code on error.
183  */
184 int
185 jsread(struct jstream *js, off_t off, void *buf, int bytes)
186 {
187     const void *ptr;
188     int n;
189
190     while (bytes) {
191         n = jsreadany(js, off, &ptr);
192         if (n == 0)
193             return (ENOENT);
194         if (n > bytes)
195             n = bytes;
196         bcopy(ptr, buf, n);
197         buf = (char *)buf + n;
198         off += n;
199         bytes -= n;
200     }
201     return(0);
202 }
203
204 /*
205  * Read the specified block of data out of a linked set of jstream
206  * structures.  Attempt to return a pointer into the data set but
207  * allocate and copy if that is not possible.  Returns 0 on success
208  * or an error code on error.
209  */
210 int
211 jsreadp(struct jstream *js, off_t off, const void **bufp,
212         int bytes)
213 {
214     int error = 0;
215     int n;
216
217     n = jsreadany(js, off, bufp);
218     if (n < bytes) {
219         if (js->js_alloc_size < bytes) {
220             if (js->js_alloc_buf)
221                 free(js->js_alloc_buf);
222             js->js_alloc_buf = malloc(bytes);
223             js->js_alloc_size = bytes;
224             assert(js->js_alloc_buf != NULL);
225         }
226         error = jsread(js, off, js->js_alloc_buf, bytes);
227         if (error) {
228             *bufp = NULL;
229         } else {
230             *bufp = js->js_alloc_buf;
231         }
232     }
233     return(error);
234 }
235
236 int
237 jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t),
238                 int fd, off_t off, int bytes)
239 {
240     const void *bufp;
241     int res;
242     int n;
243     int r;
244
245     res = 0;
246     while (bytes && (n = jsreadany(js, off, &bufp)) > 0) {
247         if (n > bytes)
248             n = bytes;
249         r = func(fd, bufp, n);
250         if (r != n) {
251             if (res == 0)
252                 res = -1;
253         }
254         res += n;
255         bytes -= n;
256         off += n;
257     }
258     return(res);
259 }
260
261 /*
262  * Return the largest contiguous buffer starting at the specified offset,
263  * or 0.
264  */
265 int
266 jsreadany(struct jstream *js, off_t off, const void **bufp)
267 {
268     struct jstream *scan;
269     int n;
270
271     if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off)
272         scan = js;
273     while (scan && scan->js_normalized_off <= off) {
274         js->js_cache = scan;
275         if (scan->js_normalized_off + scan->js_normalized_size > off) {
276             n = (int)(off - scan->js_normalized_off);
277             *bufp = scan->js_normalized_base + n;
278             return(scan->js_normalized_size - n);
279         }
280         scan = scan->js_next;
281     }
282     return(0);
283 }
284