Merge from vendor branch LESS:
[dragonfly.git] / sys / vfs / hammer / hammer_btree.c
1 /*
2  * Copyright (c) 2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/hammer_btree.c,v 1.3 2007/11/07 00:43:24 dillon Exp $
35  */
36
37 /*
38  * HAMMER BH-Tree index
39  *
40  * HAMMER implements a modified B+Tree.  In documentation this will
41  * simply be refered to as the HAMMER BH-Tree.  Basically a BH-Tree
42  * looks like a B+Tree (A B-Tree which stores its records only at the leafs
43  * of the tree), but adds two additional boundary elements which describe
44  * the left-most and right-most element a node is able to represent.  In
45  * otherwords, we have boundary elements at the two ends of a BH-Tree node
46  * instead of sub-tree pointers.
47  *
48  * A BH-Tree internal node looks like this:
49  *
50  *      B N N N N N N B   <-- boundary and internal elements
51  *       S S S S S S S    <-- subtree pointers
52  *
53  * A BH-Tree leaf node basically looks like this:
54  *
55  *      L L L L L L L L   <-- leaf elemenets
56  *
57  * The recursion radix is reduced by 2 relative to a normal B-Tree but
58  * we get a number of significant benefits for our troubles.
59  *
60  * The big benefit to using a BH-Tree is that it is possible to cache
61  * pointers into the middle of the tree and not have to start searches,
62  * insertions, OR deletions at the root node.   In particular, searches are
63  * able to progress in a definitive direction from any point in the tree
64  * without revisting nodes.  This greatly improves the efficiency of many
65  * operations, most especially record appends.
66  *
67  * BH-Trees also make the stacking of trees fairly straightforward.
68  */
69 #include "hammer.h"
70 #include <sys/buf.h>
71 #include <sys/buf2.h>
72
73 static int btree_search(hammer_btree_cursor_t cursor, hammer_base_elm_t key,
74                         int flags);
75 static int btree_cursor_set_cluster(hammer_btree_cursor_t cursor,
76                         struct hammer_cluster *cluster);
77 static int btree_cursor_set_cluster_by_value(hammer_btree_cursor_t cursor,
78                         int32_t vol_no, int32_t clu_no, hammer_tid_t clu_id);
79 static int btree_cursor_up(hammer_btree_cursor_t cursor);
80 static int btree_cursor_down(hammer_btree_cursor_t cursor);
81 static int btree_split_internal(hammer_btree_cursor_t cursor);
82 static int btree_split_leaf(hammer_btree_cursor_t cursor);
83 static int btree_rebalance_node(hammer_btree_cursor_t cursor);
84 static int btree_collapse(hammer_btree_cursor_t cursor);
85
86 /*
87  * Compare two BH-Tree elements, return -1, 0, or +1 (e.g. similar to strcmp).
88  */
89 static int
90 hammer_btree_cmp(hammer_base_elm_t key1, hammer_base_elm_t key2)
91 {
92 #if 0
93         kprintf("compare obj_id %016llx %016llx\n",
94                 key1->obj_id, key2->obj_id);
95         kprintf("compare rec_type %04x %04x\n",
96                 key1->rec_type, key2->rec_type);
97         kprintf("compare key %016llx %016llx\n",
98                 key1->key, key2->key);
99 #endif
100
101         /*
102          * A key1->obj_id of 0 matches any object id
103          */
104         if (key1->obj_id) {
105                 if (key1->obj_id < key2->obj_id)
106                         return(-4);
107                 if (key1->obj_id > key2->obj_id)
108                         return(4);
109         }
110
111         /*
112          * A key1->rec_type of 0 matches any record type.
113          */
114         if (key1->rec_type) {
115                 if (key1->rec_type < key2->rec_type)
116                         return(-3);
117                 if (key1->rec_type > key2->rec_type)
118                         return(3);
119         }
120
121         /*
122          * There is no special case for key.  0 means 0.
123          */
124         if (key1->key < key2->key)
125                 return(-2);
126         if (key1->key > key2->key)
127                 return(2);
128
129         /*
130          * This test has a number of special cases.  create_tid in key1 is
131          * the as-of transction id, and delete_tid in key1 is NOT USED.
132          *
133          * A key1->create_tid of 0 matches any record regardles of when
134          * it was created or destroyed.  0xFFFFFFFFFFFFFFFFULL should be
135          * used to search for the most current state of the object.
136          *
137          * key2->create_tid is a HAMMER record and will never be
138          * 0.   key2->delete_tid is the deletion transaction id or 0 if 
139          * the record has not yet been deleted.
140          */
141         if (key1->create_tid) {
142                 if (key1->create_tid < key2->create_tid)
143                         return(-1);
144                 if (key2->delete_tid && key1->create_tid >= key2->delete_tid)
145                         return(1);
146         }
147
148         return(0);
149 }
150
151 /*
152  * Create a separator half way inbetween key1 and key2.  For fields just
153  * one unit apart, the separator will match key2.
154  */
155 #define MAKE_SEPARATOR(key1, key2, dest, field) \
156         dest->field = key1->field + ((key2->field - key1->field + 1) >> 1);
157
158 static void
159 hammer_make_separator(hammer_base_elm_t key1, hammer_base_elm_t key2,
160                       hammer_base_elm_t dest)
161 {
162         MAKE_SEPARATOR(key1, key2, dest, obj_id);
163         MAKE_SEPARATOR(key1, key2, dest, rec_type);
164         MAKE_SEPARATOR(key1, key2, dest, key);
165         MAKE_SEPARATOR(key1, key2, dest, create_tid);
166         dest->delete_tid = 0;
167         dest->obj_type = 0;
168         dest->reserved07 = 0;
169 }
170
171 #undef MAKE_SEPARATOR
172
173 /*
174  * Return whether a generic internal or leaf node is full
175  */
176 static int
177 btree_node_is_full(struct hammer_base_node *node)
178 {
179         switch(node->type) {
180         case HAMMER_BTREE_INTERNAL_NODE:
181                 if (node->count == HAMMER_BTREE_INT_ELMS)
182                         return(1);
183                 break;
184         case HAMMER_BTREE_LEAF_NODE:
185                 if (node->count == HAMMER_BTREE_LEAF_ELMS)
186                         return(1);
187                 break;
188         default:
189                 panic("illegal btree subtype");
190         }
191         return(0);
192 }
193
194 static int
195 btree_max_elements(u_int8_t type)
196 {
197         if (type == HAMMER_BTREE_LEAF_NODE)
198                 return(HAMMER_BTREE_LEAF_ELMS);
199         if (type == HAMMER_BTREE_INTERNAL_NODE)
200                 return(HAMMER_BTREE_INT_ELMS);
201         panic("btree_max_elements: bad type %d\n", type);
202 }
203
204 /*
205  * Initialize a cursor, setting the starting point for a BH-Tree search.
206  *
207  * The passed cluster must be locked.  This function will add a reference
208  * to it.
209  */
210 int
211 hammer_btree_cursor_init(hammer_btree_cursor_t cursor,
212                          struct hammer_cluster *cluster)
213 {
214         int error;
215
216         cursor->cluster = NULL;
217         cursor->node_buffer = NULL;
218         cursor->parent_buffer = NULL;
219         cursor->node = NULL;
220         cursor->parent = NULL;
221         cursor->index = 0;
222         cursor->parent_index = 0;
223         error = btree_cursor_set_cluster(cursor, cluster);
224         return(error);
225 }
226
227 /*
228  * Clean up a HAMMER BH-Tree cursor after we are finished using it.
229  */
230 void
231 hammer_btree_cursor_done(hammer_btree_cursor_t cursor)
232 {
233         if (cursor->node_buffer) {
234                 hammer_put_buffer(cursor->node_buffer, 0);
235                 cursor->node_buffer = NULL;
236         }
237         if (cursor->parent_buffer) {
238                 hammer_put_buffer(cursor->parent_buffer, 0);
239                 cursor->parent_buffer = NULL;
240         }
241         if (cursor->cluster) {
242                 hammer_put_cluster(cursor->cluster, 0);
243                 cursor->cluster = NULL;
244         }
245         cursor->node = NULL;
246         cursor->parent = NULL;
247         cursor->left_bound = NULL;
248         cursor->right_bound = NULL;
249         cursor->index = 0;
250         cursor->parent_index = 0;
251 }
252
253 /*
254  * Initialize a btree info structure and its associated cursor prior to
255  * running a BH-Tree operation.
256  */
257 int
258 hammer_btree_info_init(hammer_btree_info_t info, struct hammer_cluster *cluster)
259 {
260         int error;
261
262         error = hammer_btree_cursor_init(&info->cursor, cluster);
263         info->data_buffer = NULL;
264         info->record_buffer = NULL;
265         info->data = NULL;
266         info->rec = NULL;
267         return (error);
268 }
269
270 /*
271  * Clean up a BH-Tree info structure after we are finished using it.
272  */
273 void
274 hammer_btree_info_done(hammer_btree_info_t info)
275 {
276         hammer_btree_cursor_done(&info->cursor);
277         if (info->data_buffer) {
278                 hammer_put_buffer(info->data_buffer, 0);
279                 info->data_buffer = NULL;
280         }
281         if (info->record_buffer) {
282                 hammer_put_buffer(info->record_buffer, 0);
283                 info->record_buffer = NULL;
284         }
285         info->data = NULL;
286         info->rec = NULL;
287 }
288
289 /*
290  * Search a cluster's BH-Tree.  Return the matching node.  The search
291  * normally traverses clusters but will terminate at a cluster entry
292  * in a leaf if HAMMER_BTREE_CLUSTER_TAG is specified.  If this flag
293  * is specified EIO is returned if the search would otherwise have to
294  * cursor up into a parent cluster.
295  *
296  * The cursor must be completely initialized on entry.  If the cursor is
297  * at the root of a cluster, the parent pointer & buffer may be NULL (in
298  * that case the bounds point to the bounds in the cluster header).  The
299  * node_buffer and node must always be valid.
300  *
301  * The search code may be forced to iterate up the tree if the conditions
302  * required for an insertion or deletion are not met.  This does not occur
303  * very often.
304  *
305  * INSERTIONS: The search will split full nodes and leaves on its way down
306  * and guarentee that the leaf it ends up on is not full.
307  *
308  * DELETIONS: The search will rebalance the tree on its way down.
309  */
310 static 
311 int
312 btree_search(hammer_btree_cursor_t cursor, hammer_base_elm_t key, int flags)
313 {
314         struct hammer_btree_leaf_node *leaf;
315         int error;
316         int i;
317         int r;
318
319         /*
320          * Move our cursor up the tree until we find a node whos range covers
321          * the key we are trying to locate.  This may move us between
322          * clusters.
323          *
324          * Since the root cluster always has a root BH-Tree node, info->node
325          * is always non-NULL if no error occured.  The parent field will be
326          * non-NULL unless we are at the root of a cluster.
327          */
328         while (hammer_btree_cmp(key, cursor->left_bound) < 0 ||
329                hammer_btree_cmp(key, cursor->right_bound) >= 0) {
330                 /*
331                  * Must stay in current cluster if flagged, code should never
332                  * use the flag if it wants us to traverse to the parent
333                  * cluster.
334                  */
335                 if (cursor->parent == NULL &&
336                     (flags & HAMMER_BTREE_CLUSTER_TAG)) {
337                         error = EIO;
338                         goto done;
339                 }
340                 error = btree_cursor_up(cursor);
341                 if (error)
342                         goto done;
343         }
344         KKASSERT(cursor->node != NULL);
345
346         /*
347          * If we are inserting we can't start at a full node if the parent
348          * is also full (because there is no way to split the node),
349          * continue running up the tree until we hit the root of the
350          * current cluster or until the requirement is satisfied.
351          */
352         while (flags & HAMMER_BTREE_INSERT) {
353                 if (btree_node_is_full(&cursor->node->base) == 0)
354                         break;
355                 if (cursor->parent == NULL)
356                         break;
357                 if (cursor->parent->base.count != HAMMER_BTREE_INT_ELMS)
358                         break;
359                 error = btree_cursor_up(cursor);
360                 if (error)
361                         goto done;
362         }
363
364         /*
365          * If we are deleting we can't start at a node with only one element
366          * unless it is root, because all of our code assumes that nodes
367          * will never be empty.
368          *
369          * This handles the case where the cursor is sitting at a leaf and
370          * either the leaf or parent contain an insufficient number of
371          * elements.
372          */
373         while (flags & HAMMER_BTREE_DELETE) {
374                 if (cursor->node->base.count > 1)
375                         break;
376                 if (cursor->parent == NULL)
377                         break;
378                 KKASSERT(cursor->node->base.count != 0);
379                 error = btree_cursor_up(cursor);
380                 if (error)
381                         goto done;
382         }
383
384 new_cluster:
385         /*
386          * Push down through internal nodes to locate the requested key.
387          */
388         while (cursor->node->base.type == HAMMER_BTREE_INTERNAL_NODE) {
389                 struct hammer_btree_internal_node *node;
390
391                 /*
392                  * If we are a the root node and deleting, try to collapse
393                  * all of the root's children into the root.  This is the
394                  * only point where tree depth is reduced.
395                  */
396                 if ((flags & HAMMER_BTREE_DELETE) && cursor->parent == NULL) {
397                         error = btree_collapse(cursor);
398                         if (error)
399                                 goto done;
400                 }
401
402                 /*
403                  * Scan the node to find the subtree index to push down into.
404                  * We go one-past, then back-up.  The key should never be
405                  * less then the left-hand boundary so I should never wind
406                  * up 0.
407                  */
408                 node = &cursor->node->internal;
409                 for (i = 0; i < node->base.count; ++i) {
410                         r = hammer_btree_cmp(key, &node->elms[i].base);
411                         if (r < 0)
412                                 break;
413                 }
414                 KKASSERT(i != 0);
415
416                 /*
417                  * The push-down index is now i - 1.
418                  */
419                 --i;
420                 cursor->index = i;
421
422                 /*
423                  * Handle insertion and deletion requirements.
424                  *
425                  * If inserting split full nodes.  The split code will
426                  * adjust cursor->node and cursor->index if the current
427                  * index winds up in the new node.
428                  */
429                 if (flags & HAMMER_BTREE_INSERT) {
430                         if (node->base.count == HAMMER_BTREE_INT_ELMS) {
431                                 error = btree_split_internal(cursor);
432                                 if (error)
433                                         goto done;
434                                 /*
435                                  * reload stale pointers
436                                  */
437                                 i = cursor->index;
438                                 node = &cursor->node->internal;
439                         }
440                 }
441
442                 /*
443                  * If deleting rebalance - do not allow the child to have
444                  * just one element or we will not be able to delete it.
445                  *
446                  * Neither internal or leaf nodes (except a root-leaf) are
447                  * allowed to drop to 0 elements.
448                  *
449                  * Our separators may have been reorganized after rebalancing,
450                  * so we have to pop back up and rescan.
451                  */
452                 if (flags & HAMMER_BTREE_DELETE) {
453                         if (node->elms[i].subtree_count <= 1) {
454                                 error = btree_rebalance_node(cursor);
455                                 if (error)
456                                         goto done;
457                                 /* cursor->index is invalid after call */
458                                 goto new_cluster;
459                         }
460                 }
461
462                 /*
463                  * Push down (push into new node, existing node becomes
464                  * the parent).
465                  */
466                 error = btree_cursor_down(cursor);
467                 if (error)
468                         goto done;
469         }
470
471
472         /*
473          * We are at a leaf, do a linear search of the key array.
474          * (XXX do a binary search).  On success the index is set to the
475          * matching element, on failure the index is set to the insertion
476          * point.
477          *
478          * Boundaries are not stored in leaf nodes, so the index can wind
479          * up to the left of element 0 (index == 0) or past the end of
480          * the array (index == leaf->base.count).
481          */
482         leaf = &cursor->node->leaf;
483         KKASSERT(leaf->base.count <= HAMMER_BTREE_LEAF_ELMS);
484
485         for (i = 0; i < leaf->base.count; ++i) {
486                 r = hammer_btree_cmp(key, &leaf->elms[i].base);
487                 if (r < 0)
488                         break;
489                 if (r == 0) {
490                         /*
491                          * Return an exact match unless this is a cluster
492                          * element.  If it is, and the cluster tag flag has
493                          * not been set, push into the new cluster and
494                          * continue the search.
495                          */
496                         cursor->index = i;
497                         if ((leaf->elms[i].base.obj_type &
498                              HAMMER_OBJTYPE_CLUSTER_FLAG) &&
499                             (flags & HAMMER_BTREE_CLUSTER_TAG) == 0) {
500                                 error = btree_cursor_down(cursor);
501                                 if (error)
502                                         goto done;
503                                 goto new_cluster;
504                         }
505                         error = 0;
506                         goto done;
507                 }
508         }
509
510         /*
511          * We could not find an exact match.  Check for a cluster
512          * recursion.  The cluster's range is bracketed by two
513          * leaf elements.  One of the two must be in this node.
514          */
515         if ((flags & HAMMER_BTREE_CLUSTER_TAG) == 0) {
516                 if (i == leaf->base.count) {
517                         if (leaf->elms[i-1].base.obj_type &
518                             HAMMER_OBJTYPE_CLUSTER_FLAG) {
519                                 cursor->index = i - 1;
520                                 error = btree_cursor_down(cursor);
521                                 if (error)
522                                         goto done;
523                                 goto new_cluster;
524                         }
525                 } else {
526                         if (leaf->elms[i].base.obj_type &
527                             HAMMER_OBJTYPE_CLUSTER_FLAG) {
528                                 cursor->index = i;
529                                 error = btree_cursor_down(cursor);
530                                 if (error)
531                                         goto done;
532                                 goto new_cluster;
533                         }
534                 }
535         }
536
537         /*
538          * If inserting split a full leaf before returning.  This
539          * may have the side effect of adjusting cursor->node and
540          * cursor->index.
541          *
542          * We delayed the split in order to avoid any unnecessary splits.
543          *
544          * XXX parent's parent's subtree_count will be wrong after
545          * this (keep parent of parent around too?  ugh).
546          */
547         cursor->index = i;
548         if ((flags & HAMMER_BTREE_INSERT) &&
549             leaf->base.count == HAMMER_BTREE_LEAF_ELMS) {
550                 error = btree_split_leaf(cursor);
551                 /* NOT USED
552                 i = cursor->index;
553                 node = &cursor->node->internal;
554                 */
555                 if (error)
556                         goto done;
557         }
558         error = ENOENT;
559
560         /*
561          * Set the cursor's last_error.  This is used primarily by
562          * btree_search_fwd() to determine whether it needs to skip
563          * the current element or not.
564          */
565 done:
566         cursor->last_error = error;
567         return(error);
568 }
569
570 /*
571  * Ignoring key->key, iterate records after a search.  The next record which
572  * matches the key (except for key->key) is returned.   To synchronize with
573  * btree_search() we only check the record at the current cursor index if
574  * cursor->last_error is ENOENT, indicating that it is at an insertion point
575  * (so if we are not at the end of the node's array, the current record
576  * will be the 'next' record to check).
577  *
578  * Records associated with regular files use the ending offset of the data
579  * block (non inclusive) as their key.  E.g. if you write 20 bytes to a
580  * file at offset 10, the HAMMER record will use a key of 30 for that record.
581  * This way the cursor is properly positioned after a search so the first
582  * record returned by btree_iterate() is (likely) the one containing the
583  * desired data.  This also means the results of the initial search are
584  * ignored.
585  *
586  * This function is also used to iterate through database records, though
587  * in that case the caller utilizes any exact match located by btree_search()
588  * before diving into the iteration.
589  */
590 int
591 hammer_btree_iterate(hammer_btree_cursor_t cursor, hammer_base_elm_t key)
592 {
593         hammer_btree_node_t node;
594         struct hammer_base_elm save_base;
595         int64_t save_key;
596         int error;
597         int r;
598         int s;
599
600         /*
601          * If last_error is not zero we are at the insertion point and must
602          * start at the current element.  If last_error is zero the caller
603          * has already processed the current cursor (or doesn't care about
604          * it) and we must iterate forwards.
605          */
606         node = cursor->node;
607         if (cursor->index < node->base.count && cursor->last_error == 0)
608                 ++cursor->index;
609
610         for (;;) {
611                 /*
612                  * We iterate up the tree while we are at the last element.
613                  *
614                  * If we hit the root of the cluster we have to cursor up
615                  * into the parent cluster, but then search to position
616                  * the cursor at the next logical element in the iteration.
617                  * We ignore an ENOENT error in this case.
618                  *
619                  * XXX this could be optimized by storing the information in
620                  * the parent reference.
621                  */
622 goupagain:
623                 while (cursor->index == node->base.count) {
624                         if (cursor->parent == NULL) {
625                                 save_base = *cursor->right_bound;
626                                 error = btree_cursor_up(cursor);
627                                 if (error)
628                                         goto done;
629                                 error = btree_search(cursor, &save_base, 0);
630                                 if (error == ENOENT)
631                                         error = 0;
632                                 if (error)
633                                         goto done;
634                                 node = cursor->node;
635                                 /* node cannot be empty. cursor->index is 0 */
636                                 KKASSERT(cursor->index != node->base.count);
637                                 /* do not further increment the index */
638                         } else {
639                                 error = btree_cursor_up(cursor);
640                                 if (error)
641                                         goto done;
642                                 node = cursor->node;
643                                 KKASSERT(cursor->index != node->base.count);
644                                 ++cursor->index;
645                         }
646                 }
647
648                 /*
649                  * Iterate down the tree while we are at an internal node.
650                  * Nodes cannot be empty, assert the case because if one is
651                  * we will wind up in an infinite loop.
652                  *
653                  * We can avoid iterating through large swaths of transaction
654                  * id space if the left and right separators are the same
655                  * except for their transaction spaces.  We can then skip
656                  * the node if the left and right transaction spaces are the
657                  * same sign.  This directly optimized accesses to files with
658                  * HUGE transactional histories, such as database files.
659                  */
660                 while (node->base.type == HAMMER_BTREE_INTERNAL_NODE) {
661                         struct hammer_btree_internal_elm *elm;
662
663                         elm = &node->internal.elms[cursor->index];
664                         if (elm[0].base.obj_id == elm[1].base.obj_id &&
665                             elm[0].base.rec_type == elm[1].base.rec_type &&
666                             elm[0].base.key == elm[1].base.key) {
667                                 save_key = key->key;
668                                 key->key = elm[0].base.key;
669                                 r = hammer_btree_cmp(key, &elm[0].base);
670                                 key->key = elm[1].base.key;
671                                 s = hammer_btree_cmp(key, &elm[1].base);
672                                 key->key = save_key;
673                                 if ((r < 0 && s < 0) || (r > 0 && s > 0)) {
674                                         ++cursor->index;
675                                         goto goupagain;
676                                 }
677                         }
678                         error = btree_cursor_down(cursor);
679                         if (error)
680                                 goto done;
681                         KKASSERT(cursor->index != node->base.count);
682                         node = cursor->node;
683                 }
684
685                 /*
686                  * Determine if the record at the cursor matches, sans
687                  * key->key (which is what we are iterating).
688                  */
689                 {
690                         union hammer_btree_leaf_elm *elm;
691
692                         elm = &node->leaf.elms[cursor->index];
693                         save_key = key->key;
694                         key->key = elm->base.key;
695                         r = hammer_btree_cmp(key, &elm->base);
696                         key->key = save_key;
697                 }
698
699                 /*
700                  * The iteration stops if the obj_id or rec_type no longer
701                  * match (unless the original key set those to 0, meaning the
702                  * caller WANTS to iterate through those as well), denoted
703                  * by -3,-2 or +2,+3 return values.  Otherwise the mismatch
704                  * is due to the transaction id and we can get both negative
705                  * and positive values... we have to skip those records and 
706                  * continue searching.
707                  */
708                 if (r == 0) {
709                         error = 0;
710                         break;
711                 }
712                 if (r < -2 || r > 2) {
713                         error = ENOENT;
714                         break;
715                 }
716                 ++cursor->index;
717         }
718 done:
719         cursor->last_error = error;
720         return(error);
721 }
722
723 /*
724  * Look up the key in the HAMMER BH-Tree and fill in the rest of the
725  * info structure with the results according to flags.  0 is returned on
726  * success, non-zero on error.
727  *
728  * The caller initializes (key, cluster) and makes the call.  The cluster
729  * must be referenced and locked on call.  The function can chain through
730  * multiple clusters and will replace the passed cluster as it does so,
731  * dereferencing and unlocking it, and referencing and locking the chain.
732  * On return info->cluster will still be referenced and locked but may
733  * represent a different cluster.
734  */
735 int
736 hammer_btree_lookup(hammer_btree_info_t info, hammer_base_elm_t key, int flags)
737 {
738         int error;
739
740         error = btree_search(&info->cursor, key, flags);
741         if (error == 0 &&
742             (flags & (HAMMER_BTREE_GET_RECORD|HAMMER_BTREE_GET_DATA))) {
743                 error = hammer_btree_extract(info, flags);
744         }
745         return(error);
746 }
747
748 int
749 hammer_btree_extract(hammer_btree_info_t info, int flags)
750 {
751         struct hammer_cluster *cluster;
752         hammer_btree_leaf_elm_t elm;
753         int32_t cloff;
754         int error;
755         int iscl;
756
757         /* 
758          * Extract the record and data reference if requested.
759          *
760          * A cluster record type has no data reference, the information
761          * is stored directly in the record and BH-Tree element.
762          *
763          * The case where the data reference resolves to the same buffer
764          * as the record reference must be handled.
765          */
766         elm = &info->cursor.node->leaf.elms[info->cursor.index];
767         iscl = (elm->base.obj_type & HAMMER_OBJTYPE_CLUSTER_FLAG) != 0;
768         cluster = info->cursor.cluster;
769         error = 0;
770
771         if ((flags & HAMMER_BTREE_GET_RECORD) && error == 0) {
772                 cloff = iscl ? elm->cluster.rec_offset : elm->record.rec_offset;
773
774
775                 info->rec = hammer_bread(cluster, cloff, HAMMER_FSBUF_RECORDS,
776                                          &error, &info->record_buffer);
777         } else {
778                 cloff = 0;
779         }
780         if ((flags & HAMMER_BTREE_GET_DATA) && iscl == 0 && error == 0) {
781                 if ((cloff ^ elm->record.data_offset) & ~HAMMER_BUFMASK) {
782                         info->data = hammer_bread(cluster,
783                                                   elm->record.data_offset,
784                                                   HAMMER_FSBUF_DATA,
785                                                   &error, &info->record_buffer);
786                 } else {
787                         info->data = (void *)
788                                 ((char *)info->data_buffer->ondisk +
789                                  (elm->record.data_offset & HAMMER_BUFMASK));
790                 }
791         }
792         return(error);
793 }
794
795
796 /*
797  * Insert a record into a BH-Tree's cluster.  The caller has already
798  * reserved space for the record and data and must handle a ENOSPC
799  * return.
800  */
801 int
802 hammer_btree_insert(struct hammer_btree_info *info, hammer_btree_leaf_elm_t elm)
803 {
804         struct hammer_btree_cursor *cursor;
805         struct hammer_btree_internal_node *parent;
806         struct hammer_btree_leaf_node *leaf;
807         int i;
808
809 #if 0
810         /* HANDLED BY CALLER */
811         /*
812          * Issue a search to get our cursor at the right place.  The search
813          * will get us to a leaf node.
814          *
815          * The search also does some setup for our insert, so there is always
816          * room in the leaf.
817          */
818         error = btree_search(&info->cursor, &elm->base, HAMMER_BTREE_INSERT);
819         if (error != ENOENT) {
820                 if (error == 0)
821                         error = EEXIST;
822                 return (error);
823         }
824 #endif
825
826         /*
827          * Insert the element at the leaf node and update the count in the
828          * parent.  It is possible for parent to be NULL, indicating that
829          * the root of the BH-Tree in the cluster is a leaf.  It is also
830          * possible for the leaf to be empty.
831          *
832          * Remember that the right-hand boundary is not included in the
833          * count.
834          */
835         cursor = &info->cursor;
836         leaf = &cursor->node->leaf;
837         i = cursor->index;
838         KKASSERT(leaf->base.count < HAMMER_BTREE_LEAF_ELMS);
839         bcopy(&leaf->elms[i], &leaf->elms[i+1],
840               (leaf->base.count - i + 1) * sizeof(leaf->elms[0]));
841         leaf->elms[i] = *elm;
842         ++leaf->base.count;
843
844         if ((parent = cursor->parent) != NULL) {
845                 i = cursor->parent_index;
846                 ++parent->elms[i].subtree_count;
847                 KKASSERT(parent->elms[i].subtree_count <= leaf->base.count);
848                 hammer_modify_buffer(cursor->parent_buffer);
849         }
850         hammer_modify_buffer(cursor->node_buffer);
851         return(0);
852 }
853
854 /*
855  * Delete a record from the BH-Tree.
856  */
857 int
858 hammer_btree_delete(struct hammer_btree_info *info, hammer_base_elm_t key)
859 {
860         struct hammer_btree_cursor *cursor;
861         struct hammer_btree_internal_node *parent;
862         struct hammer_btree_leaf_node *leaf;
863         int i;
864
865 #if 0
866         /* HANDLED BY CALLER */
867         /*
868          * Locate the leaf element to delete.  The search is also responsible
869          * for doing some of the rebalancing work on its way down.
870          */
871         error = btree_search(&info->cursor, key, HAMMER_BTREE_DELETE);
872         if (error)
873                 return (error);
874 #endif
875
876         /*
877          * Delete the element from the leaf node.  We leave empty leafs alone
878          * and instead depend on a future search to locate and destroy them.
879          * Otherwise we would have to recurse back up the tree to adjust
880          * the parent's subtree_count and we do not want to do that.
881          *
882          * Remember that the right-hand boundary is not included in the
883          * count.
884          */
885         cursor = &info->cursor;
886         leaf = &cursor->node->leaf;
887         i = cursor->index;
888
889         KKASSERT(cursor->node->base.type == HAMMER_BTREE_LEAF_NODE);
890         bcopy(&leaf->elms[i+1], &leaf->elms[i],
891               (leaf->base.count - i) * sizeof(leaf->elms[0]));
892         --leaf->base.count;
893         if ((parent = cursor->parent) != NULL) {
894                 /*
895                  * Adjust parent's notion of the leaf's count.  subtree_count
896                  * is only approximately, it is allowed to be too small but
897                  * never allowed to be too large.  Make sure we don't drop
898                  * the count below 0.
899                  */
900                 i = cursor->parent_index;
901                 if (parent->elms[i].subtree_count)
902                         --parent->elms[i].subtree_count;
903                 KKASSERT(parent->elms[i].subtree_count <= leaf->base.count);
904                 hammer_modify_buffer(cursor->parent_buffer);
905         }
906         hammer_modify_buffer(cursor->node_buffer);
907         return(0);
908 }
909
910 /************************************************************************
911  *                              CURSOR SUPPORT                          *
912  ************************************************************************
913  *
914  * These routines do basic cursor operations.  This support will probably
915  * be expanded in the future to add link fields for linear scans.
916  */
917
918 /*
919  * Unconditionally set the cursor to the root of the specified cluster.
920  * The current cursor node is set to the root node of the cluster (which
921  * can be an internal node or a degenerate leaf), and the parent info
922  * is cleaned up and cleared.
923  *
924  * The passed cluster must be locked.  This function will add a reference
925  * to it.  The cursor must already have a cluster assigned to it, which we
926  * will replace.
927  */
928 static
929 int
930 btree_cursor_set_cluster_by_value(hammer_btree_cursor_t cursor,
931                         int32_t vol_no, int32_t clu_no, hammer_tid_t clu_id)
932 {
933         struct hammer_cluster *cluster;
934         struct hammer_volume *volume;
935         int error = 0;
936
937         if (vol_no < 0)
938                 return(EIO);
939         cluster = cursor->cluster;
940         KKASSERT(cluster != NULL);
941         if (vol_no == cluster->volume->vol_no) {
942                 cluster = hammer_get_cluster(cluster->volume, clu_no,
943                                              &error, 0);
944         } else {
945                 volume = hammer_get_volume(cluster->volume->hmp,
946                                            vol_no, &error);
947                 if (volume) {
948                         cluster = hammer_get_cluster(volume, clu_no,
949                                                      &error, 0);
950                         hammer_put_volume(volume, 0);
951                 } else {
952                         cluster = NULL;
953                 }
954         }
955
956         /*
957          * Make sure the cluster id matches.  XXX At the moment the
958          * clu_id in the btree cluster element is only 32 bits, so only
959          * compare the low 32 bits.
960          */
961         if (cluster) {
962                 if ((int32_t)cluster->ondisk->clu_id == (int32_t)clu_id) {
963                         btree_cursor_set_cluster(cursor, cluster);
964                 } else {
965                         error = EIO;
966                 }
967                 hammer_put_cluster(cluster, 0);
968         }
969         return (error);
970 }
971
972 static
973 int
974 btree_cursor_set_cluster(hammer_btree_cursor_t cursor,
975                          struct hammer_cluster *cluster)
976 {
977         int error = 0;
978
979         hammer_dup_cluster(&cursor->cluster, cluster);
980         cursor->node = hammer_bread(cluster,
981                                     cluster->ondisk->clu_btree_root,
982                                     HAMMER_FSBUF_BTREE,
983                                     &error,
984                                     &cursor->node_buffer);
985         cursor->index = 0;
986         if (cursor->parent) {
987                 hammer_put_buffer(cursor->parent_buffer, 0);
988                 cursor->parent_buffer = NULL;
989                 cursor->parent = NULL;
990                 cursor->parent_index = 0;
991         }
992         cursor->left_bound = &cluster->ondisk->clu_btree_beg;
993         cursor->right_bound = &cluster->ondisk->clu_btree_end;
994         return (error);
995 }
996
997 /*
998  * Cursor the node up to the parent node.  If at the root of a cluster,
999  * cursor to the root of the cluster's parent cluster.  Note carefully
1000  * that we do NOT scan the parent cluster to find the leaf that dropped
1001  * into our current cluster.
1002  *
1003  * This function is primarily used by the search code to avoid having
1004  * to search from the root of the filesystem BH-Tree.
1005  */
1006 static
1007 int
1008 btree_cursor_up(hammer_btree_cursor_t cursor)
1009 {
1010         struct hammer_cluster_ondisk *ondisk;
1011         struct hammer_btree_internal_node *parent;
1012         int error;
1013         int i;
1014         int r;
1015
1016         error = 0;
1017         if (cursor->parent == NULL) {
1018                 /*
1019                  * We are at the root of the cluster, pop up to the root
1020                  * of the parent cluster.  Return ENOENT if we are at the
1021                  * root cluster of the filesystem.
1022                  */
1023                 ondisk = cursor->cluster->ondisk;
1024                 if (ondisk->clu_btree_parent_vol_no < 0) {
1025                         error = ENOENT;
1026                 } else {
1027                         error = btree_cursor_set_cluster_by_value(
1028                                     cursor,
1029                                     ondisk->clu_btree_parent_vol_no,
1030                                     ondisk->clu_btree_parent_clu_no,
1031                                     ondisk->clu_btree_parent_clu_id);
1032                 }
1033         } else {
1034                 /*
1035                  * Copy the current node's parent info into the node and load
1036                  * a new parent.
1037                  */
1038                 cursor->index = cursor->parent_index;
1039                 cursor->node = (hammer_btree_node_t)cursor->parent;
1040                 hammer_dup_buffer(&cursor->node_buffer, cursor->parent_buffer);
1041
1042                 /*
1043                  * Load the parent's parent into parent and figure out the
1044                  * parent's element index for its child.  Just NULL it out
1045                  * if we hit the root of the cluster.
1046                  */
1047                 if (cursor->parent->base.parent) {
1048                         parent = hammer_bread(cursor->cluster,
1049                                               cursor->node->base.parent,
1050                                               HAMMER_FSBUF_BTREE,
1051                                               &error,
1052                                               &cursor->parent_buffer);
1053                         for (i = 0; i < parent->base.count; ++i) {
1054                                 r = hammer_btree_cmp(
1055                                         &cursor->node->internal.elms[0].base,
1056                                         &parent->elms[i].base);
1057                                 if (r < 0)
1058                                         break;
1059                         }
1060                         cursor->parent = parent;
1061                         cursor->parent_index = i - 1;
1062                         KKASSERT(parent->elms[i].subtree_offset ==
1063                                  hammer_bclu_offset(cursor->node_buffer,
1064                                                     cursor->node));
1065                 } else {
1066                         hammer_put_buffer(cursor->parent_buffer, 0);
1067                         cursor->parent = NULL;
1068                         cursor->parent_buffer = NULL;
1069                         cursor->parent_index = 0;
1070                 }
1071         }
1072         return(error);
1073 }
1074
1075 /*
1076  * Cursor down into (node, index)
1077  *
1078  * Push down into the current cursor.  The current cursor becomes the parent.
1079  * If the current cursor represents a leaf cluster element this function will
1080  * push into the root of a new cluster and clear the parent fields.
1081  *
1082  * Pushin down at a leaf which is not a cluster element returns EIO.
1083  */
1084 static
1085 int
1086 btree_cursor_down(hammer_btree_cursor_t cursor)
1087 {
1088         hammer_btree_node_t node;
1089         int error;
1090
1091         node = cursor->node;
1092         if (node->base.type == HAMMER_BTREE_LEAF_NODE) {
1093                 /*
1094                  * Push into another cluster
1095                  */
1096                 hammer_btree_leaf_elm_t elm;
1097
1098                 elm = &node->leaf.elms[cursor->index];
1099                 if (elm->base.rec_type == HAMMER_RECTYPE_CLUSTER) {
1100                         error = btree_cursor_set_cluster_by_value(
1101                                     cursor,
1102                                     elm->cluster.vol_no,
1103                                     elm->cluster.clu_no,
1104                                     elm->cluster.verifier);
1105                 } else {
1106                         error = EIO;
1107                 }
1108         } else {
1109                 /*
1110                  * Push into another BH-Tree node (internal or leaf)
1111                  */
1112                 struct hammer_btree_internal_elm *elm;
1113
1114                 KKASSERT(node->base.type == HAMMER_BTREE_INTERNAL_NODE);
1115                 elm = &node->internal.elms[cursor->index];
1116                 KKASSERT(elm->subtree_offset != 0);
1117                 cursor->parent_index = cursor->index;
1118                 cursor->parent = &cursor->node->internal;
1119                 hammer_dup_buffer(&cursor->parent_buffer, cursor->node_buffer);
1120
1121                 cursor->node = hammer_bread(cursor->cluster,
1122                                             elm->subtree_offset,
1123                                             HAMMER_FSBUF_BTREE,
1124                                             &error,
1125                                             &cursor->node_buffer);
1126                 cursor->index = 0;
1127                 cursor->left_bound = &elm[0].base;
1128                 cursor->right_bound = &elm[1].base;
1129                 KKASSERT(cursor->node == NULL ||
1130                          cursor->node->base.parent == elm->subtree_offset);
1131 #ifdef INVARIANTS
1132                 /*
1133                  * The bounds of an internal node must match the parent's
1134                  * partitioning values.  Leaf nodes do not store bounds.
1135                  */
1136                 if (cursor->node->base.type == HAMMER_BTREE_INTERNAL_NODE) {
1137                         KKASSERT(hammer_btree_cmp(cursor->left_bound, &cursor->node->internal.elms[0].base) == 0);
1138                         KKASSERT(hammer_btree_cmp(cursor->right_bound, &cursor->node->internal.elms[cursor->node->base.count].base) == 0);
1139                 }
1140 #endif
1141         }
1142         return(error);
1143 }
1144
1145 /************************************************************************
1146  *                              SPLITTING AND MERGING                   *
1147  ************************************************************************
1148  *
1149  * These routines do all the dirty work required to split and merge nodes.
1150  */
1151
1152 /*
1153  * Split an internal into two nodes and move the separator at the split
1154  * point to the parent.  Note that the parent's parent's element pointing
1155  * to our parent will have an incorrect subtree_count (we don't update it).
1156  * It will be low, which is ok.
1157  *
1158  * Cursor->index indicates the element the caller intends to push into.
1159  * We will adjust cursor->node and cursor->index if that element winds
1160  * up in the split node.
1161  */
1162 static
1163 int
1164 btree_split_internal(hammer_btree_cursor_t cursor)
1165 {
1166         struct hammer_btree_internal_node *parent;
1167         struct hammer_btree_internal_node *node;
1168         struct hammer_btree_internal_node *new_node;
1169         struct hammer_btree_internal_elm *elm;
1170         struct hammer_btree_internal_elm *parent_elm;
1171         struct hammer_buffer *new_buffer;
1172         int32_t parent_offset;
1173         int parent_index;
1174         int made_root;
1175         int split;
1176         int error;
1177         const size_t esize = sizeof(struct hammer_btree_internal_elm);
1178
1179         /* 
1180          * We are splitting but elms[split] will be promoted to the parent,
1181          * leaving the right hand node with one less element.  If the
1182          * insertion point will be on the left-hand side adjust the split
1183          * point to give the right hand side one additional node.
1184          */
1185         node = &cursor->node->internal;
1186         split = (node->base.count + 1) / 2;
1187         if (cursor->index <= split)
1188                 --split;
1189         error = 0;
1190
1191         /*
1192          * If we are at the root of the tree, create a new root node with
1193          * 1 element and split normally.  Avoid making major modifications
1194          * until we know the whole operation will work.
1195          */
1196         parent = cursor->parent;
1197         if (parent == NULL) {
1198                 parent = hammer_alloc_btree(cursor->cluster, &error,
1199                                             &cursor->parent_buffer);
1200                 if (parent == NULL)
1201                         return(error);
1202                 parent->base.count = 1;
1203                 parent->base.parent = 0;
1204                 parent->base.type = HAMMER_BTREE_INTERNAL_NODE;
1205                 parent->base.subtype = node->base.type;
1206                 parent->elms[0].base = cursor->cluster->ondisk->clu_btree_beg;
1207                 parent->elms[0].subtree_offset =
1208                         hammer_bclu_offset(cursor->node_buffer, node);
1209                 parent->elms[1].base = cursor->cluster->ondisk->clu_btree_end;
1210                 made_root = 1;
1211                 cursor->parent_index = 0;       /* insertion point in parent */
1212         } else {
1213                 made_root = 0;
1214         }
1215         parent_offset = hammer_bclu_offset(cursor->parent_buffer, parent);
1216
1217         /*
1218          * Split node into new_node at the split point.
1219          *
1220          *  B O O O P N N B     <-- P = node->elms[split]
1221          *   0 1 2 3 4 5 6      <-- subtree indices
1222          *
1223          *       x x P x x
1224          *        s S S s  
1225          *         /   \
1226          *  B O O O B    B N N B        <--- inner boundary points are 'P'
1227          *   0 1 2 3      4 5 6  
1228          *
1229          */
1230         new_buffer = NULL;
1231         new_node = hammer_alloc_btree(cursor->cluster, &error, &new_buffer);
1232         if (new_node == NULL) {
1233                 if (made_root)
1234                         hammer_free_btree_ptr(cursor->parent_buffer,
1235                                               (hammer_btree_node_t)parent);
1236                 return(error);
1237         }
1238
1239         /*
1240          * Create the new node.  P become the left-hand boundary in the
1241          * new node.  Copy the right-hand boundary as well.
1242          *
1243          * elm is the new separator.
1244          */
1245         elm = &node->elms[split];
1246         bcopy(elm, &new_node->elms[0], (node->base.count - split + 1) * esize);
1247         new_node->base.count = node->base.count - split;
1248         new_node->base.parent = parent_offset;
1249         new_node->base.type = HAMMER_BTREE_INTERNAL_NODE;
1250         new_node->base.subtype = node->base.subtype;
1251         KKASSERT(node->base.type == new_node->base.type);
1252
1253         /*
1254          * Cleanup the original node.  P becomes the new boundary, its
1255          * subtree_offset was moved to the new node.  If we had created
1256          * a new root its parent pointer may have changed.
1257          */
1258         node->base.parent = parent_offset;
1259         elm->subtree_offset = 0;
1260
1261         /*
1262          * Insert the separator into the parent, fixup the parent's
1263          * reference to the original node, and reference the new node.
1264          * The separator is P.
1265          *
1266          * Remember that base.count does not include the right-hand boundary.
1267          */
1268         parent_index = cursor->parent_index;
1269         parent->elms[parent_index].subtree_count = split;
1270         parent_elm = &parent->elms[parent_index+1];
1271         bcopy(parent_elm, parent_elm + 1,
1272               (parent->base.count - parent_index) * esize);
1273         parent_elm->base = elm->base;   /* separator P */
1274         parent_elm->subtree_offset = hammer_bclu_offset(new_buffer, new_node);
1275         parent_elm->subtree_count = new_node->base.count;
1276
1277         hammer_modify_buffer(new_buffer);
1278         hammer_modify_buffer(cursor->parent_buffer);
1279         hammer_modify_buffer(cursor->node_buffer);
1280
1281         /*
1282          * The cluster's root pointer may have to be updated.
1283          */
1284         if (made_root) {
1285                 cursor->cluster->ondisk->clu_btree_root = node->base.parent;
1286                 hammer_modify_cluster(cursor->cluster);
1287         }
1288
1289         /*
1290          * Ok, now adjust the cursor depending on which element the original
1291          * index was pointing at.  If we are >= the split point the push node
1292          * is now in the new node.
1293          *
1294          * NOTE: If we are at the split point itself we cannot stay with the
1295          * original node because the push index will point at the right-hand
1296          * boundary, which is illegal.
1297          */
1298         if (cursor->index >= split) {
1299                 cursor->index -= split;
1300                 cursor->node = (hammer_btree_node_t)new_node;
1301                 hammer_put_buffer(cursor->node_buffer, 0);
1302                 cursor->node_buffer = new_buffer;
1303         }
1304
1305         return (0);
1306 }
1307
1308 /*
1309  * Same as the above, but splits a full leaf node.
1310  */
1311 static
1312 int
1313 btree_split_leaf(hammer_btree_cursor_t cursor)
1314 {
1315         struct hammer_btree_internal_node *parent;
1316         struct hammer_btree_leaf_node *leaf;
1317         struct hammer_btree_leaf_node *new_leaf;
1318         union hammer_btree_leaf_elm *elm;
1319         struct hammer_btree_internal_elm *parent_elm;
1320         struct hammer_buffer *new_buffer;
1321         int32_t parent_offset;
1322         int parent_index;
1323         int made_root;
1324         int split;
1325         int error;
1326         const size_t esize = sizeof(struct hammer_btree_internal_elm);
1327
1328         /* 
1329          * We are splitting but elms[split] will be promoted to the parent,
1330          * leaving the right hand node with one less element.  If the
1331          * insertion point will be on the left-hand side adjust the split
1332          * point to give the right hand side one additional node.
1333          */
1334         leaf = &cursor->node->leaf;
1335         split = (leaf->base.count + 1) / 2;
1336         if (cursor->index <= split)
1337                 --split;
1338         error = 0;
1339
1340         /*
1341          * If we are at the root of the tree, create a new root node with
1342          * 1 element and split normally.  Avoid making major modifications
1343          * until we know the whole operation will work.
1344          */
1345         parent = cursor->parent;
1346         if (parent == NULL) {
1347                 parent = hammer_alloc_btree(cursor->cluster, &error,
1348                                             &cursor->parent_buffer);
1349                 if (parent == NULL)
1350                         return(error);
1351                 parent->base.count = 1;
1352                 parent->base.parent = 0;
1353                 parent->base.type = HAMMER_BTREE_INTERNAL_NODE;
1354                 parent->base.subtype = leaf->base.type;
1355                 parent->elms[0].base = cursor->cluster->ondisk->clu_btree_beg;
1356                 parent->elms[0].subtree_offset =
1357                         hammer_bclu_offset(cursor->node_buffer, leaf);
1358                 parent->elms[1].base = cursor->cluster->ondisk->clu_btree_end;
1359                 made_root = 1;
1360                 cursor->parent_index = 0;       /* insertion point in parent */
1361         } else {
1362                 made_root = 0;
1363         }
1364         parent_offset = hammer_bclu_offset(cursor->parent_buffer, parent);
1365
1366         /*
1367          * Split leaf into new_leaf at the split point.  Select a separator
1368          * value in-between the two leafs but with a bent towards the right
1369          * leaf since comparisons use an 'elm >= separator' inequality.
1370          *
1371          *  L L L L L L L L
1372          *
1373          *       x x P x x
1374          *        s S S s  
1375          *         /   \
1376          *  L L L L     L L L L
1377          */
1378         new_buffer = NULL;
1379         new_leaf = hammer_alloc_btree(cursor->cluster, &error, &new_buffer);
1380         if (new_leaf == NULL) {
1381                 if (made_root)
1382                         hammer_free_btree_ptr(cursor->parent_buffer,
1383                                               (hammer_btree_node_t)parent);
1384                 return(error);
1385         }
1386
1387         /*
1388          * Create the new node.  P become the left-hand boundary in the
1389          * new node.  Copy the right-hand boundary as well.
1390          */
1391         elm = &leaf->elms[split];
1392         bcopy(elm, &new_leaf->elms[0], (leaf->base.count - split) * esize);
1393         new_leaf->base.count = leaf->base.count - split;
1394         new_leaf->base.parent = parent_offset;
1395         new_leaf->base.type = HAMMER_BTREE_LEAF_NODE;
1396         new_leaf->base.subtype = 0;
1397         KKASSERT(leaf->base.type == new_leaf->base.type);
1398
1399         /*
1400          * Cleanup the original node.  P becomes the new boundary, its
1401          * subtree_offset was moved to the new node.  If we had created
1402          * a new root its parent pointer may have changed.
1403          */
1404         leaf->base.parent = parent_offset;
1405
1406         /*
1407          * Insert the separator into the parent, fixup the parent's
1408          * reference to the original node, and reference the new node.
1409          * The separator is P.
1410          *
1411          * Remember that base.count does not include the right-hand boundary.
1412          * We are copying parent_index+1 to parent_index+2, not +0 to +1.
1413          */
1414         parent_index = cursor->parent_index;
1415         parent->elms[parent_index].subtree_count = split;
1416         parent_elm = &parent->elms[parent_index+1];
1417         if (parent_index + 1 != parent->base.count) {
1418                 bcopy(parent_elm, parent_elm + 1,
1419                       (parent->base.count - parent_index - 1) * esize);
1420         }
1421         hammer_make_separator(&elm[-1].base, &elm[0].base, &parent_elm->base);
1422         parent_elm->subtree_offset = hammer_bclu_offset(new_buffer, new_leaf);
1423         parent_elm->subtree_count = new_leaf->base.count;
1424
1425         hammer_modify_buffer(new_buffer);
1426         hammer_modify_buffer(cursor->parent_buffer);
1427         hammer_modify_buffer(cursor->node_buffer);
1428
1429         /*
1430          * The cluster's root pointer may have to be updated.
1431          */
1432         if (made_root) {
1433                 cursor->cluster->ondisk->clu_btree_root = leaf->base.parent;
1434                 hammer_modify_cluster(cursor->cluster);
1435         }
1436
1437         /*
1438          * Ok, now adjust the cursor depending on which element the original
1439          * index was pointing at.  If we are >= the split point the push node
1440          * is now in the new node.
1441          *
1442          * NOTE: If we are at the split point itself we cannot stay with the
1443          * original node because the push index will point at the right-hand
1444          * boundary, which is illegal.
1445          */
1446         if (cursor->index >= split) {
1447                 cursor->index -= split;
1448                 cursor->node = (hammer_btree_node_t)new_leaf;
1449                 hammer_put_buffer(cursor->node_buffer, 0);
1450                 cursor->node_buffer = new_buffer;
1451         }
1452
1453         return (0);
1454 }
1455
1456 /*
1457  * This routine is called on an internal node prior to recursing down
1458  * through (node, index) when (node, index) MIGHT have too few elements for
1459  * the caller to perform a deletion.
1460  *
1461  * cursor->index is invalid on return because the separators may have gotten
1462  * adjusted, the caller must rescan the node's elements.  The caller may set
1463  * cursor->index to -1 if it wants us to do a general rebalancing.
1464  * 
1465  * NOTE: Because we do not update the parent's parent in the split code,
1466  * the subtree_count used by the caller may be incorrect.  We correct it
1467  * here.  Also note that we cannot change the depth of the tree's leaf
1468  * nodes here (see btree_collapse()).
1469  *
1470  * This routine rebalances the children of the node, collapsing children
1471  * together if possible.  On return each child will have at least L/2-1
1472  * elements unless the node only has one child.
1473  */
1474 static
1475 int
1476 btree_rebalance_node(hammer_btree_cursor_t cursor)
1477 {
1478         struct hammer_btree_internal_node *node;
1479         hammer_btree_node_t children[HAMMER_BTREE_INT_ELMS];
1480         struct hammer_buffer *child_buffer[HAMMER_BTREE_INT_ELMS];
1481         hammer_btree_node_t child;
1482         hammer_btree_elm_inmemory_t elms;
1483         int i, j, n, nelms, goal;
1484         int maxelms, halfelms;
1485         int error;
1486
1487         /*
1488          * Basic setup
1489          */
1490         node = &cursor->node->internal;
1491         KKASSERT(node->elms[cursor->index].subtree_offset != 0);
1492         error = 0;
1493
1494         /*
1495          * Load the children of node and do any necessary corrections
1496          * to subtree_count.  subtree_count may be incorrect due to the
1497          * way insertions split nodes.  Get a count of the total number
1498          * of elements held by our children.
1499          */
1500         error = 0;
1501
1502         for (i = n = 0; i < node->base.count; ++i) {
1503                 struct hammer_btree_internal_elm *elm;
1504
1505                 elm = &node->elms[i];
1506                 children[i] = NULL;
1507                 child_buffer[i] = NULL; /* must be preinitialized for bread */
1508                 if (elm->subtree_offset == 0)
1509                         continue;
1510                 child = hammer_bread(cursor->cluster, elm->subtree_offset,
1511                                      HAMMER_FSBUF_BTREE, &error,
1512                                      &child_buffer[i]);
1513                 children[i] = child;
1514                 if (child == NULL)
1515                         continue;
1516                 KKASSERT(node->base.subtype == child->base.type);
1517
1518                 /*
1519                  * Accumulate n for a good child, update the node's count
1520                  * if it was wrong.
1521                  */
1522                 if (node->elms[i].subtree_count != child->base.count) {
1523                         node->elms[i].subtree_count = child->base.count;
1524                 }
1525                 n += node->elms[i].subtree_count;
1526         }
1527         if (error)
1528                 goto failed;
1529
1530         /*
1531          * Collect all the children's elements together
1532          */
1533         nelms = n;
1534         elms = kmalloc(sizeof(*elms) * (nelms + 1), M_HAMMER, M_WAITOK|M_ZERO);
1535         for (i = n = 0; i < node->base.count; ++i) {
1536                 child = children[i];
1537                 for (j = 0; j < child->base.count; ++j) {
1538                         elms[n].owner = child;
1539                         if (node->base.subtype == HAMMER_BTREE_LEAF_NODE)
1540                                 elms[n].u.leaf = child->leaf.elms[j];
1541                         else
1542                                 elms[n].u.internal = child->internal.elms[j];
1543                         ++n;
1544                 }
1545         }
1546         KKASSERT(n == nelms);
1547
1548         /*
1549          * Store a boundary in the elms array to ease the code below.  This
1550          * is only used if the children are internal nodes.
1551          */
1552         elms[n].u.internal = node->elms[i];
1553
1554         /*
1555          * Calculate the number of elements each child should have (goal) by
1556          * reducing the number of elements until we achieve at least
1557          * halfelms - 1 per child, unless we are a degenerate case.
1558          */
1559         maxelms = btree_max_elements(node->base.subtype);
1560         halfelms = maxelms / 2;
1561
1562         goal = halfelms - 1;
1563         while (i && n / i < goal)
1564                 --i;
1565
1566         /*
1567          * Now rebalance using the specified goal
1568          */
1569         for (i = n = 0; i < node->base.count; ++i) {
1570                 struct hammer_buffer *subchild_buffer = NULL;
1571                 struct hammer_btree_internal_node *subchild;
1572
1573                 child = children[i];
1574                 for (j = 0; j < goal && n < nelms; ++j) {
1575                         if (node->base.subtype == HAMMER_BTREE_LEAF_NODE) {
1576                                 child->leaf.elms[j] = elms[n].u.leaf;
1577                         } else {
1578                                 child->internal.elms[j] = elms[n].u.internal;
1579                         }
1580
1581                         /*
1582                          * If the element's parent has changed we have to
1583                          * update the parent pointer.  This is somewhat
1584                          * expensive.
1585                          */
1586                         if (elms[n].owner != child &&
1587                             node->base.subtype == HAMMER_BTREE_INTERNAL_NODE) {
1588                                 subchild = hammer_bread(cursor->cluster,
1589                                                         elms[n].u.internal.subtree_offset,
1590                                                         HAMMER_FSBUF_BTREE,
1591                                                         &error,
1592                                                         &subchild_buffer);
1593                                 if (subchild) {
1594                                         subchild->base.parent =
1595                                             hammer_bclu_offset(child_buffer[i],
1596                                                                 child);
1597                                         hammer_modify_buffer(subchild_buffer);
1598                                 }
1599                                 /* XXX error */
1600                         }
1601                         ++n;
1602                 }
1603                 /* 
1604                  * Set right boundary if the children are internal nodes.
1605                  */
1606                 if (node->base.subtype == HAMMER_BTREE_INTERNAL_NODE)
1607                         child->internal.elms[j] = elms[n].u.internal;
1608                 child->base.count = j;
1609                 hammer_modify_buffer(child_buffer[i]);
1610                 if (subchild_buffer)
1611                         hammer_put_buffer(subchild_buffer, 0);
1612
1613                 /*
1614                  * If we have run out of elements, break out
1615                  */
1616                 if (n == nelms)
1617                         break;
1618         }
1619
1620         /*
1621          * Physically destroy any left-over children.  These children's
1622          * elements have been packed into prior children.  The node's
1623          * right hand boundary and count gets shifted to index i.
1624          *
1625          * The subtree count in the node's parent MUST be updated because
1626          * we are removing elements.  The subtree_count field is allowed to
1627          * be too small, but not too large!
1628          */
1629         if (i != node->base.count) {
1630                 n = i;
1631                 node->elms[n] = node->elms[node->base.count];
1632                 while (i < node->base.count) {
1633                         hammer_free_btree_ptr(child_buffer[i], children[i]);
1634                         hammer_put_buffer(child_buffer[i], 0);
1635                         ++i;
1636                 }
1637                 node->base.count = n;
1638                 if (cursor->parent) {
1639                         cursor->parent->elms[cursor->parent_index].subtree_count = n;
1640                         hammer_modify_buffer(cursor->parent_buffer);
1641                 }
1642         }
1643
1644         kfree(elms, M_HAMMER);
1645 failed:
1646         hammer_modify_buffer(cursor->node_buffer);
1647         for (i = 0; i < node->base.count; ++i) {
1648                 if (child_buffer[i])
1649                         hammer_put_buffer(child_buffer[i], 0);
1650         }
1651         return (error);
1652 }
1653
1654 /*
1655  * This routine is only called if the cursor is at the root node and the
1656  * root node is an internal node.  We attempt to collapse the root node
1657  * by replacing it with all of its children, reducing tree depth by one.
1658  *
1659  * This is the only way to reduce tree depth in a HAMMER filesystem.
1660  * Note that all leaf nodes are at the same depth.
1661  *
1662  * This is a fairly expensive operation because we not only have to load
1663  * the root's children, we also have to scan each child and adjust the
1664  * parent offset for each element in each child.  Nasty all around.
1665  */
1666 static
1667 int
1668 btree_collapse(hammer_btree_cursor_t cursor)
1669 {
1670         hammer_btree_node_t root, child;
1671         hammer_btree_node_t children[HAMMER_BTREE_INT_ELMS];
1672         struct hammer_buffer *child_buffer[HAMMER_BTREE_INT_ELMS];
1673         int count;
1674         int i, j, n;
1675         int root_modified;
1676         int error;
1677         int32_t root_offset;
1678         u_int8_t subsubtype;
1679
1680         root = cursor->node;
1681         count = root->base.count;
1682         root_offset = hammer_bclu_offset(cursor->node_buffer, root);
1683
1684         /*
1685          * Sum up the number of children each element has.  This value is
1686          * only approximate due to the way the insertion node works.  It
1687          * may be too small but it will never be too large.
1688          *
1689          * Quickly terminate the collapse if the elements have too many
1690          * children.
1691          */
1692         KKASSERT(root->base.parent == 0);       /* must be root node */
1693         KKASSERT(root->base.type == HAMMER_BTREE_INTERNAL_NODE);
1694         KKASSERT(count <= HAMMER_BTREE_INT_ELMS);
1695
1696         for (i = n = 0; i < count; ++i) {
1697                 n += root->internal.elms[i].subtree_count;
1698         }
1699         if (n > btree_max_elements(root->base.subtype))
1700                 return(0);
1701
1702         /*
1703          * Iterate through the elements again and correct the subtree_count.
1704          * Terminate the collapse if we wind up with too many.
1705          */
1706         error = 0;
1707         root_modified = 0;
1708
1709         for (i = n = 0; i < count; ++i) {
1710                 struct hammer_btree_internal_elm *elm;
1711
1712                 elm = &root->internal.elms[i];
1713                 child_buffer[i] = NULL;
1714                 children[i] = NULL;
1715                 if (elm->subtree_offset == 0)
1716                         continue;
1717                 child = hammer_bread(cursor->cluster, elm->subtree_offset,
1718                                      HAMMER_FSBUF_BTREE, &error,
1719                                      &child_buffer[i]);
1720                 children[i] = child;
1721                 if (child == NULL)
1722                         continue;
1723                 KKASSERT(root->base.subtype == child->base.type);
1724
1725                 /*
1726                  * Accumulate n for a good child, update the root's count
1727                  * if it was wrong.
1728                  */
1729                 if (root->internal.elms[i].subtree_count != child->base.count) {
1730                         root->internal.elms[i].subtree_count = child->base.count;
1731                         root_modified = 1;
1732                 }
1733                 n += root->internal.elms[i].subtree_count;
1734         }
1735         if (error || n > btree_max_elements(root->base.subtype))
1736                 goto done;
1737
1738         /*
1739          * Ok, we can collapse the root.  If the root's children are leafs
1740          * the collapse is really simple.  If they are internal nodes the
1741          * collapse is not so simple because we have to fixup the parent
1742          * pointers for the root's children's children.
1743          *
1744          * When collapsing an internal node the far left and far right
1745          * element's boundaries should match the root's left and right
1746          * boundaries.
1747          */
1748         if (root->base.subtype == HAMMER_BTREE_LEAF_NODE) {
1749                 for (i = n = 0; i < count; ++i) {
1750                         child = children[i];
1751                         for (j = 0; j < child->base.count; ++j) {
1752                                 root->leaf.elms[n] = child->leaf.elms[j];
1753                                 ++n;
1754                         }
1755                 }
1756                 root->base.type = root->base.subtype;
1757                 root->base.subtype = 0;
1758                 root->base.count = n;
1759                 root->leaf.link_left = 0;
1760                 root->leaf.link_right = 0;
1761         } else {
1762                 struct hammer_btree_internal_elm *elm;
1763                 struct hammer_btree_internal_node *subchild;
1764                 struct hammer_buffer *subchild_buffer = NULL;
1765
1766                 if (count) {
1767                         child = children[0];
1768                         subsubtype = child->base.subtype;
1769                         KKASSERT(child->base.count > 0);
1770                         KKASSERT(root->internal.elms[0].base.key ==
1771                                  child->internal.elms[0].base.key);
1772                         child = children[count-1];
1773                         KKASSERT(child->base.count > 0);
1774                         KKASSERT(root->internal.elms[count].base.key ==
1775                              child->internal.elms[child->base.count].base.key);
1776                 } else {
1777                         subsubtype = 0;
1778                 }
1779                 for (i = n = 0; i < count; ++i) {
1780                         child = children[i];
1781                         KKASSERT(child->base.subtype == subsubtype);
1782                         for (j = 0; j < child->base.count; ++j) {
1783                                 elm = &child->internal.elms[j];
1784
1785                                 root->internal.elms[n] = *elm;
1786                                 subchild = hammer_bread(cursor->cluster,
1787                                                         elm->subtree_offset,
1788                                                         HAMMER_FSBUF_BTREE,
1789                                                         &error,
1790                                                         &subchild_buffer);
1791                                 if (subchild) {
1792                                         subchild->base.parent = root_offset;
1793                                         hammer_modify_buffer(subchild_buffer);
1794                                 }
1795                                 ++n;
1796                         }
1797                         /* make sure the right boundary is correct */
1798                         /* (this gets overwritten when the loop continues) */
1799                         /* XXX generate a new separator? */
1800                         root->internal.elms[n] = child->internal.elms[j];
1801                 }
1802                 root->base.type = HAMMER_BTREE_INTERNAL_NODE;
1803                 root->base.subtype = subsubtype;
1804                 if (subchild_buffer)
1805                         hammer_put_buffer(subchild_buffer, 0);
1806         }
1807         root_modified = 1;
1808
1809         /*
1810          * Cleanup
1811          */
1812 done:
1813         if (root_modified)
1814                 hammer_modify_buffer(cursor->node_buffer);
1815         for (i = 0; i < count; ++i) {
1816                 if (child_buffer[i])
1817                         hammer_put_buffer(child_buffer[i], 0);
1818         }
1819         return(error);
1820 }
1821