hammer2 - Initial CCMS adaptation and code-up
[dragonfly.git] / sys / vfs / hammer2 / hammer2_ccms.c
1 /*
2  * Copyright (c) 2006,2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * The Cache Coherency Management System (CCMS)
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/malloc.h>
42 #include <sys/objcache.h>
43 #include <sys/sysctl.h>
44 #include <sys/uio.h>
45 #include <machine/limits.h>
46
47 #include <sys/spinlock2.h>
48
49 #include "hammer2_ccms.h"
50
51 struct ccms_lock_scan_info {
52         ccms_inode_t    *cino;
53         ccms_lock_t     *lock;
54         ccms_cst_t      *coll_cst;
55         int             rstate_upgrade_needed;
56 };
57
58 static int ccms_cst_cmp(ccms_cst_t *b1, ccms_cst_t *b2);
59 static int ccms_lock_scan_cmp(ccms_cst_t *b1, void *arg);
60
61 static int ccms_lock_get_match(ccms_cst_t *cst, void *arg);
62 static int ccms_lock_undo_match(ccms_cst_t *cst, void *arg);
63 static int ccms_lock_redo_match(ccms_cst_t *cst, void *arg);
64 static int ccms_lock_upgrade_match(ccms_cst_t *cst, void *arg);
65 static int ccms_lock_put_match(ccms_cst_t *cst, void *arg);
66
67 static void ccms_lstate_get(ccms_cst_t *cst, ccms_state_t state);
68 static void ccms_lstate_put(ccms_cst_t *cst);
69 static void ccms_rstate_get(ccms_cst_t *cst, ccms_state_t state);
70 static void ccms_rstate_put(ccms_cst_t *cst);
71
72 struct ccms_rb_tree;
73 RB_GENERATE3(ccms_rb_tree, ccms_cst, rbnode, ccms_cst_cmp,
74              ccms_off_t, beg_offset, end_offset);
75 static MALLOC_DEFINE(M_CCMS, "CCMS", "Cache Coherency Management System");
76
77 static int ccms_debug = 0;
78
79 /*
80  * These helpers are called to manage the CST cache so we can avoid
81  * unnecessary kmalloc()'s and kfree()'s in hot paths.
82  *
83  * ccms_free_pass1() must be called with the spinlock held.
84  * ccms_free_pass2() must be called with the spinlock not held.
85  */
86 static __inline
87 ccms_cst_t *
88 ccms_free_pass1(ccms_inode_t *cino, int keep)
89 {
90         ccms_cst_t *cst;
91         ccms_cst_t **cstp;
92
93         cstp = &cino->free_cache;
94         while ((cst = *cstp) != NULL && keep) {
95                 cstp = &cst->free_next;
96                 --keep;
97         }
98         *cstp = NULL;
99         return (cst);
100 }
101
102 static __inline
103 void
104 ccms_free_pass2(ccms_cst_t *next)
105 {
106         ccms_cst_t *cst;
107         ccms_domain_t *dom;
108
109         while ((cst = next) != NULL) {
110                 next = cst->free_next;
111                 cst->free_next = NULL;
112
113                 dom = cst->cino->domain;
114                 atomic_add_int(&dom->cst_count, -1);
115
116                 kfree(cst, dom->mcst);
117         }
118 }
119
120 /*
121  * Initialize a new CCMS dataspace.  Create a new RB tree with a single
122  * element covering the entire 64 bit offset range.  This simplifies
123  * algorithms enormously by removing a number of special cases.
124  */
125 void
126 ccms_domain_init(ccms_domain_t *dom)
127 {
128         bzero(dom, sizeof(*dom));
129         kmalloc_create(&dom->mcst, "CCMS-cst");
130         ccms_inode_init(dom, &dom->root, NULL);
131         dom->root.domain = dom;
132 }
133
134 /*
135  * Initialize a ccms_inode for use.  The inode will be initialized but
136  * is not yet connected to the rest of the topology.  However, it can
137  * still be used stand-alone if desired without being connected to the
138  * topology.
139  */
140 void
141 ccms_inode_init(ccms_domain_t *dom, ccms_inode_t *cino, void *handle)
142 {
143         ccms_cst_t *cst;
144
145         bzero(cino, sizeof(*cino));
146
147         spin_init(&cino->spin);
148         RB_INIT(&cino->tree);
149         cino->domain = dom;
150         cino->handle = handle;
151         cino->attr_cst.cino = cino;
152         cino->attr_cst.lstate = CCMS_STATE_INVALID;
153         cino->attr_cst.rstate = CCMS_STATE_INVALID;
154         cino->topo_cst.cino = cino;
155         cino->topo_cst.lstate = CCMS_STATE_INVALID;
156         cino->topo_cst.rstate = CCMS_STATE_INVALID;
157
158         /*
159          * The dataspace must be initialized w/cache-state set to INVALID
160          * for the entire range.
161          */
162         cst = kmalloc(sizeof(*cst), dom->mcst, M_WAITOK | M_ZERO);
163         cst->cino = cino;
164         cst->flags = CCMS_CST_DYNAMIC;
165         cst->beg_offset = 0;
166         cst->end_offset = 0xFFFFFFFFFFFFFFFFLLU;
167         cst->lstate = CCMS_STATE_INVALID;
168         cst->rstate = CCMS_STATE_INVALID;
169         RB_INSERT(ccms_rb_tree, &cino->tree, cst);
170         atomic_add_int(&dom->cst_count, 1);
171 }
172
173 /*
174  * Insert an inode into the topology.  The inode has already been
175  * initialized and could even be in active.
176  */
177 void
178 ccms_inode_insert(ccms_inode_t *cpar, ccms_inode_t *cino)
179 {
180         spin_lock(&cpar->spin);
181         spin_lock(&cino->spin);
182         KKASSERT((cino->flags & CCMS_INODE_INSERTED) == 0);
183         if (RB_INSERT(ccms_rb_tree, &cpar->tree, &cino->topo_cst)) {
184                 spin_unlock(&cino->spin);
185                 spin_unlock(&cpar->spin);
186                 panic("ccms_inode_insert: duplicate entry");
187         }
188         cino->parent = cpar;
189         cino->flags |= CCMS_INODE_INSERTED;
190         spin_unlock(&cino->spin);
191         spin_unlock(&cpar->spin);
192 }
193
194 /*
195  * Delete an inode from the topology.  The inode can remain in active use
196  * after the deletion (e.g. when unlinking a file which still has open
197  * descriptors).
198  *
199  * If the caller is destroying the ccms_inode the caller must call
200  * ccms_inode_uninit() to invalidate the cache state (which can block).
201  */
202 void
203 ccms_inode_delete(ccms_inode_t *cino)
204 {
205         ccms_inode_t *cpar;
206         int flags;
207
208         /*
209          * Interlock with the DELETING flag.
210          */
211         spin_lock(&cino->spin);
212         flags = cino->flags;
213         cino->flags |= CCMS_INODE_DELETING;
214         spin_unlock(&cino->spin);
215
216         if (flags & CCMS_INODE_DELETING)
217                 return;
218         if ((flags & CCMS_INODE_INSERTED) == 0)
219                 return;
220
221         /*
222          * We have the interlock, we are the only ones who can delete
223          * the inode now.
224          */
225         cpar = cino->parent;
226         spin_lock(&cpar->spin);
227         spin_lock(&cino->spin);
228         KKASSERT(cpar == cino->parent);
229
230         cino->flags &= ~CCMS_INODE_INSERTED;
231         RB_REMOVE(ccms_rb_tree, &cpar->tree, &cino->topo_cst);
232
233         spin_unlock(&cino->spin);
234         spin_unlock(&cpar->spin);
235 }
236
237 /*
238  * The caller has removed the inode from the topology and is now trying
239  * to destroy the structure.  This routine flushes the cache state and
240  * can block on third-party interactions.
241  *
242  * NOTE: Caller must have already destroyed any recursive inode state.
243  */
244 void
245 ccms_inode_uninit(ccms_inode_t *cino)
246 {
247         ccms_cst_t *scan;
248
249         KKASSERT(cino->flags & CCMS_INODE_DELETING);
250         spin_lock(&cino->spin);
251
252         while ((scan = RB_ROOT(&cino->tree)) != NULL) {
253                 KKASSERT(scan->flags & CCMS_CST_DYNAMIC);
254                 KKASSERT((scan->flags & CCMS_CST_DELETING) == 0);
255                 RB_REMOVE(ccms_rb_tree, &cino->tree, scan);
256                 scan->flags |= CCMS_CST_DELETING;
257                 scan->flags &= ~CCMS_CST_INSERTED;
258                 spin_unlock(&cino->spin);
259
260                 /*
261                  * Inval can be called without the inode spinlock because
262                  * we own the DELETING flag.
263                  */
264                 ccms_lstate_put(scan);
265                 ccms_rstate_put(scan);
266                 atomic_add_int(&cino->domain->cst_count, -1);
267
268                 kfree(scan, cino->domain->mcst);
269                 spin_lock(&cino->spin);
270         }
271         KKASSERT((cino->attr_cst.flags & CCMS_CST_DELETING) == 0);
272         cino->attr_cst.flags |= CCMS_CST_DELETING;
273         KKASSERT((cino->topo_cst.flags & CCMS_CST_DELETING) == 0);
274         cino->topo_cst.flags |= CCMS_CST_DELETING;
275         scan = ccms_free_pass1(cino, 0);
276         spin_unlock(&cino->spin);
277
278         /*
279          * Inval can be called without the inode spinlock because
280          * we own the DELETING flag.  Similarly we can clear cino->domain
281          * and cino->handle because we own the DELETING flag on the cino.
282          */
283         ccms_lstate_put(&cino->attr_cst);
284         ccms_rstate_put(&cino->attr_cst);
285         ccms_lstate_put(&cino->topo_cst);
286         ccms_rstate_put(&cino->topo_cst);
287
288         ccms_free_pass2(scan);
289
290         cino->domain = NULL;
291         cino->handle = NULL;
292 }
293
294 /*
295  * This is the core CCMS lock acquisition code and is typically called
296  * by program-specific wrappers which initialize the lock structure.
297  *
298  * Three cache coherent domains can be obtained, the topological 't'
299  * domain, the attribute 'a' domain, and a range in the data 'd' domain.
300  *
301  * A topological CCMS lock covers the entire attribute and data domain
302  * plus recursively covers the entire directory sub-tree, so if a topo
303  * lock is requested the other 'a' and 'd' locks currently assert if
304  * specified in the same request.
305  *
306  * You can get both an 'a' and a 'd' lock at the same time and, in
307  * particular, a VFS can use the 'a' lock to also lock the related
308  * VFS inode structure if it desires to.  HAMMER2 utilizes this feature.
309  *
310  * Topo locks are typically needed for rename operations and topo CST
311  * cache state on the backend can be used to limit the number of dynamic
312  * CST allocations backing the live CCMS locks.
313  */
314 int
315 ccms_lock_get(ccms_inode_t *cino, ccms_lock_t *lock)
316 {
317         struct ccms_lock_scan_info info;
318         ccms_cst_t *cst;
319         int use_redo = 0;
320         ccms_state_t highest_state;
321
322         /*
323          * Live local locks prevent remotes from downgrading the rstate,
324          * so we have to acquire a local lock before testing rstate.  If
325          *
326          * The local lock must be released if a remote upgrade is required
327          * to avoid a deadlock, and we retry in that situation.
328          */
329 again:
330         if (lock->tstate) {
331                 KKASSERT(lock->astate == 0 && lock->dstate == 0);
332                 lock->icst = &cino->topo_cst;
333                 ccms_lstate_get(lock->icst, lock->tstate);
334
335                 if (cino->topo_cst.rstate < lock->tstate) {
336                         ccms_lstate_put(&cino->topo_cst);
337                         ccms_rstate_get(&cino->topo_cst, lock->tstate);
338                         goto again;
339                 }
340         } else {
341                 /*
342                  * The topo rstate must be at least ALLOWED for us to be
343                  * able to acquire any other cache state.  If the topo
344                  * rstate is already higher than that then we may have
345                  * to upgrade it further to cover the lstate's we are
346                  * requesting.
347                  */
348                 highest_state = CCMS_STATE_ALLOWED;
349                 if (cino->topo_cst.rstate > highest_state) {
350                         if (highest_state < lock->astate)
351                                 highest_state = lock->astate;
352                         if (highest_state < lock->dstate)
353                                 highest_state = lock->dstate;
354                 }
355                 if (cino->topo_cst.rstate < highest_state)
356                         ccms_rstate_get(&cino->topo_cst, highest_state);
357                 /* no need to retry */
358         }
359         if (lock->astate) {
360                 lock->icst = &cino->attr_cst;
361                 ccms_lstate_get(lock->icst, lock->astate);
362
363                 if (cino->attr_cst.rstate < lock->astate) {
364                         ccms_lstate_put(&cino->attr_cst);
365                         if (lock->tstate)
366                                 ccms_lstate_put(&cino->topo_cst);
367                         ccms_rstate_get(&cino->attr_cst, lock->astate);
368                         goto again;
369                 }
370         }
371
372         /*
373          * The data-lock is a range-lock and requires a bit more code.
374          * The CST space is partitioned so the precise range is covered.
375          *
376          * Multiple CST's may be involved and dcst points to the left hand
377          * edge.
378          */
379         if (lock->dstate) {
380                 info.lock = lock;
381                 info.cino = cino;
382                 info.coll_cst = NULL;
383
384                 spin_lock(&cino->spin);
385
386                 /*
387                  * Make sure cino has enough free CSTs to cover the operation,
388                  * so we can hold the spinlock through the scan later on.
389                  */
390                 while (cino->free_cache == NULL ||
391                        cino->free_cache->free_next == NULL) {
392                         spin_unlock(&cino->spin);
393                         cst = kmalloc(sizeof(*cst), cino->domain->mcst,
394                                       M_WAITOK | M_ZERO);
395                         atomic_add_int(&cino->domain->cst_count, 1);
396                         spin_lock(&cino->spin);
397                         cst->free_next = cino->free_cache;
398                         cino->free_cache = cst;
399                 }
400
401                 /*
402                  * The partitioning code runs with the spinlock held.  If
403                  * we've already partitioned due to having to do an rstate
404                  * upgrade we run a redo instead of a get.
405                  */
406                 info.rstate_upgrade_needed = 0;
407                 if (use_redo == 0) {
408                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
409                                 ccms_lock_get_match, &info);
410                 } else {
411                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
412                                 ccms_lock_redo_match, &info);
413                 }
414
415                 /*
416                  * If a collision occured, undo the fragments we were able
417                  * to obtain, block, and try again.
418                  */
419                 while (info.coll_cst != NULL) {
420                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
421                                 ccms_lock_undo_match, &info);
422                         info.coll_cst->blocked = 1;
423                         info.coll_cst = NULL;
424                         ssleep(info.coll_cst, &cino->spin, 0, "ccmsget", hz);
425                         info.rstate_upgrade_needed = 0;
426                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
427                                 ccms_lock_redo_match, &info);
428                 }
429
430                 /*
431                  * If the rstate needs to be upgraded we have to undo the
432                  * local locks (but we retain the partitioning).
433                  *
434                  * Set use_redo to indicate that the partioning was retained
435                  * (i.e. lrefs and rrefs remain intact).
436                  */
437                 if (info.rstate_upgrade_needed) {
438                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
439                                 ccms_lock_undo_match, &info);
440                         spin_unlock(&cino->spin);
441                         if (lock->astate)
442                                 ccms_lstate_put(&cino->attr_cst);
443                         if (lock->tstate)
444                                 ccms_lstate_put(&cino->topo_cst);
445                         spin_lock(&cino->spin);
446                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
447                                 ccms_lock_upgrade_match, &info);
448                         spin_unlock(&cino->spin);
449                         use_redo = 1;
450                         goto again;
451                 }
452
453                 /*
454                  * Cleanup free CSTs beyond the 2 we wish to retain.
455                  */
456                 cst = ccms_free_pass1(cino, 2);
457                 spin_unlock(&cino->spin);
458                 ccms_free_pass2(cst);
459         }
460
461         /*
462          * Ok, everything is in good shape EXCEPT we might not have
463          * sufficient topo_cst.rstate.  It could have gotten ripped
464          * out from under us.  Once we have the local locks it can
465          * no longer be downgraded so a check here suffices.
466          */
467         highest_state = CCMS_STATE_ALLOWED;
468         if (highest_state < lock->tstate)
469                 highest_state = lock->tstate;
470         if (highest_state < lock->astate)
471                 highest_state = lock->astate;
472         if (highest_state < lock->dstate)
473                 highest_state = lock->dstate;
474
475         if (cino->topo_cst.rstate < highest_state) {
476                 if (lock->dstate) {
477                         spin_lock(&cino->spin);
478                         RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
479                                 ccms_lock_put_match, &info);
480                         spin_unlock(&cino->spin);
481                 }
482                 if (lock->astate)
483                         ccms_lstate_put(&cino->attr_cst);
484                 if (lock->tstate)
485                         ccms_lstate_put(&cino->topo_cst);
486                 ccms_rstate_get(&cino->topo_cst, highest_state);
487                 use_redo = 0;
488                 goto again;
489         }
490         return(0);
491 }
492
493 /*
494  * Obtain a CCMS lock, initialize the lock structure based on the uio.
495  *
496  * Both the attribute AND a ranged-data lock is acquired.
497  */
498 int
499 ccms_lock_get_uio(ccms_inode_t *cino, ccms_lock_t *lock, struct uio *uio)
500 {
501         ccms_state_t dstate;
502         ccms_off_t eoff;
503
504         if (uio->uio_rw == UIO_READ)
505                 dstate = CCMS_STATE_SHARED;
506         else
507                 dstate = CCMS_STATE_MODIFIED;
508
509         /*
510          * Calculate the ending offset (byte inclusive), make sure a seek
511          * overflow does not blow us up.
512          */
513         eoff = uio->uio_offset + uio->uio_resid - 1;
514         if (eoff < uio->uio_offset)
515                 eoff = 0x7FFFFFFFFFFFFFFFLL;
516         lock->beg_offset = uio->uio_offset;
517         lock->end_offset = eoff;
518         lock->tstate = 0;
519         lock->astate = dstate;
520         lock->dstate = dstate;
521         return (ccms_lock_get(cino, lock));
522 }
523
524 /*
525  * Obtain a CCMS lock.  Only the attribute lock is acquired.
526  */
527 int
528 ccms_lock_get_attr(ccms_inode_t *cino, ccms_lock_t *lock, ccms_state_t astate)
529 {
530         lock->tstate = 0;
531         lock->astate = astate;
532         lock->dstate = 0;
533         return (ccms_lock_get(cino, lock));
534 }
535
536 /*
537  * Helper routine.
538  *
539  * NOTE: called with spinlock held.
540  */
541 static
542 int
543 ccms_lock_get_match(ccms_cst_t *cst, void *arg)
544 {
545         struct ccms_lock_scan_info *info = arg;
546         ccms_lock_t *lock = info->lock;
547         ccms_cst_t *ncst;
548
549         /*
550          * If the lock's left edge is within the CST we must split the CST
551          * into two pieces [cst][ncst].  lrefs must be bumped on the CST
552          * containing the left edge.
553          *
554          * NOTE! cst->beg_offset may not be modified.  This allows us to
555          *       avoid having to manipulate the cst's position in the tree.
556          */
557         if (lock->beg_offset > cst->beg_offset) {
558                 ncst = info->cino->free_cache;
559                 info->cino->free_cache = ncst->free_next;
560                 ncst->free_next = NULL;
561                 KKASSERT(ncst != NULL);
562
563                 *ncst = *cst;
564                 cst->end_offset = lock->beg_offset - 1;
565                 cst->rrefs = 0;
566                 ncst->beg_offset = lock->beg_offset;
567                 ncst->lrefs = 1;
568                 RB_INSERT(ccms_rb_tree, &info->cino->tree, ncst);
569
570                 /*
571                  * ncst becomes our 'matching' cst.
572                  */
573                 cst = ncst;
574         } else if (lock->beg_offset == cst->beg_offset) {
575                 ++cst->lrefs;
576         }
577
578         /*
579          * If the lock's right edge is within the CST we must split the CST
580          * into two pieces [cst][ncst].  rrefs must be bumped on the CST
581          * containing the right edge.
582          *
583          * NOTE! cst->beg_offset may not be modified.  This allows us to
584          * avoid having to manipulate the cst's position in the tree.
585          */
586         if (lock->end_offset < cst->end_offset) {
587                 ncst = info->cino->free_cache;
588                 info->cino->free_cache = ncst->free_next;
589                 ncst->free_next = NULL;
590                 KKASSERT(ncst != NULL);
591
592                 *ncst = *cst;
593                 cst->end_offset = lock->end_offset;
594                 cst->rrefs = 1;
595                 ncst->beg_offset = lock->end_offset + 1;
596                 ncst->lrefs = 0;
597                 RB_INSERT(ccms_rb_tree, &info->cino->tree, ncst);
598                 /* cst remains our 'matching' cst */
599         } else if (lock->end_offset == cst->end_offset) {
600                 ++cst->rrefs;
601         }
602
603         /*
604          * The lock covers the CST, so increment the CST's coverage count.
605          * Then attempt to obtain the shared/exclusive lock.  The coverage
606          * count is maintained until the put operation.
607          */
608         ++cst->xrefs;
609         if (cst->lstate < lock->dstate)
610                 cst->lstate = lock->dstate;
611
612         /*
613          * If we have already collided we make no more modifications
614          * to cst->count, but we must continue the scan to properly
615          * partition the cst.
616          */
617         if (info->coll_cst)
618                 return(0);
619
620         switch(lock->dstate) {
621         case CCMS_STATE_INVALID:
622                 break;
623         case CCMS_STATE_ALLOWED:
624         case CCMS_STATE_SHARED:
625         case CCMS_STATE_SLAVE:
626                 if (cst->count < 0) {
627                         info->coll_cst = cst;
628                 } else {
629                         ++cst->count;
630                         if (ccms_debug >= 9) {
631                                 kprintf("CST SHARE %d %lld-%lld\n",
632                                         cst->count,
633                                         (long long)cst->beg_offset,
634                                         (long long)cst->end_offset);
635                         }
636                 }
637                 break;
638         case CCMS_STATE_MASTER:
639         case CCMS_STATE_EXCLUSIVE:
640                 if (cst->count != 0) {
641                         info->coll_cst = cst;
642                 } else {
643                         --cst->count;
644                         if (ccms_debug >= 9) {
645                                 kprintf("CST EXCLS %d %lld-%lld\n",
646                                         cst->count,
647                                         (long long)cst->beg_offset,
648                                         (long long)cst->end_offset);
649                         }
650                 }
651                 break;
652         case CCMS_STATE_MODIFIED:
653                 if (cst->count != 0) {
654                         info->coll_cst = cst;
655                 } else {
656                         --cst->count;
657                         if (cst->lstate <= CCMS_STATE_EXCLUSIVE)
658                                 cst->lstate = CCMS_STATE_MODIFIED;
659                         if (ccms_debug >= 9) {
660                                 kprintf("CST MODXL %d %lld-%lld\n",
661                                         cst->count,
662                                         (long long)cst->beg_offset,
663                                         (long long)cst->end_offset);
664                         }
665                 }
666                 break;
667         default:
668                 panic("ccms_lock_get_match: bad state %d\n", lock->dstate);
669                 break;
670         }
671         return(0);
672 }
673
674 /*
675  * Undo a partially resolved ccms_ltype rangelock.  This is atomic with
676  * the scan/redo code so there should not be any blocked locks when
677  * transitioning to 0.  lrefs and rrefs are not touched in order to
678  * retain the partitioning.
679  *
680  * If coll_cst is non-NULL we stop when we hit this element as locks on
681  * no further elements were obtained.  This element might not represent
682  * a left or right edge but coll_cst can only be non-NULL if the spinlock
683  * was held throughout the get/redo and the undo.
684  *
685  * NOTE: called with spinlock held.
686  */
687 static
688 int
689 ccms_lock_undo_match(ccms_cst_t *cst, void *arg)
690 {
691         struct ccms_lock_scan_info *info = arg;
692         ccms_lock_t *lock = info->lock;
693
694         if (cst == info->coll_cst)
695                 return(-1);
696
697         switch (lock->dstate) {
698         case CCMS_STATE_INVALID:
699                 break;
700         case CCMS_STATE_ALLOWED:
701         case CCMS_STATE_SHARED:
702         case CCMS_STATE_SLAVE:
703                 KKASSERT(cst->count > 0);
704                 --cst->count;
705                 KKASSERT(cst->count || cst->blocked == 0);
706                 break;
707         case CCMS_STATE_MASTER:
708         case CCMS_STATE_EXCLUSIVE:
709         case CCMS_STATE_MODIFIED:
710                 KKASSERT(cst->count < 0);
711                 ++cst->count;
712                 KKASSERT(cst->count || cst->blocked == 0);
713                 break;
714         default:
715                 panic("ccms_lock_undo_match: bad state %d\n", lock->dstate);
716                 break;
717         }
718         return(0);
719 }
720
721 /*
722  * Redo the local lock request for a range which has already been
723  * partitioned.
724  *
725  * NOTE: called with spinlock held.
726  */
727 static
728 int
729 ccms_lock_redo_match(ccms_cst_t *cst, void *arg)
730 {
731         struct ccms_lock_scan_info *info = arg;
732         ccms_lock_t *lock = info->lock;
733
734         KKASSERT(info->coll_cst == NULL);
735
736         switch(lock->dstate) {
737         case CCMS_STATE_INVALID:
738                 break;
739         case CCMS_STATE_ALLOWED:
740         case CCMS_STATE_SHARED:
741         case CCMS_STATE_SLAVE:
742                 if (cst->count < 0) {
743                         info->coll_cst = cst;
744                 } else {
745                         if (ccms_debug >= 9) {
746                                 kprintf("CST SHARE %d %lld-%lld\n",
747                                         cst->count,
748                                         (long long)cst->beg_offset,
749                                         (long long)cst->end_offset);
750                         }
751                         ++cst->count;
752                 }
753                 break;
754         case CCMS_STATE_MASTER:
755         case CCMS_STATE_EXCLUSIVE:
756                 if (cst->count != 0) {
757                         info->coll_cst = cst;
758                 } else {
759                         --cst->count;
760                         if (ccms_debug >= 9) {
761                                 kprintf("CST EXCLS %d %lld-%lld\n",
762                                         cst->count,
763                                         (long long)cst->beg_offset,
764                                         (long long)cst->end_offset);
765                         }
766                 }
767                 break;
768         case CCMS_STATE_MODIFIED:
769                 if (cst->count != 0) {
770                         info->coll_cst = cst;
771                 } else {
772                         --cst->count;
773                         if (ccms_debug >= 9) {
774                                 kprintf("CST MODXL %d %lld-%lld\n",
775                                         cst->count,
776                                         (long long)cst->beg_offset,
777                                         (long long)cst->end_offset);
778                         }
779                 }
780                 break;
781         default:
782                 panic("ccms_lock_redo_match: bad state %d\n", lock->dstate);
783                 break;
784         }
785
786         if (info->coll_cst)
787                 return(-1);     /* stop the scan */
788         return(0);              /* continue the scan */
789 }
790
791 /*
792  * Upgrade the rstate for the matching range.
793  *
794  * NOTE: Called with spinlock held.
795  */
796 static
797 int
798 ccms_lock_upgrade_match(ccms_cst_t *cst, void *arg)
799 {
800         struct ccms_lock_scan_info *info = arg;
801         ccms_lock_t *lock = info->lock;
802
803         /*
804          * ccms_rstate_get() can block so we must release the spinlock.
805          * To prevent the cst from getting ripped out on us we temporarily
806          * bump both lrefs and rrefs.
807          */
808         if (cst->rstate < lock->dstate) {
809                 ++cst->lrefs;
810                 ++cst->rrefs;
811                 spin_unlock(&info->cino->spin);
812                 ccms_rstate_get(cst, lock->dstate);
813                 spin_lock(&info->cino->spin);
814                 --cst->lrefs;
815                 --cst->rrefs;
816         }
817         return(0);
818 }
819
820 /*
821  * Release a previously acquired CCMS lock.
822  */
823 int
824 ccms_lock_put(ccms_inode_t *cino, ccms_lock_t *lock)
825 {
826         struct ccms_lock_scan_info info;
827         ccms_cst_t *scan;
828
829         if (lock->tstate) {
830                 ccms_lstate_put(lock->icst);
831                 lock->tstate = 0;
832                 lock->icst = NULL;
833         } else if (lock->astate) {
834                 ccms_lstate_put(lock->icst);
835                 lock->astate = 0;
836                 lock->icst = NULL;
837         }
838
839         if (lock->dstate) {
840                 info.lock = lock;
841                 info.cino = cino;
842                 spin_lock(&cino->spin);
843                 RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
844                         ccms_lock_put_match, &info);
845                 scan = ccms_free_pass1(cino, 2);
846                 spin_unlock(&cino->spin);
847                 ccms_free_pass2(scan);
848                 lock->dstate = 0;
849                 lock->dcst = NULL;
850         }
851
852         return(0);
853 }
854
855 /*
856  * Release a local lock.  The related CST's lstate is set to INVALID once
857  * the coverage drops to 0 and adjacent compatible entries will be
858  * recombined.
859  *
860  * NOTE: called with spinlock held.
861  */
862 static
863 int
864 ccms_lock_put_match(ccms_cst_t *cst, void *arg)
865 {
866         struct ccms_lock_scan_info *info = arg;
867         ccms_lock_t *lock = info->lock;
868         ccms_cst_t *ocst;
869
870         /*
871          * Undo the local shared/exclusive rangelock.
872          */
873         switch(lock->dstate) {
874         case CCMS_STATE_INVALID:
875                 break;
876         case CCMS_STATE_ALLOWED:
877         case CCMS_STATE_SHARED:
878         case CCMS_STATE_SLAVE:
879                 KKASSERT(cst->count > 0);
880                 --cst->count;
881                 if (ccms_debug >= 9) {
882                         kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->count,
883                                 (long long)cst->beg_offset,
884                                 (long long)cst->end_offset,
885                                 cst->blocked);
886                 }
887                 if (cst->blocked && cst->count == 0) {
888                         cst->blocked = 0;
889                         wakeup(cst);
890                 }
891                 break;
892         case CCMS_STATE_MASTER:
893         case CCMS_STATE_EXCLUSIVE:
894         case CCMS_STATE_MODIFIED:
895                 KKASSERT(cst->count < 0);
896                 ++cst->count;
897                 if (ccms_debug >= 9) {
898                         kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->count,
899                                 (long long)cst->beg_offset,
900                                 (long long)cst->end_offset,
901                                 cst->blocked);
902                 }
903                 if (cst->blocked && cst->count == 0) {
904                         cst->blocked = 0;
905                         wakeup(cst);
906                 }
907                 break;
908         default:
909                 panic("ccms_lock_put_match: bad state %d\n", lock->dstate);
910                 break;
911         }
912
913         /*
914          * Decrement the lock coverage count on the CST.  Decrement the left
915          * and right edge counts as appropriate.
916          *
917          * When lrefs or rrefs drops to zero we check the adjacent entry to
918          * determine whether a merge is possible.  If the appropriate refs
919          * field (rrefs for the entry to our left, lrefs for the entry to
920          * our right) is 0, then all covering locks must cover both entries
921          * and the xrefs field must match.  We can then merge the entries
922          * if they have compatible cache states.
923          *
924          * However, because we are cleaning up the shared/exclusive count
925          * at the same time, the count field may be temporarily out of
926          * sync, so require that the count field also match before doing
927          * a merge.
928          *
929          * When merging an element which is being blocked on, the blocking
930          * thread(s) will be woken up.
931          *
932          * If the dataspace has too many CSTs we may be able to merge the
933          * entries even if their cache states are not the same, by dropping
934          * both to a compatible (lower) cache state and performing the
935          * appropriate management operations.  XXX
936          */
937         if (--cst->xrefs == 0)
938                 cst->lstate = CCMS_STATE_INVALID;
939
940         if (lock->beg_offset == cst->beg_offset && --cst->lrefs == 0) {
941                 if ((ocst = RB_PREV(ccms_rb_tree,
942                                     &info->cino->tree, cst)) != NULL &&
943                     ocst->rrefs == 0 &&
944                     ocst->lstate == cst->lstate &&
945                     ocst->rstate == cst->rstate &&
946                     ocst->count == cst->count
947                 ) {
948                         KKASSERT(ocst->xrefs == cst->xrefs);
949                         KKASSERT(ocst->end_offset + 1 == cst->beg_offset);
950                         RB_REMOVE(ccms_rb_tree, &info->cino->tree, ocst);
951                         cst->beg_offset = ocst->beg_offset;
952                         cst->lrefs = ocst->lrefs;
953                         if (ccms_debug >= 9) {
954                                 kprintf("MERGELEFT %p %lld-%lld (%d)\n",
955                                        ocst,
956                                        (long long)cst->beg_offset,
957                                        (long long)cst->end_offset,
958                                        cst->blocked);
959                         }
960                         if (ocst->blocked) {
961                                 ocst->blocked = 0;
962                                 wakeup(ocst);
963                         }
964                         ocst->free_next = info->cino->free_cache;
965                         info->cino->free_cache = ocst;
966                 }
967         }
968         if (lock->end_offset == cst->end_offset && --cst->rrefs == 0) {
969                 if ((ocst = RB_NEXT(ccms_rb_tree,
970                                     &info->cino->tree, cst)) != NULL &&
971                     ocst->lrefs == 0 &&
972                     ocst->lstate == cst->lstate &&
973                     ocst->rstate == cst->rstate &&
974                     ocst->count == cst->count
975                 ) {
976                         KKASSERT(ocst->xrefs == cst->xrefs);
977                         KKASSERT(cst->end_offset + 1 == ocst->beg_offset);
978                         RB_REMOVE(ccms_rb_tree, &info->cino->tree, ocst);
979                         cst->end_offset = ocst->end_offset;
980                         cst->rrefs = ocst->rrefs;
981                         if (ccms_debug >= 9) {
982                                 kprintf("MERGERIGHT %p %lld-%lld\n",
983                                        ocst,
984                                        (long long)cst->beg_offset,
985                                        (long long)cst->end_offset);
986                         }
987                         ocst->free_next = info->cino->free_cache;
988                         info->cino->free_cache = ocst;
989                 }
990         }
991         return(0);
992 }
993
994 /*
995  * RB tree compare function for insertions and deletions.  This function
996  * compares two CSTs.
997  */
998 static int
999 ccms_cst_cmp(ccms_cst_t *b1, ccms_cst_t *b2)
1000 {
1001         if (b1->end_offset < b2->beg_offset)
1002                 return(-1);
1003         if (b1->beg_offset > b2->end_offset)
1004                 return(1);
1005         return(0);
1006 }
1007
1008 /*
1009  * RB tree scanning compare function.  This function compares the CST
1010  * from the tree against the supplied ccms_lock and returns the CST's
1011  * placement relative to the lock.
1012  */
1013 static int
1014 ccms_lock_scan_cmp(ccms_cst_t *cst, void *arg)
1015 {
1016         struct ccms_lock_scan_info *info = arg;
1017         ccms_lock_t *lock = info->lock;
1018
1019         if (cst->end_offset < lock->beg_offset)
1020                 return(-1);
1021         if (cst->beg_offset > lock->end_offset)
1022                 return(1);
1023         return(0);
1024 }
1025
1026 /************************************************************************
1027  *              STANDALONE LSTATE AND RSTATE SUPPORT FUNCTIONS          *
1028  ************************************************************************
1029  *
1030  * These functions are used to perform work on the attr_cst and topo_cst
1031  * embedded in a ccms_inode, and to issue remote state operations.  These
1032  * functions are called without the ccms_inode spinlock held.
1033  */
1034
1035 static
1036 void
1037 ccms_lstate_get(ccms_cst_t *cst, ccms_state_t state)
1038 {
1039         int blocked;
1040
1041         spin_lock(&cst->cino->spin);
1042         ++cst->xrefs;
1043
1044         for (;;) {
1045                 blocked = 0;
1046
1047                 switch(state) {
1048                 case CCMS_STATE_INVALID:
1049                         break;
1050                 case CCMS_STATE_ALLOWED:
1051                 case CCMS_STATE_SHARED:
1052                 case CCMS_STATE_SLAVE:
1053                         if (cst->count < 0) {
1054                                 blocked = 1;
1055                         } else {
1056                                 ++cst->count;
1057                                 if (ccms_debug >= 9) {
1058                                         kprintf("CST SHARE %d %lld-%lld\n",
1059                                                 cst->count,
1060                                                 (long long)cst->beg_offset,
1061                                                 (long long)cst->end_offset);
1062                                 }
1063                         }
1064                         break;
1065                 case CCMS_STATE_MASTER:
1066                 case CCMS_STATE_EXCLUSIVE:
1067                         if (cst->count != 0) {
1068                                 blocked = 1;
1069                         } else {
1070                                 --cst->count;
1071                                 if (ccms_debug >= 9) {
1072                                         kprintf("CST EXCLS %d %lld-%lld\n",
1073                                                 cst->count,
1074                                                 (long long)cst->beg_offset,
1075                                                 (long long)cst->end_offset);
1076                                 }
1077                         }
1078                         break;
1079                 case CCMS_STATE_MODIFIED:
1080                         if (cst->count != 0) {
1081                                 blocked = 1;
1082                         } else {
1083                                 --cst->count;
1084                                 if (cst->lstate <= CCMS_STATE_EXCLUSIVE)
1085                                         cst->lstate = CCMS_STATE_MODIFIED;
1086                                 if (ccms_debug >= 9) {
1087                                         kprintf("CST MODXL %d %lld-%lld\n",
1088                                                 cst->count,
1089                                                 (long long)cst->beg_offset,
1090                                                 (long long)cst->end_offset);
1091                                 }
1092                         }
1093                         break;
1094                 default:
1095                         panic("ccms_lock_get_match: bad state %d\n", state);
1096                         break;
1097                 }
1098                 if (blocked == 0)
1099                         break;
1100                 ssleep(cst, &cst->cino->spin, 0, "ccmslget", hz);
1101         }
1102         if (cst->lstate < state)
1103                 cst->lstate = state;
1104         spin_unlock(&cst->cino->spin);
1105 }
1106
1107 static
1108 void
1109 ccms_lstate_put(ccms_cst_t *cst)
1110 {
1111         spin_lock(&cst->cino->spin);
1112
1113         switch(cst->lstate) {
1114         case CCMS_STATE_INVALID:
1115                 break;
1116         case CCMS_STATE_ALLOWED:
1117         case CCMS_STATE_SHARED:
1118         case CCMS_STATE_SLAVE:
1119                 KKASSERT(cst->count > 0);
1120                 --cst->count;
1121                 if (ccms_debug >= 9) {
1122                         kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->count,
1123                                 (long long)cst->beg_offset,
1124                                 (long long)cst->end_offset,
1125                                 cst->blocked);
1126                 }
1127                 if (cst->blocked && cst->count == 0) {
1128                         cst->blocked = 0;
1129                         wakeup(cst);
1130                 }
1131                 break;
1132         case CCMS_STATE_MASTER:
1133         case CCMS_STATE_EXCLUSIVE:
1134         case CCMS_STATE_MODIFIED:
1135                 KKASSERT(cst->count < 0);
1136                 ++cst->count;
1137                 if (ccms_debug >= 9) {
1138                         kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->count,
1139                                 (long long)cst->beg_offset,
1140                                 (long long)cst->end_offset,
1141                                 cst->blocked);
1142                 }
1143                 if (cst->blocked && cst->count == 0) {
1144                         cst->blocked = 0;
1145                         wakeup(cst);
1146                 }
1147                 break;
1148         default:
1149                 panic("ccms_lock_put_match: bad state %d\n", cst->lstate);
1150                 break;
1151         }
1152
1153         if (--cst->xrefs == 0)
1154                 cst->lstate = CCMS_STATE_INVALID;
1155         spin_unlock(&cst->cino->spin);
1156 }
1157
1158 /*
1159  * XXX third-party interaction & granularity
1160  */
1161 static
1162 void
1163 ccms_rstate_get(ccms_cst_t *cst, ccms_state_t state)
1164 {
1165         spin_lock(&cst->cino->spin);
1166         if (cst->rstate < state)
1167                 cst->rstate = state;
1168         spin_unlock(&cst->cino->spin);
1169 }
1170
1171 /*
1172  * XXX third-party interaction & granularity
1173  */
1174 static
1175 void
1176 ccms_rstate_put(ccms_cst_t *cst)
1177 {
1178         spin_lock(&cst->cino->spin);
1179         cst->rstate = CCMS_STATE_INVALID;
1180         spin_unlock(&cst->cino->spin);
1181 }