| 1 | /* |
| 2 | * Copyright (c) 2004,2005 The DragonFly Project. All rights reserved. |
| 3 | * |
| 4 | * This code is derived from software contributed to The DragonFly Project |
| 5 | * by Matthew Dillon <dillon@backplane.com> |
| 6 | * |
| 7 | * Redistribution and use in source and binary forms, with or without |
| 8 | * modification, are permitted provided that the following conditions |
| 9 | * are met: |
| 10 | * |
| 11 | * 1. Redistributions of source code must retain the above copyright |
| 12 | * notice, this list of conditions and the following disclaimer. |
| 13 | * 2. Redistributions in binary form must reproduce the above copyright |
| 14 | * notice, this list of conditions and the following disclaimer in |
| 15 | * the documentation and/or other materials provided with the |
| 16 | * distribution. |
| 17 | * 3. Neither the name of The DragonFly Project nor the names of its |
| 18 | * contributors may be used to endorse or promote products derived |
| 19 | * from this software without specific, prior written permission. |
| 20 | * |
| 21 | * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS |
| 22 | * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT |
| 23 | * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS |
| 24 | * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE |
| 25 | * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, |
| 26 | * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING, |
| 27 | * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; |
| 28 | * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED |
| 29 | * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, |
| 30 | * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT |
| 31 | * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF |
| 32 | * SUCH DAMAGE. |
| 33 | * |
| 34 | * $DragonFly: src/sbin/jscan/jstream.c,v 1.8 2005/09/07 07:20:23 dillon Exp $ |
| 35 | */ |
| 36 | |
| 37 | #include "jscan.h" |
| 38 | |
| 39 | static struct jhash *JHashAry[JHASH_SIZE]; |
| 40 | |
| 41 | static void jnormalize(struct jstream *js); |
| 42 | static int jaddrecord_backtrack(struct jsession *ss, struct jdata *jd); |
| 43 | |
| 44 | /* |
| 45 | * Integrate a raw record. Deal with the transaction begin and end flags |
| 46 | * to create a forward-referenced collection of jstream records. If we are |
| 47 | * able to complete a transaction, the first js associated with that |
| 48 | * transaction is returned. |
| 49 | * |
| 50 | * XXX we need to store the data for very large multi-record transactions |
| 51 | * separately since it might not fit into memory. |
| 52 | * |
| 53 | * Note that a transaction might represent a huge I/O operation, resulting |
| 54 | * in an overall node structure that spans gigabytes, but individual |
| 55 | * subrecord leaf nodes are limited in size and we depend on this to simplify |
| 56 | * the handling of leaf records. |
| 57 | * |
| 58 | * A transaction may cover several raw records. The jstream collection for |
| 59 | * a transaction is only returned when the entire transaction has been |
| 60 | * successfully scanned. Due to the interleaving of transactions the ordering |
| 61 | * of returned JS's may be different (not exactly reversed) when scanning a |
| 62 | * journal backwards verses forwards. Since parallel operations are |
| 63 | * theoretically non-conflicting, this should not present a problem. |
| 64 | */ |
| 65 | struct jstream * |
| 66 | jaddrecord(struct jsession *ss, struct jdata *jd) |
| 67 | { |
| 68 | struct journal_rawrecbeg *head; |
| 69 | struct jstream *js; |
| 70 | struct jhash *jh; |
| 71 | struct jhash **jhp; |
| 72 | |
| 73 | js = malloc(sizeof(struct jstream)); |
| 74 | bzero(js, sizeof(struct jstream)); |
| 75 | js->js_jdata = jref(jd); |
| 76 | js->js_head = (void *)jd->jd_data; |
| 77 | js->js_session = ss; |
| 78 | head = js->js_head; |
| 79 | |
| 80 | /* |
| 81 | * Check for a completely self-contained transaction, just return the |
| 82 | * js if possible. |
| 83 | */ |
| 84 | if ((head->streamid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == |
| 85 | (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) |
| 86 | ) { |
| 87 | jnormalize(js); |
| 88 | return (js); |
| 89 | } |
| 90 | |
| 91 | retry: |
| 92 | /* |
| 93 | * Check for an open transaction in the hash table. |
| 94 | */ |
| 95 | jhp = &JHashAry[head->streamid & JHASH_MASK]; |
| 96 | while ((jh = *jhp) != NULL) { |
| 97 | if (jh->jh_session == ss && |
| 98 | ((jh->jh_transid ^ head->streamid) & JREC_STREAMID_MASK) == 0 |
| 99 | ) { |
| 100 | break; |
| 101 | } |
| 102 | jhp = &jh->jh_hash; |
| 103 | } |
| 104 | |
| 105 | /* |
| 106 | * We might have picked up a transaction in the middle, which we |
| 107 | * detect by not finding a hash record coupled with a raw record |
| 108 | * whos JREC_STREAMCTL_BEGIN bit is not set (or JREC_STREAMCTL_END |
| 109 | * bit if we are scanning backwards). |
| 110 | * |
| 111 | * When this case occurs we have to backtrack to locate the |
| 112 | * BEGIN (END if scanning backwards) and collect those records |
| 113 | * in order to obtain a complete transaction. |
| 114 | * |
| 115 | * This case most often occurs when a batch operation runs in |
| 116 | * prefix set whos starting raw-record transaction id is in |
| 117 | * the middle of one or more meta-transactions. It's a bit of |
| 118 | * a tricky situation, but easily resolvable by scanning the |
| 119 | * prefix set backwards (forwards if originally scanning backwards) |
| 120 | * to locate the raw record representing the start (end) of the |
| 121 | * transaction. |
| 122 | */ |
| 123 | if (jh == NULL) { |
| 124 | if (ss->ss_direction == JD_FORWARDS && |
| 125 | (head->streamid & JREC_STREAMCTL_BEGIN) == 0 |
| 126 | ) { |
| 127 | if (verbose_opt > 1) |
| 128 | fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK); |
| 129 | if (jaddrecord_backtrack(ss, jd) == 0) { |
| 130 | if (verbose_opt) |
| 131 | fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK); |
| 132 | goto retry; |
| 133 | } |
| 134 | fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK); |
| 135 | jscan_dispose(js); |
| 136 | return(NULL); |
| 137 | } else if (ss->ss_direction == JD_BACKWARDS && |
| 138 | (head->streamid & JREC_STREAMCTL_END) == 0 |
| 139 | ) { |
| 140 | if (verbose_opt > 1) |
| 141 | fprintf(stderr, "mid-transaction detected transid %016llx streamid %04x\n", jd->jd_transid, head->streamid & JREC_STREAMID_MASK); |
| 142 | if (jaddrecord_backtrack(ss, jd) == 0) { |
| 143 | if (verbose_opt) |
| 144 | fprintf(stderr, "mid-transaction streamid %04x collection succeeded\n", head->streamid & JREC_STREAMID_MASK); |
| 145 | goto retry; |
| 146 | } |
| 147 | fprintf(stderr, "mid-transaction streamid %04x collection failed\n", head->streamid & JREC_STREAMID_MASK); |
| 148 | jscan_dispose(js); |
| 149 | return(NULL); |
| 150 | } |
| 151 | } |
| 152 | |
| 153 | /* |
| 154 | * If we've made it to here and we still don't have a hash record |
| 155 | * to track the transaction, create one. |
| 156 | */ |
| 157 | if (jh == NULL) { |
| 158 | jh = malloc(sizeof(*jh)); |
| 159 | bzero(jh, sizeof(*jh)); |
| 160 | *jhp = jh; |
| 161 | jh->jh_first = js; |
| 162 | jh->jh_last = js; |
| 163 | jh->jh_transid = head->streamid; |
| 164 | jh->jh_session = ss; |
| 165 | return (NULL); |
| 166 | } |
| 167 | |
| 168 | /* |
| 169 | * Emplace the stream segment |
| 170 | */ |
| 171 | jh->jh_transid |= head->streamid & JREC_STREAMCTL_MASK; |
| 172 | if (ss->ss_direction == JD_FORWARDS) { |
| 173 | jh->jh_last->js_next = js; |
| 174 | jh->jh_last = js; |
| 175 | } else { |
| 176 | js->js_next = jh->jh_first; |
| 177 | jh->jh_first = js; |
| 178 | } |
| 179 | |
| 180 | /* |
| 181 | * If the transaction is complete, remove the hash entry and return the |
| 182 | * js representing the beginning of the transaction. Otherwise leave |
| 183 | * the hash entry intact and return NULL. |
| 184 | */ |
| 185 | if ((jh->jh_transid & (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END)) == |
| 186 | (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END) |
| 187 | ) { |
| 188 | *jhp = jh->jh_hash; |
| 189 | js = jh->jh_first; |
| 190 | free(jh); |
| 191 | |
| 192 | jnormalize(js); |
| 193 | } else { |
| 194 | js = NULL; |
| 195 | } |
| 196 | return (js); |
| 197 | } |
| 198 | |
| 199 | /* |
| 200 | * Renormalize the jscan list to remove all the meta record headers |
| 201 | * and trailers except for the very first one. |
| 202 | */ |
| 203 | static |
| 204 | void |
| 205 | jnormalize(struct jstream *js) |
| 206 | { |
| 207 | struct jstream *jscan; |
| 208 | off_t off; |
| 209 | |
| 210 | js->js_normalized_off = 0; |
| 211 | js->js_normalized_base = (void *)js->js_head; |
| 212 | js->js_normalized_size = js->js_head->recsize - sizeof(struct journal_rawrecend); |
| 213 | js->js_normalized_total = js->js_normalized_size; |
| 214 | off = js->js_normalized_size; |
| 215 | for (jscan = js->js_next; jscan; jscan = jscan->js_next) { |
| 216 | jscan->js_normalized_off = off; |
| 217 | jscan->js_normalized_base = (char *)jscan->js_head + |
| 218 | sizeof(struct journal_rawrecbeg); |
| 219 | jscan->js_normalized_size = jscan->js_head->recsize - |
| 220 | sizeof(struct journal_rawrecbeg) - |
| 221 | sizeof(struct journal_rawrecend); |
| 222 | off += jscan->js_normalized_size; |
| 223 | js->js_normalized_total += jscan->js_normalized_size; |
| 224 | } |
| 225 | } |
| 226 | |
| 227 | /* |
| 228 | * For sanity's sake I will describe the normal backtracking that must occur, |
| 229 | * but this routine must also operate on reverse-scanned (undo) records |
| 230 | * by forward tracking. |
| 231 | * |
| 232 | * A record has been found that represents the middle or end of a transaction |
| 233 | * when we were expecting the beginning of a transaction. We must backtrack |
| 234 | * to locate the start of the transaction, then process raw records relating |
| 235 | * to the transaction until we reach our current point (jd) again. If |
| 236 | * we find a matching streamid representing the end of a transaction instead |
| 237 | * of the expected start-of-transaction that record belongs to a wholely |
| 238 | * different meta-transaction and the record we seek is known to not be |
| 239 | * available. |
| 240 | * |
| 241 | * jd is the current record, directon is the normal scan direction (we have |
| 242 | * to scan in the reverse direction). |
| 243 | */ |
| 244 | static |
| 245 | int |
| 246 | jaddrecord_backtrack(struct jsession *ss, struct jdata *jd) |
| 247 | { |
| 248 | struct jfile *jf = ss->ss_jfin; |
| 249 | struct jdata *scan; |
| 250 | struct jstream *js; |
| 251 | u_int16_t streamid; |
| 252 | u_int16_t scanid; |
| 253 | |
| 254 | assert(ss->ss_direction == JD_FORWARDS || ss->ss_direction == JD_BACKWARDS); |
| 255 | if (jmodes & JMODEF_INPUT_PIPE) |
| 256 | return(-1); |
| 257 | |
| 258 | streamid = ((struct journal_rawrecbeg *)jd->jd_data)->streamid & JREC_STREAMID_MASK; |
| 259 | |
| 260 | if (ss->ss_direction == JD_FORWARDS) { |
| 261 | /* |
| 262 | * Backtrack in the reverse direction looking for the transaction |
| 263 | * start bit. If we find an end bit instead it belongs to an |
| 264 | * unrelated transaction using the same streamid and there is no |
| 265 | * point continuing. |
| 266 | */ |
| 267 | scan = jref(jd); |
| 268 | while ((scan = jread(jf, scan, JD_BACKWARDS)) != NULL) { |
| 269 | scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; |
| 270 | if ((scanid & JREC_STREAMID_MASK) != streamid) |
| 271 | continue; |
| 272 | if (scanid & JREC_STREAMCTL_END) { |
| 273 | jfree(jf, scan); |
| 274 | return(-1); |
| 275 | } |
| 276 | if (scanid & JREC_STREAMCTL_BEGIN) |
| 277 | break; |
| 278 | } |
| 279 | |
| 280 | /* |
| 281 | * Now jaddrecord the related records. |
| 282 | */ |
| 283 | while (scan != NULL && scan->jd_transid < jd->jd_transid) { |
| 284 | scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; |
| 285 | if ((scanid & JREC_STREAMID_MASK) == streamid) { |
| 286 | js = jaddrecord(ss, scan); |
| 287 | assert(js == NULL); |
| 288 | } |
| 289 | scan = jread(jf, scan, JD_FORWARDS); |
| 290 | } |
| 291 | if (scan == NULL) |
| 292 | return(-1); |
| 293 | jfree(jf, scan); |
| 294 | } else { |
| 295 | /* |
| 296 | * Backtrack in the forwards direction looking for the transaction |
| 297 | * end bit. If we find a start bit instead if belongs to an |
| 298 | * unrelated transaction using the same streamid and there is no |
| 299 | * point continuing. |
| 300 | */ |
| 301 | scan = jref(jd); |
| 302 | while ((scan = jread(jf, scan, JD_FORWARDS)) != NULL) { |
| 303 | scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; |
| 304 | if ((scanid & JREC_STREAMID_MASK) != streamid) |
| 305 | continue; |
| 306 | if (scanid & JREC_STREAMCTL_BEGIN) { |
| 307 | jfree(jf, scan); |
| 308 | return(-1); |
| 309 | } |
| 310 | if (scanid & JREC_STREAMCTL_END) |
| 311 | break; |
| 312 | } |
| 313 | |
| 314 | /* |
| 315 | * Now jaddrecord the related records. |
| 316 | */ |
| 317 | while (scan != NULL && scan->jd_transid > jd->jd_transid) { |
| 318 | scanid = ((struct journal_rawrecbeg *)scan->jd_data)->streamid; |
| 319 | if ((scanid & JREC_STREAMID_MASK) == streamid) { |
| 320 | js = jaddrecord(ss, scan); |
| 321 | assert(js == NULL); |
| 322 | } |
| 323 | scan = jread(jf, scan, JD_BACKWARDS); |
| 324 | } |
| 325 | if (scan == NULL) |
| 326 | return(-1); |
| 327 | jfree(jf, scan); |
| 328 | } |
| 329 | return(0); |
| 330 | } |
| 331 | |
| 332 | void |
| 333 | jscan_dispose(struct jstream *js) |
| 334 | { |
| 335 | struct jstream *jnext; |
| 336 | |
| 337 | if (js->js_alloc_buf) { |
| 338 | free(js->js_alloc_buf); |
| 339 | js->js_alloc_buf = NULL; |
| 340 | js->js_alloc_size = 0; |
| 341 | } |
| 342 | |
| 343 | while (js) { |
| 344 | jnext = js->js_next; |
| 345 | jfree(js->js_session->ss_jfin, js->js_jdata); |
| 346 | js->js_jdata = NULL; |
| 347 | free(js); |
| 348 | js = jnext; |
| 349 | } |
| 350 | } |
| 351 | |
| 352 | /* |
| 353 | * Read the specified block of data out of a linked set of jstream |
| 354 | * structures. Returns 0 on success or an error code on error. |
| 355 | */ |
| 356 | int |
| 357 | jsread(struct jstream *js, off_t off, void *buf, int bytes) |
| 358 | { |
| 359 | const void *ptr; |
| 360 | int n; |
| 361 | |
| 362 | while (bytes) { |
| 363 | n = jsreadany(js, off, &ptr); |
| 364 | if (n == 0) |
| 365 | return (ENOENT); |
| 366 | if (n > bytes) |
| 367 | n = bytes; |
| 368 | bcopy(ptr, buf, n); |
| 369 | buf = (char *)buf + n; |
| 370 | off += n; |
| 371 | bytes -= n; |
| 372 | } |
| 373 | return(0); |
| 374 | } |
| 375 | |
| 376 | /* |
| 377 | * Read the specified block of data out of a linked set of jstream |
| 378 | * structures. Attempt to return a pointer into the data set but |
| 379 | * allocate and copy if that is not possible. Returns 0 on success |
| 380 | * or an error code on error. |
| 381 | */ |
| 382 | int |
| 383 | jsreadp(struct jstream *js, off_t off, const void **bufp, |
| 384 | int bytes) |
| 385 | { |
| 386 | int error = 0; |
| 387 | int n; |
| 388 | |
| 389 | n = jsreadany(js, off, bufp); |
| 390 | if (n < bytes) { |
| 391 | if (js->js_alloc_size < bytes) { |
| 392 | if (js->js_alloc_buf) |
| 393 | free(js->js_alloc_buf); |
| 394 | js->js_alloc_buf = malloc(bytes); |
| 395 | js->js_alloc_size = bytes; |
| 396 | if (js->js_alloc_buf == NULL) |
| 397 | fprintf(stderr, "attempt to allocate %d bytes failed\n", bytes); |
| 398 | assert(js->js_alloc_buf != NULL); |
| 399 | } |
| 400 | error = jsread(js, off, js->js_alloc_buf, bytes); |
| 401 | if (error) { |
| 402 | *bufp = NULL; |
| 403 | } else { |
| 404 | *bufp = js->js_alloc_buf; |
| 405 | } |
| 406 | } |
| 407 | return(error); |
| 408 | } |
| 409 | |
| 410 | int |
| 411 | jsreadcallback(struct jstream *js, ssize_t (*func)(int, const void *, size_t), |
| 412 | int fd, off_t off, int bytes) |
| 413 | { |
| 414 | const void *bufp; |
| 415 | int res; |
| 416 | int n; |
| 417 | int r; |
| 418 | |
| 419 | res = 0; |
| 420 | while (bytes && (n = jsreadany(js, off, &bufp)) > 0) { |
| 421 | if (n > bytes) |
| 422 | n = bytes; |
| 423 | r = func(fd, bufp, n); |
| 424 | if (r != n) { |
| 425 | if (res == 0) |
| 426 | res = -1; |
| 427 | } |
| 428 | res += n; |
| 429 | bytes -= n; |
| 430 | off += n; |
| 431 | } |
| 432 | return(res); |
| 433 | } |
| 434 | |
| 435 | /* |
| 436 | * Return the largest contiguous buffer starting at the specified offset, |
| 437 | * or 0. |
| 438 | */ |
| 439 | int |
| 440 | jsreadany(struct jstream *js, off_t off, const void **bufp) |
| 441 | { |
| 442 | struct jstream *scan; |
| 443 | int n; |
| 444 | |
| 445 | if ((scan = js->js_cache) == NULL || scan->js_normalized_off > off) |
| 446 | scan = js; |
| 447 | while (scan && scan->js_normalized_off <= off) { |
| 448 | js->js_cache = scan; |
| 449 | if (scan->js_normalized_off + scan->js_normalized_size > off) { |
| 450 | n = (int)(off - scan->js_normalized_off); |
| 451 | *bufp = scan->js_normalized_base + n; |
| 452 | return(scan->js_normalized_size - n); |
| 453 | } |
| 454 | scan = scan->js_next; |
| 455 | } |
| 456 | return(0); |
| 457 | } |
| 458 | |