HAMMER 23/many: Recovery, B-Tree, spike, I/O work.
[dragonfly.git] / sys / vfs / hammer / hammer_spike.c
1 /*
2  * Copyright (c) 2007 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/vfs/hammer/Attic/hammer_spike.c,v 1.11 2008/01/24 02:14:45 dillon Exp $
35  */
36
37 #include "hammer.h"
38
39 /*
40  * Load spike info given a cursor.  The cursor must point to the leaf node
41  * that needs to be spiked after a failed insertion.
42  */
43 void
44 hammer_load_spike(hammer_cursor_t cursor, struct hammer_cursor **spikep)
45 {
46         hammer_cursor_t spike;
47
48         KKASSERT(cursor->node->ondisk->type == HAMMER_BTREE_TYPE_LEAF);
49         KKASSERT(*spikep == NULL);
50         *spikep = spike = kmalloc(sizeof(*spike), M_HAMMER, M_WAITOK|M_ZERO);
51         ++hammer_count_spikes;
52
53         spike->parent = cursor->parent;
54         spike->parent_index = cursor->parent_index;
55         spike->node = cursor->node;
56         spike->index = cursor->index;
57         spike->left_bound = cursor->left_bound;
58         spike->right_bound = cursor->right_bound;
59         spike->key_beg = cursor->key_beg;
60
61         if (spike->parent) {
62                 hammer_ref_node(spike->parent);
63                 hammer_lock_sh(&spike->parent->lock);
64         }
65         hammer_ref_node(spike->node);
66         hammer_lock_sh(&spike->node->lock);
67         kprintf("LOAD SPIKE %p\n", spike);
68 }
69
70 /*
71  * Spike code - make room in a cluster by spiking in a new cluster.
72  *
73  * The spike structure contains a locked and reference B-Tree leaf node.
74  * The spike at a minimum must move the contents of the leaf into a
75  * new cluster and replace the leaf with two elements representing the
76  * SPIKE_BEG and SPIKE_END.
77  *
78  * Various optimizations are desireable, including merging the spike node
79  * with an adjacent node that has already been spiked, if its cluster is
80  * not full, or promoting the spike node to the parent cluster of the current
81  * cluster when it represents the right hand boundary leaf node in the
82  * cluster (to avoid append chains).
83  */
84 int
85 hammer_spike(struct hammer_cursor **spikep)
86 {
87         hammer_cursor_t spike;
88         struct hammer_cursor ncursor;
89         hammer_cluster_t ocluster;
90         hammer_cluster_t ncluster;
91         hammer_node_ondisk_t ondisk;
92         hammer_btree_elm_t elm;
93         hammer_node_t onode;
94         hammer_record_ondisk_t rec;
95         hammer_node_locklist_t locklist = NULL;
96         int error;
97         int b, e;
98         const int esize = sizeof(*elm);
99         static int count;       /* XXX temporary */
100
101         kprintf("hammer_spike: ENOSPC in cluster, spiking\n");
102         /*Debugger("ENOSPC");*/
103
104         /*
105          * Validate and lock the spike.  If this fails due to a deadlock
106          * we still return 0 since a spike is only called when the
107          * caller intends to retry the operation.
108          */
109         spike = *spikep;
110         KKASSERT(spike != NULL);
111         KKASSERT(spike->parent &&
112                  spike->parent->cluster == spike->node->cluster);
113         KKASSERT(spike->node->ondisk->type == HAMMER_BTREE_TYPE_LEAF);
114
115         error = hammer_cursor_upgrade(spike);
116         if (error) {
117                 error = 0;
118                 goto failed4;
119         }
120
121         /*
122          * Our leaf may contain spikes.  We have to lock the root node
123          * in each target cluster.
124          */
125         error = hammer_btree_lock_children(spike, &locklist);
126         if (error) {
127                 error = 0;
128                 goto failed4;
129         }
130
131         onode = spike->node;
132         ocluster = onode->cluster;
133         ondisk = onode->ondisk;
134         hammer_lock_ex(&ocluster->io.lock);
135
136         /*
137          * Calculate the range of elements in the leaf that we will push
138          * down into our spike.  For the moment push them all down.
139          */
140         b = 0;
141         e = ondisk->count;
142
143         /*
144          * Use left-bound for spike if b == 0, else use the base element
145          * for the item to the left and adjust it past one unit.
146          */
147         if (b == 0) {
148                 spike->key_beg = *spike->left_bound;
149         } else {
150                 spike->key_beg = ondisk->elms[b-1].leaf.base;
151                 if (spike->key_beg.delete_tid != 0) {
152                         ++spike->key_beg.delete_tid;
153                 } else if (spike->key_beg.key != HAMMER_MAX_KEY) {
154                         ++spike->key_beg.key;
155                         spike->key_beg.delete_tid = 1;
156                 } else if (spike->key_beg.rec_type != HAMMER_MAX_RECTYPE) {
157                         ++spike->key_beg.rec_type;
158                         spike->key_beg.key = HAMMER_MIN_KEY;
159                         spike->key_beg.delete_tid = 1;
160                 } else if (spike->key_beg.obj_id != HAMMER_MAX_OBJID) {
161                         ++spike->key_beg.obj_id;
162                         spike->key_beg.key = HAMMER_MIN_KEY;
163                         spike->key_beg.delete_tid = 1;
164                         spike->key_beg.rec_type = HAMMER_MIN_RECTYPE;
165                 } else {
166                         panic("hammer_spike: illegal key");
167                 }
168                 KKASSERT(hammer_btree_cmp(&ondisk->elms[b-1].base, &spike->key_beg) < 0);
169         }
170
171         /*
172          * Use the right-bound if e is terminal, otherwise use the element
173          * at [e].  key_end is exclusive for the call to hammer_init_cluster()
174          * and is then made inclusive later to construct the SPIKE_END
175          * element.
176          */
177         if (e == ondisk->count)
178                 spike->key_end = *spike->right_bound;
179         else
180                 spike->key_end = ondisk->elms[e].leaf.base;
181
182         /*
183          * Heuristic:  Attempt to size the spike range according to
184          * expected traffic.  This is primarily responsible for the
185          * initial layout of the filesystem.
186          */
187         if (e && b != e) {
188                 int32_t clsize = ocluster->volume->ondisk->vol_clsize;
189                 int64_t delta = 1000000000;
190                 int64_t dkey;
191
192                 elm = &ondisk->elms[e-1];
193                 if (elm->base.obj_id == spike->key_end.obj_id &&
194                     elm->base.rec_type == spike->key_end.rec_type) {
195                         /* 
196                          * NOTE: dkey can overflow.
197                          */
198                         dkey = elm->base.key + clsize;
199                         if (dkey > elm->base.key && dkey < spike->key_end.key)
200                                 spike->key_end.key = elm->base.key + clsize;
201                 } else if (elm->base.obj_id + delta < spike->key_end.obj_id) {
202                         spike->key_end.obj_id = elm->base.obj_id + delta;
203                 }
204         }
205
206         /*
207          * Allocate and lock a new cluster, initialize its bounds.
208          */
209         ncluster = hammer_alloc_cluster(ocluster->volume->hmp, ocluster,
210                                         &error);
211         if (ncluster == NULL)
212                 goto failed3;
213         hammer_init_cluster(ncluster, &spike->key_beg, &spike->key_end);
214
215         /*
216          * Get a cursor for the new cluster.  Operations will be limited to
217          * this cluster.  Set HAMMER_CURSOR_RECOVER to force internal
218          * boundary elements in a way that allows us to copy spikes.
219          */
220         error = hammer_init_cursor_cluster(&ncursor, ncluster);
221         if (error)
222                 goto failed1;
223         ncursor.flags |= HAMMER_CURSOR_INSERT | HAMMER_CURSOR_RECOVER;
224
225         /*
226          * Copy the elements in the leaf node to the new target cluster.
227          */
228         for (spike->index = b; spike->index < e; ++spike->index) {
229                 elm = &onode->ondisk->elms[spike->index];
230
231                 if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_END)
232                         continue;
233                 error = hammer_btree_extract(spike,
234                                              HAMMER_CURSOR_GET_RECORD |
235                                              HAMMER_CURSOR_GET_DATA);
236                 if (error == 0) {
237                         ncursor.key_beg = elm->leaf.base;
238                         error = hammer_write_record(&ncursor, spike->record,
239                                                     spike->data, spike->flags);
240                 }
241
242                 KKASSERT(error != EDEADLK);
243                 if (error == ENOSPC) {
244                         kprintf("impossible ENOSPC error on spike\n");
245                         error = EIO;
246                 }
247                 if (error)
248                         goto failed1;
249         }
250
251         /*
252          * Delete the records and data associated with the old leaf node,
253          * replacing them with the spike elements.
254          *
255          * XXX I/O ordering issue, we're destroying these records too
256          * early, but we need one for the spike allocation.  What to do?
257          */
258         for (spike->index = b; spike->index < e; ++spike->index) {
259                 int32_t roff;
260
261                 elm = &onode->ondisk->elms[spike->index];
262                 if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_SPIKE_BEG)
263                         continue;
264                 KKASSERT(elm->leaf.rec_offset > 0);
265                 hammer_free_record(ocluster, elm->leaf.rec_offset,
266                                    elm->leaf.base.rec_type);
267                 if (elm->leaf.base.btype == HAMMER_BTREE_TYPE_RECORD &&
268                     elm->leaf.data_offset) {
269                         roff = elm->leaf.data_offset - elm->leaf.rec_offset;
270                         if (roff < 0 || roff >= HAMMER_RECORD_SIZE) {
271                                 hammer_free_data(ocluster,
272                                                  elm->leaf.data_offset,
273                                                  elm->leaf.data_len);
274                         }
275                 }
276         }
277
278         /*
279          * Add a record representing the spike using space freed up by the
280          * above deletions.
281          */
282         rec = hammer_alloc_record(ocluster, &error,
283                                   HAMMER_RECTYPE_CLUSTER,
284                                   &spike->record_buffer);
285         KKASSERT(error == 0);
286         rec->spike.base.base.btype = HAMMER_BTREE_TYPE_RECORD;
287         rec->spike.base.base.rec_type = HAMMER_RECTYPE_CLUSTER;
288         rec->spike.base.rec_id = hammer_alloc_recid(ocluster);
289         rec->spike.clu_no = ncluster->clu_no;
290         rec->spike.vol_no = ncluster->volume->vol_no;
291         rec->spike.clu_id = 0;
292
293         /*
294          * Construct the spike elements.  Note that the right boundary
295          * is range-exclusive whereas the SPIKE_END must be range-inclusive.
296          */
297         hammer_modify_node(onode);
298         ondisk = onode->ondisk;
299         elm = &ondisk->elms[b];
300
301         if (e - b != 2)
302                 bcopy(&elm[e - b], &elm[2], (ondisk->count - e) * esize);
303         ondisk->count = ondisk->count - (e - b) + 2;
304
305         elm[0].leaf.base = spike->key_beg;
306         elm[0].leaf.base.btype = HAMMER_BTREE_TYPE_SPIKE_BEG;
307         elm[0].leaf.rec_offset = hammer_bclu_offset(spike->record_buffer, rec);
308         elm[0].leaf.spike_clu_no = ncluster->clu_no;
309         elm[0].leaf.spike_vol_no = ncluster->volume->vol_no;
310         elm[0].leaf.spike_unused01 = 0;
311
312         elm[1].leaf.base = spike->key_end;
313         elm[1].leaf.base.btype = HAMMER_BTREE_TYPE_SPIKE_END;
314         elm[1].leaf.rec_offset = elm[0].leaf.rec_offset;
315         elm[1].leaf.spike_clu_no = ncluster->clu_no;
316         elm[1].leaf.spike_vol_no = ncluster->volume->vol_no;
317         elm[1].leaf.spike_unused01 = 0;
318
319         /*
320          * Make the SPIKE_END element inclusive.
321          */
322         if (elm[1].leaf.base.delete_tid != 1) {
323                 --elm[1].leaf.base.delete_tid;
324         } else if (elm[0].leaf.base.key != HAMMER_MIN_KEY) {
325                 --elm[0].leaf.base.key;
326                 elm[0].leaf.base.delete_tid = 0; /* max value */
327         } else if (elm[0].leaf.base.rec_type != HAMMER_MIN_RECTYPE) {
328                 --elm[0].leaf.base.rec_type;
329                 elm[0].leaf.base.key = HAMMER_MAX_KEY;
330                 elm[0].leaf.base.delete_tid = 0; /* max value */
331         } else if (elm[0].leaf.base.obj_id != HAMMER_MIN_OBJID) {
332                 --elm[0].leaf.base.obj_id;
333                 elm[0].leaf.base.rec_type = HAMMER_MAX_RECTYPE;
334                 elm[0].leaf.base.key = HAMMER_MAX_KEY;
335                 elm[0].leaf.base.delete_tid = 0; /* max value */
336         } else {
337                 panic("hammer_spike: illegal key");
338         }
339
340         /*
341          * Adjust ncluster
342          */
343         {
344                 hammer_cluster_ondisk_t ondisk;
345
346                 hammer_modify_cluster(ncluster);
347                 ondisk = ncluster->ondisk;
348                 ondisk->clu_btree_parent_vol_no = ocluster->volume->vol_no;
349                 ondisk->clu_btree_parent_clu_no = ocluster->clu_no;
350                 ondisk->clu_btree_parent_offset = onode->node_offset;
351                 ondisk->clu_btree_parent_clu_gen = ocluster->ondisk->clu_gen;
352         }
353
354         /*
355          * XXX I/O dependancy - new cluster must be flushed before current
356          * cluster can be flushed.
357          */
358         /*Debugger("COPY COMPLETE");*/
359         hammer_done_cursor(&ncursor);
360         goto success;
361
362         /*
363          * Cleanup
364          */
365 failed1:
366         hammer_done_cursor(&ncursor);
367         hammer_free_cluster(ncluster);
368 success:
369         hammer_unlock(&ncluster->io.lock);
370         hammer_rel_cluster(ncluster, 0);
371 failed3:
372         kprintf("UNLOAD SPIKE %p %d\n", spike, error);
373         hammer_unlock(&ocluster->io.lock);
374 failed4:
375         hammer_btree_unlock_children(&locklist);
376         hammer_done_cursor(spike);
377         --hammer_count_spikes;
378         kfree(spike, M_HAMMER);
379         *spikep = NULL;
380         return (error);
381 }
382