HAMMER 21/many: B-Tree node locking finalization.
[dragonfly.git] / sys / vfs / hammer / hammer_cursor.c
1 /*
2  * Copyright (c) 2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/hammer_cursor.c,v 1.14 2008/01/18 07:02:41 dillon Exp $
35  */
36
37 /*
38  * HAMMER B-Tree index - cursor support routines
39  */
40 #include "hammer.h"
41
42 static int hammer_load_cursor_parent(hammer_cursor_t cursor);
43 static int hammer_load_cursor_parent_local(hammer_cursor_t cursor);
44 static int hammer_load_cursor_parent_cluster(hammer_cursor_t cursor);
45
46 /*
47  * Initialize a fresh cursor using the B-Tree node cache.  If the cache
48  * is not available initialize a fresh cursor at the root of the root
49  * cluster.
50  */
51 int
52 hammer_init_cursor_hmp(hammer_cursor_t cursor, struct hammer_node **cache,
53                        hammer_mount_t hmp)
54 {
55         hammer_cluster_t cluster;
56         hammer_node_t node;
57         int error;
58
59         bzero(cursor, sizeof(*cursor));
60
61         /*
62          * Step 1 - acquire a locked node from the cache if possible
63          */
64         if (cache && *cache) {
65                 node = hammer_ref_node_safe(hmp, cache, &error);
66                 if (error == 0) {
67                         hammer_lock_sh(&node->lock);
68                         if (node->flags & HAMMER_NODE_DELETED) {
69                                 hammer_unlock(&node->lock);
70                                 hammer_rel_node(node);
71                                 node = NULL;
72                         }
73                 }
74         } else {
75                 node = NULL;
76         }
77
78         /*
79          * Step 2 - If we couldn't get a node from the cache, get
80          * the one from the root of the root cluster.
81          */
82         while (node == NULL) {
83                 cluster = hammer_get_root_cluster(hmp, &error);
84                 if (error)
85                         break;
86                 node = hammer_get_node(cluster,
87                                        cluster->ondisk->clu_btree_root,
88                                        &error);
89                 hammer_rel_cluster(cluster, 0);
90                 if (error)
91                         break;
92                 hammer_lock_sh(&node->lock);
93                 if (node->flags & HAMMER_NODE_DELETED) {
94                         hammer_unlock(&node->lock);
95                         hammer_rel_node(node);
96                         node = NULL;
97                 }
98         }
99
100         /*
101          * Step 3 - finish initializing the cursor by acquiring the parent
102          */
103         cursor->node = node;
104         if (error == 0)
105                 error = hammer_load_cursor_parent(cursor);
106         KKASSERT(error == 0);
107         return(error);
108 }
109
110 /*
111  * Initialize a fresh cursor at the root of the specified cluster and
112  * limit operations to within the cluster.
113  */
114 int
115 hammer_init_cursor_cluster(hammer_cursor_t cursor, hammer_cluster_t cluster)
116 {
117         int error;
118
119         bzero(cursor, sizeof(*cursor));
120         cursor->flags |= HAMMER_CURSOR_INCLUSTER;
121         cursor->node = hammer_get_node(cluster,
122                                        cluster->ondisk->clu_btree_root,
123                                        &error);
124         if (error == 0) {
125                 hammer_lock_sh(&cursor->node->lock);
126                 error = hammer_load_cursor_parent(cursor);
127         }
128         KKASSERT(error == 0);
129         return(error);
130 }
131
132 /*
133  * We are finished with a cursor.  We NULL out various fields as sanity
134  * check, in case the structure is inappropriately used afterwords.
135  */
136 void
137 hammer_done_cursor(hammer_cursor_t cursor)
138 {
139         if (cursor->parent) {
140                 hammer_unlock(&cursor->parent->lock);
141                 hammer_rel_node(cursor->parent);
142                 cursor->parent = NULL;
143         }
144         if (cursor->node) {
145                 hammer_unlock(&cursor->node->lock);
146                 hammer_rel_node(cursor->node);
147                 cursor->node = NULL;
148         }
149         if (cursor->data_buffer) {
150                 hammer_rel_buffer(cursor->data_buffer, 0);
151                 cursor->data_buffer = NULL;
152         }
153         if (cursor->record_buffer) {
154                 hammer_rel_buffer(cursor->record_buffer, 0);
155                 cursor->record_buffer = NULL;
156         }
157         if (cursor->ip)
158                 hammer_mem_done(cursor);
159
160         /*
161          * If we deadlocked this node will be referenced.  Do a quick
162          * lock/unlock to wait for the deadlock condition to clear.
163          */
164         if (cursor->deadlk_node) {
165                 hammer_lock_ex(&cursor->deadlk_node->lock);
166                 hammer_unlock(&cursor->deadlk_node->lock);
167                 hammer_rel_node(cursor->deadlk_node);
168                 cursor->deadlk_node = NULL;
169         }
170
171         cursor->data = NULL;
172         cursor->record = NULL;
173         cursor->left_bound = NULL;
174         cursor->right_bound = NULL;
175 }
176
177 /*
178  * Upgrade cursor->node and cursor->parent to exclusive locks.  This
179  * function can return EDEADLK.
180  *
181  * If we fail to upgrade the lock and cursor->deadlk_node is NULL, 
182  * we add another reference to the node that failed and set
183  * cursor->deadlk_node so hammer_done_cursor() can block on it.
184  */
185 int
186 hammer_cursor_upgrade(hammer_cursor_t cursor)
187 {
188         int error;
189
190         if (hammer_lock_held(&cursor->node->lock) < 0) {
191                 error = hammer_lock_upgrade(&cursor->node->lock);
192                 if (error && cursor->deadlk_node == NULL) {
193                         cursor->deadlk_node = cursor->node;
194                         hammer_ref_node(cursor->deadlk_node);
195                 }
196         } else {
197                 error = 0;
198         }
199         if (cursor->parent && hammer_lock_held(&cursor->parent->lock) < 0) {
200                 error = hammer_lock_upgrade(&cursor->parent->lock);
201                 if (error && cursor->deadlk_node == NULL) {
202                         cursor->deadlk_node = cursor->parent;
203                         hammer_ref_node(cursor->deadlk_node);
204                 }
205         } else {
206                 error = 0;
207         }
208         return(error);
209 }
210
211 /*
212  * Downgrade cursor->node and cursor->parent to shared locks.  This
213  * function can return EDEADLK.
214  */
215 void
216 hammer_cursor_downgrade(hammer_cursor_t cursor)
217 {
218         if (hammer_lock_held(&cursor->node->lock) > 0)
219                 hammer_lock_downgrade(&cursor->node->lock);
220         if (cursor->parent && hammer_lock_held(&cursor->parent->lock) > 0)
221                 hammer_lock_downgrade(&cursor->parent->lock);
222 }
223
224
225 #if 0
226
227 /*
228  * Acquire the parent B-Tree node of the specified node, returning a
229  * referenced but unlocked node.  NULL can be returned with *errorp == 0
230  * if node is the root node of the root cluster.
231  */
232 static
233 hammer_node_t
234 hammer_get_parent_node(hammer_node_t node, int *errorp)
235 {
236         hammer_cluster_t cluster;
237         hammer_node_t parent;
238
239         cluster = node->cluster;
240         if (node->ondisk->parent) {
241                 /*
242                  * Local parent
243                  */
244                 parent = hammer_get_node(cluster, node->ondisk->parent, errorp);
245         } else if (cluster->ondisk->clu_btree_parent_vol_no >= 0) {
246                 /*
247                  * At cluster root, locate node in parent cluster
248                  */
249                 hammer_cluster_ondisk_t ondisk;
250                 hammer_cluster_t pcluster;
251                 hammer_volume_t pvolume;
252                 int32_t clu_no;
253                 int32_t vol_no;
254
255                 ondisk = cluster->ondisk;
256                 vol_no = ondisk->clu_btree_parent_vol_no;
257                 clu_no = ondisk->clu_btree_parent_clu_no;
258
259                 /*
260                  * Acquire the node from (volume, cluster, offset)
261                  */
262                 pvolume = hammer_get_volume(cluster->volume->hmp, vol_no,
263                                             errorp);
264                 if (*errorp)
265                         return (NULL);
266                 pcluster = hammer_get_cluster(pvolume, clu_no, errorp, 0);
267                 hammer_rel_volume(pvolume, 0);
268                 if (*errorp)
269                         return (NULL);
270                 parent = hammer_get_node(pcluster,
271                                          ondisk->clu_btree_parent_offset,
272                                          errorp);
273                 hammer_rel_cluster(pcluster, 0);
274         } else {
275                 /*
276                  * At root of root cluster, there is no parent.
277                  */
278                 KKASSERT(cluster->ondisk->clu_btree_parent_vol_no == -1);
279                 parent = NULL;
280                 *errorp = 0;
281         }
282         return(parent);
283 }
284
285 #endif
286
287 /*
288  * Load the parent of cursor->node into cursor->parent.  There are several
289  * cases.  (1) The parent is in the current cluster.  (2) We are at the
290  * root of the cluster and the parent is in another cluster.  (3) We are at
291  * the root of the root cluster.
292  *
293  * If HAMMER_CURSOR_INCLUSTER is set and we are at the root of the cluster,
294  * we do not access the parent cluster at all and make the cursor look like
295  * its at the root.
296  */
297 static
298 int
299 hammer_load_cursor_parent(hammer_cursor_t cursor)
300 {
301         hammer_cluster_t cluster;
302         int error;
303
304         cluster = cursor->node->cluster;
305
306         if (cursor->node->ondisk->parent) {
307                 error = hammer_load_cursor_parent_local(cursor);
308         } else if (cluster->ondisk->clu_btree_parent_vol_no >= 0 &&
309                    ((cursor->flags & HAMMER_CURSOR_INCLUSTER) == 0)
310         ) {
311                 error = hammer_load_cursor_parent_cluster(cursor);
312         } else {
313                 cursor->parent = NULL;
314                 cursor->parent_index = 0;
315                 cursor->left_bound = &cluster->ondisk->clu_btree_beg;
316                 cursor->right_bound = &cluster->ondisk->clu_btree_end;
317                 error = 0;
318         }
319         return(error);
320 }
321
322 static
323 int
324 hammer_load_cursor_parent_local(hammer_cursor_t cursor)
325 {
326         hammer_node_t node;
327         hammer_node_t parent;
328         hammer_btree_elm_t elm;
329         int error;
330         int i;
331
332         node = cursor->node;
333         parent = hammer_get_node(node->cluster, node->ondisk->parent, &error);
334         if (error)
335                 return(error);
336         elm = NULL;
337         for (i = 0; i < parent->ondisk->count; ++i) {
338                 elm = &parent->ondisk->elms[i];
339                 if (parent->ondisk->elms[i].internal.subtree_offset ==
340                     node->node_offset) {
341                         break;
342                 }
343         }
344         if (i == parent->ondisk->count)
345                 panic("Bad B-Tree link: parent %p node %p\n", parent, node);
346         KKASSERT(i != parent->ondisk->count);
347         cursor->parent = parent;
348         cursor->parent_index = i;
349         cursor->left_bound = &elm[0].internal.base;
350         cursor->right_bound = &elm[1].internal.base;
351
352         hammer_lock_sh(&parent->lock);
353         return(error);
354 }
355
356 static
357 int
358 hammer_load_cursor_parent_cluster(hammer_cursor_t cursor)
359 {
360         hammer_cluster_ondisk_t ondisk;
361         hammer_cluster_t pcluster;
362         hammer_cluster_t ccluster;
363         hammer_volume_t volume;
364         hammer_node_t node;
365         hammer_node_t parent;
366         hammer_btree_elm_t elm;
367         int32_t clu_no;
368         int32_t vol_no;
369         int error;
370         int i;
371
372         node = cursor->node;
373         ccluster = node->cluster;
374         ondisk = ccluster->ondisk;
375         vol_no = ondisk->clu_btree_parent_vol_no;
376         clu_no = ondisk->clu_btree_parent_clu_no;
377
378         /*
379          * Acquire the node from (volume, cluster, offset).  This should
380          * be a leaf node containing the HAMMER_BTREE_TYPE_SPIKE_END element.
381          */
382         volume = hammer_get_volume(ccluster->volume->hmp, vol_no, &error);
383         if (error)
384                 return (error);
385         pcluster = hammer_get_cluster(volume, clu_no, &error, 0);
386         hammer_rel_volume(volume, 0);
387         if (error)
388                 return (error);
389         parent = hammer_get_node(pcluster, ondisk->clu_btree_parent_offset,
390                                  &error);
391         hammer_rel_cluster(pcluster, 0);
392         if (error)
393                 return (error);
394         KKASSERT(parent->ondisk->type == HAMMER_BTREE_TYPE_LEAF);
395
396         /* 
397          * Ok, we have the node.  Locate the inter-cluster element
398          */
399         elm = NULL;
400         for (i = 0; i < parent->ondisk->count; ++i) {
401                 elm = &parent->ondisk->elms[i];
402
403                 if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_END &&
404                     elm->leaf.spike_clu_no == cursor->node->cluster->clu_no) {
405                         break;
406                 }
407         }
408         KKASSERT(i != parent->ondisk->count);
409         KKASSERT(i && elm[-1].leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_BEG);
410         cursor->parent = parent;
411         cursor->parent_index = i;
412         cursor->left_bound = &ccluster->ondisk->clu_btree_beg;
413         cursor->right_bound = &ccluster->ondisk->clu_btree_end;
414
415         KKASSERT(hammer_btree_cmp(&elm[-1].leaf.base,
416                  &ccluster->ondisk->clu_btree_beg) == 0);
417             /*
418              * spike_end is an inclusive boundary and will != clu_btree_end
419         KKASSERT(hammer_btree_cmp(cursor->right_bound,
420                  &ccluster->ondisk->clu_btree_end) >= 0);
421             */
422
423         hammer_lock_sh(&parent->lock);
424         return(0);
425 }
426
427 /*
428  * Cursor up to our parent node.  Return ENOENT if we are at the root of
429  * the root cluster.
430  *
431  * If doing a nonblocking cursor-up and we are unable to acquire the lock,
432  * the cursor remains unchanged.
433  */
434 int
435 hammer_cursor_up(hammer_cursor_t cursor)
436 {
437         int error;
438
439         hammer_cursor_downgrade(cursor);
440
441         /*
442          * Set the node to its parent.  If the parent is NULL we are at
443          * the root of the root cluster and return ENOENT.
444          */
445         hammer_unlock(&cursor->node->lock);
446         hammer_rel_node(cursor->node);
447         cursor->node = cursor->parent;
448         cursor->index = cursor->parent_index;
449         cursor->parent = NULL;
450         cursor->parent_index = 0;
451
452         if (cursor->node == NULL) {
453                 error = ENOENT;
454         } else if ((cursor->flags & HAMMER_CURSOR_INCLUSTER) &&
455                    cursor->node->ondisk->parent == 0
456         ) {
457                 error = ENOENT;
458         } else {
459                 error = hammer_load_cursor_parent(cursor);
460         }
461         return(error);
462 }
463
464 /*
465  * Set the cursor to the root of the current cursor's cluster.
466  */
467 int
468 hammer_cursor_toroot(hammer_cursor_t cursor)
469 {
470         hammer_cluster_t cluster;
471         int error;
472
473         /*
474          * Already at root of cluster?
475          */
476         if (cursor->node->ondisk->parent == 0) 
477                 return (0);
478
479         hammer_cursor_downgrade(cursor);
480
481         /*
482          * Parent is root of cluster?
483          */
484         if (cursor->parent->ondisk->parent == 0)
485                 return (hammer_cursor_up(cursor));
486
487         /*
488          * Ok, reload the cursor with the root of the cluster, then
489          * locate its parent.
490          */
491         cluster = cursor->node->cluster;
492         error = hammer_ref_cluster(cluster);
493         if (error)
494                 return (error);
495
496         hammer_unlock(&cursor->parent->lock);
497         hammer_rel_node(cursor->parent);
498         hammer_unlock(&cursor->node->lock);
499         hammer_rel_node(cursor->node);
500         cursor->parent = NULL;
501         cursor->parent_index = 0;
502
503         cursor->node = hammer_get_node(cluster, cluster->ondisk->clu_btree_root,
504                                        &error);
505         cursor->index = 0;
506         hammer_lock_sh(&cursor->node->lock);
507         hammer_rel_cluster(cluster, 0);
508         if (error == 0)
509                 error = hammer_load_cursor_parent(cursor);
510         return(error);
511 }
512
513 /*
514  * Cursor down through the current node, which must be an internal node.
515  *
516  * This routine adjusts the cursor and sets index to 0.
517  */
518 int
519 hammer_cursor_down(hammer_cursor_t cursor)
520 {
521         hammer_node_t node;
522         hammer_btree_elm_t elm;
523         hammer_volume_t volume;
524         hammer_cluster_t cluster;
525         int32_t vol_no;
526         int32_t clu_no;
527         int error;
528
529         /*
530          * The current node becomes the current parent
531          */
532         hammer_cursor_downgrade(cursor);
533         node = cursor->node;
534         KKASSERT(cursor->index >= 0 && cursor->index < node->ondisk->count);
535         if (cursor->parent) {
536                 hammer_unlock(&cursor->parent->lock);
537                 hammer_rel_node(cursor->parent);
538         }
539         cursor->parent = node;
540         cursor->parent_index = cursor->index;
541         cursor->node = NULL;
542         cursor->index = 0;
543
544         /*
545          * Extract element to push into at (node,index), set bounds.
546          */
547         elm = &node->ondisk->elms[cursor->parent_index];
548
549         /*
550          * Ok, push down into elm.  If elm specifies an internal or leaf
551          * node the current node must be an internal node.  If elm specifies
552          * a spike then the current node must be a leaf node.
553          *
554          * Cursoring down through a cluster transition when the INCLUSTER
555          * flag is set is not legal.
556          */
557         switch(elm->base.btype) {
558         case HAMMER_BTREE_TYPE_INTERNAL:
559         case HAMMER_BTREE_TYPE_LEAF:
560                 KKASSERT(node->ondisk->type == HAMMER_BTREE_TYPE_INTERNAL);
561                 KKASSERT(elm->internal.subtree_offset != 0);
562                 cursor->left_bound = &elm[0].internal.base;
563                 cursor->right_bound = &elm[1].internal.base;
564                 node = hammer_get_node(node->cluster,
565                                        elm->internal.subtree_offset,
566                                        &error);
567                 if (error == 0) {
568                         KKASSERT(elm->base.btype == node->ondisk->type);
569                         if(node->ondisk->parent != cursor->parent->node_offset)
570                                 kprintf("node %p %d vs %d\n", node, node->ondisk->parent, cursor->parent->node_offset);
571                         KKASSERT(node->ondisk->parent == cursor->parent->node_offset);
572                 }
573                 break;
574         case HAMMER_BTREE_TYPE_SPIKE_BEG:
575         case HAMMER_BTREE_TYPE_SPIKE_END:
576                 KKASSERT(node->ondisk->type == HAMMER_BTREE_TYPE_LEAF);
577                 KKASSERT((cursor->flags & HAMMER_CURSOR_INCLUSTER) == 0);
578                 vol_no = elm->leaf.spike_vol_no;
579                 clu_no = elm->leaf.spike_clu_no;
580                 volume = hammer_get_volume(node->cluster->volume->hmp,
581                                            vol_no, &error);
582                 KKASSERT(error != EINVAL);
583                 if (error)
584                         return(error);
585                 cluster = hammer_get_cluster(volume, clu_no, &error, 0);
586                 KKASSERT(error != EINVAL);
587                 hammer_rel_volume(volume, 0);
588                 if (error)
589                         return(error);
590                 cursor->left_bound = &cluster->ondisk->clu_btree_beg;
591                 cursor->right_bound = &cluster->ondisk->clu_btree_end;
592                 node = hammer_get_node(cluster,
593                                        cluster->ondisk->clu_btree_root,
594                                        &error);
595                 hammer_rel_cluster(cluster, 0);
596                 break;
597         default:
598                 panic("hammer_cursor_down: illegal btype %02x (%c)\n",
599                       elm->base.btype,
600                       (elm->base.btype ? elm->base.btype : '?'));
601                 break;
602         }
603         if (error == 0) {
604                 hammer_lock_sh(&node->lock);
605                 cursor->node = node;
606                 cursor->index = 0;
607         }
608         return(error);
609 }
610