Merge from vendor branch TCSH:
[dragonfly.git] / sys / vfs / hammer / hammer_recover.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/hammer_recover.c,v 1.3 2008/01/15 06:02:57 dillon Exp $
35  */
36
37 #include "hammer.h"
38
39 static void hammer_recover_buffer_stage1(hammer_cluster_t cluster,
40                                 int32_t buf_no);
41 static void hammer_recover_buffer_stage2(hammer_cluster_t cluster,
42                                 int32_t buf_no);
43 static int  hammer_recover_record(hammer_cluster_t cluster,
44                                 hammer_buffer_t buffer, int32_t rec_offset,
45                                 hammer_record_ondisk_t rec);
46 static int hammer_recover_btree(hammer_cluster_t cluster,
47                                 hammer_buffer_t buffer, int32_t rec_offset,
48                                 hammer_record_ondisk_t rec);
49
50 /*
51  * Recover a cluster.  The caller has referenced and locked the cluster.
52  * 
53  * Generally returns 0 on success and EIO if the recovery was unsuccessful.
54  */
55 int
56 hammer_recover(hammer_cluster_t cluster)
57 {
58         int buf_no;
59         int nbuffers;
60         int32_t r;
61         u_int32_t bitmap;
62
63         return(0); /* XXX */
64
65         kprintf("HAMMER_RECOVER %d:%d\n",
66                 cluster->volume->vol_no, cluster->clu_no);
67         KKASSERT(cluster->ondisk->synchronized_tid);
68
69         nbuffers = cluster->ondisk->clu_limit / HAMMER_BUFSIZE;
70         hammer_modify_cluster(cluster);
71
72         /*
73          * Re-initialize the A-lists.
74          */
75         hammer_alist_init(&cluster->alist_master, 1, nbuffers - 1,
76                           HAMMER_ASTATE_FREE);
77         hammer_alist_init(&cluster->alist_btree,
78                           HAMMER_FSBUF_MAXBLKS,
79                           (nbuffers - 1) * HAMMER_FSBUF_MAXBLKS,
80                           HAMMER_ASTATE_ALLOC);
81         hammer_alist_init(&cluster->alist_mdata,
82                           HAMMER_FSBUF_MAXBLKS,
83                           (nbuffers - 1) * HAMMER_FSBUF_MAXBLKS,
84                           HAMMER_ASTATE_ALLOC);
85         hammer_alist_init(&cluster->alist_record,
86                           HAMMER_FSBUF_MAXBLKS,
87                           (nbuffers - 1) * HAMMER_FSBUF_MAXBLKS,
88                           HAMMER_ASTATE_ALLOC);
89
90         /*
91          * Scan the cluster's clu_record_buf_bitmap, reserve record buffers
92          * from the master bitmap before we try to recover their data.  Free
93          * the block of records to alist_record.
94          *
95          * We need to mark the blocks as free in alist_record so future
96          * allocations will dive into the buffer A-list's, but we don't 
97          * want to destroy the underlying buffer A-list's.  Because the
98          * meta data in cluster->alist_record indicates state 00 (all-allocated
99          * but not initialized), it will not dive down into the buffer when
100          * freeing the entire buffer.
101          */
102         for (buf_no = 1; buf_no < nbuffers; ++buf_no) {
103                 bitmap = cluster->ondisk->clu_record_buf_bitmap[buf_no >> 5];
104                 if (bitmap == 0) {
105                         buf_no = ((buf_no + 32) & ~31) - 1;
106                         continue;
107                 }
108                 if ((bitmap & (1 << (buf_no & 31))) == 0)
109                         continue;
110                 r = hammer_alist_alloc_fwd(&cluster->alist_master, 1, buf_no);
111                 KKASSERT(r == buf_no);
112                 hammer_alist_free(&cluster->alist_record,
113                                   buf_no * HAMMER_FSBUF_MAXBLKS,
114                                   HAMMER_FSBUF_MAXBLKS);
115         }
116
117         /*
118          * Scan the cluster's clu_record_buf_bitmap, reassign buffers
119          * from alist_master to alist_record, and reallocate individual
120          * records and any related data reference if they meet the critera.
121          */
122         for (buf_no = 1; buf_no < nbuffers; ++buf_no) {
123                 bitmap = cluster->ondisk->clu_record_buf_bitmap[buf_no >> 5];
124                 if (bitmap == 0) {
125                         buf_no = ((buf_no + 32) & ~31) - 1;
126                         continue;
127                 }
128                 if ((bitmap & (1 << (buf_no & 31))) == 0)
129                         continue;
130                 hammer_recover_buffer_stage1(cluster, buf_no);
131         }
132
133         /*
134          * The cluster is now in good enough shape that general allocations
135          * are possible.  Construct an empty B-Tree root.
136          */
137         {
138                 hammer_node_t croot;
139                 int error;
140
141                 croot = hammer_alloc_btree(cluster, &error);
142                 if (error == 0) {
143                         hammer_modify_node(croot);
144                         bzero(croot->ondisk, sizeof(*croot->ondisk));
145                         croot->ondisk->count = 0;
146                         croot->ondisk->type = HAMMER_BTREE_TYPE_LEAF;
147                         cluster->ondisk->clu_btree_root = croot->node_offset;
148                         hammer_rel_node(croot);
149                 }
150         }
151
152         /*
153          * Scan the cluster's clu_record_buf_bitmap again and regenerate
154          * the B-Tree.
155          *
156          * XXX B-tree record for cluster-push
157          */
158         for (buf_no = 1; buf_no < nbuffers; ++buf_no) {
159                 bitmap = cluster->ondisk->clu_record_buf_bitmap[buf_no >> 5];
160                 if (bitmap == 0) {
161                         buf_no = ((buf_no + 32) & ~31) - 1;
162                         continue;
163                 }
164                 if ((bitmap & (1 << (buf_no & 31))) == 0)
165                         continue;
166                 hammer_recover_buffer_stage2(cluster, buf_no);
167         }
168         kprintf("HAMMER_RECOVER DONE %d:%d\n",
169                 cluster->volume->vol_no, cluster->clu_no);
170
171         /*
172          * Validate the parent cluster pointer. XXX
173          */
174         return(0);
175 }
176
177 /*
178  * Reassign buf_no as a record buffer and recover any related data
179  * references.
180  */
181 static void
182 hammer_recover_buffer_stage1(hammer_cluster_t cluster, int32_t buf_no)
183 {
184         hammer_record_ondisk_t rec;
185         hammer_buffer_t buffer;
186         int32_t rec_no;
187         int32_t rec_offset;
188         int error;
189
190         kprintf("recover buffer1 %d:%d:%d\n",
191                 cluster->volume->vol_no,
192                 cluster->clu_no, buf_no);
193         buffer = hammer_get_buffer(cluster, buf_no, 0, &error);
194         if (error) {
195                 /*
196                  * If we are unable to access the buffer leave it in a
197                  * reserved state on the master alist.
198                  */
199                 kprintf("hammer_recover_buffer_stage1: error "
200                         "recovering %d:%d:%d\n",
201                         cluster->volume->vol_no, cluster->clu_no, buf_no);
202                 return;
203         }
204
205         /*
206          * Recover the buffer, scan and validate allocated records.  Records
207          * which cannot be recovered are freed.
208          */
209         hammer_modify_buffer(buffer);
210         hammer_alist_recover(&buffer->alist, 0, 0, HAMMER_RECORD_NODES);
211         rec_no = -1;
212         for (;;) {
213                 rec_no = hammer_alist_find(&buffer->alist, rec_no + 1,
214                                            HAMMER_RECORD_NODES);
215                 if (rec_no == HAMMER_ALIST_BLOCK_NONE)
216                         break;
217 #if 0
218                 kprintf("recover record %d:%d:%d %d\n",
219                         cluster->volume->vol_no,
220                         cluster->clu_no, buf_no, rec_no);
221 #endif
222                 rec_offset = offsetof(union hammer_fsbuf_ondisk,
223                                       record.recs[rec_no]);
224                 rec_offset += buf_no * HAMMER_BUFSIZE;
225                 rec = &buffer->ondisk->record.recs[rec_no];
226                 error = hammer_recover_record(cluster, buffer, rec_offset, rec);
227                 if (error) {
228                         kprintf("hammer_recover_record: failed %d:%d@%d\n",
229                                 cluster->clu_no, buffer->buf_no, rec_offset);
230                         hammer_alist_free(&buffer->alist, rec_no, 1);
231                 }
232         }
233         hammer_rel_buffer(buffer, 0);
234 }
235
236 /*
237  * Recover a record, at least into a state that doesn't blow up the
238  * filesystem.  Returns 0 on success, non-zero if the record is
239  * unrecoverable.
240  */
241 static int
242 hammer_recover_record(hammer_cluster_t cluster, hammer_buffer_t buffer,
243                              int32_t rec_offset, hammer_record_ondisk_t rec)
244 {
245         hammer_buffer_t dbuf;
246         hammer_tid_t syncid = cluster->ondisk->synchronized_tid;
247         int32_t data_offset;
248         int32_t data_len;
249         int32_t nblks;
250         int32_t dbuf_no;
251         int32_t dblk_no;
252         int32_t base_blk;
253         int32_t r;
254         int error = 0;
255
256         /*
257          * Discard records created after the synchronization point and
258          * undo any deletions made after the synchronization point.
259          */
260         if (rec->base.base.create_tid > syncid) {
261                 kprintf("recover record: syncid too large %016llx/%016llx\n",
262                         rec->base.base.create_tid, syncid);
263                 return(EINVAL);
264         }
265
266         if (rec->base.base.delete_tid > syncid)
267                 rec->base.base.delete_tid = 0;
268
269         /*
270          * Validate the record's B-Tree key
271          */
272         if (rec->base.base.rec_type != HAMMER_RECTYPE_CLUSTER) {
273                 if (hammer_btree_cmp(&rec->base.base,
274                                      &cluster->ondisk->clu_btree_beg) < 0)  {
275                         kprintf("recover record: range low\n");
276                         return(EINVAL);
277                 }
278                 if (hammer_btree_cmp(&rec->base.base,
279                                      &cluster->ondisk->clu_btree_end) >= 0)  {
280                         kprintf("recover record: range high\n");
281                         return(EINVAL);
282                 }
283         }
284
285         /*
286          * Validate the record's data.  If the offset is 0 there is no data
287          * (or it is zero-fill) and we can return success immediately.
288          * Otherwise make sure everything is ok.
289          */
290         data_offset = rec->base.data_offset;
291         data_len = rec->base.data_len;
292
293         if (data_len == 0)
294                 rec->base.data_offset = data_offset = 0;
295         if (data_offset == 0)
296                 goto done;
297
298         /*
299          * Non-zero data offset, recover the data
300          */
301         if (data_offset < HAMMER_BUFSIZE ||
302             data_offset >= cluster->ondisk->clu_limit ||
303             data_len < 0 || data_len > HAMMER_MAXDATA ||
304             data_offset + data_len > cluster->ondisk->clu_limit) {
305                 kprintf("recover record: bad offset/len %d/%d\n",
306                         data_offset, data_len);
307                 return(EINVAL);
308         }
309
310         /*
311          * Check data_offset relative to rec_offset
312          */
313         if (data_offset < rec_offset && data_offset + data_len > rec_offset) {
314                 kprintf("recover record: bad offset: overlapping1\n");
315                 return(EINVAL);
316         }
317         if (data_offset >= rec_offset &&
318             data_offset < rec_offset + sizeof(struct hammer_base_record)) {
319                 kprintf("recover record: bad offset: overlapping2\n");
320                 return(EINVAL);
321         }
322
323         /*
324          * Check for data embedded in the record
325          */
326         if (data_offset >= rec_offset &&
327             data_offset < rec_offset + HAMMER_RECORD_SIZE) {
328                 if (data_offset + data_len > rec_offset + HAMMER_RECORD_SIZE) {
329                         kprintf("recover record: bad offset: overlapping3\n");
330                         return(EINVAL);
331                 }
332                 goto done;
333         }
334
335         /*
336          * Recover the allocated data either out of the cluster's master alist
337          * or as a buffer sub-allocation.
338          */
339         if ((data_len & HAMMER_BUFMASK) == 0) {
340                 if (data_offset & HAMMER_BUFMASK) {
341                         kprintf("recover record: bad offset: unaligned\n");
342                         return(EINVAL);
343                 }
344                 nblks = data_len / HAMMER_BUFSIZE;
345                 dbuf_no = data_offset / HAMMER_BUFSIZE;
346
347                 r = hammer_alist_alloc_fwd(&cluster->alist_master,
348                                            nblks, dbuf_no);
349                 if (r == HAMMER_ALIST_BLOCK_NONE) {
350                         kprintf("recover record: cannot recover offset1\n");
351                         return(EINVAL);
352                 }
353                 if (r != dbuf_no) {
354                         kprintf("recover record: cannot recover offset2\n");
355                         hammer_alist_free(&cluster->alist_master, r, nblks);
356                         return(EINVAL);
357                 }
358         } else {
359                 if ((data_offset & ~HAMMER_BUFMASK) !=
360                     ((data_offset + data_len - 1) & ~HAMMER_BUFMASK)) {
361                         kprintf("recover record: overlaps multiple bufs\n");
362                         return(EINVAL);
363                 }
364                 if ((data_offset & HAMMER_BUFMASK) <
365                     sizeof(struct hammer_fsbuf_head)) {
366                         kprintf("recover record: data in header area\n");
367                         return(EINVAL);
368                 }
369                 if (data_offset & HAMMER_DATA_BLKMASK) {
370                         kprintf("recover record: data blk unaligned\n");
371                         return(EINVAL);
372                 }
373
374                 /*
375                  * Ok, recover the space in the data buffer.
376                  */
377                 dbuf_no = data_offset / HAMMER_BUFSIZE;
378                 r = hammer_alist_alloc_fwd(&cluster->alist_master, 1, dbuf_no);
379                 if (r != dbuf_no && r != HAMMER_ALIST_BLOCK_NONE)
380                         hammer_alist_free(&cluster->alist_master, r, 1);
381                 if (r == dbuf_no) {
382                         /*
383                          * This is the first time we've tried to recover
384                          * data in this data buffer, reinit it (but don't
385                          * zero it out, obviously).
386                          *
387                          * Calling initbuffer marks the data blocks within
388                          * the buffer as being free.
389                          */
390                         dbuf = hammer_get_buffer(cluster, dbuf_no,
391                                                  0, &error);
392                         if (error == 0) {
393                                 hammer_modify_buffer(dbuf);
394                                 hammer_initbuffer(&dbuf->alist, 
395                                                   &dbuf->ondisk->head,
396                                                   HAMMER_FSBUF_DATA);
397                                 dbuf->buf_type = HAMMER_FSBUF_DATA;
398                         }
399                         base_blk = dbuf_no * HAMMER_FSBUF_MAXBLKS;
400                         hammer_alist_free(&cluster->alist_mdata, base_blk,
401                                           HAMMER_DATA_NODES);
402                 } else {
403                         /*
404                          * We've seen this data buffer before.
405                          */
406                         dbuf = hammer_get_buffer(cluster, dbuf_no,
407                                                  0, &error);
408                 }
409                 if (error) {
410                         kprintf("recover record: data: getbuf failed\n");
411                         return(EINVAL);
412                 }
413
414                 if (dbuf->buf_type != HAMMER_FSBUF_DATA) {
415                         hammer_rel_buffer(dbuf, 0);
416                         kprintf("recover record: data: wrong buffer type\n");
417                         return(EINVAL);
418                 }
419
420                 /*
421                  * Figure out the data block number and number of blocks.
422                  */
423                 nblks = (data_len + HAMMER_DATA_BLKMASK) & ~HAMMER_DATA_BLKMASK;
424                 nblks /= HAMMER_DATA_BLKSIZE;
425                 dblk_no = ((data_offset & HAMMER_BUFMASK) - offsetof(union hammer_fsbuf_ondisk, data.data)) / HAMMER_DATA_BLKSIZE;
426                 if ((data_offset & HAMMER_BUFMASK) != offsetof(union hammer_fsbuf_ondisk, data.data[dblk_no])) {
427                         kprintf("dblk_no %d does not match data_offset %d/%d\n",
428                                 dblk_no,
429                                 offsetof(union hammer_fsbuf_ondisk, data.data[dblk_no]),
430                                 (data_offset & HAMMER_BUFMASK));
431                         hammer_rel_buffer(dbuf, 0);
432                         kprintf("recover record: data: not block aligned\n");
433                         Debugger("bad data");
434                         return(EINVAL);
435                 }
436                 dblk_no += dbuf_no * HAMMER_FSBUF_MAXBLKS;
437                 r = hammer_alist_alloc_fwd(&cluster->alist_mdata, nblks, dblk_no);
438                 if (r != dblk_no) {
439                         if (r != HAMMER_ALIST_BLOCK_NONE)
440                                 hammer_alist_free(&cluster->alist_mdata, r, nblks);
441                         hammer_rel_buffer(dbuf, 0);
442                         kprintf("recover record: data: unable to realloc\n");
443                         return(EINVAL);
444                 }
445                 hammer_rel_buffer(dbuf, 0);
446         }
447 done:
448         return(0);
449 }
450
451 /*
452  * Rebuild the B-Tree for the records residing in the specified buffer.
453  */
454 static void
455 hammer_recover_buffer_stage2(hammer_cluster_t cluster, int32_t buf_no)
456 {
457         hammer_record_ondisk_t rec;
458         hammer_buffer_t buffer;
459         int32_t rec_no;
460         int32_t rec_offset;
461         int error;
462
463         buffer = hammer_get_buffer(cluster, buf_no, 0, &error);
464         if (error) {
465                 /*
466                  * If we are unable to access the buffer leave it in a
467                  * reserved state on the master alist.
468                  */
469                 kprintf("hammer_recover_buffer_stage2: error "
470                         "recovering %d:%d:%d\n",
471                         cluster->volume->vol_no, cluster->clu_no, buf_no);
472                 return;
473         }
474
475         /*
476          * Recover the buffer, scan and validate allocated records.  Records
477          * which cannot be recovered are freed.
478          */
479         rec_no = -1;
480         for (;;) {
481                 rec_no = hammer_alist_find(&buffer->alist, rec_no + 1,
482                                            HAMMER_RECORD_NODES);
483                 if (rec_no == HAMMER_ALIST_BLOCK_NONE)
484                         break;
485                 rec_offset = offsetof(union hammer_fsbuf_ondisk,
486                                       record.recs[rec_no]);
487                 rec_offset += buf_no * HAMMER_BUFSIZE;
488                 rec = &buffer->ondisk->record.recs[rec_no];
489                 error = hammer_recover_btree(cluster, buffer, rec_offset, rec);
490                 if (error) {
491                         kprintf("hammer_recover_btree: failed %d:%d@%d\n",
492                                 cluster->clu_no, buffer->buf_no, rec_offset);
493                         /* XXX free the record and its data? */
494                         /*hammer_alist_free(&buffer->alist, rec_no, 1);*/
495                 }
496         }
497         hammer_rel_buffer(buffer, 0);
498 }
499
500 static int
501 hammer_recover_btree(hammer_cluster_t cluster, hammer_buffer_t buffer,
502                       int32_t rec_offset, hammer_record_ondisk_t rec)
503 {
504         struct hammer_cursor cursor;
505         union hammer_btree_elm elm;
506         hammer_cluster_t ncluster;
507         int error = 0;
508
509         /*
510          * Check for a spike record.
511          */
512         kprintf("hammer_recover_btree cluster %p (%d) type %d\n",
513                 cluster, cluster->clu_no, rec->base.base.rec_type);
514         if (rec->base.base.rec_type == HAMMER_RECTYPE_CLUSTER) {
515                 hammer_volume_t ovolume = cluster->volume;
516                 hammer_volume_t nvolume;
517
518                 nvolume = hammer_get_volume(ovolume->hmp, rec->spike.vol_no,
519                                             &error);
520                 if (error)
521                         return(error);
522                 ncluster = hammer_get_cluster(nvolume, rec->spike.clu_no,
523                                               &error, 0);
524                 hammer_rel_volume(nvolume, 0);
525                 if (error)
526                         return(error);
527
528                 /*
529                  * Validate the cluster.  Allow the offset to be fixed up.
530                  */
531                 if (ncluster->ondisk->clu_btree_parent_vol_no != ovolume->vol_no ||
532                     ncluster->ondisk->clu_btree_parent_clu_no != cluster->clu_no) {
533                         kprintf("hammer_recover: Bad cluster spike hookup: "
534                                 "%d:%d != %d:%d\n",
535                                 ncluster->ondisk->clu_btree_parent_vol_no,
536                                 ncluster->ondisk->clu_btree_parent_clu_no,
537                                 ovolume->vol_no,
538                                 ovolume->vol_no);
539                         error = EINVAL;
540                         hammer_rel_cluster(ncluster, 0);
541                         return(error);
542                 }
543         } else {
544                 ncluster = NULL;
545         }
546
547         /*
548          * Locate the insertion point.  Note that we are using the cluster-
549          * localized cursor init so parent will start out NULL.
550          */
551         kprintf("hammer_recover_btree init_cursor_cluster cluster %p (%d) type %d ncluster %p\n",
552                 cluster, cluster->clu_no, rec->base.base.rec_type, ncluster);
553         error = hammer_init_cursor_cluster(&cursor, cluster);
554         if (error)
555                 goto failed;
556         KKASSERT(cursor.node);
557         if (ncluster)
558                 cursor.key_beg = ncluster->ondisk->clu_btree_beg;
559         else
560                 cursor.key_beg = rec->base.base;
561         cursor.flags |= HAMMER_CURSOR_INSERT;
562
563         error = hammer_btree_lookup(&cursor);
564         KKASSERT(cursor.node);
565         if (error == 0) {
566                 kprintf("hammer_recover_btree: Duplicate record\n");
567                 hammer_print_btree_elm(&cursor.node->ondisk->elms[cursor.index], HAMMER_BTREE_TYPE_LEAF, cursor.index);
568                 Debugger("duplicate record");
569         }
570         if (error != ENOENT)
571                 goto failed;
572
573         if (ncluster) {
574                 /*
575                  * Spike record
576                  */
577                 kprintf("recover spike clu %d %016llx-%016llx\n",
578                         ncluster->clu_no,
579                         ncluster->ondisk->clu_btree_beg.obj_id,
580                         ncluster->ondisk->clu_btree_end.obj_id);
581                 error = hammer_btree_insert_cluster(&cursor, ncluster,
582                                                     rec_offset);
583                 kprintf("recover spike record error %d\n", error);
584                 if (error)
585                         Debugger("spike recovery");
586         } else {
587                 /*
588                  * Normal record
589                  */
590                 kprintf("recover recrd clu %d %016llx\n",
591                         cluster->clu_no, rec->base.base.obj_id);
592                 elm.leaf.base = rec->base.base;
593                 elm.leaf.rec_offset = rec_offset;
594                 elm.leaf.data_offset = rec->base.data_offset;
595                 elm.leaf.data_len = rec->base.data_len;
596                 elm.leaf.data_crc = rec->base.data_crc;
597
598                 error = hammer_btree_insert(&cursor, &elm);
599         }
600
601         if (error) {
602                 kprintf("hammer_recover_btree: insertion failed\n");
603         }
604
605 failed:
606         if (ncluster)
607                 hammer_rel_cluster(ncluster, 0);
608         hammer_done_cursor(&cursor);
609         return(error);
610 }
611