Merge from vendor branch SENDMAIL:
[dragonfly.git] / sys / vfs / hammer / hammer_btree.c
1 /*
2  * Copyright (c) 2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/hammer_btree.c,v 1.2 2007/11/02 00:57:15 dillon Exp $
35  */
36
37 /*
38  * HAMMER BH-Tree index
39  *
40  * HAMMER implements a modified B+Tree.  In documentation this will
41  * simply be refered to as the HAMMER BH-Tree.  Basically a BH-Tree
42  * looks like a B+Tree (A B-Tree which stores its records only at the leafs
43  * of the tree), but adds two additional boundary elements which describe
44  * the left-most and right-most element a node is able to represent.  In
45  * otherwords, we have boundary elements at the two ends of a BH-Tree node
46  * instead of sub-tree pointers.
47  *
48  * A BH-Tree internal node looks like this:
49  *
50  *      B N N N N N N B   <-- boundary and internal elements
51  *       S S S S S S S    <-- subtree pointers
52  *
53  * A BH-Tree leaf node basically looks like this:
54  *
55  *      L L L L L L L L   <-- leaf elemenets
56  *
57  * The recursion radix is reduced by 2 relative to a normal B-Tree but
58  * we get a number of significant benefits for our troubles.
59  *
60  * The big benefit to using a BH-Tree is that it is possible to cache
61  * pointers into the middle of the tree and not have to start searches,
62  * insertions, OR deletions at the root node.   In particular, searches are
63  * able to progress in a definitive direction from any point in the tree
64  * without revisting nodes.  This greatly improves the efficiency of many
65  * operations, most especially record appends.
66  *
67  * BH-Trees also make the stacking of trees fairly straightforward.
68  */
69 #include "hammer.h"
70 #include <sys/buf.h>
71 #include <sys/buf2.h>
72
73 static int btree_search(hammer_btree_cursor_t cursor, hammer_base_elm_t key,
74                         int flags);
75 static int btree_cursor_set_cluster(hammer_btree_cursor_t cursor,
76                         struct hammer_cluster *cluster);
77 static int btree_cursor_set_cluster_by_value(hammer_btree_cursor_t cursor,
78                         int32_t vol_no, int32_t clu_no, hammer_tid_t clu_id);
79 static int btree_cursor_up(hammer_btree_cursor_t cursor);
80 static int btree_cursor_down(hammer_btree_cursor_t cursor);
81 static int btree_split_internal(hammer_btree_cursor_t cursor);
82 static int btree_split_leaf(hammer_btree_cursor_t cursor);
83 static int btree_rebalance_node(hammer_btree_cursor_t cursor);
84 static int btree_collapse(hammer_btree_cursor_t cursor);
85
86 /*
87  * Compare two BH-Tree elements, return -1, 0, or +1 (e.g. similar to strcmp).
88  */
89 static int
90 hammer_btree_cmp(hammer_base_elm_t key1, hammer_base_elm_t key2)
91 {
92 #if 0
93         kprintf("compare obj_id %016llx %016llx\n",
94                 key1->obj_id, key2->obj_id);
95         kprintf("compare rec_type %04x %04x\n",
96                 key1->rec_type, key2->rec_type);
97         kprintf("compare key %016llx %016llx\n",
98                 key1->key, key2->key);
99 #endif
100
101         if (key1->obj_id < key2->obj_id)
102                 return(-1);
103         if (key1->obj_id > key2->obj_id)
104                 return(1);
105
106         if (key1->rec_type < key2->rec_type)
107                 return(-1);
108         if (key1->rec_type > key2->rec_type)
109                 return(1);
110
111         if (key1->key < key2->key)
112                 return(-1);
113         if (key1->key > key2->key)
114                 return(1);
115
116         /*
117          * This test has a number of special cases.  create_tid in key1 is
118          * the as-of transction id, and delete_tid in key1 is NOT USED.
119          *
120          * A key1->create_tid of 0 indicates 'the most recent record if not
121          * deleted'.  key2->create_tid is a HAMMER record and will never be
122          * 0.   key2->delete_tid is the deletion transaction id or 0 if 
123          * the record has not yet been deleted.
124          */
125         if (key1->create_tid == 0) {
126                 if (key2->delete_tid)
127                         return(-1);
128         } else {
129                 if (key1->create_tid < key2->create_tid)
130                         return(-1);
131                 if (key2->delete_tid && key1->create_tid >= key2->delete_tid)
132                         return(1);
133         }
134
135         return(0);
136 }
137
138 /*
139  * Create a separator half way inbetween key1 and key2.  For fields just
140  * one unit apart, the separator will match key2.
141  */
142 #define MAKE_SEPARATOR(key1, key2, dest, field) \
143         dest->field = key1->field + ((key2->field - key1->field + 1) >> 1);
144
145 static void
146 hammer_make_separator(hammer_base_elm_t key1, hammer_base_elm_t key2,
147                       hammer_base_elm_t dest)
148 {
149         MAKE_SEPARATOR(key1, key2, dest, obj_id);
150         MAKE_SEPARATOR(key1, key2, dest, rec_type);
151         MAKE_SEPARATOR(key1, key2, dest, key);
152         MAKE_SEPARATOR(key1, key2, dest, create_tid);
153         dest->delete_tid = 0;
154         dest->obj_type = 0;
155         dest->reserved07 = 0;
156 }
157
158 #undef MAKE_SEPARATOR
159
160 /*
161  * Return whether a generic internal or leaf node is full
162  */
163 static int
164 btree_node_is_full(struct hammer_base_node *node)
165 {
166         switch(node->type) {
167         case HAMMER_BTREE_INTERNAL_NODE:
168                 if (node->count == HAMMER_BTREE_INT_ELMS)
169                         return(1);
170                 break;
171         case HAMMER_BTREE_LEAF_NODE:
172                 if (node->count == HAMMER_BTREE_LEAF_ELMS)
173                         return(1);
174                 break;
175         default:
176                 panic("illegal btree subtype");
177         }
178         return(0);
179 }
180
181 static int
182 btree_max_elements(u_int8_t type)
183 {
184         if (type == HAMMER_BTREE_LEAF_NODE)
185                 return(HAMMER_BTREE_LEAF_ELMS);
186         if (type == HAMMER_BTREE_INTERNAL_NODE)
187                 return(HAMMER_BTREE_INT_ELMS);
188         panic("btree_max_elements: bad type %d\n", type);
189 }
190
191 /*
192  * Initialize a cursor, setting the starting point for a BH-Tree search.
193  *
194  * The passed cluster must be locked.  This function will add a reference
195  * to it.
196  */
197 int
198 hammer_btree_cursor_init(hammer_btree_cursor_t cursor,
199                          struct hammer_cluster *cluster)
200 {
201         int error;
202
203         cursor->cluster = NULL;
204         cursor->node_buffer = NULL;
205         cursor->parent_buffer = NULL;
206         cursor->node = NULL;
207         cursor->parent = NULL;
208         cursor->index = 0;
209         cursor->parent_index = 0;
210         error = btree_cursor_set_cluster(cursor, cluster);
211         return(error);
212 }
213
214 /*
215  * Clean up a HAMMER BH-Tree cursor after we are finished using it.
216  */
217 void
218 hammer_btree_cursor_done(hammer_btree_cursor_t cursor)
219 {
220         if (cursor->node_buffer) {
221                 hammer_put_buffer(cursor->node_buffer);
222                 cursor->node_buffer = NULL;
223         }
224         if (cursor->parent_buffer) {
225                 hammer_put_buffer(cursor->parent_buffer);
226                 cursor->parent_buffer = NULL;
227         }
228         if (cursor->cluster) {
229                 hammer_put_cluster(cursor->cluster);
230                 cursor->cluster = NULL;
231         }
232         cursor->node = NULL;
233         cursor->parent = NULL;
234         cursor->left_bound = NULL;
235         cursor->right_bound = NULL;
236         cursor->index = 0;
237         cursor->parent_index = 0;
238 }
239
240 /*
241  * Initialize a btree info structure and its associated cursor prior to
242  * running a BH-Tree operation.
243  */
244 int
245 hammer_btree_info_init(hammer_btree_info_t info, struct hammer_cluster *cluster)
246 {
247         int error;
248
249         error = hammer_btree_cursor_init(&info->cursor, cluster);
250         info->data_buffer = NULL;
251         info->record_buffer = NULL;
252         info->data = NULL;
253         info->rec = NULL;
254         return (error);
255 }
256
257 /*
258  * Clean up a BH-Tree info structure after we are finished using it.
259  */
260 void
261 hammer_btree_info_done(hammer_btree_info_t info)
262 {
263         hammer_btree_cursor_done(&info->cursor);
264         if (info->data_buffer) {
265                 hammer_put_buffer(info->data_buffer);
266                 info->data_buffer = NULL;
267         }
268         if (info->record_buffer) {
269                 hammer_put_buffer(info->record_buffer);
270                 info->record_buffer = NULL;
271         }
272         info->data = NULL;
273         info->rec = NULL;
274 }
275
276 /*
277  * Search a cluster's BH-Tree.  Return the matching node.  The search
278  * normally traverses clusters but will terminate at a cluster entry
279  * in a leaf if HAMMER_BTREE_CLUSTER_TAG is specified.  If this flag
280  * is specified EIO is returned if the search would otherwise have to
281  * cursor up into a parent cluster.
282  *
283  * The cursor must be completely initialized on entry.  If the cursor is
284  * at the root of a cluster, the parent pointer & buffer may be NULL (in
285  * that case the bounds point to the bounds in the cluster header).  The
286  * node_buffer and node must always be valid.
287  *
288  * The search code may be forced to iterate up the tree if the conditions
289  * required for an insertion or deletion are not met.  This does not occur
290  * very often.
291  *
292  * INSERTIONS: The search will split full nodes and leaves on its way down
293  * and guarentee that the leaf it ends up on is not full.
294  *
295  * DELETIONS: The search will rebalance the tree on its way down.
296  */
297 static 
298 int
299 btree_search(hammer_btree_cursor_t cursor, hammer_base_elm_t key, int flags)
300 {
301         struct hammer_btree_leaf_node *leaf;
302         int error;
303         int i;
304         int r;
305
306         /*
307          * Move our cursor up the tree until we find a node whos range covers
308          * the key we are trying to locate.  This may move us between
309          * clusters.
310          *
311          * Since the root cluster always has a root BH-Tree node, info->node
312          * is always non-NULL if no error occured.  The parent field will be
313          * non-NULL unless we are at the root of a cluster.
314          */
315         while (hammer_btree_cmp(key, cursor->left_bound) < 0 ||
316                hammer_btree_cmp(key, cursor->right_bound) >= 0) {
317                 /*
318                  * Must stay in current cluster if flagged, code should never
319                  * use the flag if it wants us to traverse to the parent
320                  * cluster.
321                  */
322                 if (cursor->parent == NULL &&
323                     (flags & HAMMER_BTREE_CLUSTER_TAG)) {
324                         return(EIO);
325                 }
326                 error = btree_cursor_up(cursor);
327                 if (error)
328                         return(error);
329         }
330         KKASSERT(cursor->node != NULL);
331
332         /*
333          * If we are inserting we can't start at a full node if the parent
334          * is also full (because there is no way to split the node),
335          * continue running up the tree until we hit the root of the
336          * current cluster or until the requirement is satisfied.
337          */
338         while (flags & HAMMER_BTREE_INSERT) {
339                 if (btree_node_is_full(&cursor->node->base) == 0)
340                         break;
341                 if (cursor->parent == NULL)
342                         break;
343                 if (cursor->parent->base.count != HAMMER_BTREE_INT_ELMS)
344                         break;
345                 error = btree_cursor_up(cursor);
346                 if (error)
347                         return (error);
348         }
349
350         /*
351          * If we are deleting we can't start at a node with only one element
352          * unless it is root, because all of our code assumes that nodes
353          * will never be empty.
354          *
355          * This handles the case where the cursor is sitting at a leaf and
356          * either the leaf or parent contain an insufficient number of
357          * elements.
358          */
359         while (flags & HAMMER_BTREE_DELETE) {
360                 if (cursor->node->base.count > 1)
361                         break;
362                 if (cursor->parent == NULL)
363                         break;
364                 KKASSERT(cursor->node->base.count != 0);
365                 error = btree_cursor_up(cursor);
366                 if (error)
367                         return (error);
368         }
369
370 new_cluster:
371         /*
372          * Push down through internal nodes to locate the requested key.
373          */
374         while (cursor->node->base.type == HAMMER_BTREE_INTERNAL_NODE) {
375                 struct hammer_btree_internal_node *node;
376
377                 /*
378                  * If we are a the root node and deleting, try to collapse
379                  * all of the root's children into the root.  This is the
380                  * only point where tree depth is reduced.
381                  */
382                 if ((flags & HAMMER_BTREE_DELETE) && cursor->parent == NULL) {
383                         error = btree_collapse(cursor);
384                         if (error)
385                                 return (error);
386                 }
387
388                 /*
389                  * Scan the node (XXX binary search) to find the subtree
390                  * index to push down into.  We go one-past, then back-up.
391                  * The key should never be less then the left-hand boundary
392                  * so I should never wind up 0.
393                  */
394                 node = &cursor->node->internal;
395                 for (i = 0; i < node->base.count; ++i) {
396                         r = hammer_btree_cmp(key, &node->elms[i].base);
397                         if (r < 0)
398                                 break;
399                 }
400                 KKASSERT(i != 0);
401
402                 /*
403                  * The push-down index is now i - 1.
404                  */
405                 --i;
406                 cursor->index = i;
407
408                 /*
409                  * Handle insertion and deletion requirements.
410                  *
411                  * If inserting split full nodes.  The split code will
412                  * adjust cursor->node and cursor->index if the current
413                  * index winds up in the new node.
414                  */
415                 if (flags & HAMMER_BTREE_INSERT) {
416                         if (node->base.count == HAMMER_BTREE_INT_ELMS) {
417                                 error = btree_split_internal(cursor);
418                                 if (error)
419                                         return(error);
420                                 /*
421                                  * reload stale pointers
422                                  */
423                                 i = cursor->index;
424                                 node = &cursor->node->internal;
425                         }
426                 }
427
428                 /*
429                  * If deleting rebalance - do not allow the child to have
430                  * just one element or we will not be able to delete it.
431                  *
432                  * Neither internal or leaf nodes (except a root-leaf) are
433                  * allowed to drop to 0 elements.
434                  *
435                  * Our separators may have been reorganized after rebalancing,
436                  * so we have to pop back up and rescan.
437                  */
438                 if (flags & HAMMER_BTREE_DELETE) {
439                         if (node->elms[i].subtree_count <= 1) {
440                                 error = btree_rebalance_node(cursor);
441                                 if (error)
442                                         return(error);
443                                 /* cursor->index is invalid after call */
444                                 goto new_cluster;
445                         }
446                 }
447
448                 /*
449                  * Push down (push into new node, existing node becomes
450                  * the parent).
451                  */
452                 error = btree_cursor_down(cursor);
453                 if (error)
454                         return (error);
455         }
456
457
458         /*
459          * We are at a leaf, do a linear search of the key array.
460          * (XXX do a binary search).  On success the index is set to the
461          * matching element, on failure the index is set to the insertion
462          * point.
463          *
464          * Boundaries are not stored in leaf nodes, so the index can wind
465          * up to the left of element 0 (index == 0) or past the end of
466          * the array (index == leaf->base.count).
467          */
468         leaf = &cursor->node->leaf;
469         KKASSERT(leaf->base.count <= HAMMER_BTREE_LEAF_ELMS);
470
471         for (i = 0; i < leaf->base.count; ++i) {
472                 r = hammer_btree_cmp(key, &leaf->elms[i].base);
473                 if (r < 0)
474                         break;
475                 if (r == 0) {
476                         /*
477                          * Return an exact match unless this is a cluster
478                          * element.  If it is, and the cluster tag flag has
479                          * not been set, push into the new cluster and
480                          * continue the search.
481                          */
482                         cursor->index = i;
483                         if ((leaf->elms[i].base.obj_type &
484                              HAMMER_OBJTYPE_CLUSTER_FLAG) &&
485                             (flags & HAMMER_BTREE_CLUSTER_TAG) == 0) {
486                                 error = btree_cursor_down(cursor);
487                                 if (error)
488                                         return (error);
489                                 goto new_cluster;
490                         }
491                         return(0);
492                 }
493         }
494
495         /*
496          * We could not find an exact match.  Check for a cluster
497          * recursion.  The cluster's range is bracketed by two
498          * leaf elements.  One of the two must be in this node.
499          */
500         if ((flags & HAMMER_BTREE_CLUSTER_TAG) == 0) {
501                 if (i == leaf->base.count) {
502                         if (leaf->elms[i-1].base.obj_type &
503                             HAMMER_OBJTYPE_CLUSTER_FLAG) {
504                                 cursor->index = i - 1;
505                                 error = btree_cursor_down(cursor);
506                                 if (error)
507                                         return (error);
508                                 goto new_cluster;
509                         }
510                 } else {
511                         if (leaf->elms[i].base.obj_type &
512                             HAMMER_OBJTYPE_CLUSTER_FLAG) {
513                                 cursor->index = i;
514                                 error = btree_cursor_down(cursor);
515                                 if (error)
516                                         return (error);
517                                 goto new_cluster;
518                         }
519                 }
520         }
521
522         /*
523          * If inserting split a full leaf before returning.  This
524          * may have the side effect of adjusting cursor->node and
525          * cursor->index.
526          *
527          * We delayed the split in order to avoid any unnecessary splits.
528          *
529          * XXX parent's parent's subtree_count will be wrong after
530          * this (keep parent of parent around too?  ugh).
531          */
532         cursor->index = i;
533         if ((flags & HAMMER_BTREE_INSERT) &&
534             leaf->base.count == HAMMER_BTREE_LEAF_ELMS) {
535                 error = btree_split_leaf(cursor);
536                 /* NOT USED
537                 i = cursor->index;
538                 node = &cursor->node->internal;
539                 */
540                 if (error)
541                         return(error);
542         }
543         return(ENOENT);
544 }
545
546 /*
547  * Look up the key in the HAMMER BH-Tree and fill in the rest of the
548  * info structure with the results according to flags.  0 is returned on
549  * success, non-zero on error.
550  *
551  * The caller initializes (key, cluster) and makes the call.  The cluster
552  * must be referenced and locked on call.  The function can chain through
553  * multiple clusters and will replace the passed cluster as it does so,
554  * dereferencing and unlocking it, and referencing and locking the chain.
555  * On return info->cluster will still be referenced and locked but may
556  * represent a different cluster.
557  */
558 int
559 hammer_btree_lookup(hammer_btree_info_t info, hammer_base_elm_t key, int flags)
560 {
561         hammer_btree_leaf_elm_t elm;
562         struct hammer_cluster *cluster;
563         int32_t cloff;
564         int error;
565         int iscl;
566
567         error = btree_search(&info->cursor, key, flags);
568         if (error)
569                 return (error);
570
571         /* 
572          * Extract the record and data reference if requested.
573          *
574          * A cluster record type has no data reference, the information
575          * is stored directly in the record and BH-Tree element.
576          *
577          * The case where the data reference resolves to the same buffer
578          * as the record reference must be handled.
579          */
580         elm = &info->cursor.node->leaf.elms[info->cursor.index];
581         iscl = (elm->base.obj_type & HAMMER_OBJTYPE_CLUSTER_FLAG) != 0;
582         cluster = info->cursor.cluster;
583         if ((flags & HAMMER_BTREE_GET_RECORD) && error == 0) {
584                 cloff = iscl ? elm->cluster.rec_offset : elm->record.rec_offset;
585
586
587                 info->rec = hammer_bread(cluster, cloff, HAMMER_FSBUF_RECORDS,
588                                          &error, &info->record_buffer);
589         } else {
590                 cloff = 0;
591         }
592         if ((flags & HAMMER_BTREE_GET_DATA) && iscl == 0 && error == 0) {
593                 if ((cloff ^ elm->record.data_offset) & ~HAMMER_BUFMASK) {
594                         info->data = hammer_bread(cluster,
595                                                   elm->record.data_offset,
596                                                   HAMMER_FSBUF_DATA,
597                                                   &error, &info->record_buffer);
598                 } else {
599                         info->data = (void *)
600                                 ((char *)info->data_buffer->ondisk +
601                                  (elm->record.data_offset & HAMMER_BUFMASK));
602                 }
603         }
604         return(error);
605 }
606
607
608 /*
609  * Insert a record into a BH-Tree's cluster.  The caller has already
610  * reserved space for the record and data and must handle a ENOSPC
611  * return.
612  */
613 int
614 hammer_btree_insert(struct hammer_btree_info *info, hammer_btree_leaf_elm_t elm)
615 {
616         struct hammer_btree_cursor *cursor;
617         struct hammer_btree_internal_node *parent;
618         struct hammer_btree_leaf_node *leaf;
619         int error;
620         int i;
621
622         /*
623          * Issue a search to get our cursor at the right place.  The search
624          * will get us to a leaf node.
625          *
626          * The search also does some setup for our insert, so there is always
627          * room in the leaf.
628          */
629         error = btree_search(&info->cursor, &elm->base, HAMMER_BTREE_INSERT);
630         if (error != ENOENT) {
631                 if (error == 0)
632                         error = EEXIST;
633                 return (error);
634         }
635
636         /*
637          * Insert the element at the leaf node and update the count in the
638          * parent.  It is possible for parent to be NULL, indicating that
639          * the root of the BH-Tree in the cluster is a leaf.  It is also
640          * possible for the leaf to be empty.
641          *
642          * Remember that the right-hand boundary is not included in the
643          * count.
644          */
645         cursor = &info->cursor;
646         leaf = &cursor->node->leaf;
647         i = cursor->index;
648         KKASSERT(leaf->base.count < HAMMER_BTREE_LEAF_ELMS);
649         bcopy(&leaf->elms[i], &leaf->elms[i+1],
650               (leaf->base.count - i + 1) * sizeof(leaf->elms[0]));
651         leaf->elms[i] = *elm;
652         ++leaf->base.count;
653
654         if ((parent = cursor->parent) != NULL) {
655                 i = cursor->parent_index;
656                 ++parent->elms[i].subtree_count;
657                 KKASSERT(parent->elms[i].subtree_count <= leaf->base.count);
658                 hammer_modify_buffer(cursor->parent_buffer);
659         }
660         hammer_modify_buffer(cursor->node_buffer);
661         return(0);
662 }
663
664 /*
665  * Delete a record from the BH-Tree.
666  */
667 int
668 hammer_btree_delete(struct hammer_btree_info *info, hammer_base_elm_t key)
669 {
670         struct hammer_btree_cursor *cursor;
671         struct hammer_btree_internal_node *parent;
672         struct hammer_btree_leaf_node *leaf;
673         int error;
674         int i;
675
676         /*
677          * Locate the leaf element to delete.  The search is also responsible
678          * for doing some of the rebalancing work on its way down.
679          */
680         error = btree_search(&info->cursor, key, HAMMER_BTREE_DELETE);
681         if (error)
682                 return (error);
683
684         /*
685          * Delete the element from the leaf node.  We leave empty leafs alone
686          * and instead depend on a future search to locate and destroy them.
687          * Otherwise we would have to recurse back up the tree to adjust
688          * the parent's subtree_count and we do not want to do that.
689          *
690          * Remember that the right-hand boundary is not included in the
691          * count.
692          */
693         cursor = &info->cursor;
694         leaf = &cursor->node->leaf;
695         i = cursor->index;
696
697         KKASSERT(cursor->node->base.type == HAMMER_BTREE_LEAF_NODE);
698         bcopy(&leaf->elms[i+1], &leaf->elms[i],
699               (leaf->base.count - i) * sizeof(leaf->elms[0]));
700         --leaf->base.count;
701         if ((parent = cursor->parent) != NULL) {
702                 /*
703                  * Adjust parent's notion of the leaf's count.  subtree_count
704                  * is only approximately, it is allowed to be too small but
705                  * never allowed to be too large.  Make sure we don't drop
706                  * the count below 0.
707                  */
708                 i = cursor->parent_index;
709                 if (parent->elms[i].subtree_count)
710                         --parent->elms[i].subtree_count;
711                 KKASSERT(parent->elms[i].subtree_count <= leaf->base.count);
712                 hammer_modify_buffer(cursor->parent_buffer);
713         }
714         hammer_modify_buffer(cursor->node_buffer);
715         return(0);
716 }
717
718 /************************************************************************
719  *                              CURSOR SUPPORT                          *
720  ************************************************************************
721  *
722  * These routines do basic cursor operations.  This support will probably
723  * be expanded in the future to add link fields for linear scans.
724  */
725
726 /*
727  * Unconditionally set the cursor to the root of the specified cluster.
728  * The current cursor node is set to the root node of the cluster (which
729  * can be an internal node or a degenerate leaf), and the parent info
730  * is cleaned up and cleared.
731  *
732  * The passed cluster must be locked.  This function will add a reference
733  * to it.  The cursor must already have a cluster assigned to it, which we
734  * will replace.
735  */
736 static
737 int
738 btree_cursor_set_cluster_by_value(hammer_btree_cursor_t cursor,
739                         int32_t vol_no, int32_t clu_no, hammer_tid_t clu_id)
740 {
741         struct hammer_cluster *cluster;
742         struct hammer_volume *volume;
743         int error = 0;
744
745         if (vol_no < 0)
746                 return(EIO);
747         cluster = cursor->cluster;
748         KKASSERT(cluster != NULL);
749         if (vol_no == cluster->volume->vol_no) {
750                 cluster = hammer_get_cluster(cluster->volume, clu_no,
751                                              &error, 0);
752         } else {
753                 volume = hammer_get_volume(cluster->volume->hmp,
754                                            vol_no, &error);
755                 if (volume) {
756                         cluster = hammer_get_cluster(volume, clu_no,
757                                                      &error, 0);
758                         hammer_put_volume(volume);
759                 } else {
760                         cluster = NULL;
761                 }
762         }
763
764         /*
765          * Make sure the cluster id matches.  XXX At the moment the
766          * clu_id in the btree cluster element is only 32 bits, so only
767          * compare the low 32 bits.
768          */
769         if (cluster) {
770                 if ((int32_t)cluster->ondisk->clu_id == (int32_t)clu_id) {
771                         btree_cursor_set_cluster(cursor, cluster);
772                 } else {
773                         error = EIO;
774                 }
775                 hammer_put_cluster(cluster);
776         }
777         return (error);
778 }
779
780 static
781 int
782 btree_cursor_set_cluster(hammer_btree_cursor_t cursor,
783                          struct hammer_cluster *cluster)
784 {
785         int error = 0;
786
787         hammer_dup_cluster(&cursor->cluster, cluster);
788         cursor->node = hammer_bread(cluster,
789                                     cluster->ondisk->clu_btree_root,
790                                     HAMMER_FSBUF_BTREE,
791                                     &error,
792                                     &cursor->node_buffer);
793         cursor->index = 0;
794         if (cursor->parent) {
795                 hammer_put_buffer(cursor->parent_buffer);
796                 cursor->parent_buffer = NULL;
797                 cursor->parent = NULL;
798                 cursor->parent_index = 0;
799         }
800         cursor->left_bound = &cluster->ondisk->clu_btree_beg;
801         cursor->right_bound = &cluster->ondisk->clu_btree_end;
802         return (error);
803 }
804
805 /*
806  * Cursor the node up to the parent node.  If at the root of a cluster,
807  * cursor to the root of the cluster's parent cluster.  Note carefully
808  * that we do NOT scan the parent cluster to find the leaf that dropped
809  * into our current cluster.
810  *
811  * This function is primarily used by the search code to avoid having
812  * to search from the root of the filesystem BH-Tree.
813  */
814 static
815 int
816 btree_cursor_up(hammer_btree_cursor_t cursor)
817 {
818         struct hammer_cluster_ondisk *ondisk;
819         struct hammer_btree_internal_node *parent;
820         int error;
821         int i;
822         int r;
823
824         error = 0;
825         if (cursor->parent == NULL) {
826                 /*
827                  * We are at the root of the cluster, pop up to the root
828                  * of the parent cluster.  Return EIO if we are at the
829                  * root cluster of the filesystem.
830                  */
831                 ondisk = cursor->cluster->ondisk;
832                 error = btree_cursor_set_cluster_by_value(
833                             cursor,
834                             ondisk->clu_btree_parent_vol_no,
835                             ondisk->clu_btree_parent_clu_no,
836                             ondisk->clu_btree_parent_clu_id);
837         } else {
838                 /*
839                  * Copy the current node's parent info into the node and load
840                  * a new parent.
841                  */
842                 cursor->index = cursor->parent_index;
843                 cursor->node = (hammer_btree_node_t)cursor->parent;
844                 hammer_dup_buffer(&cursor->node_buffer, cursor->parent_buffer);
845
846                 /*
847                  * Load the parent's parent into parent and figure out the
848                  * parent's element index for its child.  Just NULL it out
849                  * if we hit the root of the cluster.
850                  */
851                 if (cursor->parent->base.parent) {
852                         parent = hammer_bread(cursor->cluster,
853                                               cursor->node->base.parent,
854                                               HAMMER_FSBUF_BTREE,
855                                               &error,
856                                               &cursor->parent_buffer);
857                         for (i = 0; i < parent->base.count; ++i) {
858                                 r = hammer_btree_cmp(
859                                         &cursor->node->internal.elms[0].base,
860                                         &parent->elms[i].base);
861                                 if (r < 0)
862                                         break;
863                         }
864                         cursor->parent = parent;
865                         cursor->parent_index = i - 1;
866                         KKASSERT(parent->elms[i].subtree_offset ==
867                                  hammer_bclu_offset(cursor->node_buffer,
868                                                     cursor->node));
869                 } else {
870                         hammer_put_buffer(cursor->parent_buffer);
871                         cursor->parent = NULL;
872                         cursor->parent_buffer = NULL;
873                         cursor->parent_index = 0;
874                 }
875         }
876         return(error);
877 }
878
879 /*
880  * Cursor down into (node, index)
881  *
882  * Push down into the current cursor.  The current cursor becomes the parent.
883  * If the current cursor represents a leaf cluster element this function will
884  * push into the root of a new cluster and clear the parent fields.
885  *
886  * Pushin down at a leaf which is not a cluster element returns EIO.
887  */
888 static
889 int
890 btree_cursor_down(hammer_btree_cursor_t cursor)
891 {
892         hammer_btree_node_t node;
893         int error;
894
895         node = cursor->node;
896         if (node->base.type == HAMMER_BTREE_LEAF_NODE) {
897                 /*
898                  * Push into another cluster
899                  */
900                 hammer_btree_leaf_elm_t elm;
901
902                 elm = &node->leaf.elms[cursor->index];
903                 if (elm->base.rec_type == HAMMER_RECTYPE_CLUSTER) {
904                         error = btree_cursor_set_cluster_by_value(
905                                     cursor,
906                                     elm->cluster.vol_no,
907                                     elm->cluster.clu_no,
908                                     elm->cluster.verifier);
909                 } else {
910                         error = EIO;
911                 }
912         } else {
913                 /*
914                  * Push into another BH-Tree node (internal or leaf)
915                  */
916                 struct hammer_btree_internal_elm *elm;
917
918                 KKASSERT(node->base.type == HAMMER_BTREE_INTERNAL_NODE);
919                 elm = &node->internal.elms[cursor->index];
920                 KKASSERT(elm->subtree_offset != 0);
921
922                 cursor->parent_index = cursor->index;
923                 cursor->parent = &cursor->node->internal;
924                 hammer_dup_buffer(&cursor->parent_buffer, cursor->node_buffer);
925
926                 cursor->node = hammer_bread(cursor->cluster,
927                                             elm->subtree_offset,
928                                             HAMMER_FSBUF_BTREE,
929                                             &error,
930                                             &cursor->node_buffer);
931                 cursor->index = 0;
932                 KKASSERT(cursor->node == NULL ||
933                          cursor->node->base.parent == elm->subtree_offset);
934         }
935         return(error);
936 }
937
938 /************************************************************************
939  *                              SPLITTING AND MERGING                   *
940  ************************************************************************
941  *
942  * These routines do all the dirty work required to split and merge nodes.
943  */
944
945 /*
946  * Split an internal into two nodes and move the separator at the split
947  * point to the parent.  Note that the parent's parent's element pointing
948  * to our parent will have an incorrect subtree_count (we don't update it).
949  * It will be low, which is ok.
950  *
951  * Cursor->index indicates the element the caller intends to push into.
952  * We will adjust cursor->node and cursor->index if that element winds
953  * up in the split node.
954  */
955 static
956 int
957 btree_split_internal(hammer_btree_cursor_t cursor)
958 {
959         struct hammer_btree_internal_node *parent;
960         struct hammer_btree_internal_node *node;
961         struct hammer_btree_internal_node *new_node;
962         struct hammer_btree_internal_elm *elm;
963         struct hammer_btree_internal_elm *parent_elm;
964         struct hammer_buffer *new_buffer;
965         int32_t parent_offset;
966         int parent_index;
967         int made_root;
968         int split;
969         int error;
970         const size_t esize = sizeof(struct hammer_btree_internal_elm);
971
972         /* 
973          * We are splitting but elms[split] will be promoted to the parent,
974          * leaving the right hand node with one less element.  If the
975          * insertion point will be on the left-hand side adjust the split
976          * point to give the right hand side one additional node.
977          */
978         node = &cursor->node->internal;
979         split = (node->base.count + 1) / 2;
980         if (cursor->index <= split)
981                 --split;
982         error = 0;
983
984         /*
985          * If we are at the root of the tree, create a new root node with
986          * 1 element and split normally.  Avoid making major modifications
987          * until we know the whole operation will work.
988          */
989         parent = cursor->parent;
990         if (parent == NULL) {
991                 parent = hammer_alloc_btree(cursor->cluster, &error,
992                                             &cursor->parent_buffer);
993                 if (parent == NULL)
994                         return(error);
995                 parent->base.count = 1;
996                 parent->base.parent = 0;
997                 parent->base.type = HAMMER_BTREE_INTERNAL_NODE;
998                 parent->base.subtype = node->base.type;
999                 parent->elms[0].base = cursor->cluster->ondisk->clu_btree_beg;
1000                 parent->elms[0].subtree_offset =
1001                         hammer_bclu_offset(cursor->node_buffer, node);
1002                 parent->elms[1].base = cursor->cluster->ondisk->clu_btree_end;
1003                 made_root = 1;
1004                 cursor->parent_index = 0;       /* insertion point in parent */
1005         } else {
1006                 made_root = 0;
1007         }
1008         parent_offset = hammer_bclu_offset(cursor->parent_buffer, parent);
1009
1010         /*
1011          * Split node into new_node at the split point.
1012          *
1013          *  B O O O P N N B     <-- P = node->elms[split]
1014          *   0 1 2 3 4 5 6      <-- subtree indices
1015          *
1016          *       x x P x x
1017          *        s S S s  
1018          *         /   \
1019          *  B O O O B    B N N B        <--- inner boundary points are 'P'
1020          *   0 1 2 3      4 5 6  
1021          *
1022          */
1023         new_buffer = NULL;
1024         new_node = hammer_alloc_btree(cursor->cluster, &error, &new_buffer);
1025         if (new_node == NULL) {
1026                 if (made_root)
1027                         hammer_free_btree_ptr(cursor->parent_buffer,
1028                                               (hammer_btree_node_t)parent);
1029                 return(error);
1030         }
1031
1032         /*
1033          * Create the new node.  P become the left-hand boundary in the
1034          * new node.  Copy the right-hand boundary as well.
1035          *
1036          * elm is the new separator.
1037          */
1038         elm = &node->elms[split];
1039         bcopy(elm, &new_node->elms[0], (node->base.count - split + 1) * esize);
1040         new_node->base.count = node->base.count - split;
1041         new_node->base.parent = parent_offset;
1042         new_node->base.type = HAMMER_BTREE_INTERNAL_NODE;
1043         new_node->base.subtype = node->base.subtype;
1044         KKASSERT(node->base.type == new_node->base.type);
1045
1046         /*
1047          * Cleanup the original node.  P becomes the new boundary, its
1048          * subtree_offset was moved to the new node.  If we had created
1049          * a new root its parent pointer may have changed.
1050          */
1051         node->base.parent = parent_offset;
1052         elm->subtree_offset = 0;
1053
1054         /*
1055          * Insert the separator into the parent, fixup the parent's
1056          * reference to the original node, and reference the new node.
1057          * The separator is P.
1058          *
1059          * Remember that base.count does not include the right-hand boundary.
1060          */
1061         parent_index = cursor->parent_index;
1062         parent->elms[parent_index].subtree_count = split;
1063         parent_elm = &parent->elms[parent_index+1];
1064         bcopy(parent_elm, parent_elm + 1,
1065               (parent->base.count - parent_index) * esize);
1066         parent_elm->base = elm->base;   /* separator P */
1067         parent_elm->subtree_offset = hammer_bclu_offset(new_buffer, new_node);
1068         parent_elm->subtree_count = new_node->base.count;
1069
1070         hammer_modify_buffer(new_buffer);
1071         hammer_modify_buffer(cursor->parent_buffer);
1072         hammer_modify_buffer(cursor->node_buffer);
1073
1074         /*
1075          * The cluster's root pointer may have to be updated.
1076          */
1077         if (made_root) {
1078                 cursor->cluster->ondisk->clu_btree_root = node->base.parent;
1079                 hammer_modify_cluster(cursor->cluster);
1080         }
1081
1082         /*
1083          * Ok, now adjust the cursor depending on which element the original
1084          * index was pointing at.  If we are >= the split point the push node
1085          * is now in the new node.
1086          *
1087          * NOTE: If we are at the split point itself we cannot stay with the
1088          * original node because the push index will point at the right-hand
1089          * boundary, which is illegal.
1090          */
1091         if (cursor->index >= split) {
1092                 cursor->index -= split;
1093                 cursor->node = (hammer_btree_node_t)new_node;
1094                 hammer_put_buffer(cursor->node_buffer);
1095                 cursor->node_buffer = new_buffer;
1096         }
1097
1098         return (0);
1099 }
1100
1101 /*
1102  * Same as the above, but splits a full leaf node.
1103  */
1104 static
1105 int
1106 btree_split_leaf(hammer_btree_cursor_t cursor)
1107 {
1108         struct hammer_btree_internal_node *parent;
1109         struct hammer_btree_leaf_node *leaf;
1110         struct hammer_btree_leaf_node *new_leaf;
1111         union hammer_btree_leaf_elm *elm;
1112         struct hammer_btree_internal_elm *parent_elm;
1113         struct hammer_buffer *new_buffer;
1114         int32_t parent_offset;
1115         int parent_index;
1116         int made_root;
1117         int split;
1118         int error;
1119         const size_t esize = sizeof(struct hammer_btree_internal_elm);
1120
1121         /* 
1122          * We are splitting but elms[split] will be promoted to the parent,
1123          * leaving the right hand node with one less element.  If the
1124          * insertion point will be on the left-hand side adjust the split
1125          * point to give the right hand side one additional node.
1126          */
1127         leaf = &cursor->node->leaf;
1128         split = (leaf->base.count + 1) / 2;
1129         if (cursor->index <= split)
1130                 --split;
1131         error = 0;
1132
1133         /*
1134          * If we are at the root of the tree, create a new root node with
1135          * 1 element and split normally.  Avoid making major modifications
1136          * until we know the whole operation will work.
1137          */
1138         parent = cursor->parent;
1139         if (parent == NULL) {
1140                 parent = hammer_alloc_btree(cursor->cluster, &error,
1141                                             &cursor->parent_buffer);
1142                 if (parent == NULL)
1143                         return(error);
1144                 parent->base.count = 1;
1145                 parent->base.parent = 0;
1146                 parent->base.type = HAMMER_BTREE_INTERNAL_NODE;
1147                 parent->base.subtype = leaf->base.type;
1148                 parent->elms[0].base = cursor->cluster->ondisk->clu_btree_beg;
1149                 parent->elms[0].subtree_offset =
1150                         hammer_bclu_offset(cursor->node_buffer, leaf);
1151                 parent->elms[1].base = cursor->cluster->ondisk->clu_btree_end;
1152                 made_root = 1;
1153                 cursor->parent_index = 0;       /* insertion point in parent */
1154         } else {
1155                 made_root = 0;
1156         }
1157         parent_offset = hammer_bclu_offset(cursor->parent_buffer, parent);
1158
1159         /*
1160          * Split leaf into new_leaf at the split point.  Select a separator
1161          * value in-between the two leafs but with a bent towards the right
1162          * leaf since comparisons use an 'elm >= separator' inequality.
1163          *
1164          *  L L L L L L L L
1165          *
1166          *       x x P x x
1167          *        s S S s  
1168          *         /   \
1169          *  L L L L     L L L L
1170          */
1171         new_buffer = NULL;
1172         new_leaf = hammer_alloc_btree(cursor->cluster, &error, &new_buffer);
1173         if (new_leaf == NULL) {
1174                 if (made_root)
1175                         hammer_free_btree_ptr(cursor->parent_buffer,
1176                                               (hammer_btree_node_t)parent);
1177                 return(error);
1178         }
1179
1180         /*
1181          * Create the new node.  P become the left-hand boundary in the
1182          * new node.  Copy the right-hand boundary as well.
1183          */
1184         elm = &leaf->elms[split];
1185         bcopy(elm, &new_leaf->elms[0], (leaf->base.count - split) * esize);
1186         new_leaf->base.count = leaf->base.count - split;
1187         new_leaf->base.parent = parent_offset;
1188         new_leaf->base.type = HAMMER_BTREE_LEAF_NODE;
1189         new_leaf->base.subtype = 0;
1190         KKASSERT(leaf->base.type == new_leaf->base.type);
1191
1192         /*
1193          * Cleanup the original node.  P becomes the new boundary, its
1194          * subtree_offset was moved to the new node.  If we had created
1195          * a new root its parent pointer may have changed.
1196          */
1197         leaf->base.parent = parent_offset;
1198
1199         /*
1200          * Insert the separator into the parent, fixup the parent's
1201          * reference to the original node, and reference the new node.
1202          * The separator is P.
1203          *
1204          * Remember that base.count does not include the right-hand boundary.
1205          * We are copying parent_index+1 to parent_index+2, not +0 to +1.
1206          */
1207         parent_index = cursor->parent_index;
1208         parent->elms[parent_index].subtree_count = split;
1209         parent_elm = &parent->elms[parent_index+1];
1210         if (parent_index + 1 != parent->base.count) {
1211                 bcopy(parent_elm, parent_elm + 1,
1212                       (parent->base.count - parent_index - 1) * esize);
1213         }
1214         hammer_make_separator(&elm[-1].base, &elm[0].base, &parent_elm->base);
1215         parent_elm->subtree_offset = hammer_bclu_offset(new_buffer, new_leaf);
1216         parent_elm->subtree_count = new_leaf->base.count;
1217
1218         hammer_modify_buffer(new_buffer);
1219         hammer_modify_buffer(cursor->parent_buffer);
1220         hammer_modify_buffer(cursor->node_buffer);
1221
1222         /*
1223          * The cluster's root pointer may have to be updated.
1224          */
1225         if (made_root) {
1226                 cursor->cluster->ondisk->clu_btree_root = leaf->base.parent;
1227                 hammer_modify_cluster(cursor->cluster);
1228         }
1229
1230         /*
1231          * Ok, now adjust the cursor depending on which element the original
1232          * index was pointing at.  If we are >= the split point the push node
1233          * is now in the new node.
1234          *
1235          * NOTE: If we are at the split point itself we cannot stay with the
1236          * original node because the push index will point at the right-hand
1237          * boundary, which is illegal.
1238          */
1239         if (cursor->index >= split) {
1240                 cursor->index -= split;
1241                 cursor->node = (hammer_btree_node_t)new_leaf;
1242                 hammer_put_buffer(cursor->node_buffer);
1243                 cursor->node_buffer = new_buffer;
1244         }
1245
1246         return (0);
1247 }
1248
1249 /*
1250  * This routine is called on an internal node prior to recursing down
1251  * through (node, index) when (node, index) MIGHT have too few elements for
1252  * the caller to perform a deletion.
1253  *
1254  * cursor->index is invalid on return because the separators may have gotten
1255  * adjusted, the caller must rescan the node's elements.  The caller may set
1256  * cursor->index to -1 if it wants us to do a general rebalancing.
1257  * 
1258  * NOTE: Because we do not update the parent's parent in the split code,
1259  * the subtree_count used by the caller may be incorrect.  We correct it
1260  * here.  Also note that we cannot change the depth of the tree's leaf
1261  * nodes here (see btree_collapse()).
1262  *
1263  * This routine rebalances the children of the node, collapsing children
1264  * together if possible.  On return each child will have at least L/2-1
1265  * elements unless the node only has one child.
1266  */
1267 static
1268 int
1269 btree_rebalance_node(hammer_btree_cursor_t cursor)
1270 {
1271         struct hammer_btree_internal_node *node;
1272         hammer_btree_node_t children[HAMMER_BTREE_INT_ELMS];
1273         struct hammer_buffer *child_buffer[HAMMER_BTREE_INT_ELMS];
1274         hammer_btree_node_t child;
1275         hammer_btree_elm_inmemory_t elms;
1276         int i, j, n, nelms, goal;
1277         int maxelms, halfelms;
1278         int error;
1279
1280         /*
1281          * Basic setup
1282          */
1283         node = &cursor->node->internal;
1284         KKASSERT(node->elms[cursor->index].subtree_offset != 0);
1285         error = 0;
1286
1287         /*
1288          * Load the children of node and do any necessary corrections
1289          * to subtree_count.  subtree_count may be incorrect due to the
1290          * way insertions split nodes.  Get a count of the total number
1291          * of elements held by our children.
1292          */
1293         error = 0;
1294
1295         for (i = n = 0; i < node->base.count; ++i) {
1296                 struct hammer_btree_internal_elm *elm;
1297
1298                 elm = &node->elms[i];
1299                 children[i] = NULL;
1300                 child_buffer[i] = NULL; /* must be preinitialized for bread */
1301                 if (elm->subtree_offset == 0)
1302                         continue;
1303                 child = hammer_bread(cursor->cluster, elm->subtree_offset,
1304                                      HAMMER_FSBUF_BTREE, &error,
1305                                      &child_buffer[i]);
1306                 children[i] = child;
1307                 if (child == NULL)
1308                         continue;
1309                 KKASSERT(node->base.subtype == child->base.type);
1310
1311                 /*
1312                  * Accumulate n for a good child, update the node's count
1313                  * if it was wrong.
1314                  */
1315                 if (node->elms[i].subtree_count != child->base.count) {
1316                         node->elms[i].subtree_count = child->base.count;
1317                 }
1318                 n += node->elms[i].subtree_count;
1319         }
1320         if (error)
1321                 goto failed;
1322
1323         /*
1324          * Collect all the children's elements together
1325          */
1326         nelms = n;
1327         elms = kmalloc(sizeof(*elms) * (nelms + 1), M_HAMMER, M_WAITOK|M_ZERO);
1328         for (i = n = 0; i < node->base.count; ++i) {
1329                 child = children[i];
1330                 for (j = 0; j < child->base.count; ++j) {
1331                         elms[n].owner = child;
1332                         if (node->base.subtype == HAMMER_BTREE_LEAF_NODE)
1333                                 elms[n].u.leaf = child->leaf.elms[j];
1334                         else
1335                                 elms[n].u.internal = child->internal.elms[j];
1336                         ++n;
1337                 }
1338         }
1339         KKASSERT(n == nelms);
1340
1341         /*
1342          * Store a boundary in the elms array to ease the code below.  This
1343          * is only used if the children are internal nodes.
1344          */
1345         elms[n].u.internal = node->elms[i];
1346
1347         /*
1348          * Calculate the number of elements each child should have (goal) by
1349          * reducing the number of elements until we achieve at least
1350          * halfelms - 1 per child, unless we are a degenerate case.
1351          */
1352         maxelms = btree_max_elements(node->base.subtype);
1353         halfelms = maxelms / 2;
1354
1355         goal = halfelms - 1;
1356         while (i && n / i < goal)
1357                 --i;
1358
1359         /*
1360          * Now rebalance using the specified goal
1361          */
1362         for (i = n = 0; i < node->base.count; ++i) {
1363                 struct hammer_buffer *subchild_buffer = NULL;
1364                 struct hammer_btree_internal_node *subchild;
1365
1366                 child = children[i];
1367                 for (j = 0; j < goal && n < nelms; ++j) {
1368                         if (node->base.subtype == HAMMER_BTREE_LEAF_NODE) {
1369                                 child->leaf.elms[j] = elms[n].u.leaf;
1370                         } else {
1371                                 child->internal.elms[j] = elms[n].u.internal;
1372                         }
1373
1374                         /*
1375                          * If the element's parent has changed we have to
1376                          * update the parent pointer.  This is somewhat
1377                          * expensive.
1378                          */
1379                         if (elms[n].owner != child &&
1380                             node->base.subtype == HAMMER_BTREE_INTERNAL_NODE) {
1381                                 subchild = hammer_bread(cursor->cluster,
1382                                                         elms[n].u.internal.subtree_offset,
1383                                                         HAMMER_FSBUF_BTREE,
1384                                                         &error,
1385                                                         &subchild_buffer);
1386                                 if (subchild) {
1387                                         subchild->base.parent =
1388                                             hammer_bclu_offset(child_buffer[i],
1389                                                                 child);
1390                                         hammer_modify_buffer(subchild_buffer);
1391                                 }
1392                                 /* XXX error */
1393                         }
1394                         ++n;
1395                 }
1396                 /* 
1397                  * Set right boundary if the children are internal nodes.
1398                  */
1399                 if (node->base.subtype == HAMMER_BTREE_INTERNAL_NODE)
1400                         child->internal.elms[j] = elms[n].u.internal;
1401                 child->base.count = j;
1402                 hammer_modify_buffer(child_buffer[i]);
1403                 if (subchild_buffer)
1404                         hammer_put_buffer(subchild_buffer);
1405
1406                 /*
1407                  * If we have run out of elements, break out
1408                  */
1409                 if (n == nelms)
1410                         break;
1411         }
1412
1413         /*
1414          * Physically destroy any left-over children.  These children's
1415          * elements have been packed into prior children.  The node's
1416          * right hand boundary and count gets shifted to index i.
1417          *
1418          * The subtree count in the node's parent MUST be updated because
1419          * we are removing elements.  The subtree_count field is allowed to
1420          * be too small, but not too large!
1421          */
1422         if (i != node->base.count) {
1423                 n = i;
1424                 node->elms[n] = node->elms[node->base.count];
1425                 while (i < node->base.count) {
1426                         hammer_free_btree_ptr(child_buffer[i], children[i]);
1427                         hammer_put_buffer(child_buffer[i]);
1428                         ++i;
1429                 }
1430                 node->base.count = n;
1431                 if (cursor->parent) {
1432                         cursor->parent->elms[cursor->parent_index].subtree_count = n;
1433                         hammer_modify_buffer(cursor->parent_buffer);
1434                 }
1435         }
1436
1437         kfree(elms, M_HAMMER);
1438 failed:
1439         hammer_modify_buffer(cursor->node_buffer);
1440         for (i = 0; i < node->base.count; ++i) {
1441                 if (child_buffer[i])
1442                         hammer_put_buffer(child_buffer[i]);
1443         }
1444         return (error);
1445 }
1446
1447 /*
1448  * This routine is only called if the cursor is at the root node and the
1449  * root node is an internal node.  We attempt to collapse the root node
1450  * by replacing it with all of its children, reducing tree depth by one.
1451  *
1452  * This is the only way to reduce tree depth in a HAMMER filesystem.
1453  * Note that all leaf nodes are at the same depth.
1454  *
1455  * This is a fairly expensive operation because we not only have to load
1456  * the root's children, we also have to scan each child and adjust the
1457  * parent offset for each element in each child.  Nasty all around.
1458  */
1459 static
1460 int
1461 btree_collapse(hammer_btree_cursor_t cursor)
1462 {
1463         hammer_btree_node_t root, child;
1464         hammer_btree_node_t children[HAMMER_BTREE_INT_ELMS];
1465         struct hammer_buffer *child_buffer[HAMMER_BTREE_INT_ELMS];
1466         int count;
1467         int i, j, n;
1468         int root_modified;
1469         int error;
1470         int32_t root_offset;
1471         u_int8_t subsubtype;
1472
1473         root = cursor->node;
1474         count = root->base.count;
1475         root_offset = hammer_bclu_offset(cursor->node_buffer, root);
1476
1477         /*
1478          * Sum up the number of children each element has.  This value is
1479          * only approximate due to the way the insertion node works.  It
1480          * may be too small but it will never be too large.
1481          *
1482          * Quickly terminate the collapse if the elements have too many
1483          * children.
1484          */
1485         KKASSERT(root->base.parent == 0);       /* must be root node */
1486         KKASSERT(root->base.type == HAMMER_BTREE_INTERNAL_NODE);
1487         KKASSERT(count <= HAMMER_BTREE_INT_ELMS);
1488
1489         for (i = n = 0; i < count; ++i) {
1490                 n += root->internal.elms[i].subtree_count;
1491         }
1492         if (n > btree_max_elements(root->base.subtype))
1493                 return(0);
1494
1495         /*
1496          * Iterate through the elements again and correct the subtree_count.
1497          * Terminate the collapse if we wind up with too many.
1498          */
1499         error = 0;
1500         root_modified = 0;
1501
1502         for (i = n = 0; i < count; ++i) {
1503                 struct hammer_btree_internal_elm *elm;
1504
1505                 elm = &root->internal.elms[i];
1506                 child_buffer[i] = NULL;
1507                 children[i] = NULL;
1508                 if (elm->subtree_offset == 0)
1509                         continue;
1510                 child = hammer_bread(cursor->cluster, elm->subtree_offset,
1511                                      HAMMER_FSBUF_BTREE, &error,
1512                                      &child_buffer[i]);
1513                 children[i] = child;
1514                 if (child == NULL)
1515                         continue;
1516                 KKASSERT(root->base.subtype == child->base.type);
1517
1518                 /*
1519                  * Accumulate n for a good child, update the root's count
1520                  * if it was wrong.
1521                  */
1522                 if (root->internal.elms[i].subtree_count != child->base.count) {
1523                         root->internal.elms[i].subtree_count = child->base.count;
1524                         root_modified = 1;
1525                 }
1526                 n += root->internal.elms[i].subtree_count;
1527         }
1528         if (error || n > btree_max_elements(root->base.subtype))
1529                 goto done;
1530
1531         /*
1532          * Ok, we can collapse the root.  If the root's children are leafs
1533          * the collapse is really simple.  If they are internal nodes the
1534          * collapse is not so simple because we have to fixup the parent
1535          * pointers for the root's children's children.
1536          *
1537          * When collapsing an internal node the far left and far right
1538          * element's boundaries should match the root's left and right
1539          * boundaries.
1540          */
1541         if (root->base.subtype == HAMMER_BTREE_LEAF_NODE) {
1542                 for (i = n = 0; i < count; ++i) {
1543                         child = children[i];
1544                         for (j = 0; j < child->base.count; ++j) {
1545                                 root->leaf.elms[n] = child->leaf.elms[j];
1546                                 ++n;
1547                         }
1548                 }
1549                 root->base.type = root->base.subtype;
1550                 root->base.subtype = 0;
1551                 root->base.count = n;
1552                 root->leaf.link_left = 0;
1553                 root->leaf.link_right = 0;
1554         } else {
1555                 struct hammer_btree_internal_elm *elm;
1556                 struct hammer_btree_internal_node *subchild;
1557                 struct hammer_buffer *subchild_buffer = NULL;
1558
1559                 if (count) {
1560                         child = children[0];
1561                         subsubtype = child->base.subtype;
1562                         KKASSERT(child->base.count > 0);
1563                         KKASSERT(root->internal.elms[0].base.key ==
1564                                  child->internal.elms[0].base.key);
1565                         child = children[count-1];
1566                         KKASSERT(child->base.count > 0);
1567                         KKASSERT(root->internal.elms[count].base.key ==
1568                              child->internal.elms[child->base.count].base.key);
1569                 } else {
1570                         subsubtype = 0;
1571                 }
1572                 for (i = n = 0; i < count; ++i) {
1573                         child = children[i];
1574                         KKASSERT(child->base.subtype == subsubtype);
1575                         for (j = 0; j < child->base.count; ++j) {
1576                                 elm = &child->internal.elms[j];
1577
1578                                 root->internal.elms[n] = *elm;
1579                                 subchild = hammer_bread(cursor->cluster,
1580                                                         elm->subtree_offset,
1581                                                         HAMMER_FSBUF_BTREE,
1582                                                         &error,
1583                                                         &subchild_buffer);
1584                                 if (subchild) {
1585                                         subchild->base.parent = root_offset;
1586                                         hammer_modify_buffer(subchild_buffer);
1587                                 }
1588                                 ++n;
1589                         }
1590                         /* make sure the right boundary is correct */
1591                         /* (this gets overwritten when the loop continues) */
1592                         /* XXX generate a new separator? */
1593                         root->internal.elms[n] = child->internal.elms[j];
1594                 }
1595                 root->base.type = HAMMER_BTREE_INTERNAL_NODE;
1596                 root->base.subtype = subsubtype;
1597                 if (subchild_buffer)
1598                         hammer_put_buffer(subchild_buffer);
1599         }
1600         root_modified = 1;
1601
1602         /*
1603          * Cleanup
1604          */
1605 done:
1606         if (root_modified)
1607                 hammer_modify_buffer(cursor->node_buffer);
1608         for (i = 0; i < count; ++i) {
1609                 if (child_buffer[i])
1610                         hammer_put_buffer(child_buffer[i]);
1611         }
1612         return(error);
1613 }
1614