Merge from vendor branch FILE:
[dragonfly.git] / sys / vfs / hammer / hammer_btree.c
1 /*
2  * Copyright (c) 2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/hammer_btree.c,v 1.21 2008/01/18 07:02:41 dillon Exp $
35  */
36
37 /*
38  * HAMMER B-Tree index
39  *
40  * HAMMER implements a modified B+Tree.  In documentation this will
41  * simply be refered to as the HAMMER B-Tree.  Basically a HAMMER B-Tree
42  * looks like a B+Tree (A B-Tree which stores its records only at the leafs
43  * of the tree), but adds two additional boundary elements which describe
44  * the left-most and right-most element a node is able to represent.  In
45  * otherwords, we have boundary elements at the two ends of a B-Tree node
46  * instead of sub-tree pointers.
47  *
48  * A B-Tree internal node looks like this:
49  *
50  *      B N N N N N N B   <-- boundary and internal elements
51  *       S S S S S S S    <-- subtree pointers
52  *
53  * A B-Tree leaf node basically looks like this:
54  *
55  *      L L L L L L L L   <-- leaf elemenets
56  *
57  * The radix for an internal node is 1 less then a leaf but we get a
58  * number of significant benefits for our troubles.
59  *
60  * The big benefit to using a B-Tree containing boundary information
61  * is that it is possible to cache pointers into the middle of the tree
62  * and not have to start searches, insertions, OR deletions at the root
63  * node.   In particular, searches are able to progress in a definitive
64  * direction from any point in the tree without revisting nodes.  This
65  * greatly improves the efficiency of many operations, most especially
66  * record appends.
67  *
68  * B-Trees also make the stacking of trees fairly straightforward.
69  *
70  * SPIKES: Two leaf elements denoting an inclusive sub-range of keys
71  * may represent a spike, or a recursion into another cluster.  Most
72  * standard B-Tree searches traverse spikes.
73  *
74  * INSERTIONS:  A search performed with the intention of doing
75  * an insert will guarantee that the terminal leaf node is not full by
76  * splitting full nodes.  Splits occur top-down during the dive down the
77  * B-Tree.
78  *
79  * DELETIONS: A deletion makes no attempt to proactively balance the
80  * tree and will recursively remove nodes that become empty.  Empty
81  * nodes are not allowed and a deletion may recurse upwards from the leaf.
82  * Rather then allow a deadlock a deletion may terminate early by setting
83  * an internal node's element's subtree_offset to 0.  The deletion will
84  * then be resumed the next time a search encounters the element.
85  */
86 #include "hammer.h"
87 #include <sys/buf.h>
88 #include <sys/buf2.h>
89
90 static int btree_search(hammer_cursor_t cursor, int flags);
91 static int btree_split_internal(hammer_cursor_t cursor);
92 static int btree_split_leaf(hammer_cursor_t cursor);
93 static int btree_remove(hammer_cursor_t cursor, int depth);
94 static int btree_remove_deleted_element(hammer_cursor_t cursor);
95 static int btree_set_parent(hammer_node_t node, hammer_btree_elm_t elm);
96 #if 0
97 static int btree_rebalance(hammer_cursor_t cursor);
98 static int btree_collapse(hammer_cursor_t cursor);
99 static int btree_node_is_almost_full(hammer_node_ondisk_t node);
100 #endif
101 static int btree_node_is_full(hammer_node_ondisk_t node);
102 static void hammer_make_separator(hammer_base_elm_t key1,
103                         hammer_base_elm_t key2, hammer_base_elm_t dest);
104
105 /*
106  * Iterate records after a search.  The cursor is iterated forwards past
107  * the current record until a record matching the key-range requirements
108  * is found.  ENOENT is returned if the iteration goes past the ending
109  * key. 
110  *
111  * The iteration is inclusive of key_beg and can be inclusive or exclusive
112  * of key_end depending on whether HAMMER_CURSOR_END_INCLUSIVE is set.
113  *
114  * When doing an as-of search (cursor->asof != 0), key_beg.delete_tid
115  * may be modified by B-Tree functions.
116  *
117  * cursor->key_beg may or may not be modified by this function during
118  * the iteration.  XXX future - in case of an inverted lock we may have
119  * to reinitiate the lookup and set key_beg to properly pick up where we
120  * left off.
121  *
122  * NOTE!  EDEADLK *CANNOT* be returned by this procedure.
123  */
124 int
125 hammer_btree_iterate(hammer_cursor_t cursor)
126 {
127         hammer_node_ondisk_t node;
128         hammer_btree_elm_t elm;
129         int error;
130         int r;
131         int s;
132
133         /*
134          * Skip past the current record
135          */
136         node = cursor->node->ondisk;
137         if (node == NULL)
138                 return(ENOENT);
139         if (cursor->index < node->count && 
140             (cursor->flags & HAMMER_CURSOR_ATEDISK)) {
141                 ++cursor->index;
142         }
143
144         /*
145          * Loop until an element is found or we are done.
146          */
147         for (;;) {
148                 /*
149                  * We iterate up the tree and then index over one element
150                  * while we are at the last element in the current node.
151                  *
152                  * NOTE: This can pop us up to another cluster.
153                  *
154                  * If we are at the root of the root cluster, cursor_up
155                  * returns ENOENT.
156                  *
157                  * NOTE: hammer_cursor_up() will adjust cursor->key_beg
158                  * when told to re-search for the cluster tag.
159                  *
160                  * XXX this could be optimized by storing the information in
161                  * the parent reference.
162                  *
163                  * XXX we can lose the node lock temporarily, this could mess
164                  * up our scan.
165                  */
166                 if (cursor->index == node->count) {
167                         error = hammer_cursor_up(cursor);
168                         if (error)
169                                 break;
170                         node = cursor->node->ondisk;
171                         KKASSERT(cursor->index != node->count);
172                         ++cursor->index;
173                         continue;
174                 }
175
176                 /*
177                  * Check internal or leaf element.  Determine if the record
178                  * at the cursor has gone beyond the end of our range.
179                  *
180                  * Generally we recurse down through internal nodes.  An
181                  * internal node can only be returned if INCLUSTER is set
182                  * and the node represents a cluster-push record.
183                  */
184                 if (node->type == HAMMER_BTREE_TYPE_INTERNAL) {
185                         elm = &node->elms[cursor->index];
186                         r = hammer_btree_cmp(&cursor->key_end, &elm[0].base);
187                         s = hammer_btree_cmp(&cursor->key_beg, &elm[1].base);
188                         if (hammer_debug_btree) {
189                                 kprintf("BRACKETL %p:%d %016llx %02x %016llx %d\n",
190                                         cursor->node, cursor->index,
191                                         elm[0].internal.base.obj_id,
192                                         elm[0].internal.base.rec_type,
193                                         elm[0].internal.base.key,
194                                         r
195                                 );
196                                 kprintf("BRACKETR %p:%d %016llx %02x %016llx %d\n",
197                                         cursor->node, cursor->index + 1,
198                                         elm[1].internal.base.obj_id,
199                                         elm[1].internal.base.rec_type,
200                                         elm[1].internal.base.key,
201                                         s
202                                 );
203                         }
204
205                         if (r < 0) {
206                                 error = ENOENT;
207                                 break;
208                         }
209                         if (r == 0 && (cursor->flags &
210                                        HAMMER_CURSOR_END_INCLUSIVE) == 0) {
211                                 error = ENOENT;
212                                 break;
213                         }
214                         KKASSERT(s <= 0);
215
216                         /*
217                          * When iterating try to clean up any deleted
218                          * internal elements left over from btree_remove()
219                          * deadlocks, but it is ok if we can't.
220                          */
221                         if (elm->internal.subtree_offset == 0)
222                                 btree_remove_deleted_element(cursor);
223                         if (elm->internal.subtree_offset != 0) {
224                                 error = hammer_cursor_down(cursor);
225                                 if (error)
226                                         break;
227                                 KKASSERT(cursor->index == 0);
228                                 node = cursor->node->ondisk;
229                         }
230                         continue;
231                 } else {
232                         elm = &node->elms[cursor->index];
233                         r = hammer_btree_cmp(&cursor->key_end, &elm->base);
234                         if (hammer_debug_btree) {
235                                 kprintf("ELEMENT  %p:%d %016llx %02x %016llx %d\n",
236                                         cursor->node, cursor->index,
237                                         elm[0].leaf.base.obj_id,
238                                         elm[0].leaf.base.rec_type,
239                                         elm[0].leaf.base.key,
240                                         r
241                                 );
242                         }
243                         if (r < 0) {
244                                 error = ENOENT;
245                                 break;
246                         }
247                         if (r == 0 && (cursor->flags &
248                                        HAMMER_CURSOR_END_INCLUSIVE) == 0) {
249                                 error = ENOENT;
250                                 break;
251                         }
252                         switch(elm->leaf.base.btype) {
253                         case HAMMER_BTREE_TYPE_RECORD:
254                                 if ((cursor->flags & HAMMER_CURSOR_ASOF) &&
255                                     hammer_btree_chkts(cursor->asof, &elm->base)) {
256                                         ++cursor->index;
257                                         continue;
258                                 }
259                                 break;
260                         case HAMMER_BTREE_TYPE_SPIKE_BEG:
261                                 /*
262                                  * We must cursor-down via the SPIKE_END
263                                  * element, otherwise cursor->parent will
264                                  * not be set correctly for deletions.
265                                  */
266                                 KKASSERT(cursor->index + 1 < node->count);
267                                 ++cursor->index;
268                                 /* fall through */
269                         case HAMMER_BTREE_TYPE_SPIKE_END:
270                                 if (cursor->flags & HAMMER_CURSOR_INCLUSTER)
271                                         break;
272                                 error = hammer_cursor_down(cursor);
273                                 if (error)
274                                         break;
275                                 KKASSERT(cursor->index == 0);
276                                 node = cursor->node->ondisk;
277                                 continue;
278                         default:
279                                 error = EINVAL;
280                                 break;
281                         }
282                         if (error)
283                                 break;
284                 }
285
286                 /*
287                  * Return entry
288                  */
289                 if (hammer_debug_btree) {
290                         int i = cursor->index;
291                         hammer_btree_elm_t elm = &cursor->node->ondisk->elms[i];
292                         kprintf("ITERATE  %p:%d %016llx %02x %016llx\n",
293                                 cursor->node, i,
294                                 elm->internal.base.obj_id,
295                                 elm->internal.base.rec_type,
296                                 elm->internal.base.key
297                         );
298                 }
299                 return(0);
300         }
301         return(error);
302 }
303
304 /*
305  * Lookup cursor->key_beg.  0 is returned on success, ENOENT if the entry
306  * could not be found, EDEADLK if inserting and a retry is needed, and a
307  * fatal error otherwise.  When retrying, the caller must terminate the
308  * cursor and reinitialize it.
309  * 
310  * The cursor is suitably positioned for a deletion on success, and suitably
311  * positioned for an insertion on ENOENT.
312  *
313  * The cursor may begin anywhere, the search will traverse clusters in
314  * either direction to locate the requested element.
315  */
316 int
317 hammer_btree_lookup(hammer_cursor_t cursor)
318 {
319         int error;
320
321         if (cursor->flags & HAMMER_CURSOR_ASOF) {
322                 cursor->key_beg.delete_tid = cursor->asof;
323                 do {
324                         error = btree_search(cursor, 0);
325                 } while (error == EAGAIN);
326         } else {
327                 error = btree_search(cursor, 0);
328         }
329         if (error == 0 && cursor->flags)
330                 error = hammer_btree_extract(cursor, cursor->flags);
331         return(error);
332 }
333
334 /*
335  * Execute the logic required to start an iteration.  The first record
336  * located within the specified range is returned and iteration control
337  * flags are adjusted for successive hammer_btree_iterate() calls.
338  */
339 int
340 hammer_btree_first(hammer_cursor_t cursor)
341 {
342         int error;
343
344         error = hammer_btree_lookup(cursor);
345         if (error == ENOENT) {
346                 cursor->flags &= ~HAMMER_CURSOR_ATEDISK;
347                 error = hammer_btree_iterate(cursor);
348         }
349         cursor->flags |= HAMMER_CURSOR_ATEDISK;
350         return(error);
351 }
352
353 /*
354  * Extract the record and/or data associated with the cursor's current
355  * position.  Any prior record or data stored in the cursor is replaced.
356  * The cursor must be positioned at a leaf node.
357  *
358  * NOTE: Most extractions occur at the leaf of the B-Tree.  The only
359  *       extraction allowed at an internal element is at a cluster-push.
360  *       Cluster-push elements have records but no data.
361  */
362 int
363 hammer_btree_extract(hammer_cursor_t cursor, int flags)
364 {
365         hammer_node_ondisk_t node;
366         hammer_btree_elm_t elm;
367         hammer_cluster_t cluster;
368         u_int64_t buf_type;
369         int32_t cloff;
370         int32_t roff;
371         int error;
372
373         /*
374          * A cluster record type has no data reference, the information
375          * is stored directly in the record and B-Tree element.
376          *
377          * The case where the data reference resolves to the same buffer
378          * as the record reference must be handled.
379          */
380         node = cursor->node->ondisk;
381         elm = &node->elms[cursor->index];
382         cluster = cursor->node->cluster;
383         cursor->flags &= ~HAMMER_CURSOR_DATA_EMBEDDED;
384         cursor->data = NULL;
385
386         /*
387          * There is nothing to extract for an internal element.
388          */
389         if (node->type == HAMMER_BTREE_TYPE_INTERNAL)
390                 return(EINVAL);
391
392         KKASSERT(node->type == HAMMER_BTREE_TYPE_LEAF);
393
394         /*
395          * Leaf element.
396          */
397         if ((flags & HAMMER_CURSOR_GET_RECORD)) {
398                 cloff = elm->leaf.rec_offset;
399                 cursor->record = hammer_bread(cluster, cloff,
400                                               HAMMER_FSBUF_RECORDS, &error,
401                                               &cursor->record_buffer);
402         } else {
403                 cloff = 0;
404                 error = 0;
405         }
406         if ((flags & HAMMER_CURSOR_GET_DATA) && error == 0) {
407                 if (elm->leaf.base.btype != HAMMER_BTREE_TYPE_RECORD) {
408                         /*
409                          * Only records have data references.  Spike elements
410                          * do not.
411                          */
412                         cursor->data = NULL;
413                 } else if ((cloff ^ elm->leaf.data_offset) & ~HAMMER_BUFMASK) {
414                         /*
415                          * The data is not in the same buffer as the last
416                          * record we cached, but it could still be embedded
417                          * in a record.  Note that we may not have loaded the
418                          * record's buffer above, depending on flags.
419                          */
420                         if ((elm->leaf.rec_offset ^ elm->leaf.data_offset) &
421                             ~HAMMER_BUFMASK) {
422                                 if (elm->leaf.data_len & HAMMER_BUFMASK)
423                                         buf_type = HAMMER_FSBUF_DATA;
424                                 else
425                                         buf_type = 0;   /* pure data buffer */
426                         } else {
427                                 buf_type = HAMMER_FSBUF_RECORDS;
428                         }
429                         cursor->data = hammer_bread(cluster,
430                                                   elm->leaf.data_offset,
431                                                   buf_type, &error,
432                                                   &cursor->data_buffer);
433                 } else {
434                         /*
435                          * Data in same buffer as record.  Note that we
436                          * leave any existing data_buffer intact, even
437                          * though we don't use it in this case, in case
438                          * other records extracted during an iteration
439                          * go back to it.
440                          *
441                          * The data must be embedded in the record for this
442                          * case to be hit.
443                          *
444                          * Just assume the buffer type is correct.
445                          */
446                         cursor->data = (void *)
447                                 ((char *)cursor->record_buffer->ondisk +
448                                  (elm->leaf.data_offset & HAMMER_BUFMASK));
449                         roff = (char *)cursor->data - (char *)cursor->record;
450                         KKASSERT (roff >= 0 && roff < HAMMER_RECORD_SIZE);
451                         cursor->flags |= HAMMER_CURSOR_DATA_EMBEDDED;
452                 }
453         }
454         return(error);
455 }
456
457
458 /*
459  * Insert a leaf element into the B-Tree at the current cursor position.
460  * The cursor is positioned such that the element at and beyond the cursor
461  * are shifted to make room for the new record.
462  *
463  * The caller must call hammer_btree_lookup() with the HAMMER_CURSOR_INSERT
464  * flag set and that call must return ENOENT before this function can be
465  * called.
466  *
467  * ENOSPC is returned if there is no room to insert a new record.
468  */
469 int
470 hammer_btree_insert(hammer_cursor_t cursor, hammer_btree_elm_t elm)
471 {
472         hammer_node_ondisk_t node;
473         int i;
474         int error;
475
476         if ((error = hammer_cursor_upgrade(cursor)) != 0)
477                 return(error);
478
479         /*
480          * Insert the element at the leaf node and update the count in the
481          * parent.  It is possible for parent to be NULL, indicating that
482          * the root of the B-Tree in the cluster is a leaf.  It is also
483          * possible for the leaf to be empty.
484          *
485          * Remember that the right-hand boundary is not included in the
486          * count.
487          */
488         hammer_modify_node(cursor->node);
489         node = cursor->node->ondisk;
490         i = cursor->index;
491         KKASSERT(elm->base.btype != 0);
492         KKASSERT(node->type == HAMMER_BTREE_TYPE_LEAF);
493         KKASSERT(node->count < HAMMER_BTREE_LEAF_ELMS);
494         if (i != node->count) {
495                 bcopy(&node->elms[i], &node->elms[i+1],
496                       (node->count - i) * sizeof(*elm));
497         }
498         node->elms[i] = *elm;
499         ++node->count;
500
501         KKASSERT(hammer_btree_cmp(cursor->left_bound, &elm->leaf.base) <= 0);
502         KKASSERT(hammer_btree_cmp(cursor->right_bound, &elm->leaf.base) > 0);
503         if (i)
504                 KKASSERT(hammer_btree_cmp(&node->elms[i-1].leaf.base, &elm->leaf.base) < 0);
505         if (i != node->count - 1)
506                 KKASSERT(hammer_btree_cmp(&node->elms[i+1].leaf.base, &elm->leaf.base) > 0);
507
508         return(0);
509 }
510
511 #if 0
512
513 /*
514  * Insert a cluster push into the B-Tree at the current cursor position.
515  * The cursor is positioned at a leaf after a failed btree_lookup.
516  *
517  * The caller must call hammer_btree_lookup() with the HAMMER_CURSOR_INSERT
518  * flag set and that call must return ENOENT before this function can be
519  * called.
520  *
521  * This routine is used ONLY during a recovery pass while the originating
522  * cluster is serialized.  The leaf is broken up into up to three pieces,
523  * causing up to an additional internal elements to be added to the parent.
524  *
525  * ENOSPC is returned if there is no room to insert a new record.
526  */
527 int
528 hammer_btree_insert_cluster(hammer_cursor_t cursor, hammer_cluster_t ncluster,
529                             int32_t rec_offset)
530 {
531         hammer_cluster_t ocluster;
532         hammer_node_ondisk_t parent;
533         hammer_node_ondisk_t node;
534         hammer_node_ondisk_t xnode;     /* additional leaf node */
535         hammer_node_t        new_node;
536         hammer_btree_elm_t   elm;
537         const int esize = sizeof(*elm);
538         u_int8_t save;
539         int error;
540         int pi, i;
541
542         if ((error = hammer_cursor_upgrade(cursor)) != 0)
543                 return(error);
544         hammer_modify_node(cursor->node);
545         node = cursor->node->ondisk;
546         i = cursor->index;
547         KKASSERT(node->type == HAMMER_BTREE_TYPE_LEAF);
548         KKASSERT(node->count < HAMMER_BTREE_LEAF_ELMS);
549
550         /*
551          * Make sure the spike is legal or the B-Tree code will get really
552          * confused.
553          */
554         KKASSERT(hammer_btree_cmp(&ncluster->ondisk->clu_btree_beg,
555                                   cursor->left_bound) >= 0);
556         KKASSERT(hammer_btree_cmp(&ncluster->ondisk->clu_btree_end,
557                                   cursor->right_bound) <= 0);
558         if (i != node->count) {
559                 KKASSERT(hammer_btree_cmp(&ncluster->ondisk->clu_btree_end,
560                                           &node->elms[i].leaf.base) <= 0);
561         }
562
563         /*
564          * If we are at the local root of the cluster a new root node
565          * must be created, because we need an internal node.  The
566          * caller has already marked the source cluster as undergoing
567          * modification.
568          */
569         ocluster = cursor->node->cluster;
570         if (cursor->parent == NULL) {
571                 cursor->parent = hammer_alloc_btree(ocluster, &error);
572                 if (error)
573                         return(error);
574                 hammer_lock_ex(&cursor->parent->lock);
575                 hammer_modify_node(cursor->parent);
576                 parent = cursor->parent->ondisk;
577                 parent->count = 1;
578                 parent->parent = 0;
579                 parent->type = HAMMER_BTREE_TYPE_INTERNAL;
580                 parent->elms[0].base = ocluster->clu_btree_beg;
581                 parent->elms[0].base.subtree_type = node->type;
582                 parent->elms[0].internal.subtree_offset = cursor->node->node_offset;
583                 parent->elms[1].base = ocluster->clu_btree_end;
584                 cursor->parent_index = 0;
585                 cursor->left_bound = &parent->elms[0].base;
586                 cursor->right_bound = &parent->elms[1].base;
587                 node->parent = cursor->parent->node_offset;
588                 ocluster->ondisk->clu_btree_root = cursor->parent->node_offset;
589                 kprintf("no parent\n");
590         } else {
591                 kprintf("has parent\n");
592                 if (error)
593                         return(error);
594         }
595
596
597         KKASSERT(cursor->parent->ondisk->count <= HAMMER_BTREE_INT_ELMS - 2);
598
599         hammer_modify_node(cursor->parent);
600         parent = cursor->parent->ondisk;
601         pi = cursor->parent_index;
602
603         kprintf("%d node %d/%d (%c) offset=%d parent=%d\n",
604                 cursor->node->cluster->clu_no,
605                 i, node->count, node->type, cursor->node->node_offset, node->parent);
606
607         /*
608          * If the insertion point bisects the node we will need to allocate
609          * a second leaf node to copy the right hand side into.
610          */
611         if (i != 0 && i != node->count) {
612                 new_node = hammer_alloc_btree(cursor->node->cluster, &error);
613                 if (error)
614                         return(error);
615                 xnode = new_node->ondisk;
616                 bcopy(&node->elms[i], &xnode->elms[0],
617                       (node->count - i) * esize);
618                 xnode->count = node->count - i;
619                 xnode->parent = cursor->parent->node_offset;
620                 xnode->type = HAMMER_BTREE_TYPE_LEAF;
621                 node->count = i;
622         } else {
623                 new_node = NULL;
624                 xnode = NULL;
625         }
626
627         /*
628          * Adjust the parent and set pi to point at the internal element
629          * which we intended to hold the spike.
630          */
631         if (new_node) {
632                 /*
633                  * Insert spike after parent index.  Spike is at pi + 1.
634                  * Also include room after the spike for new_node
635                  */
636                 ++pi;
637                 bcopy(&parent->elms[pi], &parent->elms[pi+2],
638                       (parent->count - pi + 1) * esize);
639                 parent->count += 2;
640         } else if (i == 0) {
641                 /*
642                  * Insert spike before parent index.  Spike is at pi.
643                  *
644                  * cursor->node's index in the parent (cursor->parent_index)
645                  * has now shifted over by one.
646                  */
647                 bcopy(&parent->elms[pi], &parent->elms[pi+1],
648                       (parent->count - pi + 1) * esize);
649                 ++parent->count;
650                 ++cursor->parent_index;
651         } else {
652                 /*
653                  * Insert spike after parent index.  Spike is at pi + 1.
654                  */
655                 ++pi;
656                 bcopy(&parent->elms[pi], &parent->elms[pi+1],
657                       (parent->count - pi + 1) * esize);
658                 ++parent->count;
659         }
660
661         /*
662          * Load the spike into the parent at (pi).
663          *
664          * WARNING: subtree_type is actually overloaded within base.
665          * WARNING: subtree_clu_no is overloaded on subtree_offset
666          */
667         elm = &parent->elms[pi];
668         elm[0].internal.base = ncluster->ondisk->clu_btree_beg;
669         elm[0].internal.base.subtree_type = HAMMER_BTREE_TYPE_CLUSTER;
670         elm[0].internal.rec_offset = rec_offset;
671         elm[0].internal.subtree_clu_no = ncluster->clu_no;
672         elm[0].internal.subtree_vol_no = ncluster->volume->vol_no;
673
674         /*
675          * Load the new node into parent at (pi+1) if non-NULL, and also
676          * set the right-hand boundary for the spike.
677          *
678          * Because new_node is a leaf its elements do not point to any
679          * nodes so we don't have to scan it to adjust parent pointers.
680          *
681          * WARNING: subtree_type is actually overloaded within base.
682          * WARNING: subtree_clu_no is overloaded on subtree_offset
683          *
684          * XXX right-boundary may not match clu_btree_end if spike is
685          *     at the end of the internal node.  For now the cursor search
686          *     insertion code will deal with it.
687          */
688         if (new_node) {
689                 elm[1].internal.base = ncluster->ondisk->clu_btree_end;
690                 elm[1].internal.base.subtree_type = HAMMER_BTREE_TYPE_LEAF;
691                 elm[1].internal.subtree_offset = new_node->node_offset;
692                 elm[1].internal.subtree_vol_no = -1;
693                 elm[1].internal.rec_offset = 0;
694         } else {
695                 /*
696                  * The right boundary is only the base part of elm[1].
697                  * The rest belongs to elm[1]'s recursion.  Note however
698                  * that subtree_type is overloaded within base so we 
699                  * have to retain it as well.
700                  */
701                 save = elm[1].internal.base.subtree_type;
702                 elm[1].internal.base = ncluster->ondisk->clu_btree_end;
703                 elm[1].internal.base.subtree_type = save;
704         }
705
706         /*
707          * The boundaries stored in the cursor for node are probably all
708          * messed up now, fix them.
709          */
710         cursor->left_bound = &parent->elms[cursor->parent_index].base;
711         cursor->right_bound = &parent->elms[cursor->parent_index+1].base;
712
713         KKASSERT(hammer_btree_cmp(&ncluster->ondisk->clu_btree_end,
714                                   &elm[1].internal.base) <= 0);
715
716
717         /*
718          * Adjust the target cluster's parent offset
719          */
720         hammer_modify_cluster(ncluster);
721         ncluster->ondisk->clu_btree_parent_offset = cursor->parent->node_offset;
722
723         if (new_node)
724                 hammer_rel_node(new_node);
725
726         return(0);
727 }
728
729 #endif
730
731 /*
732  * Delete a record from the B-Tree at the current cursor position.
733  * The cursor is positioned such that the current element is the one
734  * to be deleted.
735  *
736  * On return the cursor will be positioned after the deleted element and
737  * MAY point to an internal node.  It will be suitable for the continuation
738  * of an iteration but not for an insertion or deletion.
739  *
740  * Deletions will attempt to partially rebalance the B-Tree in an upward
741  * direction, but will terminate rather then deadlock.  Empty leaves are
742  * not allowed except at the root node of a cluster.  An early termination
743  * will leave an internal node with an element whos subtree_offset is 0,
744  * a case detected and handled by btree_search().
745  */
746 int
747 hammer_btree_delete(hammer_cursor_t cursor)
748 {
749         hammer_node_ondisk_t ondisk;
750         hammer_node_t node;
751         hammer_node_t parent;
752         int error;
753         int i;
754
755         if ((error = hammer_cursor_upgrade(cursor)) != 0)
756                 return(error);
757
758         /*
759          * Delete the element from the leaf node. 
760          *
761          * Remember that leaf nodes do not have boundaries.
762          */
763         node = cursor->node;
764         ondisk = node->ondisk;
765         i = cursor->index;
766
767         KKASSERT(ondisk->type == HAMMER_BTREE_TYPE_LEAF);
768         KKASSERT(i >= 0 && i < ondisk->count);
769         hammer_modify_node(node);
770         if (i + 1 != ondisk->count) {
771                 bcopy(&ondisk->elms[i+1], &ondisk->elms[i],
772                       (ondisk->count - i - 1) * sizeof(ondisk->elms[0]));
773         }
774         --ondisk->count;
775
776         /*
777          * Validate local parent
778          */
779         if (ondisk->parent) {
780                 parent = cursor->parent;
781
782                 KKASSERT(parent != NULL);
783                 KKASSERT(parent->node_offset == ondisk->parent);
784                 KKASSERT(parent->cluster == node->cluster);
785         }
786
787         /*
788          * If the leaf becomes empty it must be detached from the parent,
789          * potentially recursing through to the cluster root.
790          *
791          * This may reposition the cursor at one of the parent's of the
792          * current node.
793          *
794          * Ignore deadlock errors, that simply means that btree_remove
795          * was unable to recurse and had to leave the subtree_offset 
796          * in the parent set to 0.
797          */
798         KKASSERT(cursor->index <= ondisk->count);
799         if (ondisk->count == 0) {
800                 do {
801                         error = btree_remove(cursor, 0);
802                 } while (error == EAGAIN);
803                 if (error == EDEADLK)
804                         error = 0;
805         } else {
806                 error = 0;
807         }
808         KKASSERT(cursor->parent == NULL || cursor->parent_index < cursor->parent->ondisk->count);
809         return(error);
810 }
811
812 /*
813  * PRIMAY B-TREE SEARCH SUPPORT PROCEDURE
814  *
815  * Search a cluster's B-Tree for cursor->key_beg, return the matching node.
816  *
817  * The search can begin ANYWHERE in the B-Tree.  As a first step the search
818  * iterates up the tree as necessary to properly position itself prior to
819  * actually doing the sarch.
820  * 
821  * INSERTIONS: The search will split full nodes and leaves on its way down
822  * and guarentee that the leaf it ends up on is not full.  If we run out
823  * of space the search continues to the leaf (to position the cursor for
824  * the spike), but ENOSPC is returned.
825  *
826  * XXX this isn't optimal - we really need to just locate the end point and
827  * insert space going up, and if we get a deadlock just release and retry
828  * the operation.  Or something like that.  The insertion code can transit
829  * multiple clusters and run splits in unnecessary clusters.
830  *
831  * DELETIONS: The search will rebalance the tree on its way down. XXX
832  *
833  * The search is only guarenteed to end up on a leaf if an error code of 0
834  * is returned, or if inserting and an error code of ENOENT is returned.
835  * Otherwise it can stop at an internal node.  On success a search returns
836  * a leaf node unless INCLUSTER is set and the search located a cluster push
837  * node (which is an internal node).
838  */
839 static 
840 int
841 btree_search(hammer_cursor_t cursor, int flags)
842 {
843         hammer_node_ondisk_t node;
844         hammer_cluster_t cluster;
845         hammer_btree_elm_t elm;
846         int error;
847         int enospc = 0;
848         int i;
849         int r;
850
851         flags |= cursor->flags;
852
853         if (hammer_debug_btree) {
854                 kprintf("SEARCH   %p:%d %016llx %02x key=%016llx did=%016llx\n",
855                         cursor->node, cursor->index,
856                         cursor->key_beg.obj_id,
857                         cursor->key_beg.rec_type,
858                         cursor->key_beg.key,
859                         cursor->key_beg.delete_tid
860                 );
861         }
862
863         /*
864          * Move our cursor up the tree until we find a node whos range covers
865          * the key we are trying to locate.  This may move us between
866          * clusters.
867          *
868          * The left bound is inclusive, the right bound is non-inclusive.
869          * It is ok to cursor up too far so when cursoring across a cluster
870          * boundary.
871          *
872          * First see if we can skip the whole cluster.  hammer_cursor_up()
873          * handles both cases but this way we don't check the cluster
874          * bounds when going up the tree within a cluster.
875          *
876          * NOTE: If INCLUSTER is set and we are at the root of the cluster,
877          * hammer_cursor_up() will return ENOENT.
878          */
879         cluster = cursor->node->cluster;
880         while (
881             hammer_btree_cmp(&cursor->key_beg, &cluster->clu_btree_beg) < 0 ||
882             hammer_btree_cmp(&cursor->key_beg, &cluster->clu_btree_end) >= 0) {
883                 error = hammer_cursor_toroot(cursor);
884                 if (error)
885                         goto done;
886                 KKASSERT(cursor->parent);
887                 error = hammer_cursor_up(cursor);
888                 if (error)
889                         goto done;
890                 cluster = cursor->node->cluster;
891         }
892
893         /*
894          * Deal with normal cursoring within a cluster.  The right bound
895          * is non-inclusive.  That is, the bounds form a separator.
896          */
897         while (hammer_btree_cmp(&cursor->key_beg, cursor->left_bound) < 0 ||
898                hammer_btree_cmp(&cursor->key_beg, cursor->right_bound) >= 0) {
899                 KKASSERT(cursor->parent);
900                 error = hammer_cursor_up(cursor);
901                 if (error)
902                         goto done;
903         }
904
905         /*
906          * We better have ended up with a node somewhere, and our second
907          * while loop had better not have traversed up a cluster.
908          */
909         KKASSERT(cursor->node != NULL && cursor->node->cluster == cluster);
910
911         /*
912          * If we are inserting we can't start at a full node if the parent
913          * is also full (because there is no way to split the node),
914          * continue running up the tree until we hit the root of the
915          * root cluster or until the requirement is satisfied.
916          *
917          * NOTE: These cursor-up's CAN continue to cross cluster boundaries.
918          *
919          * NOTE: We must guarantee at least two open spots in the parent
920          *       to deal with hammer_btree_insert_cluster().
921          *
922          * XXX as an optimization it should be possible to unbalance the tree
923          * and stop at the root of the current cluster.
924          */
925         while ((flags & HAMMER_CURSOR_INSERT) && enospc == 0) {
926                 if (btree_node_is_full(cursor->node->ondisk) == 0)
927                         break;
928                 if (cursor->parent == NULL)
929                         break;
930                 if (cursor->parent->ondisk->count != HAMMER_BTREE_INT_ELMS)
931                         break;
932                 error = hammer_cursor_up(cursor);
933                 /* cluster and node are now may become stale */
934                 if (error)
935                         goto done;
936         }
937         /* cluster = cursor->node->cluster; not needed until next cluster = */
938
939 new_cluster:
940         /*
941          * Push down through internal nodes to locate the requested key.
942          */
943         cluster = cursor->node->cluster;
944         node = cursor->node->ondisk;
945         while (node->type == HAMMER_BTREE_TYPE_INTERNAL) {
946                 /*
947                  * Scan the node to find the subtree index to push down into.
948                  * We go one-past, then back-up.
949                  *
950                  * We must proactively remove deleted elements which may
951                  * have been left over from a deadlocked btree_remove().
952                  *
953                  * The left and right boundaries are included in the loop h
954                  * in order to detect edge cases.
955                  *
956                  * If the separator only differs by delete_tid (r == -1)
957                  * we may end up going down a branch to the left of the
958                  * one containing the desired key.  Flag it.
959                  */
960                 for (i = 0; i <= node->count; ++i) {
961                         elm = &node->elms[i];
962                         r = hammer_btree_cmp(&cursor->key_beg, &elm->base);
963                         if (r < 0)
964                                 break;
965                 }
966
967                 /*
968                  * These cases occur when the parent's idea of the boundary
969                  * is wider then the child's idea of the boundary, and
970                  * require special handling.  If not inserting we can
971                  * terminate the search early for these cases but the
972                  * child's boundaries cannot be unconditionally modified.
973                  */
974                 if (i == 0) {
975                         /*
976                          * If i == 0 the search terminated to the LEFT of the
977                          * left_boundary but to the RIGHT of the parent's left
978                          * boundary.
979                          */
980                         u_int8_t save;
981
982                         if ((flags & HAMMER_CURSOR_INSERT) == 0) {
983                                 cursor->index = 0;
984                                 return(ENOENT);
985                         }
986                         elm = &node->elms[0];
987
988                         /*
989                          * Correct a left-hand boundary mismatch.
990                          *
991                          * This is done without an exclusive lock XXX.  We
992                          * have to do this or the search will not terminate
993                          * at a leaf.
994                          */
995                         hammer_modify_node(cursor->node);
996                         save = node->elms[0].base.btype;
997                         node->elms[0].base = *cursor->left_bound;
998                         node->elms[0].base.btype = save;
999                 } else if (i == node->count + 1) {
1000                         /*
1001                          * If i == node->count + 1 the search terminated to
1002                          * the RIGHT of the right boundary but to the LEFT
1003                          * of the parent's right boundary.
1004                          *
1005                          * Note that the last element in this case is
1006                          * elms[i-2] prior to adjustments to 'i'.
1007                          */
1008                         --i;
1009                         if ((flags & HAMMER_CURSOR_INSERT) == 0) {
1010                                 cursor->index = i;
1011                                 return(ENOENT);
1012                         }
1013
1014                         /*
1015                          * Correct a right-hand boundary mismatch.
1016                          * (actual push-down record is i-2 prior to
1017                          * adjustments to i).
1018                          *
1019                          * This is done without an exclusive lock XXX.  We
1020                          * have to do this or the search will not terminate
1021                          * at a leaf.
1022                          */
1023                         elm = &node->elms[i];
1024                         hammer_modify_node(cursor->node);
1025                         elm->base = *cursor->right_bound;
1026                         --i;
1027                 } else {
1028                         /*
1029                          * The push-down index is now i - 1.  If we had
1030                          * terminated on the right boundary this will point
1031                          * us at the last element.
1032                          */
1033                         --i;
1034                 }
1035                 cursor->index = i;
1036                 elm = &node->elms[i];
1037
1038                 if (hammer_debug_btree) {
1039                         kprintf("SEARCH-I %p:%d %016llx %02x key=%016llx did=%016llx\n",
1040                                 cursor->node, i,
1041                                 elm->internal.base.obj_id,
1042                                 elm->internal.base.rec_type,
1043                                 elm->internal.base.key,
1044                                 elm->internal.base.delete_tid
1045                         );
1046                 }
1047
1048                 /*
1049                  * When searching try to clean up any deleted
1050                  * internal elements left over from btree_remove()
1051                  * deadlocks.
1052                  *
1053                  * If we fail and we are doing an insertion lookup,
1054                  * we have to return EDEADLK, because an insertion lookup
1055                  * must terminate at a leaf.
1056                  */
1057                 if (elm->internal.subtree_offset == 0) {
1058                         error = btree_remove_deleted_element(cursor);
1059                         if (error == 0)
1060                                 goto new_cluster;
1061                         if (flags & HAMMER_CURSOR_INSERT)
1062                                 return(EDEADLK);
1063                         return(ENOENT);
1064                 }
1065
1066
1067                 /*
1068                  * Handle insertion and deletion requirements.
1069                  *
1070                  * If inserting split full nodes.  The split code will
1071                  * adjust cursor->node and cursor->index if the current
1072                  * index winds up in the new node.
1073                  *
1074                  * If inserting and a left or right edge case was detected,
1075                  * we cannot correct the left or right boundary and must
1076                  * prepend and append an empty leaf node in order to make
1077                  * the boundary correction.
1078                  *
1079                  * If we run out of space we set enospc and continue on
1080                  * to a leaf to provide the spike code with a good point
1081                  * of entry.  Enospc is reset if we cross a cluster boundary.
1082                  */
1083                 if ((flags & HAMMER_CURSOR_INSERT) && enospc == 0) {
1084                         if (btree_node_is_full(node)) {
1085                                 error = btree_split_internal(cursor);
1086                                 if (error) {
1087                                         if (error != ENOSPC)
1088                                                 goto done;
1089                                         enospc = 1;
1090                                 }
1091                                 /*
1092                                  * reload stale pointers
1093                                  */
1094                                 i = cursor->index;
1095                                 node = cursor->node->ondisk;
1096                         }
1097                 }
1098
1099                 /*
1100                  * Push down (push into new node, existing node becomes
1101                  * the parent) and continue the search.
1102                  */
1103                 error = hammer_cursor_down(cursor);
1104                 /* node and cluster become stale */
1105                 if (error)
1106                         goto done;
1107                 node = cursor->node->ondisk;
1108                 cluster = cursor->node->cluster;
1109         }
1110
1111         /*
1112          * We are at a leaf, do a linear search of the key array.
1113          *
1114          * If we encounter a spike element type within the necessary
1115          * range we push into it.
1116          *
1117          * On success the index is set to the matching element and 0
1118          * is returned.
1119          *
1120          * On failure the index is set to the insertion point and ENOENT
1121          * is returned.
1122          *
1123          * Boundaries are not stored in leaf nodes, so the index can wind
1124          * up to the left of element 0 (index == 0) or past the end of
1125          * the array (index == node->count).
1126          */
1127         KKASSERT (node->type == HAMMER_BTREE_TYPE_LEAF);
1128         KKASSERT(node->count <= HAMMER_BTREE_LEAF_ELMS);
1129
1130         for (i = 0; i < node->count; ++i) {
1131                 elm = &node->elms[i];
1132
1133                 r = hammer_btree_cmp(&cursor->key_beg, &elm->leaf.base);
1134
1135                 if (hammer_debug_btree > 1)
1136                         kprintf("  ELM %p %d r=%d\n", &node->elms[i], i, r);
1137
1138                 if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_BEG) {
1139                         /*
1140                          * SPIKE_BEG.  Stop if we are to the left of the
1141                          * spike begin element.
1142                          *
1143                          * If we are not the last element in the leaf continue
1144                          * the loop looking for the SPIKE_END.  If we are
1145                          * the last element, however, then push into the
1146                          * spike.
1147                          *
1148                          * A Spike demark on a delete_tid boundary must be
1149                          * pushed into.  An as-of search failure will force
1150                          * an iteration.
1151                          *
1152                          * enospc must be reset because we have crossed a
1153                          * cluster boundary.
1154                          */
1155                         if (r < -1)
1156                                 goto failed;
1157                         if (i != node->count - 1)
1158                                 continue;
1159                         panic("btree_search: illegal spike, no SPIKE_END "
1160                               "in leaf node! %p\n", cursor->node);
1161                         /*
1162                          * XXX This is not currently legal, you can only
1163                          * cursor_down() from a SPIKE_END element, otherwise
1164                          * the cursor parent is pointing at the wrong element
1165                          * for deletions.
1166                          */
1167                         if (cursor->flags & HAMMER_CURSOR_INCLUSTER)
1168                                 goto success;
1169                         cursor->index = i;
1170                         error = hammer_cursor_down(cursor);
1171                         enospc = 0;
1172                         if (error)
1173                                 goto done;
1174                         goto new_cluster;
1175                 }
1176                 if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_END) {
1177                         /*
1178                          * SPIKE_END.  We can only hit this case if we are
1179                          * greater or equal to SPIKE_BEG.
1180                          *
1181                          * If we are less then or equal to the SPIKE_END
1182                          * we must push into it, otherwise continue the
1183                          * search.
1184                          *
1185                          * enospc must be reset because we have crossed a
1186                          * cluster boundary.
1187                          */
1188                         if (r > 0)
1189                                 continue;
1190                         if (cursor->flags & HAMMER_CURSOR_INCLUSTER)
1191                                 goto success;
1192                         cursor->index = i;
1193                         error = hammer_cursor_down(cursor);
1194                         enospc = 0;
1195                         if (error)
1196                                 goto done;
1197                         goto new_cluster;
1198                 }
1199
1200                 /*
1201                  * We are at a record element.  Stop if we've flipped past
1202                  * key_beg, not counting the delete_tid test.
1203                  */
1204                 KKASSERT (elm->leaf.base.btype == HAMMER_BTREE_TYPE_RECORD);
1205
1206                 if (r < -1)
1207                         goto failed;
1208                 if (r > 0)
1209                         continue;
1210
1211                 /*
1212                  * Check our as-of timestamp against the element. 
1213                  */
1214                 if (r == -1) {
1215                         if ((cursor->flags & HAMMER_CURSOR_ASOF) == 0)
1216                                 goto failed;
1217                         if (hammer_btree_chkts(cursor->asof,
1218                                                &node->elms[i].base) != 0) {
1219                                 continue;
1220                         }
1221                 }
1222 success:
1223                 cursor->index = i;
1224                 error = 0;
1225                 if (hammer_debug_btree)
1226                         kprintf("SEARCH-L %p:%d (SUCCESS)\n", cursor->node, i);
1227                 goto done;
1228         }
1229
1230         /*
1231          * The search failed but due the way we handle delete_tid we may
1232          * have to iterate.  Here is why:  If a center separator differs
1233          * only by its delete_tid as shown below and we are looking for, say,
1234          * a record with an as-of TID of 12, we will traverse LEAF1.  LEAF1
1235          * might contain element 11 and thus not match, and LEAF2 might
1236          * contain element 17 which we DO want to match (i.e. that record
1237          * will be visible to us).
1238          *
1239          * delete_tid:    10    15    20
1240          *                   L1    L2
1241          *
1242          *
1243          * Its easiest to adjust delete_tid and to tell the caller to
1244          * retry, because this may be an insertion search and require
1245          * more then just a simple iteration.
1246          */
1247         if ((flags & (HAMMER_CURSOR_INSERT|HAMMER_CURSOR_ASOF)) ==
1248                 HAMMER_CURSOR_ASOF &&
1249             cursor->key_beg.obj_id == cursor->right_bound->obj_id &&
1250             cursor->key_beg.rec_type == cursor->right_bound->rec_type &&
1251             cursor->key_beg.key == cursor->right_bound->key &&
1252             (cursor->right_bound->delete_tid == 0 ||
1253              cursor->key_beg.delete_tid < cursor->right_bound->delete_tid)
1254         ) {
1255                 kprintf("MUST ITERATE\n");
1256                 cursor->key_beg.delete_tid = cursor->right_bound->delete_tid;
1257                 return(EAGAIN);
1258         }
1259
1260 failed:
1261         if (hammer_debug_btree) {
1262                 kprintf("SEARCH-L %p:%d (FAILED)\n",
1263                         cursor->node, i);
1264         }
1265
1266         /*
1267          * No exact match was found, i is now at the insertion point.
1268          *
1269          * If inserting split a full leaf before returning.  This
1270          * may have the side effect of adjusting cursor->node and
1271          * cursor->index.
1272          */
1273         cursor->index = i;
1274         if ((flags & HAMMER_CURSOR_INSERT) && btree_node_is_full(node)) {
1275                 error = btree_split_leaf(cursor);
1276                 if (error) {
1277                         if (error != ENOSPC)
1278                                 goto done;
1279                         enospc = 1;
1280                         flags &= ~HAMMER_CURSOR_INSERT;
1281                 }
1282                 /*
1283                  * reload stale pointers
1284                  */
1285                 /* NOT USED
1286                 i = cursor->index;
1287                 node = &cursor->node->internal;
1288                 */
1289         }
1290
1291         /*
1292          * We reached a leaf but did not find the key we were looking for.
1293          * If this is an insert we will be properly positioned for an insert
1294          * (ENOENT) or spike (ENOSPC) operation.
1295          */
1296         error = enospc ? ENOSPC : ENOENT;
1297 done:
1298         return(error);
1299 }
1300
1301
1302 /************************************************************************
1303  *                         SPLITTING AND MERGING                        *
1304  ************************************************************************
1305  *
1306  * These routines do all the dirty work required to split and merge nodes.
1307  */
1308
1309 /*
1310  * Split an internal node into two nodes and move the separator at the split
1311  * point to the parent.
1312  *
1313  * (cursor->node, cursor->index) indicates the element the caller intends
1314  * to push into.  We will adjust node and index if that element winds
1315  * up in the split node.
1316  *
1317  * If we are at the root of a cluster a new root must be created with two
1318  * elements, one pointing to the original root and one pointing to the
1319  * newly allocated split node.
1320  *
1321  * NOTE! Being at the root of a cluster is different from being at the
1322  * root of the root cluster.  cursor->parent will not be NULL and
1323  * cursor->node->ondisk.parent must be tested against 0.  Theoretically
1324  * we could propogate the algorithm into the parent and deal with multiple
1325  * 'roots' in the cluster header, but it's easier not to.
1326  */
1327 static
1328 int
1329 btree_split_internal(hammer_cursor_t cursor)
1330 {
1331         hammer_node_ondisk_t ondisk;
1332         hammer_node_t node;
1333         hammer_node_t parent;
1334         hammer_node_t new_node;
1335         hammer_btree_elm_t elm;
1336         hammer_btree_elm_t parent_elm;
1337         int parent_index;
1338         int made_root;
1339         int split;
1340         int error;
1341         int i;
1342         const int esize = sizeof(*elm);
1343
1344         if ((error = hammer_cursor_upgrade(cursor)) != 0)
1345                 return(error);
1346
1347         /* 
1348          * We are splitting but elms[split] will be promoted to the parent,
1349          * leaving the right hand node with one less element.  If the
1350          * insertion point will be on the left-hand side adjust the split
1351          * point to give the right hand side one additional node.
1352          */
1353         node = cursor->node;
1354         ondisk = node->ondisk;
1355         split = (ondisk->count + 1) / 2;
1356         if (cursor->index <= split)
1357                 --split;
1358
1359         /*
1360          * If we are at the root of the cluster, create a new root node with
1361          * 1 element and split normally.  Avoid making major modifications
1362          * until we know the whole operation will work.
1363          *
1364          * The root of the cluster is different from the root of the root
1365          * cluster.  Use the node's on-disk structure's parent offset to
1366          * detect the case.
1367          */
1368         if (ondisk->parent == 0) {
1369                 parent = hammer_alloc_btree(node->cluster, &error);
1370                 if (parent == NULL)
1371                         goto done;
1372                 hammer_lock_ex(&parent->lock);
1373                 hammer_modify_node(parent);
1374                 ondisk = parent->ondisk;
1375                 ondisk->count = 1;
1376                 ondisk->parent = 0;
1377                 ondisk->type = HAMMER_BTREE_TYPE_INTERNAL;
1378                 ondisk->elms[0].base = node->cluster->clu_btree_beg;
1379                 ondisk->elms[0].base.btype = node->ondisk->type;
1380                 ondisk->elms[0].internal.subtree_offset = node->node_offset;
1381                 ondisk->elms[1].base = node->cluster->clu_btree_end;
1382                 /* ondisk->elms[1].base.btype - not used */
1383                 made_root = 1;
1384                 parent_index = 0;       /* index of current node in parent */
1385         } else {
1386                 made_root = 0;
1387                 parent = cursor->parent;
1388                 parent_index = cursor->parent_index;
1389                 KKASSERT(parent->cluster == node->cluster);
1390         }
1391
1392         /*
1393          * Split node into new_node at the split point.
1394          *
1395          *  B O O O P N N B     <-- P = node->elms[split]
1396          *   0 1 2 3 4 5 6      <-- subtree indices
1397          *
1398          *       x x P x x
1399          *        s S S s  
1400          *         /   \
1401          *  B O O O B    B N N B        <--- inner boundary points are 'P'
1402          *   0 1 2 3      4 5 6  
1403          *
1404          */
1405         new_node = hammer_alloc_btree(node->cluster, &error);
1406         if (new_node == NULL) {
1407                 if (made_root) {
1408                         hammer_unlock(&parent->lock);
1409                         parent->flags |= HAMMER_NODE_DELETED;
1410                         hammer_rel_node(parent);
1411                 }
1412                 goto done;
1413         }
1414         hammer_lock_ex(&new_node->lock);
1415
1416         /*
1417          * Create the new node.  P becomes the left-hand boundary in the
1418          * new node.  Copy the right-hand boundary as well.
1419          *
1420          * elm is the new separator.
1421          */
1422         hammer_modify_node(new_node);
1423         hammer_modify_node(node);
1424         ondisk = node->ondisk;
1425         elm = &ondisk->elms[split];
1426         bcopy(elm, &new_node->ondisk->elms[0],
1427               (ondisk->count - split + 1) * esize);
1428         new_node->ondisk->count = ondisk->count - split;
1429         new_node->ondisk->parent = parent->node_offset;
1430         new_node->ondisk->type = HAMMER_BTREE_TYPE_INTERNAL;
1431         KKASSERT(ondisk->type == new_node->ondisk->type);
1432
1433         /*
1434          * Cleanup the original node.  Elm (P) becomes the new boundary,
1435          * its subtree_offset was moved to the new node.  If we had created
1436          * a new root its parent pointer may have changed.
1437          */
1438         elm->internal.subtree_offset = 0;
1439         ondisk->count = split;
1440
1441         /*
1442          * Insert the separator into the parent, fixup the parent's
1443          * reference to the original node, and reference the new node.
1444          * The separator is P.
1445          *
1446          * Remember that base.count does not include the right-hand boundary.
1447          */
1448         hammer_modify_node(parent);
1449         ondisk = parent->ondisk;
1450         KKASSERT(ondisk->count != HAMMER_BTREE_INT_ELMS);
1451         parent_elm = &ondisk->elms[parent_index+1];
1452         bcopy(parent_elm, parent_elm + 1,
1453               (ondisk->count - parent_index) * esize);
1454         parent_elm->internal.base = elm->base;  /* separator P */
1455         parent_elm->internal.base.btype = new_node->ondisk->type;
1456         parent_elm->internal.subtree_offset = new_node->node_offset;
1457         ++ondisk->count;
1458
1459         /*
1460          * The children of new_node need their parent pointer set to new_node.
1461          */
1462         for (i = 0; i < new_node->ondisk->count; ++i) {
1463                 elm = &new_node->ondisk->elms[i];
1464                 error = btree_set_parent(new_node, elm);
1465                 if (error) {
1466                         panic("btree_split_internal: btree-fixup problem");
1467                 }
1468         }
1469
1470         /*
1471          * The cluster's root pointer may have to be updated.
1472          */
1473         if (made_root) {
1474                 hammer_modify_cluster(node->cluster);
1475                 node->cluster->ondisk->clu_btree_root = parent->node_offset;
1476                 node->ondisk->parent = parent->node_offset;
1477                 if (cursor->parent) {
1478                         hammer_unlock(&cursor->parent->lock);
1479                         hammer_rel_node(cursor->parent);
1480                 }
1481                 cursor->parent = parent;        /* lock'd and ref'd */
1482         }
1483
1484
1485         /*
1486          * Ok, now adjust the cursor depending on which element the original
1487          * index was pointing at.  If we are >= the split point the push node
1488          * is now in the new node.
1489          *
1490          * NOTE: If we are at the split point itself we cannot stay with the
1491          * original node because the push index will point at the right-hand
1492          * boundary, which is illegal.
1493          *
1494          * NOTE: The cursor's parent or parent_index must be adjusted for
1495          * the case where a new parent (new root) was created, and the case
1496          * where the cursor is now pointing at the split node.
1497          */
1498         if (cursor->index >= split) {
1499                 cursor->parent_index = parent_index + 1;
1500                 cursor->index -= split;
1501                 hammer_unlock(&cursor->node->lock);
1502                 hammer_rel_node(cursor->node);
1503                 cursor->node = new_node;        /* locked and ref'd */
1504         } else {
1505                 cursor->parent_index = parent_index;
1506                 hammer_unlock(&new_node->lock);
1507                 hammer_rel_node(new_node);
1508         }
1509
1510         /*
1511          * Fixup left and right bounds
1512          */
1513         parent_elm = &parent->ondisk->elms[cursor->parent_index];
1514         cursor->left_bound = &parent_elm[0].internal.base;
1515         cursor->right_bound = &parent_elm[1].internal.base;
1516         KKASSERT(hammer_btree_cmp(cursor->left_bound,
1517                  &cursor->node->ondisk->elms[0].internal.base) <= 0);
1518         KKASSERT(hammer_btree_cmp(cursor->right_bound,
1519                  &cursor->node->ondisk->elms[cursor->node->ondisk->count].internal.base) >= 0);
1520
1521 done:
1522         hammer_cursor_downgrade(cursor);
1523         return (error);
1524 }
1525
1526 /*
1527  * Same as the above, but splits a full leaf node.
1528  *
1529  * This function
1530  */
1531 static
1532 int
1533 btree_split_leaf(hammer_cursor_t cursor)
1534 {
1535         hammer_node_ondisk_t ondisk;
1536         hammer_node_t parent;
1537         hammer_node_t leaf;
1538         hammer_node_t new_leaf;
1539         hammer_btree_elm_t elm;
1540         hammer_btree_elm_t parent_elm;
1541         hammer_base_elm_t mid_boundary;
1542         int parent_index;
1543         int made_root;
1544         int split;
1545         int error;
1546         int i;
1547         const size_t esize = sizeof(*elm);
1548
1549         if ((error = hammer_cursor_upgrade(cursor)) != 0)
1550                 return(error);
1551
1552         /* 
1553          * Calculate the split point.  If the insertion point will be on
1554          * the left-hand side adjust the split point to give the right
1555          * hand side one additional node.
1556          *
1557          * Spikes are made up of two leaf elements which cannot be
1558          * safely split.
1559          */
1560         leaf = cursor->node;
1561         ondisk = leaf->ondisk;
1562         split = (ondisk->count + 1) / 2;
1563         if (cursor->index <= split)
1564                 --split;
1565         error = 0;
1566
1567         elm = &ondisk->elms[split];
1568         if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_END) {
1569                 KKASSERT(split &&
1570                         elm[-1].leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_BEG);
1571                 --split;
1572         }
1573
1574         /*
1575          * If we are at the root of the tree, create a new root node with
1576          * 1 element and split normally.  Avoid making major modifications
1577          * until we know the whole operation will work.
1578          */
1579         if (ondisk->parent == 0) {
1580                 parent = hammer_alloc_btree(leaf->cluster, &error);
1581                 if (parent == NULL)
1582                         goto done;
1583                 hammer_lock_ex(&parent->lock);
1584                 hammer_modify_node(parent);
1585                 ondisk = parent->ondisk;
1586                 ondisk->count = 1;
1587                 ondisk->parent = 0;
1588                 ondisk->type = HAMMER_BTREE_TYPE_INTERNAL;
1589                 ondisk->elms[0].base = leaf->cluster->clu_btree_beg;
1590                 ondisk->elms[0].base.btype = leaf->ondisk->type;
1591                 ondisk->elms[0].internal.subtree_offset = leaf->node_offset;
1592                 ondisk->elms[1].base = leaf->cluster->clu_btree_end;
1593                 /* ondisk->elms[1].base.btype = not used */
1594                 made_root = 1;
1595                 parent_index = 0;       /* insertion point in parent */
1596         } else {
1597                 made_root = 0;
1598                 parent = cursor->parent;
1599                 parent_index = cursor->parent_index;
1600                 KKASSERT(parent->cluster == leaf->cluster);
1601         }
1602
1603         /*
1604          * Split leaf into new_leaf at the split point.  Select a separator
1605          * value in-between the two leafs but with a bent towards the right
1606          * leaf since comparisons use an 'elm >= separator' inequality.
1607          *
1608          *  L L L L L L L L
1609          *
1610          *       x x P x x
1611          *        s S S s  
1612          *         /   \
1613          *  L L L L     L L L L
1614          */
1615         new_leaf = hammer_alloc_btree(leaf->cluster, &error);
1616         if (new_leaf == NULL) {
1617                 if (made_root) {
1618                         hammer_unlock(&parent->lock);
1619                         parent->flags |= HAMMER_NODE_DELETED;
1620                         hammer_rel_node(parent);
1621                 }
1622                 goto done;
1623         }
1624         hammer_lock_ex(&new_leaf->lock);
1625
1626         /*
1627          * Create the new node.  P become the left-hand boundary in the
1628          * new node.  Copy the right-hand boundary as well.
1629          */
1630         hammer_modify_node(leaf);
1631         hammer_modify_node(new_leaf);
1632         ondisk = leaf->ondisk;
1633         elm = &ondisk->elms[split];
1634         bcopy(elm, &new_leaf->ondisk->elms[0], (ondisk->count - split) * esize);
1635         new_leaf->ondisk->count = ondisk->count - split;
1636         new_leaf->ondisk->parent = parent->node_offset;
1637         new_leaf->ondisk->type = HAMMER_BTREE_TYPE_LEAF;
1638         KKASSERT(ondisk->type == new_leaf->ondisk->type);
1639
1640         /*
1641          * Cleanup the original node.  Because this is a leaf node and
1642          * leaf nodes do not have a right-hand boundary, there
1643          * aren't any special edge cases to clean up.  We just fixup the
1644          * count.
1645          */
1646         ondisk->count = split;
1647
1648         /*
1649          * Insert the separator into the parent, fixup the parent's
1650          * reference to the original node, and reference the new node.
1651          * The separator is P.
1652          *
1653          * Remember that base.count does not include the right-hand boundary.
1654          * We are copying parent_index+1 to parent_index+2, not +0 to +1.
1655          */
1656         hammer_modify_node(parent);
1657         ondisk = parent->ondisk;
1658         KKASSERT(ondisk->count != HAMMER_BTREE_INT_ELMS);
1659         parent_elm = &ondisk->elms[parent_index+1];
1660         bcopy(parent_elm, parent_elm + 1,
1661               (ondisk->count - parent_index) * esize);
1662         hammer_make_separator(&elm[-1].base, &elm[0].base, &parent_elm->base);
1663         parent_elm->internal.base.btype = new_leaf->ondisk->type;
1664         parent_elm->internal.subtree_offset = new_leaf->node_offset;
1665         mid_boundary = &parent_elm->base;
1666         ++ondisk->count;
1667
1668         /*
1669          * The children of new_leaf need their parent pointer set to new_leaf.
1670          *
1671          * The leaf's elements are either TYPE_RECORD or TYPE_SPIKE_*.  Only
1672          * elements of BTREE_TYPE_SPIKE_END really requires any action.
1673          */
1674         for (i = 0; i < new_leaf->ondisk->count; ++i) {
1675                 elm = &new_leaf->ondisk->elms[i];
1676                 error = btree_set_parent(new_leaf, elm);
1677                 if (error) {
1678                         panic("btree_split_internal: btree-fixup problem");
1679                 }
1680         }
1681
1682         /*
1683          * The cluster's root pointer may have to be updated.
1684          */
1685         if (made_root) {
1686                 hammer_modify_cluster(leaf->cluster);
1687                 leaf->cluster->ondisk->clu_btree_root = parent->node_offset;
1688                 leaf->ondisk->parent = parent->node_offset;
1689                 if (cursor->parent) {
1690                         hammer_unlock(&cursor->parent->lock);
1691                         hammer_rel_node(cursor->parent);
1692                 }
1693                 cursor->parent = parent;        /* lock'd and ref'd */
1694         }
1695
1696         /*
1697          * Ok, now adjust the cursor depending on which element the original
1698          * index was pointing at.  If we are >= the split point the push node
1699          * is now in the new node.
1700          *
1701          * NOTE: If we are at the split point itself we need to select the
1702          * old or new node based on where key_beg's insertion point will be.
1703          * If we pick the wrong side the inserted element will wind up in
1704          * the wrong leaf node and outside that node's bounds.
1705          */
1706         if (cursor->index > split ||
1707             (cursor->index == split &&
1708              hammer_btree_cmp(&cursor->key_beg, mid_boundary) >= 0)) {
1709                 cursor->parent_index = parent_index + 1;
1710                 cursor->index -= split;
1711                 hammer_unlock(&cursor->node->lock);
1712                 hammer_rel_node(cursor->node);
1713                 cursor->node = new_leaf;
1714         } else {
1715                 cursor->parent_index = parent_index;
1716                 hammer_unlock(&new_leaf->lock);
1717                 hammer_rel_node(new_leaf);
1718         }
1719
1720         /*
1721          * Fixup left and right bounds
1722          */
1723         parent_elm = &parent->ondisk->elms[cursor->parent_index];
1724         cursor->left_bound = &parent_elm[0].internal.base;
1725         cursor->right_bound = &parent_elm[1].internal.base;
1726         KKASSERT(hammer_btree_cmp(cursor->left_bound,
1727                  &cursor->node->ondisk->elms[0].leaf.base) <= 0);
1728         KKASSERT(hammer_btree_cmp(cursor->right_bound,
1729                  &cursor->node->ondisk->elms[cursor->node->ondisk->count-1].leaf.base) > 0);
1730
1731 done:
1732         hammer_cursor_downgrade(cursor);
1733         return (error);
1734 }
1735
1736 /*
1737  * Attempt to remove the empty B-Tree node at (cursor->node).  Returns 0
1738  * on success, EAGAIN if we could not acquire the necessary locks, or some
1739  * other error.  This node can be a leaf node or an internal node.
1740  *
1741  * On return the cursor may end up pointing at an internal node, suitable
1742  * for further iteration but not for an immediate insertion or deletion.
1743  *
1744  * cursor->node may be an internal node or a leaf node.
1745  *
1746  * NOTE: If cursor->node has one element it is the parent trying to delete
1747  * that element, make sure cursor->index is properly adjusted on success.
1748  */
1749 int
1750 btree_remove(hammer_cursor_t cursor, int depth)
1751 {
1752         hammer_node_ondisk_t ondisk;
1753         hammer_btree_elm_t elm;
1754         hammer_node_t node;
1755         hammer_node_t save;
1756         hammer_node_t parent;
1757         const int esize = sizeof(*elm);
1758         int error;
1759
1760         /*
1761          * If we are at the root of the cluster we must be able to
1762          * successfully delete the HAMMER_BTREE_SPIKE_* leaf elements in
1763          * the parent in order to be able to destroy the cluster.
1764          */
1765         node = cursor->node;
1766
1767         if (node->ondisk->parent == 0) {
1768                 hammer_modify_node(node);
1769                 ondisk = node->ondisk;
1770                 ondisk->type = HAMMER_BTREE_TYPE_LEAF;
1771                 ondisk->count = 0;
1772                 cursor->index = 0;
1773                 error = 0;
1774
1775                 if (depth > 16) {
1776                         Debugger("btree_remove: stack limit reached");
1777                         return(EDEADLK);
1778                 }
1779
1780                 /*
1781                  * When trying to delete a cluster we need to exclusively
1782                  * lock the cluster root, its parent (leaf in parent cluster),
1783                  * AND the parent of that leaf if it's going to be empty,
1784                  * because we can't leave around an empty leaf.
1785                  *
1786                  * XXX this is messy due to potentially recursive locks.
1787                  * downgrade the cursor, get a second shared lock on the
1788                  * node that cannot deadlock because we only own shared locks
1789                  * then, cursor-up, and re-upgrade everything.  If the
1790                  * upgrades EDEADLK then don't try to remove the cluster
1791                  * at this time.
1792                  */
1793                 if ((parent = cursor->parent) != NULL) {
1794                         hammer_cursor_downgrade(cursor);
1795                         save = node;
1796                         hammer_ref_node(save);
1797                         hammer_lock_sh(&save->lock);
1798
1799                         error = hammer_cursor_up(cursor);
1800                         if (error == 0)
1801                                 error = hammer_cursor_upgrade(cursor);
1802                         if (error == 0)
1803                                 error = hammer_lock_upgrade(&save->lock);
1804
1805                         if (error) {
1806                                 /* may be EDEADLK */
1807                                 kprintf("BTREE_REMOVE: Cannot delete cluster\n");
1808                                 Debugger("BTREE_REMOVE");
1809                         } else {
1810                                 /* 
1811                                  * cursor->node is now the leaf in the parent
1812                                  * cluster containing the spike elements.
1813                                  *
1814                                  * The cursor should be pointing at the
1815                                  * SPIKE_END element.
1816                                  *
1817                                  * Remove the spike elements and recurse
1818                                  * if the leaf becomes empty.
1819                                  */
1820                                 node = cursor->node;
1821                                 hammer_modify_node(node);
1822                                 ondisk = node->ondisk;
1823                                 KKASSERT(cursor->index > 0);
1824                                 --cursor->index;
1825                                 elm = &ondisk->elms[cursor->index];
1826                                 KKASSERT(elm[0].leaf.base.btype ==
1827                                          HAMMER_BTREE_TYPE_SPIKE_BEG);
1828                                 KKASSERT(elm[1].leaf.base.btype ==
1829                                          HAMMER_BTREE_TYPE_SPIKE_END);
1830                                 bcopy(elm + 2, elm, (ondisk->count -
1831                                                     cursor->index - 2) * esize);
1832                                 ondisk->count -= 2;
1833                                 if (ondisk->count == 0)
1834                                         error = btree_remove(cursor, depth + 1);
1835                                 hammer_flush_node(save);
1836                                 save->flags |= HAMMER_NODE_DELETED;
1837                         }
1838                         hammer_unlock(&save->lock);
1839                         hammer_rel_node(save);
1840                 }
1841                 return(error);
1842         }
1843
1844         /*
1845          * Zero-out the parent's reference to the child and flag the
1846          * child for destruction.  This ensures that the child is not
1847          * reused while other references to it exist.
1848          */
1849         parent = cursor->parent;
1850         hammer_modify_node(parent);
1851         ondisk = parent->ondisk;
1852         KKASSERT(ondisk->type == HAMMER_BTREE_TYPE_INTERNAL);
1853         elm = &ondisk->elms[cursor->parent_index];
1854         KKASSERT(elm->internal.subtree_offset == node->node_offset);
1855         elm->internal.subtree_offset = 0;
1856
1857         hammer_flush_node(node);
1858         node->flags |= HAMMER_NODE_DELETED;
1859
1860         /*
1861          * Don't blow up the kernel stack.
1862          */
1863         if (depth > 20) {
1864                 kprintf("btree_remove: stack limit reached");
1865                 return(EDEADLK);
1866         }
1867
1868         /*
1869          * If the parent would otherwise not become empty we can physically
1870          * remove the zero'd element.  Note however that in order to
1871          * guarentee a valid cursor we still need to be able to cursor up
1872          * because we no longer have a node.
1873          *
1874          * This collapse will change the parent's boundary elements, making
1875          * them wider.  The new boundaries are recursively corrected in
1876          * btree_search().
1877          *
1878          * XXX we can theoretically recalculate the midpoint but there isn't
1879          * much of a reason to do it.
1880          */
1881         error = hammer_cursor_up(cursor);
1882         if (error == 0)
1883                 error = hammer_cursor_upgrade(cursor);
1884
1885         if (error) {
1886                 kprintf("BTREE_REMOVE: Cannot lock parent, skipping\n");
1887                 Debugger("BTREE_REMOVE");
1888                 return (0);
1889         }
1890
1891         /*
1892          * Remove the internal element from the parent.  The bcopy must
1893          * include the right boundary element.
1894          */
1895         KKASSERT(parent == cursor->node && ondisk == parent->ondisk);
1896         node = parent;
1897         parent = NULL;
1898         /* ondisk is node's ondisk */
1899         /* elm is node's element */
1900
1901         /*
1902          * Remove the internal element that we zero'd out.  Tell the caller
1903          * to loop if it hits zero (to try to avoid eating up precious kernel
1904          * stack).
1905          */
1906         KKASSERT(ondisk->count > 0);
1907         bcopy(&elm[1], &elm[0], (ondisk->count - cursor->index) * esize);
1908         --ondisk->count;
1909         if (ondisk->count == 0)
1910                 error = EAGAIN;
1911         return(error);
1912 }
1913
1914 /*
1915  * Attempt to remove the deleted internal element at the current cursor
1916  * position.  If we are unable to remove the element we return EDEADLK.
1917  *
1918  * If the current internal node becomes empty we delete it in the parent
1919  * and cursor up, looping until we finish or we deadlock.
1920  *
1921  * On return, if successful, the cursor will be pointing at the next
1922  * iterative position in the B-Tree.  If unsuccessful the cursor will be
1923  * pointing at the last deleted internal element that could not be
1924  * removed.
1925  */
1926 static 
1927 int
1928 btree_remove_deleted_element(hammer_cursor_t cursor)
1929 {
1930         hammer_node_t node;
1931         hammer_btree_elm_t elm; 
1932         int error;
1933
1934         if ((error = hammer_cursor_upgrade(cursor)) != 0)
1935                 return(error);
1936         node = cursor->node;
1937         elm = &node->ondisk->elms[cursor->index];
1938         if (elm->internal.subtree_offset == 0) {
1939                 do {
1940                         error = btree_remove(cursor, 0);
1941                         kprintf("BTREE REMOVE DELETED ELEMENT %d\n", error);
1942                 } while (error == EAGAIN);
1943         }
1944         return(error);
1945 }
1946
1947 /*
1948  * The element (elm) has been moved to a new internal node (node).
1949  *
1950  * If the element represents a pointer to an internal node that node's
1951  * parent must be adjusted to the element's new location.
1952  *
1953  * If the element represents a spike the target cluster's header must
1954  * be adjusted to point to the element's new location.  This only
1955  * applies to HAMMER_SPIKE_END.
1956  *
1957  * XXX deadlock potential here with our exclusive locks
1958  */
1959 static
1960 int
1961 btree_set_parent(hammer_node_t node, hammer_btree_elm_t elm)
1962 {
1963         hammer_volume_t volume;
1964         hammer_cluster_t cluster;
1965         hammer_node_t child;
1966         int error;
1967
1968         error = 0;
1969
1970         switch(elm->base.btype) {
1971         case HAMMER_BTREE_TYPE_INTERNAL:
1972         case HAMMER_BTREE_TYPE_LEAF:
1973                 child = hammer_get_node(node->cluster,
1974                                         elm->internal.subtree_offset, &error);
1975                 if (error == 0) {
1976                         hammer_modify_node(child);
1977                         hammer_lock_ex(&child->lock);
1978                         child->ondisk->parent = node->node_offset;
1979                         hammer_unlock(&child->lock);
1980                         hammer_rel_node(child);
1981                 }
1982                 break;
1983         case HAMMER_BTREE_TYPE_SPIKE_END:
1984                 volume = hammer_get_volume(node->cluster->volume->hmp,
1985                                            elm->leaf.spike_vol_no, &error);
1986                 if (error)
1987                         break;
1988                 cluster = hammer_get_cluster(volume, elm->leaf.spike_clu_no,
1989                                              &error, 0);
1990                 hammer_rel_volume(volume, 0);
1991                 if (error)
1992                         break;
1993                 hammer_modify_cluster(cluster);
1994                 hammer_lock_ex(&cluster->io.lock);
1995                 cluster->ondisk->clu_btree_parent_offset = node->node_offset;
1996                 hammer_unlock(&cluster->io.lock);
1997                 KKASSERT(cluster->ondisk->clu_btree_parent_clu_no ==
1998                          node->cluster->clu_no);
1999                 KKASSERT(cluster->ondisk->clu_btree_parent_vol_no ==
2000                          node->cluster->volume->vol_no);
2001                 hammer_rel_cluster(cluster, 0);
2002                 break;
2003         default:
2004                 break;
2005         }
2006         return(error);
2007 }
2008
2009 /************************************************************************
2010  *                         MISCELLANIOUS SUPPORT                        *
2011  ************************************************************************/
2012
2013 /*
2014  * Compare two B-Tree elements, return -N, 0, or +N (e.g. similar to strcmp).
2015  *
2016  * Note that for this particular function a return value of -1, 0, or +1
2017  * can denote a match if delete_tid is otherwise discounted.  A delete_tid
2018  * of zero is considered to be 'infinity' in comparisons.
2019  *
2020  * See also hammer_rec_rb_compare() and hammer_rec_cmp() in hammer_object.c.
2021  */
2022 int
2023 hammer_btree_cmp(hammer_base_elm_t key1, hammer_base_elm_t key2)
2024 {
2025         if (key1->obj_id < key2->obj_id)
2026                 return(-4);
2027         if (key1->obj_id > key2->obj_id)
2028                 return(4);
2029
2030         if (key1->rec_type < key2->rec_type)
2031                 return(-3);
2032         if (key1->rec_type > key2->rec_type)
2033                 return(3);
2034
2035         if (key1->key < key2->key)
2036                 return(-2);
2037         if (key1->key > key2->key)
2038                 return(2);
2039
2040         /*
2041          * A delete_tid of zero indicates a record which has not been
2042          * deleted yet and must be considered to have a value of positive
2043          * infinity.
2044          */
2045         if (key1->delete_tid == 0) {
2046                 if (key2->delete_tid == 0)
2047                         return(0);
2048                 return(1);
2049         }
2050         if (key2->delete_tid == 0)
2051                 return(-1);
2052         if (key1->delete_tid < key2->delete_tid)
2053                 return(-1);
2054         if (key1->delete_tid > key2->delete_tid)
2055                 return(1);
2056         return(0);
2057 }
2058
2059 /*
2060  * Test a timestamp against an element to determine whether the
2061  * element is visible.  A timestamp of 0 means 'infinity'.
2062  */
2063 int
2064 hammer_btree_chkts(hammer_tid_t asof, hammer_base_elm_t base)
2065 {
2066         if (asof == 0) {
2067                 if (base->delete_tid)
2068                         return(1);
2069                 return(0);
2070         }
2071         if (asof < base->create_tid)
2072                 return(-1);
2073         if (base->delete_tid && asof >= base->delete_tid)
2074                 return(1);
2075         return(0);
2076 }
2077
2078 /*
2079  * Create a separator half way inbetween key1 and key2.  For fields just
2080  * one unit apart, the separator will match key2.  key1 is on the left-hand
2081  * side and key2 is on the right-hand side.
2082  *
2083  * delete_tid has to be special cased because a value of 0 represents
2084  * infinity, and records with a delete_tid of 0 can be replaced with 
2085  * a non-zero delete_tid when deleted and must maintain their proper
2086  * (as in the same) position in the B-Tree.
2087  */
2088 #define MAKE_SEPARATOR(key1, key2, dest, field) \
2089         dest->field = key1->field + ((key2->field - key1->field + 1) >> 1);
2090
2091 static void
2092 hammer_make_separator(hammer_base_elm_t key1, hammer_base_elm_t key2,
2093                       hammer_base_elm_t dest)
2094 {
2095         bzero(dest, sizeof(*dest));
2096         MAKE_SEPARATOR(key1, key2, dest, obj_id);
2097         MAKE_SEPARATOR(key1, key2, dest, rec_type);
2098         MAKE_SEPARATOR(key1, key2, dest, key);
2099
2100         if (key1->obj_id == key2->obj_id &&
2101             key1->rec_type == key2->rec_type &&
2102             key1->key == key2->key) {
2103                 if (key1->delete_tid == 0) {
2104                         /* 
2105                          * key1 cannot be on the left hand side if everything
2106                          * matches but it has an infinite delete_tid!
2107                          */
2108                         panic("hammer_make_separator: illegal delete_tid");
2109                 } else if (key2->delete_tid == 0) {
2110                         dest->delete_tid = key1->delete_tid + 1;
2111                 } else {
2112                         MAKE_SEPARATOR(key1, key2, dest, delete_tid);
2113                 }
2114         } else {
2115                 dest->delete_tid = 0;
2116         }
2117 }
2118
2119 /*
2120  * This adjusts a right-hand key from being exclusive to being inclusive.
2121  * 
2122  * A delete_key of 0 represents infinity.  Decrementing it results in
2123  * (u_int64_t)-1 which is the largest value possible prior to infinity.
2124  */
2125 void
2126 hammer_make_base_inclusive(hammer_base_elm_t key)
2127 {
2128         --key->delete_tid;
2129 }
2130
2131 #undef MAKE_SEPARATOR
2132
2133 /*
2134  * Return whether a generic internal or leaf node is full
2135  */
2136 static int
2137 btree_node_is_full(hammer_node_ondisk_t node)
2138 {
2139         switch(node->type) {
2140         case HAMMER_BTREE_TYPE_INTERNAL:
2141                 if (node->count == HAMMER_BTREE_INT_ELMS)
2142                         return(1);
2143                 break;
2144         case HAMMER_BTREE_TYPE_LEAF:
2145                 if (node->count == HAMMER_BTREE_LEAF_ELMS)
2146                         return(1);
2147                 break;
2148         default:
2149                 panic("illegal btree subtype");
2150         }
2151         return(0);
2152 }
2153
2154 #if 0
2155 /*
2156  * Return whether a generic internal or leaf node is almost full.  This
2157  * routine is used as a helper for search insertions to guarentee at 
2158  * least 2 available slots in the internal node(s) leading up to a leaf,
2159  * so hammer_btree_insert_cluster() will function properly.
2160  */
2161 static int
2162 btree_node_is_almost_full(hammer_node_ondisk_t node)
2163 {
2164         switch(node->type) {
2165         case HAMMER_BTREE_TYPE_INTERNAL:
2166                 if (node->count > HAMMER_BTREE_INT_ELMS - 2)
2167                         return(1);
2168                 break;
2169         case HAMMER_BTREE_TYPE_LEAF:
2170                 if (node->count > HAMMER_BTREE_LEAF_ELMS - 2)
2171                         return(1);
2172                 break;
2173         default:
2174                 panic("illegal btree subtype");
2175         }
2176         return(0);
2177 }
2178 #endif
2179
2180 #if 0
2181 static int
2182 btree_max_elements(u_int8_t type)
2183 {
2184         if (type == HAMMER_BTREE_TYPE_LEAF)
2185                 return(HAMMER_BTREE_LEAF_ELMS);
2186         if (type == HAMMER_BTREE_TYPE_INTERNAL)
2187                 return(HAMMER_BTREE_INT_ELMS);
2188         panic("btree_max_elements: bad type %d\n", type);
2189 }
2190 #endif
2191
2192 void
2193 hammer_print_btree_node(hammer_node_ondisk_t ondisk)
2194 {
2195         hammer_btree_elm_t elm;
2196         int i;
2197
2198         kprintf("node %p count=%d parent=%d type=%c\n",
2199                 ondisk, ondisk->count, ondisk->parent, ondisk->type);
2200
2201         /*
2202          * Dump both boundary elements if an internal node
2203          */
2204         if (ondisk->type == HAMMER_BTREE_TYPE_INTERNAL) {
2205                 for (i = 0; i <= ondisk->count; ++i) {
2206                         elm = &ondisk->elms[i];
2207                         hammer_print_btree_elm(elm, ondisk->type, i);
2208                 }
2209         } else {
2210                 for (i = 0; i < ondisk->count; ++i) {
2211                         elm = &ondisk->elms[i];
2212                         hammer_print_btree_elm(elm, ondisk->type, i);
2213                 }
2214         }
2215 }
2216
2217 void
2218 hammer_print_btree_elm(hammer_btree_elm_t elm, u_int8_t type, int i)
2219 {
2220         kprintf("  %2d", i);
2221         kprintf("\tobjid        = %016llx\n", elm->base.obj_id);
2222         kprintf("\tkey          = %016llx\n", elm->base.key);
2223         kprintf("\tcreate_tid   = %016llx\n", elm->base.create_tid);
2224         kprintf("\tdelete_tid   = %016llx\n", elm->base.delete_tid);
2225         kprintf("\trec_type     = %04x\n", elm->base.rec_type);
2226         kprintf("\tobj_type     = %02x\n", elm->base.obj_type);
2227         kprintf("\tbtype        = %02x (%c)\n",
2228                 elm->base.btype,
2229                 (elm->base.btype ? elm->base.btype : '?'));
2230
2231         switch(type) {
2232         case HAMMER_BTREE_TYPE_INTERNAL:
2233                 kprintf("\tsubtree_off  = %08x\n",
2234                         elm->internal.subtree_offset);
2235                 break;
2236         case HAMMER_BTREE_TYPE_SPIKE_BEG:
2237         case HAMMER_BTREE_TYPE_SPIKE_END:
2238                 kprintf("\tspike_clu_no = %d\n", elm->leaf.spike_clu_no);
2239                 kprintf("\tspike_vol_no = %d\n", elm->leaf.spike_vol_no);
2240                 break;
2241         case HAMMER_BTREE_TYPE_RECORD:
2242                 kprintf("\trec_offset   = %08x\n", elm->leaf.rec_offset);
2243                 kprintf("\tdata_offset  = %08x\n", elm->leaf.data_offset);
2244                 kprintf("\tdata_len     = %08x\n", elm->leaf.data_len);
2245                 kprintf("\tdata_crc     = %08x\n", elm->leaf.data_crc);
2246                 break;
2247         }
2248 }