hammer2 - Initial CCMS adaptation and code-up
authorMatthew Dillon <dillon@apollo.backplane.com>
Mon, 4 Jun 2012 03:47:45 +0000 (20:47 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 4 Jun 2012 03:47:45 +0000 (20:47 -0700)
This is an initial code-up and compiles-without-error pass, untested and
likely full of bugs.

CCMS needed a makeover but I managed to retain the guts of the original
block/wakeup and CST partitioning code.

* The frontend code now creates a larger CCMS topology which will mirror
  the chain topology (the ccms_inode will be embedded in a hammer2_inode),
  and places the data ranging in ccms_inode.

* CCMS inode creation and deletion is broken up into two stages, e.g. a
  deletion requires a 'delete' plus 'uninit' sequence allowing the 'delete'
  to reflect a topological deletion but for the CCMS node to remain intact
  (e.g. if open descriptors on the related file or directory remain), then
  a final uninit when the descriptors finally go away.

* Enhanced the original CCMS code and the new ccms_inode to track three
  different cache coherency domains:  (1) A recursive topological domain
  which covers the inode and entire subtree.  (2) An attribute domain covering
  only the inode attributes, and (3) A data domain covering a data offset
  range or directory key range.

* Local cache states are implemented for the attribute and data range domains,
  the topological domain is not yet properly recursive.

* Remotely-granted cache states are not yet implemented.

sys/vfs/hammer2/Makefile
sys/vfs/hammer2/hammer2.h
sys/vfs/hammer2/hammer2_ccms.c
sys/vfs/hammer2/hammer2_ccms.h

index f0b2ffe..67f60b1 100644 (file)
@@ -5,7 +5,7 @@
 
 CFLAGS+= -DINVARIANTS
 KMOD=  hammer2
-SRCS=  hammer2_vfsops.c hammer2_vnops.c hammer2_inode.c
+SRCS=  hammer2_vfsops.c hammer2_vnops.c hammer2_inode.c hammer2_ccms.c
 SRCS+= hammer2_chain.c hammer2_freemap.c hammer2_subr.c hammer2_icrc.c
 SRCS+= hammer2_ioctl.c
 
index 265c2c8..183616a 100644 (file)
@@ -66,6 +66,7 @@
 #include "hammer2_disk.h"
 #include "hammer2_mount.h"
 #include "hammer2_ioctl.h"
+#include "hammer2_ccms.h"
 
 struct hammer2_chain;
 struct hammer2_inode;
index e4cc014..0c08947 100644 (file)
 #include <sys/kernel.h>
 #include <sys/malloc.h>
 #include <sys/objcache.h>
-#include <sys/ccms.h>
 #include <sys/sysctl.h>
 #include <sys/uio.h>
 #include <machine/limits.h>
 
 #include <sys/spinlock2.h>
 
+#include "hammer2_ccms.h"
+
 struct ccms_lock_scan_info {
-       ccms_dataspace_t ds;
-       ccms_lock_t lock;
-       ccms_cst_t  cst1;
-       ccms_cst_t  cst2;
-       ccms_cst_t  coll_cst;
+       ccms_inode_t    *cino;
+       ccms_lock_t     *lock;
+       ccms_cst_t      *coll_cst;
+       int             rstate_upgrade_needed;
 };
 
-static int ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2);
-static int ccms_lock_scan_cmp(ccms_cst_t b1, void *arg);
-static int ccms_lock_undo_cmp(ccms_cst_t b1, void *arg);
-static int ccms_dataspace_destroy_match(ccms_cst_t cst, void *arg);
-static int ccms_lock_get_match(struct ccms_cst *cst, void *arg);
-static int ccms_lock_undo_match(struct ccms_cst *cst, void *arg);
-static int ccms_lock_redo_match(struct ccms_cst *cst, void *arg);
-static int ccms_lock_put_match(struct ccms_cst *cst, void *arg);
+static int ccms_cst_cmp(ccms_cst_t *b1, ccms_cst_t *b2);
+static int ccms_lock_scan_cmp(ccms_cst_t *b1, void *arg);
+
+static int ccms_lock_get_match(ccms_cst_t *cst, void *arg);
+static int ccms_lock_undo_match(ccms_cst_t *cst, void *arg);
+static int ccms_lock_redo_match(ccms_cst_t *cst, void *arg);
+static int ccms_lock_upgrade_match(ccms_cst_t *cst, void *arg);
+static int ccms_lock_put_match(ccms_cst_t *cst, void *arg);
 
+static void ccms_lstate_get(ccms_cst_t *cst, ccms_state_t state);
+static void ccms_lstate_put(ccms_cst_t *cst);
+static void ccms_rstate_get(ccms_cst_t *cst, ccms_state_t state);
+static void ccms_rstate_put(ccms_cst_t *cst);
+
+struct ccms_rb_tree;
 RB_GENERATE3(ccms_rb_tree, ccms_cst, rbnode, ccms_cst_cmp,
-            off_t, beg_offset, end_offset);
+            ccms_off_t, beg_offset, end_offset);
 static MALLOC_DEFINE(M_CCMS, "CCMS", "Cache Coherency Management System");
 
-static int ccms_enable;
-SYSCTL_INT(_kern, OID_AUTO, ccms_enable, CTLFLAG_RW, &ccms_enable, 0, "");
-
-static struct objcache *ccms_oc;
+static int ccms_debug = 0;
 
 /*
- * Initialize the CCMS subsystem
+ * These helpers are called to manage the CST cache so we can avoid
+ * unnecessary kmalloc()'s and kfree()'s in hot paths.
+ *
+ * ccms_free_pass1() must be called with the spinlock held.
+ * ccms_free_pass2() must be called with the spinlock not held.
  */
-static void
-ccmsinit(void *dummy)
+static __inline
+ccms_cst_t *
+ccms_free_pass1(ccms_inode_t *cino, int keep)
 {
-    ccms_oc = objcache_create_simple(M_CCMS, sizeof(struct ccms_cst));
+       ccms_cst_t *cst;
+       ccms_cst_t **cstp;
+
+       cstp = &cino->free_cache;
+       while ((cst = *cstp) != NULL && keep) {
+               cstp = &cst->free_next;
+               --keep;
+       }
+       *cstp = NULL;
+       return (cst);
+}
+
+static __inline
+void
+ccms_free_pass2(ccms_cst_t *next)
+{
+       ccms_cst_t *cst;
+       ccms_domain_t *dom;
+
+       while ((cst = next) != NULL) {
+               next = cst->free_next;
+               cst->free_next = NULL;
+
+               dom = cst->cino->domain;
+               atomic_add_int(&dom->cst_count, -1);
+
+               kfree(cst, dom->mcst);
+       }
 }
-SYSINIT(ccms, SI_BOOT2_MACHDEP, SI_ORDER_ANY, ccmsinit, NULL);
 
 /*
  * Initialize a new CCMS dataspace.  Create a new RB tree with a single
@@ -89,161 +123,414 @@ SYSINIT(ccms, SI_BOOT2_MACHDEP, SI_ORDER_ANY, ccmsinit, NULL);
  * algorithms enormously by removing a number of special cases.
  */
 void
-ccms_dataspace_init(ccms_dataspace_t ds)
+ccms_domain_init(ccms_domain_t *dom)
 {
-    ccms_cst_t cst;
-
-    RB_INIT(&ds->tree);
-    ds->info = NULL;
-    ds->chain = NULL;
-    cst = objcache_get(ccms_oc, M_WAITOK);
-    bzero(cst, sizeof(*cst));
-    cst->beg_offset = LLONG_MIN;
-    cst->end_offset = LLONG_MAX;
-    cst->state = CCMS_STATE_INVALID;
-    RB_INSERT(ccms_rb_tree, &ds->tree, cst);
-    spin_init(&ds->spin);
+       bzero(dom, sizeof(*dom));
+       kmalloc_create(&dom->mcst, "CCMS-cst");
+       ccms_inode_init(dom, &dom->root, NULL);
+       dom->root.domain = dom;
 }
 
 /*
- * Helper to destroy deleted cst's.
+ * Initialize a ccms_inode for use.  The inode will be initialized but
+ * is not yet connected to the rest of the topology.  However, it can
+ * still be used stand-alone if desired without being connected to the
+ * topology.
  */
-static __inline
 void
-ccms_delayed_free(ccms_cst_t cstn)
+ccms_inode_init(ccms_domain_t *dom, ccms_inode_t *cino, void *handle)
 {
-    ccms_cst_t cst;
+       ccms_cst_t *cst;
+
+       bzero(cino, sizeof(*cino));
 
-    while((cst = cstn) != NULL) {
-       cstn = cst->delayed_next;
-       objcache_put(ccms_oc, cst);
-    }
+       spin_init(&cino->spin);
+       RB_INIT(&cino->tree);
+       cino->domain = dom;
+       cino->handle = handle;
+       cino->attr_cst.cino = cino;
+       cino->attr_cst.lstate = CCMS_STATE_INVALID;
+       cino->attr_cst.rstate = CCMS_STATE_INVALID;
+       cino->topo_cst.cino = cino;
+       cino->topo_cst.lstate = CCMS_STATE_INVALID;
+       cino->topo_cst.rstate = CCMS_STATE_INVALID;
+
+       /*
+        * The dataspace must be initialized w/cache-state set to INVALID
+        * for the entire range.
+        */
+       cst = kmalloc(sizeof(*cst), dom->mcst, M_WAITOK | M_ZERO);
+       cst->cino = cino;
+       cst->flags = CCMS_CST_DYNAMIC;
+       cst->beg_offset = 0;
+       cst->end_offset = 0xFFFFFFFFFFFFFFFFLLU;
+       cst->lstate = CCMS_STATE_INVALID;
+       cst->rstate = CCMS_STATE_INVALID;
+       RB_INSERT(ccms_rb_tree, &cino->tree, cst);
+       atomic_add_int(&dom->cst_count, 1);
+}
+
+/*
+ * Insert an inode into the topology.  The inode has already been
+ * initialized and could even be in active.
+ */
+void
+ccms_inode_insert(ccms_inode_t *cpar, ccms_inode_t *cino)
+{
+       spin_lock(&cpar->spin);
+       spin_lock(&cino->spin);
+       KKASSERT((cino->flags & CCMS_INODE_INSERTED) == 0);
+       if (RB_INSERT(ccms_rb_tree, &cpar->tree, &cino->topo_cst)) {
+               spin_unlock(&cino->spin);
+               spin_unlock(&cpar->spin);
+               panic("ccms_inode_insert: duplicate entry");
+       }
+       cino->parent = cpar;
+       cino->flags |= CCMS_INODE_INSERTED;
+       spin_unlock(&cino->spin);
+       spin_unlock(&cpar->spin);
 }
 
 /*
- * Destroy a CCMS dataspace.
+ * Delete an inode from the topology.  The inode can remain in active use
+ * after the deletion (e.g. when unlinking a file which still has open
+ * descriptors).
  *
- * MPSAFE
+ * If the caller is destroying the ccms_inode the caller must call
+ * ccms_inode_uninit() to invalidate the cache state (which can block).
  */
 void
-ccms_dataspace_destroy(ccms_dataspace_t ds)
+ccms_inode_delete(ccms_inode_t *cino)
 {
-    ccms_cst_t cst;
-
-    spin_lock(&ds->spin);
-    RB_SCAN(ccms_rb_tree, &ds->tree, NULL,
-           ccms_dataspace_destroy_match, ds);
-    cst = ds->delayed_free;
-    ds->delayed_free = NULL;
-    spin_unlock(&ds->spin);
-    ccms_delayed_free(cst);
+       ccms_inode_t *cpar;
+       int flags;
+
+       /*
+        * Interlock with the DELETING flag.
+        */
+       spin_lock(&cino->spin);
+       flags = cino->flags;
+       cino->flags |= CCMS_INODE_DELETING;
+       spin_unlock(&cino->spin);
+
+       if (flags & CCMS_INODE_DELETING)
+               return;
+       if ((flags & CCMS_INODE_INSERTED) == 0)
+               return;
+
+       /*
+        * We have the interlock, we are the only ones who can delete
+        * the inode now.
+        */
+       cpar = cino->parent;
+       spin_lock(&cpar->spin);
+       spin_lock(&cino->spin);
+       KKASSERT(cpar == cino->parent);
+
+       cino->flags &= ~CCMS_INODE_INSERTED;
+       RB_REMOVE(ccms_rb_tree, &cpar->tree, &cino->topo_cst);
+
+       spin_unlock(&cino->spin);
+       spin_unlock(&cpar->spin);
 }
 
 /*
- * Helper routine to delete matches during a destroy.
+ * The caller has removed the inode from the topology and is now trying
+ * to destroy the structure.  This routine flushes the cache state and
+ * can block on third-party interactions.
  *
- * NOTE: called with spinlock held.
+ * NOTE: Caller must have already destroyed any recursive inode state.
  */
-static
-int
-ccms_dataspace_destroy_match(ccms_cst_t cst, void *arg)
+void
+ccms_inode_uninit(ccms_inode_t *cino)
 {
-    ccms_dataspace_t ds = arg;
+       ccms_cst_t *scan;
+
+       KKASSERT(cino->flags & CCMS_INODE_DELETING);
+       spin_lock(&cino->spin);
+
+       while ((scan = RB_ROOT(&cino->tree)) != NULL) {
+               KKASSERT(scan->flags & CCMS_CST_DYNAMIC);
+               KKASSERT((scan->flags & CCMS_CST_DELETING) == 0);
+               RB_REMOVE(ccms_rb_tree, &cino->tree, scan);
+               scan->flags |= CCMS_CST_DELETING;
+               scan->flags &= ~CCMS_CST_INSERTED;
+               spin_unlock(&cino->spin);
+
+               /*
+                * Inval can be called without the inode spinlock because
+                * we own the DELETING flag.
+                */
+               ccms_lstate_put(scan);
+               ccms_rstate_put(scan);
+               atomic_add_int(&cino->domain->cst_count, -1);
+
+               kfree(scan, cino->domain->mcst);
+               spin_lock(&cino->spin);
+       }
+       KKASSERT((cino->attr_cst.flags & CCMS_CST_DELETING) == 0);
+       cino->attr_cst.flags |= CCMS_CST_DELETING;
+       KKASSERT((cino->topo_cst.flags & CCMS_CST_DELETING) == 0);
+       cino->topo_cst.flags |= CCMS_CST_DELETING;
+       scan = ccms_free_pass1(cino, 0);
+       spin_unlock(&cino->spin);
+
+       /*
+        * Inval can be called without the inode spinlock because
+        * we own the DELETING flag.  Similarly we can clear cino->domain
+        * and cino->handle because we own the DELETING flag on the cino.
+        */
+       ccms_lstate_put(&cino->attr_cst);
+       ccms_rstate_put(&cino->attr_cst);
+       ccms_lstate_put(&cino->topo_cst);
+       ccms_rstate_put(&cino->topo_cst);
 
-    RB_REMOVE(ccms_rb_tree, &ds->tree, cst);
-    cst->delayed_next = ds->delayed_free;
-    ds->delayed_free = cst;
-    return(0);
+       ccms_free_pass2(scan);
+
+       cino->domain = NULL;
+       cino->handle = NULL;
 }
 
 /*
- * Obtain a CCMS lock
+ * This is the core CCMS lock acquisition code and is typically called
+ * by program-specific wrappers which initialize the lock structure.
+ *
+ * Three cache coherent domains can be obtained, the topological 't'
+ * domain, the attribute 'a' domain, and a range in the data 'd' domain.
+ *
+ * A topological CCMS lock covers the entire attribute and data domain
+ * plus recursively covers the entire directory sub-tree, so if a topo
+ * lock is requested the other 'a' and 'd' locks currently assert if
+ * specified in the same request.
+ *
+ * You can get both an 'a' and a 'd' lock at the same time and, in
+ * particular, a VFS can use the 'a' lock to also lock the related
+ * VFS inode structure if it desires to.  HAMMER2 utilizes this feature.
  *
- * MPSAFE
+ * Topo locks are typically needed for rename operations and topo CST
+ * cache state on the backend can be used to limit the number of dynamic
+ * CST allocations backing the live CCMS locks.
  */
 int
-ccms_lock_get(ccms_dataspace_t ds, ccms_lock_t lock)
+ccms_lock_get(ccms_inode_t *cino, ccms_lock_t *lock)
 {
-    struct ccms_lock_scan_info info;
-    ccms_cst_t cst;
+       struct ccms_lock_scan_info info;
+       ccms_cst_t *cst;
+       int use_redo = 0;
+       ccms_state_t highest_state;
+
+       /*
+        * Live local locks prevent remotes from downgrading the rstate,
+        * so we have to acquire a local lock before testing rstate.  If
+        *
+        * The local lock must be released if a remote upgrade is required
+        * to avoid a deadlock, and we retry in that situation.
+        */
+again:
+       if (lock->tstate) {
+               KKASSERT(lock->astate == 0 && lock->dstate == 0);
+               lock->icst = &cino->topo_cst;
+               ccms_lstate_get(lock->icst, lock->tstate);
+
+               if (cino->topo_cst.rstate < lock->tstate) {
+                       ccms_lstate_put(&cino->topo_cst);
+                       ccms_rstate_get(&cino->topo_cst, lock->tstate);
+                       goto again;
+               }
+       } else {
+               /*
+                * The topo rstate must be at least ALLOWED for us to be
+                * able to acquire any other cache state.  If the topo
+                * rstate is already higher than that then we may have
+                * to upgrade it further to cover the lstate's we are
+                * requesting.
+                */
+               highest_state = CCMS_STATE_ALLOWED;
+               if (cino->topo_cst.rstate > highest_state) {
+                       if (highest_state < lock->astate)
+                               highest_state = lock->astate;
+                       if (highest_state < lock->dstate)
+                               highest_state = lock->dstate;
+               }
+               if (cino->topo_cst.rstate < highest_state)
+                       ccms_rstate_get(&cino->topo_cst, highest_state);
+               /* no need to retry */
+       }
+       if (lock->astate) {
+               lock->icst = &cino->attr_cst;
+               ccms_lstate_get(lock->icst, lock->astate);
+
+               if (cino->attr_cst.rstate < lock->astate) {
+                       ccms_lstate_put(&cino->attr_cst);
+                       if (lock->tstate)
+                               ccms_lstate_put(&cino->topo_cst);
+                       ccms_rstate_get(&cino->attr_cst, lock->astate);
+                       goto again;
+               }
+       }
+
+       /*
+        * The data-lock is a range-lock and requires a bit more code.
+        * The CST space is partitioned so the precise range is covered.
+        *
+        * Multiple CST's may be involved and dcst points to the left hand
+        * edge.
+        */
+       if (lock->dstate) {
+               info.lock = lock;
+               info.cino = cino;
+               info.coll_cst = NULL;
+
+               spin_lock(&cino->spin);
+
+               /*
+                * Make sure cino has enough free CSTs to cover the operation,
+                * so we can hold the spinlock through the scan later on.
+                */
+               while (cino->free_cache == NULL ||
+                      cino->free_cache->free_next == NULL) {
+                       spin_unlock(&cino->spin);
+                       cst = kmalloc(sizeof(*cst), cino->domain->mcst,
+                                     M_WAITOK | M_ZERO);
+                       atomic_add_int(&cino->domain->cst_count, 1);
+                       spin_lock(&cino->spin);
+                       cst->free_next = cino->free_cache;
+                       cino->free_cache = cst;
+               }
+
+               /*
+                * The partitioning code runs with the spinlock held.  If
+                * we've already partitioned due to having to do an rstate
+                * upgrade we run a redo instead of a get.
+                */
+               info.rstate_upgrade_needed = 0;
+               if (use_redo == 0) {
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_get_match, &info);
+               } else {
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_redo_match, &info);
+               }
+
+               /*
+                * If a collision occured, undo the fragments we were able
+                * to obtain, block, and try again.
+                */
+               while (info.coll_cst != NULL) {
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_undo_match, &info);
+                       info.coll_cst->blocked = 1;
+                       info.coll_cst = NULL;
+                       ssleep(info.coll_cst, &cino->spin, 0, "ccmsget", hz);
+                       info.rstate_upgrade_needed = 0;
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_redo_match, &info);
+               }
+
+               /*
+                * If the rstate needs to be upgraded we have to undo the
+                * local locks (but we retain the partitioning).
+                *
+                * Set use_redo to indicate that the partioning was retained
+                * (i.e. lrefs and rrefs remain intact).
+                */
+               if (info.rstate_upgrade_needed) {
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_undo_match, &info);
+                       spin_unlock(&cino->spin);
+                       if (lock->astate)
+                               ccms_lstate_put(&cino->attr_cst);
+                       if (lock->tstate)
+                               ccms_lstate_put(&cino->topo_cst);
+                       spin_lock(&cino->spin);
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_upgrade_match, &info);
+                       spin_unlock(&cino->spin);
+                       use_redo = 1;
+                       goto again;
+               }
+
+               /*
+                * Cleanup free CSTs beyond the 2 we wish to retain.
+                */
+               cst = ccms_free_pass1(cino, 2);
+               spin_unlock(&cino->spin);
+               ccms_free_pass2(cst);
+       }
 
-    if (ccms_enable == 0) {
-       lock->ds = NULL;
+       /*
+        * Ok, everything is in good shape EXCEPT we might not have
+        * sufficient topo_cst.rstate.  It could have gotten ripped
+        * out from under us.  Once we have the local locks it can
+        * no longer be downgraded so a check here suffices.
+        */
+       highest_state = CCMS_STATE_ALLOWED;
+       if (highest_state < lock->tstate)
+               highest_state = lock->tstate;
+       if (highest_state < lock->astate)
+               highest_state = lock->astate;
+       if (highest_state < lock->dstate)
+               highest_state = lock->dstate;
+
+       if (cino->topo_cst.rstate < highest_state) {
+               if (lock->dstate) {
+                       spin_lock(&cino->spin);
+                       RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                               ccms_lock_put_match, &info);
+                       spin_unlock(&cino->spin);
+               }
+               if (lock->astate)
+                       ccms_lstate_put(&cino->attr_cst);
+               if (lock->tstate)
+                       ccms_lstate_put(&cino->topo_cst);
+               ccms_rstate_get(&cino->topo_cst, highest_state);
+               use_redo = 0;
+               goto again;
+       }
        return(0);
-    }
-
-    /*
-     * Partition the CST space so the precise range is covered and
-     * attempt to obtain the requested local lock (ltype) at the same
-     * time.
-     */
-    lock->ds = ds;
-    info.lock = lock;
-    info.ds = ds;
-    info.coll_cst = NULL;
-    info.cst1 = objcache_get(ccms_oc, M_WAITOK);
-    info.cst2 = objcache_get(ccms_oc, M_WAITOK);
-
-    spin_lock(&ds->spin);
-    RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
-           ccms_lock_get_match, &info);
-
-    /*
-     * If a collision occured, undo the fragments we were able to obtain,
-     * block, and try again.
-     */
-    while (info.coll_cst != NULL) {
-       RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_undo_cmp,
-               ccms_lock_undo_match, &info);
-       info.coll_cst->blocked = 1;
-       ssleep(info.coll_cst, &ds->spin, 0,
-              ((lock->ltype == CCMS_LTYPE_SHARED) ? "rngsh" : "rngex"),
-              hz);
-       info.coll_cst = NULL;
-       RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
-               ccms_lock_redo_match, &info);
-    }
-    cst = ds->delayed_free;
-    ds->delayed_free = NULL;
-    spin_unlock(&ds->spin);
-
-    /*
-     * Cleanup
-     */
-    ccms_delayed_free(cst);
-    if (info.cst1)
-       objcache_put(ccms_oc, info.cst1);
-    if (info.cst2)
-       objcache_put(ccms_oc, info.cst2);
-
-    return(0);
 }
 
 /*
- * Obtain a CCMS lock, initialize the lock structure from the uio.
+ * Obtain a CCMS lock, initialize the lock structure based on the uio.
  *
- * MPSAFE
+ * Both the attribute AND a ranged-data lock is acquired.
  */
 int
-ccms_lock_get_uio(ccms_dataspace_t ds, ccms_lock_t lock, struct uio *uio)
+ccms_lock_get_uio(ccms_inode_t *cino, ccms_lock_t *lock, struct uio *uio)
 {
-    ccms_ltype_t ltype;
-    off_t eoff;
-
-    if (uio->uio_rw == UIO_READ)
-       ltype = CCMS_LTYPE_SHARED;
-    else
-       ltype = CCMS_LTYPE_MODIFYING;
-
-    /*
-     * Calculate the ending offset (byte inclusive), make sure a seek
-     * overflow does not blow us up.
-     */
-    eoff = uio->uio_offset + uio->uio_resid - 1;
-    if (eoff < uio->uio_offset)
-       eoff = 0x7FFFFFFFFFFFFFFFLL;
-    ccms_lock_init(lock, uio->uio_offset, eoff, ltype);
-    return(ccms_lock_get(ds, lock));
+       ccms_state_t dstate;
+       ccms_off_t eoff;
+
+       if (uio->uio_rw == UIO_READ)
+               dstate = CCMS_STATE_SHARED;
+       else
+               dstate = CCMS_STATE_MODIFIED;
+
+       /*
+        * Calculate the ending offset (byte inclusive), make sure a seek
+        * overflow does not blow us up.
+        */
+       eoff = uio->uio_offset + uio->uio_resid - 1;
+       if (eoff < uio->uio_offset)
+               eoff = 0x7FFFFFFFFFFFFFFFLL;
+       lock->beg_offset = uio->uio_offset;
+       lock->end_offset = eoff;
+       lock->tstate = 0;
+       lock->astate = dstate;
+       lock->dstate = dstate;
+       return (ccms_lock_get(cino, lock));
+}
+
+/*
+ * Obtain a CCMS lock.  Only the attribute lock is acquired.
+ */
+int
+ccms_lock_get_attr(ccms_inode_t *cino, ccms_lock_t *lock, ccms_state_t astate)
+{
+       lock->tstate = 0;
+       lock->astate = astate;
+       lock->dstate = 0;
+       return (ccms_lock_get(cino, lock));
 }
 
 /*
@@ -253,148 +540,182 @@ ccms_lock_get_uio(ccms_dataspace_t ds, ccms_lock_t lock, struct uio *uio)
  */
 static
 int
-ccms_lock_get_match(ccms_cst_t cst, void *arg)
+ccms_lock_get_match(ccms_cst_t *cst, void *arg)
 {
-    struct ccms_lock_scan_info *info = arg;
-    ccms_lock_t lock = info->lock;
-    ccms_cst_t ncst;
-
-    /*
-     * If the lock's left edge is within the CST we must split the CST
-     * into two pieces [cst][ncst].  lrefs must be bumped on the CST
-     * containing the left edge.
-     *
-     * NOTE! cst->beg_offset may not be modified.  This allows us to avoid
-     * having to manipulate the cst's position in the tree.
-     */
-    if (lock->beg_offset > cst->beg_offset) {
-       ncst = info->cst1;
-       info->cst1 = NULL;
-       KKASSERT(ncst != NULL);
-       *ncst = *cst;
-       cst->end_offset = lock->beg_offset - 1;
-       cst->rrefs = 0;
-       ncst->beg_offset = lock->beg_offset;
-       ncst->lrefs = 1;
-       RB_INSERT(ccms_rb_tree, &info->ds->tree, ncst);
+       struct ccms_lock_scan_info *info = arg;
+       ccms_lock_t *lock = info->lock;
+       ccms_cst_t *ncst;
 
        /*
-        * ncst becomes our 'matching' cst.
+        * If the lock's left edge is within the CST we must split the CST
+        * into two pieces [cst][ncst].  lrefs must be bumped on the CST
+        * containing the left edge.
+        *
+        * NOTE! cst->beg_offset may not be modified.  This allows us to
+        *       avoid having to manipulate the cst's position in the tree.
         */
-       cst = ncst;
-    } else if (lock->beg_offset == cst->beg_offset) {
-       ++cst->lrefs;
-    }
-
-    /*
-     * If the lock's right edge is within the CST we must split the CST
-     * into two pieces [cst][ncst].  rrefs must be bumped on the CST
-     * containing the right edge.
-     *
-     * NOTE! cst->beg_offset may not be modified.  This allows us to avoid
-     * having to manipulate the cst's position in the tree.
-     */
-    if (lock->end_offset < cst->end_offset) {
-       ncst = info->cst2;
-       info->cst2 = NULL;
-       KKASSERT(ncst != NULL);
-       *ncst = *cst;
-       cst->end_offset = lock->end_offset;
-       cst->rrefs = 1;
-       ncst->beg_offset = lock->end_offset + 1;
-       ncst->lrefs = 0;
-       RB_INSERT(ccms_rb_tree, &info->ds->tree, ncst);
-       /* cst remains our 'matching' cst */
-    } else if (lock->end_offset == cst->end_offset) {
-       ++cst->rrefs;
-    }
-
-    /*
-     * The lock covers the CST, so increment the CST's coverage count.
-     * Then attempt to obtain the shared/exclusive ltype.
-     */
-    ++cst->xrefs;
-
-    if (info->coll_cst == NULL) {
-       switch(lock->ltype) {
-       case CCMS_LTYPE_SHARED:
-           if (cst->sharecount < 0) {
-               info->coll_cst = cst;
-           } else {
-               ++cst->sharecount;
-               if (ccms_enable >= 9) {
-                       kprintf("CST SHARE %d %lld-%lld\n",
-                               cst->sharecount,
-                               (long long)cst->beg_offset,
-                               (long long)cst->end_offset);
+       if (lock->beg_offset > cst->beg_offset) {
+               ncst = info->cino->free_cache;
+               info->cino->free_cache = ncst->free_next;
+               ncst->free_next = NULL;
+               KKASSERT(ncst != NULL);
+
+               *ncst = *cst;
+               cst->end_offset = lock->beg_offset - 1;
+               cst->rrefs = 0;
+               ncst->beg_offset = lock->beg_offset;
+               ncst->lrefs = 1;
+               RB_INSERT(ccms_rb_tree, &info->cino->tree, ncst);
+
+               /*
+                * ncst becomes our 'matching' cst.
+                */
+               cst = ncst;
+       } else if (lock->beg_offset == cst->beg_offset) {
+               ++cst->lrefs;
+       }
+
+       /*
+        * If the lock's right edge is within the CST we must split the CST
+        * into two pieces [cst][ncst].  rrefs must be bumped on the CST
+        * containing the right edge.
+        *
+        * NOTE! cst->beg_offset may not be modified.  This allows us to
+        * avoid having to manipulate the cst's position in the tree.
+        */
+       if (lock->end_offset < cst->end_offset) {
+               ncst = info->cino->free_cache;
+               info->cino->free_cache = ncst->free_next;
+               ncst->free_next = NULL;
+               KKASSERT(ncst != NULL);
+
+               *ncst = *cst;
+               cst->end_offset = lock->end_offset;
+               cst->rrefs = 1;
+               ncst->beg_offset = lock->end_offset + 1;
+               ncst->lrefs = 0;
+               RB_INSERT(ccms_rb_tree, &info->cino->tree, ncst);
+               /* cst remains our 'matching' cst */
+       } else if (lock->end_offset == cst->end_offset) {
+               ++cst->rrefs;
+       }
+
+       /*
+        * The lock covers the CST, so increment the CST's coverage count.
+        * Then attempt to obtain the shared/exclusive lock.  The coverage
+        * count is maintained until the put operation.
+        */
+       ++cst->xrefs;
+       if (cst->lstate < lock->dstate)
+               cst->lstate = lock->dstate;
+
+       /*
+        * If we have already collided we make no more modifications
+        * to cst->count, but we must continue the scan to properly
+        * partition the cst.
+        */
+       if (info->coll_cst)
+               return(0);
+
+       switch(lock->dstate) {
+       case CCMS_STATE_INVALID:
+               break;
+       case CCMS_STATE_ALLOWED:
+       case CCMS_STATE_SHARED:
+       case CCMS_STATE_SLAVE:
+               if (cst->count < 0) {
+                       info->coll_cst = cst;
+               } else {
+                       ++cst->count;
+                       if (ccms_debug >= 9) {
+                               kprintf("CST SHARE %d %lld-%lld\n",
+                                       cst->count,
+                                       (long long)cst->beg_offset,
+                                       (long long)cst->end_offset);
+                       }
                }
-           }
-           break;
-       case CCMS_LTYPE_EXCLUSIVE:
-           if (cst->sharecount != 0) {
-               info->coll_cst = cst;
-           } else {
-               --cst->sharecount;
-               if (ccms_enable >= 9) {
-                       kprintf("CST EXCLS %d %lld-%lld\n",
-                               cst->sharecount,
-                               (long long)cst->beg_offset,
-                               (long long)cst->end_offset);
+               break;
+       case CCMS_STATE_MASTER:
+       case CCMS_STATE_EXCLUSIVE:
+               if (cst->count != 0) {
+                       info->coll_cst = cst;
+               } else {
+                       --cst->count;
+                       if (ccms_debug >= 9) {
+                               kprintf("CST EXCLS %d %lld-%lld\n",
+                                       cst->count,
+                                       (long long)cst->beg_offset,
+                                       (long long)cst->end_offset);
+                       }
                }
-           }
-           break;
-       case CCMS_LTYPE_MODIFYING:
-           if (cst->sharecount != 0) {
-               info->coll_cst = cst;
-           } else {
-               --cst->sharecount;
-               ++cst->modifycount;
-               if (ccms_enable >= 9) {
-                       kprintf("CST MODXL %d %lld-%lld\n",
-                               cst->sharecount,
-                               (long long)cst->beg_offset,
-                               (long long)cst->end_offset);
+               break;
+       case CCMS_STATE_MODIFIED:
+               if (cst->count != 0) {
+                       info->coll_cst = cst;
+               } else {
+                       --cst->count;
+                       if (cst->lstate <= CCMS_STATE_EXCLUSIVE)
+                               cst->lstate = CCMS_STATE_MODIFIED;
+                       if (ccms_debug >= 9) {
+                               kprintf("CST MODXL %d %lld-%lld\n",
+                                       cst->count,
+                                       (long long)cst->beg_offset,
+                                       (long long)cst->end_offset);
+                       }
                }
-           }
-           break;
+               break;
+       default:
+               panic("ccms_lock_get_match: bad state %d\n", lock->dstate);
+               break;
        }
-    }
-    return(0);
+       return(0);
 }
 
 /*
  * Undo a partially resolved ccms_ltype rangelock.  This is atomic with
  * the scan/redo code so there should not be any blocked locks when
- * transitioning to 0.
+ * transitioning to 0.  lrefs and rrefs are not touched in order to
+ * retain the partitioning.
+ *
+ * If coll_cst is non-NULL we stop when we hit this element as locks on
+ * no further elements were obtained.  This element might not represent
+ * a left or right edge but coll_cst can only be non-NULL if the spinlock
+ * was held throughout the get/redo and the undo.
  *
  * NOTE: called with spinlock held.
  */
 static
 int
-ccms_lock_undo_match(ccms_cst_t cst, void *arg)
+ccms_lock_undo_match(ccms_cst_t *cst, void *arg)
 {
-    struct ccms_lock_scan_info *info = arg;
-    ccms_lock_t lock = info->lock;
-
-    switch(lock->ltype) {
-    case CCMS_LTYPE_SHARED:
-       KKASSERT(cst->sharecount > 0);
-       --cst->sharecount;
-       KKASSERT(cst->sharecount || cst->blocked == 0);
-       break;
-    case CCMS_LTYPE_EXCLUSIVE:
-       KKASSERT(cst->sharecount < 0);
-       ++cst->sharecount;
-       KKASSERT(cst->sharecount || cst->blocked == 0);
-       break;
-    case CCMS_LTYPE_MODIFYING:
-       KKASSERT(cst->sharecount < 0 && cst->modifycount > 0);
-       ++cst->sharecount;
-       --cst->modifycount;
-       KKASSERT(cst->sharecount || cst->blocked == 0);
-       break;
-    }
-    return(0);
+       struct ccms_lock_scan_info *info = arg;
+       ccms_lock_t *lock = info->lock;
+
+       if (cst == info->coll_cst)
+               return(-1);
+
+       switch (lock->dstate) {
+       case CCMS_STATE_INVALID:
+               break;
+       case CCMS_STATE_ALLOWED:
+       case CCMS_STATE_SHARED:
+       case CCMS_STATE_SLAVE:
+               KKASSERT(cst->count > 0);
+               --cst->count;
+               KKASSERT(cst->count || cst->blocked == 0);
+               break;
+       case CCMS_STATE_MASTER:
+       case CCMS_STATE_EXCLUSIVE:
+       case CCMS_STATE_MODIFIED:
+               KKASSERT(cst->count < 0);
+               ++cst->count;
+               KKASSERT(cst->count || cst->blocked == 0);
+               break;
+       default:
+               panic("ccms_lock_undo_match: bad state %d\n", lock->dstate);
+               break;
+       }
+       return(0);
 }
 
 /*
@@ -405,234 +726,269 @@ ccms_lock_undo_match(ccms_cst_t cst, void *arg)
  */
 static
 int
-ccms_lock_redo_match(ccms_cst_t cst, void *arg)
+ccms_lock_redo_match(ccms_cst_t *cst, void *arg)
 {
-    struct ccms_lock_scan_info *info = arg;
-    ccms_lock_t lock = info->lock;
-
-    if (info->coll_cst == NULL) {
-       switch(lock->ltype) {
-       case CCMS_LTYPE_SHARED:
-           if (cst->sharecount < 0) {
-               info->coll_cst = cst;
-           } else {
-               if (ccms_enable >= 9) {
-                       kprintf("CST SHARE %d %lld-%lld\n",
-                               cst->sharecount,
-                               (long long)cst->beg_offset,
-                               (long long)cst->end_offset);
+       struct ccms_lock_scan_info *info = arg;
+       ccms_lock_t *lock = info->lock;
+
+       KKASSERT(info->coll_cst == NULL);
+
+       switch(lock->dstate) {
+       case CCMS_STATE_INVALID:
+               break;
+       case CCMS_STATE_ALLOWED:
+       case CCMS_STATE_SHARED:
+       case CCMS_STATE_SLAVE:
+               if (cst->count < 0) {
+                       info->coll_cst = cst;
+               } else {
+                       if (ccms_debug >= 9) {
+                               kprintf("CST SHARE %d %lld-%lld\n",
+                                       cst->count,
+                                       (long long)cst->beg_offset,
+                                       (long long)cst->end_offset);
+                       }
+                       ++cst->count;
                }
-               ++cst->sharecount;
-           }
-           break;
-       case CCMS_LTYPE_EXCLUSIVE:
-           if (cst->sharecount != 0) {
-               info->coll_cst = cst;
-           } else {
-               --cst->sharecount;
-               if (ccms_enable >= 9) {
-                       kprintf("CST EXCLS %d %lld-%lld\n",
-                               cst->sharecount,
-                               (long long)cst->beg_offset,
-                               (long long)cst->end_offset);
+               break;
+       case CCMS_STATE_MASTER:
+       case CCMS_STATE_EXCLUSIVE:
+               if (cst->count != 0) {
+                       info->coll_cst = cst;
+               } else {
+                       --cst->count;
+                       if (ccms_debug >= 9) {
+                               kprintf("CST EXCLS %d %lld-%lld\n",
+                                       cst->count,
+                                       (long long)cst->beg_offset,
+                                       (long long)cst->end_offset);
+                       }
                }
-           }
-           break;
-       case CCMS_LTYPE_MODIFYING:
-           if (cst->sharecount != 0) {
-               info->coll_cst = cst;
-           } else {
-               --cst->sharecount;
-               ++cst->modifycount;
-               if (ccms_enable >= 9) {
-                       kprintf("CST MODXL %d %lld-%lld\n",
-                               cst->sharecount,
-                               (long long)cst->beg_offset,
-                               (long long)cst->end_offset);
+               break;
+       case CCMS_STATE_MODIFIED:
+               if (cst->count != 0) {
+                       info->coll_cst = cst;
+               } else {
+                       --cst->count;
+                       if (ccms_debug >= 9) {
+                               kprintf("CST MODXL %d %lld-%lld\n",
+                                       cst->count,
+                                       (long long)cst->beg_offset,
+                                       (long long)cst->end_offset);
+                       }
                }
-           }
-           break;
+               break;
+       default:
+               panic("ccms_lock_redo_match: bad state %d\n", lock->dstate);
+               break;
        }
-    }
-    return(0);
+
+       if (info->coll_cst)
+               return(-1);     /* stop the scan */
+       return(0);              /* continue the scan */
 }
 
 /*
- * Release a CCMS lock
+ * Upgrade the rstate for the matching range.
  *
- * MPSAFE
+ * NOTE: Called with spinlock held.
  */
+static
 int
-ccms_lock_put(ccms_dataspace_t ds, ccms_lock_t lock)
+ccms_lock_upgrade_match(ccms_cst_t *cst, void *arg)
 {
-    struct ccms_lock_scan_info info;
-    ccms_cst_t cst;
+       struct ccms_lock_scan_info *info = arg;
+       ccms_lock_t *lock = info->lock;
 
-    if (lock->ds == NULL)
+       /*
+        * ccms_rstate_get() can block so we must release the spinlock.
+        * To prevent the cst from getting ripped out on us we temporarily
+        * bump both lrefs and rrefs.
+        */
+       if (cst->rstate < lock->dstate) {
+               ++cst->lrefs;
+               ++cst->rrefs;
+               spin_unlock(&info->cino->spin);
+               ccms_rstate_get(cst, lock->dstate);
+               spin_lock(&info->cino->spin);
+               --cst->lrefs;
+               --cst->rrefs;
+       }
        return(0);
+}
 
-    lock->ds = NULL;
-    info.lock = lock;
-    info.ds = ds;
-    info.cst1 = NULL;
-    info.cst2 = NULL;
-
-    spin_lock(&ds->spin);
-    RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
-           ccms_lock_put_match, &info);
-    cst = ds->delayed_free;
-    ds->delayed_free = NULL;
-    spin_unlock(&ds->spin);
-
-    ccms_delayed_free(cst);
-    if (info.cst1)
-       objcache_put(ccms_oc, info.cst1);
-    if (info.cst2)
-       objcache_put(ccms_oc, info.cst2);
-    return(0);
+/*
+ * Release a previously acquired CCMS lock.
+ */
+int
+ccms_lock_put(ccms_inode_t *cino, ccms_lock_t *lock)
+{
+       struct ccms_lock_scan_info info;
+       ccms_cst_t *scan;
+
+       if (lock->tstate) {
+               ccms_lstate_put(lock->icst);
+               lock->tstate = 0;
+               lock->icst = NULL;
+       } else if (lock->astate) {
+               ccms_lstate_put(lock->icst);
+               lock->astate = 0;
+               lock->icst = NULL;
+       }
+
+       if (lock->dstate) {
+               info.lock = lock;
+               info.cino = cino;
+               spin_lock(&cino->spin);
+               RB_SCAN(ccms_rb_tree, &cino->tree, ccms_lock_scan_cmp,
+                       ccms_lock_put_match, &info);
+               scan = ccms_free_pass1(cino, 2);
+               spin_unlock(&cino->spin);
+               ccms_free_pass2(scan);
+               lock->dstate = 0;
+               lock->dcst = NULL;
+       }
+
+       return(0);
 }
 
 /*
+ * Release a local lock.  The related CST's lstate is set to INVALID once
+ * the coverage drops to 0 and adjacent compatible entries will be
+ * recombined.
+ *
  * NOTE: called with spinlock held.
  */
 static
 int
-ccms_lock_put_match(ccms_cst_t cst, void *arg)
+ccms_lock_put_match(ccms_cst_t *cst, void *arg)
 {
-    struct ccms_lock_scan_info *info = arg;
-    ccms_lock_t lock = info->lock;
-    ccms_cst_t ocst;
-
-    /*
-     * Undo the local shared/exclusive rangelock.
-     */
-    switch(lock->ltype) {
-    case CCMS_LTYPE_SHARED:
-       KKASSERT(cst->sharecount > 0);
-       --cst->sharecount;
-       if (ccms_enable >= 9) {
-               kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->sharecount,
-                       (long long)cst->beg_offset,
-                       (long long)cst->end_offset,
-                       cst->blocked);
-       }
-       if (cst->blocked && cst->sharecount == 0) {
-               cst->blocked = 0;
-               wakeup(cst);
-       }
-       break;
-    case CCMS_LTYPE_EXCLUSIVE:
-       KKASSERT(cst->sharecount < 0);
-       ++cst->sharecount;
-       if (ccms_enable >= 9) {
-               kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->sharecount,
-                       (long long)cst->beg_offset,
-                       (long long)cst->end_offset,
-                       cst->blocked);
-       }
-       if (cst->blocked && cst->sharecount == 0) {
-               cst->blocked = 0;
-               wakeup(cst);
-       }
-       break;
-    case CCMS_LTYPE_MODIFYING:
-       KKASSERT(cst->sharecount < 0 && cst->modifycount > 0);
-       ++cst->sharecount;
-       --cst->modifycount;
-       if (ccms_enable >= 9) {
-               kprintf("CST UNMOD %d %lld-%lld (%d)\n", cst->sharecount,
-                       (long long)cst->beg_offset,
-                       (long long)cst->end_offset,
-                       cst->blocked);
-       }
-       if (cst->blocked && cst->sharecount == 0) {
-               cst->blocked = 0;
-               wakeup(cst);
-       }
-       break;
-    }
-
-    /*
-     * Decrement the lock coverage count on the CST.  Decrement the left and
-     * right edge counts as appropriate.
-     *
-     * When lrefs or rrefs drops to zero we check the adjacent entry to
-     * determine whether a merge is possible.  If the appropriate refs field
-     * (rrefs for the entry to our left, lrefs for the entry to our right)
-     * is 0, then all covering locks must cover both entries and the xrefs
-     * field must match.  We can then merge the entries if they have
-     * compatible cache states.
-     *
-     * However, because we are cleaning up the shared/exclusive count at
-     * the same time, the sharecount field may be temporarily out of
-     * sync, so require that the sharecount field also match before doing
-     * a merge.
-     *
-     * When merging an element which is being blocked on, the blocking
-     * thread(s) will be woken up.
-     *
-     * If the dataspace has too many CSTs we may be able to merge the
-     * entries even if their cache states are not the same, by dropping
-     * both to a compatible (lower) cache state and performing the appropriate
-     * management operations.  XXX
-     */
-    --cst->xrefs;
-    if (lock->beg_offset == cst->beg_offset) {
-       --cst->lrefs;
-       if (cst->lrefs == 0) {
-           if ((ocst = RB_PREV(ccms_rb_tree, &info->ds->tree, cst)) != NULL &&
-               ocst->rrefs == 0 &&
-               ocst->state == cst->state &&
-               ocst->sharecount == cst->sharecount
-           ) {
-               KKASSERT(ocst->xrefs == cst->xrefs);
-               KKASSERT(ocst->end_offset + 1 == cst->beg_offset);
-               RB_REMOVE(ccms_rb_tree, &info->ds->tree, ocst);
-               cst->beg_offset = ocst->beg_offset;
-               cst->lrefs = ocst->lrefs;
-               if (ccms_enable >= 9) {
-                   kprintf("MERGELEFT %p %lld-%lld (%d)\n",
-                          ocst,
-                          (long long)cst->beg_offset,
-                          (long long)cst->end_offset,
-                          cst->blocked);
+       struct ccms_lock_scan_info *info = arg;
+       ccms_lock_t *lock = info->lock;
+       ccms_cst_t *ocst;
+
+       /*
+        * Undo the local shared/exclusive rangelock.
+        */
+       switch(lock->dstate) {
+       case CCMS_STATE_INVALID:
+               break;
+       case CCMS_STATE_ALLOWED:
+       case CCMS_STATE_SHARED:
+       case CCMS_STATE_SLAVE:
+               KKASSERT(cst->count > 0);
+               --cst->count;
+               if (ccms_debug >= 9) {
+                       kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->count,
+                               (long long)cst->beg_offset,
+                               (long long)cst->end_offset,
+                               cst->blocked);
                }
-               if (ocst->blocked) {
-                   ocst->blocked = 0;
-                   wakeup(ocst);
+               if (cst->blocked && cst->count == 0) {
+                       cst->blocked = 0;
+                       wakeup(cst);
                }
-               /*objcache_put(ccms_oc, ocst);*/
-               ocst->delayed_next = info->ds->delayed_free;
-               info->ds->delayed_free = ocst;
-           }
+               break;
+       case CCMS_STATE_MASTER:
+       case CCMS_STATE_EXCLUSIVE:
+       case CCMS_STATE_MODIFIED:
+               KKASSERT(cst->count < 0);
+               ++cst->count;
+               if (ccms_debug >= 9) {
+                       kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->count,
+                               (long long)cst->beg_offset,
+                               (long long)cst->end_offset,
+                               cst->blocked);
+               }
+               if (cst->blocked && cst->count == 0) {
+                       cst->blocked = 0;
+                       wakeup(cst);
+               }
+               break;
+       default:
+               panic("ccms_lock_put_match: bad state %d\n", lock->dstate);
+               break;
        }
-    }
-    if (lock->end_offset == cst->end_offset) {
-       --cst->rrefs;
-       if (cst->rrefs == 0) {
-           if ((ocst = RB_NEXT(ccms_rb_tree, &info->ds->tree, cst)) != NULL &&
-               ocst->lrefs == 0 &&
-               ocst->state == cst->state &&
-               ocst->sharecount == cst->sharecount
-           ) {
-               KKASSERT(ocst->xrefs == cst->xrefs);
-               KKASSERT(cst->end_offset + 1 == ocst->beg_offset);
-               RB_REMOVE(ccms_rb_tree, &info->ds->tree, ocst);
-               cst->end_offset = ocst->end_offset;
-               cst->rrefs = ocst->rrefs;
-               if (ccms_enable >= 9) {
-                   kprintf("MERGERIGHT %p %lld-%lld\n",
-                          ocst,
-                          (long long)cst->beg_offset,
-                          (long long)cst->end_offset);
+
+       /*
+        * Decrement the lock coverage count on the CST.  Decrement the left
+        * and right edge counts as appropriate.
+        *
+        * When lrefs or rrefs drops to zero we check the adjacent entry to
+        * determine whether a merge is possible.  If the appropriate refs
+        * field (rrefs for the entry to our left, lrefs for the entry to
+        * our right) is 0, then all covering locks must cover both entries
+        * and the xrefs field must match.  We can then merge the entries
+        * if they have compatible cache states.
+        *
+        * However, because we are cleaning up the shared/exclusive count
+        * at the same time, the count field may be temporarily out of
+        * sync, so require that the count field also match before doing
+        * a merge.
+        *
+        * When merging an element which is being blocked on, the blocking
+        * thread(s) will be woken up.
+        *
+        * If the dataspace has too many CSTs we may be able to merge the
+        * entries even if their cache states are not the same, by dropping
+        * both to a compatible (lower) cache state and performing the
+        * appropriate management operations.  XXX
+        */
+       if (--cst->xrefs == 0)
+               cst->lstate = CCMS_STATE_INVALID;
+
+       if (lock->beg_offset == cst->beg_offset && --cst->lrefs == 0) {
+               if ((ocst = RB_PREV(ccms_rb_tree,
+                                   &info->cino->tree, cst)) != NULL &&
+                   ocst->rrefs == 0 &&
+                   ocst->lstate == cst->lstate &&
+                   ocst->rstate == cst->rstate &&
+                   ocst->count == cst->count
+               ) {
+                       KKASSERT(ocst->xrefs == cst->xrefs);
+                       KKASSERT(ocst->end_offset + 1 == cst->beg_offset);
+                       RB_REMOVE(ccms_rb_tree, &info->cino->tree, ocst);
+                       cst->beg_offset = ocst->beg_offset;
+                       cst->lrefs = ocst->lrefs;
+                       if (ccms_debug >= 9) {
+                               kprintf("MERGELEFT %p %lld-%lld (%d)\n",
+                                      ocst,
+                                      (long long)cst->beg_offset,
+                                      (long long)cst->end_offset,
+                                      cst->blocked);
+                       }
+                       if (ocst->blocked) {
+                               ocst->blocked = 0;
+                               wakeup(ocst);
+                       }
+                       ocst->free_next = info->cino->free_cache;
+                       info->cino->free_cache = ocst;
                }
-               /*objcache_put(ccms_oc, ocst);*/
-               ocst->delayed_next = info->ds->delayed_free;
-               info->ds->delayed_free = ocst;
-           }
        }
-    }
-    return(0);
+       if (lock->end_offset == cst->end_offset && --cst->rrefs == 0) {
+               if ((ocst = RB_NEXT(ccms_rb_tree,
+                                   &info->cino->tree, cst)) != NULL &&
+                   ocst->lrefs == 0 &&
+                   ocst->lstate == cst->lstate &&
+                   ocst->rstate == cst->rstate &&
+                   ocst->count == cst->count
+               ) {
+                       KKASSERT(ocst->xrefs == cst->xrefs);
+                       KKASSERT(cst->end_offset + 1 == ocst->beg_offset);
+                       RB_REMOVE(ccms_rb_tree, &info->cino->tree, ocst);
+                       cst->end_offset = ocst->end_offset;
+                       cst->rrefs = ocst->rrefs;
+                       if (ccms_debug >= 9) {
+                               kprintf("MERGERIGHT %p %lld-%lld\n",
+                                      ocst,
+                                      (long long)cst->beg_offset,
+                                      (long long)cst->end_offset);
+                       }
+                       ocst->free_next = info->cino->free_cache;
+                       info->cino->free_cache = ocst;
+               }
+       }
+       return(0);
 }
 
 /*
@@ -640,13 +996,13 @@ ccms_lock_put_match(ccms_cst_t cst, void *arg)
  * compares two CSTs.
  */
 static int
-ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2)
+ccms_cst_cmp(ccms_cst_t *b1, ccms_cst_t *b2)
 {
-    if (b1->end_offset < b2->beg_offset)
-       return(-1);
-    if (b1->beg_offset > b2->end_offset)
-       return(1);
-    return(0);
+       if (b1->end_offset < b2->beg_offset)
+               return(-1);
+       if (b1->beg_offset > b2->end_offset)
+               return(1);
+       return(0);
 }
 
 /*
@@ -655,32 +1011,171 @@ ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2)
  * placement relative to the lock.
  */
 static int
-ccms_lock_scan_cmp(ccms_cst_t cst, void *arg)
+ccms_lock_scan_cmp(ccms_cst_t *cst, void *arg)
 {
-    struct ccms_lock_scan_info *info = arg;
-    ccms_lock_t lock = info->lock;
-
-    if (cst->end_offset < lock->beg_offset)
-       return(-1);
-    if (cst->beg_offset > lock->end_offset)
-       return(1);
-    return(0);
+       struct ccms_lock_scan_info *info = arg;
+       ccms_lock_t *lock = info->lock;
+
+       if (cst->end_offset < lock->beg_offset)
+               return(-1);
+       if (cst->beg_offset > lock->end_offset)
+               return(1);
+       return(0);
+}
+
+/************************************************************************
+ *             STANDALONE LSTATE AND RSTATE SUPPORT FUNCTIONS          *
+ ************************************************************************
+ *
+ * These functions are used to perform work on the attr_cst and topo_cst
+ * embedded in a ccms_inode, and to issue remote state operations.  These
+ * functions are called without the ccms_inode spinlock held.
+ */
+
+static
+void
+ccms_lstate_get(ccms_cst_t *cst, ccms_state_t state)
+{
+       int blocked;
+
+       spin_lock(&cst->cino->spin);
+       ++cst->xrefs;
+
+       for (;;) {
+               blocked = 0;
+
+               switch(state) {
+               case CCMS_STATE_INVALID:
+                       break;
+               case CCMS_STATE_ALLOWED:
+               case CCMS_STATE_SHARED:
+               case CCMS_STATE_SLAVE:
+                       if (cst->count < 0) {
+                               blocked = 1;
+                       } else {
+                               ++cst->count;
+                               if (ccms_debug >= 9) {
+                                       kprintf("CST SHARE %d %lld-%lld\n",
+                                               cst->count,
+                                               (long long)cst->beg_offset,
+                                               (long long)cst->end_offset);
+                               }
+                       }
+                       break;
+               case CCMS_STATE_MASTER:
+               case CCMS_STATE_EXCLUSIVE:
+                       if (cst->count != 0) {
+                               blocked = 1;
+                       } else {
+                               --cst->count;
+                               if (ccms_debug >= 9) {
+                                       kprintf("CST EXCLS %d %lld-%lld\n",
+                                               cst->count,
+                                               (long long)cst->beg_offset,
+                                               (long long)cst->end_offset);
+                               }
+                       }
+                       break;
+               case CCMS_STATE_MODIFIED:
+                       if (cst->count != 0) {
+                               blocked = 1;
+                       } else {
+                               --cst->count;
+                               if (cst->lstate <= CCMS_STATE_EXCLUSIVE)
+                                       cst->lstate = CCMS_STATE_MODIFIED;
+                               if (ccms_debug >= 9) {
+                                       kprintf("CST MODXL %d %lld-%lld\n",
+                                               cst->count,
+                                               (long long)cst->beg_offset,
+                                               (long long)cst->end_offset);
+                               }
+                       }
+                       break;
+               default:
+                       panic("ccms_lock_get_match: bad state %d\n", state);
+                       break;
+               }
+               if (blocked == 0)
+                       break;
+               ssleep(cst, &cst->cino->spin, 0, "ccmslget", hz);
+       }
+       if (cst->lstate < state)
+               cst->lstate = state;
+       spin_unlock(&cst->cino->spin);
+}
+
+static
+void
+ccms_lstate_put(ccms_cst_t *cst)
+{
+       spin_lock(&cst->cino->spin);
+
+       switch(cst->lstate) {
+       case CCMS_STATE_INVALID:
+               break;
+       case CCMS_STATE_ALLOWED:
+       case CCMS_STATE_SHARED:
+       case CCMS_STATE_SLAVE:
+               KKASSERT(cst->count > 0);
+               --cst->count;
+               if (ccms_debug >= 9) {
+                       kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->count,
+                               (long long)cst->beg_offset,
+                               (long long)cst->end_offset,
+                               cst->blocked);
+               }
+               if (cst->blocked && cst->count == 0) {
+                       cst->blocked = 0;
+                       wakeup(cst);
+               }
+               break;
+       case CCMS_STATE_MASTER:
+       case CCMS_STATE_EXCLUSIVE:
+       case CCMS_STATE_MODIFIED:
+               KKASSERT(cst->count < 0);
+               ++cst->count;
+               if (ccms_debug >= 9) {
+                       kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->count,
+                               (long long)cst->beg_offset,
+                               (long long)cst->end_offset,
+                               cst->blocked);
+               }
+               if (cst->blocked && cst->count == 0) {
+                       cst->blocked = 0;
+                       wakeup(cst);
+               }
+               break;
+       default:
+               panic("ccms_lock_put_match: bad state %d\n", cst->lstate);
+               break;
+       }
+
+       if (--cst->xrefs == 0)
+               cst->lstate = CCMS_STATE_INVALID;
+       spin_unlock(&cst->cino->spin);
 }
 
 /*
- * This function works like ccms_lock_scan_cmp but terminates at the
- * collision point rather then at the lock's ending offset.  Only
- * the CSTs that were already partially resolved are returned by the scan.
+ * XXX third-party interaction & granularity
  */
-static int
-ccms_lock_undo_cmp(ccms_cst_t cst, void *arg)
+static
+void
+ccms_rstate_get(ccms_cst_t *cst, ccms_state_t state)
+{
+       spin_lock(&cst->cino->spin);
+       if (cst->rstate < state)
+               cst->rstate = state;
+       spin_unlock(&cst->cino->spin);
+}
+
+/*
+ * XXX third-party interaction & granularity
+ */
+static
+void
+ccms_rstate_put(ccms_cst_t *cst)
 {
-    struct ccms_lock_scan_info *info = arg;
-    ccms_lock_t lock = info->lock;
-
-    if (cst->end_offset < lock->beg_offset)
-       return(-1);
-    if (cst->beg_offset >= info->coll_cst->beg_offset)
-       return(1);
-    return(0);
+       spin_lock(&cst->cino->spin);
+       cst->rstate = CCMS_STATE_INVALID;
+       spin_unlock(&cst->cino->spin);
 }
index 00107b2..c677d36 100644 (file)
  * SUCH DAMAGE.
  */
 /*
+ * This module is HAMMER2-independent.
+ *
  * CCMS - Cache Coherency Management System.  These structures are used
- * to manage cache coherency and locking for an object.  Cache Coherency is
- * managed at byte granularity with 64 bit offset ranges.
- *
- * Management is broken into two distinct pieces: (1) Local shared/exclusive
- * locks which essentially replace traditional vnode locks and (2) local
- * cache state which interacts with other hosts and follows a MESI-like model.
- *
- * The core to the entire module is the 'CST' (Cache State Tree) structure
- * which stores both pieces of information in a red-black tree styled data
- * structure.  CSTs are non-overlapping offset-ranged entities.  Other
- * higher level structures govern how CSTs in the red-black tree or cut up
- * or merged.
+ * to manage cache coherency and locking for an object.
+ *
+ *                             ccms_inode
+ *
+ * Cache coherency is tied into a kernel or VFS structure, creating a
+ * directory/file topology and a keyspace on an inode-by-inode basis
+ * via the (ccms_inode) structure.
+ *
+ * Each CCMS inode contains a RB-Tree holding ccms_cst (CST) elements
+ * for its file range or directory key range, plus two independent embedded
+ * ccms_cst structures representing the inode attributes and the entire
+ * recursive sub-tree.
+ *
+ * The CST representing the entire sub-tree is inclusive of that inode's
+ * attribute state and data/key range state AND inclusive of the entire
+ * filesystem topology under that point, recursively.
+ *
+ * Two ccms_cst's are embedded in each cached inode via the ccms_inode
+ * structure to represent attribute and recursive topological cache state.
+ *
+ *                              ccms_cst
+ *
+ * The (ccms_cst) structure, called the CST, represents specific, persistent
+ * cache state.  This structure is allocated and freed on the fly as needed
+ * (except for the two embedded in the ccms_inode).
+ *
+ * The persistence ties into network/cluster operations via the 'rstate'
+ * field.  When cluster-maintained state is present then certain operations
+ * on the CST's local state (including when a vnode is reclaimed) will
+ * block while third-party synchronization occurs.
+ *
+ * The number of dynamically allocated CSTs is strictly limited, forcing
+ * a degree of aggregation when the limit is reached.
+ *
+ *                              ccms_lock
+ *
+ * The (ccms_lock) structure represents a live local lock for the duration of
+ * any given filesystem operation.  A single ccms_lock can cover both
+ * attribute state AND a byte-range/key-range.
+ *
+ * This lock represents the exact lock being requested but the CST structure
+ * it points to can be a more general representation which covers the lock.
+ * The minimum granularity for the cst pointer in the ccms_lock will be to
+ * the ccms_inode's embedded topo_cst.
+ *
+ * Theoretically a single CST at the root can cover the entire filesystem,
+ * but this creates a great deal of SMP interaction.
+ *
+ *                                Management
+ *
+ * Because cache state is persistent the CCMS module may desire to limit the
+ * total number of CSTs under management.  It does this by aggregating cache
+ * state which in turn may require callbacks to invalidate third-party
+ * (cluster-related) cache state.
+ *
+ * CCMS operations related to locks can stall on third-party state
+ * transitions.  Because third-party state can also change independently
+ * due to foreign interactions (often with a userland program), no filesystem
+ * lock can be held while manipulating CST states.  For this reason,
+ * HAMMER2 (or any VFS using CCMS) must provide roll-up functions to acquire
+ * CCMS lock state up-front prior to locking the VFS inode structure.
+ *
+ * vnode locks which are under the control of the filesystem can be more
+ * problematic and may require additional care.
  */
 
 #ifndef _SYS_CCMS_H_
 #include <sys/tree.h>
 #endif
 
+typedef uint64_t       ccms_off_t;
+typedef uint8_t                ccms_state_t;
+
 /*
- * CCMS uses a red-black tree to sort CSTs.
+ * CCMS uses a red-black tree to organize CSTs.
  */
 RB_HEAD(ccms_rb_tree, ccms_cst);
-RB_PROTOTYPE3(ccms_rb_tree, ccms_cst, rbnode, ccms_cst_cmp, off_t);
+RB_PROTOTYPE3(ccms_rb_tree, ccms_cst, rbnode, ccms_cst_cmp, ccms_off_t);
 
-struct ccms_lock;
+struct ccms_inode;
 struct ccms_cst;
+struct ccms_lock;
 
 /*
- * ccms_state_t - CCMS cache states
+ * CCMS cache states
  *
- * CCMS uses an extended MESI caching model.  There are two extension
- * states, MASTER and SLAVE, which represents dirty data which has not been
+ * CCMS uses an extended MESI caching model.  There are two extension states,
+ * MASTER and SLAVE, which represents dirty data which has not been
  * synchronized to backing store but which nevertheless is being shared
  * between distinct caches.   These states are designed to allow data
  * to be shared between nodes in a cluster without having to wait for it
  * to be synchronized with its backing store.
  *
- * SLAVE     - A shared state where the master copy of the data is being
- *             held by a foreign cache rather then by backing store.
- *             This state implies that the backing store may contain stale
- *             data.
- *
- * MASTER    - A shared state where the master copy of the data is being
- *             held locally.  Zero or more foreign caches may be holding
- *             a copy of our data, so we cannot modify it without
- *             invalidating those caches.  This state implies that the
- *             backing store may contain stale data.
- *
- *             MASTER differs from MODIFIED in that the data is read-only
- *             due to the existance of foreign copies.  However, even though
- *             the data is read-only, it is ALSO DIRTY because the backing
- *             store has not been synchronized
- *
- * NOTE!  The cache state represents the worst case cache state for caching
- * elements such as the buffer cache or VM page cache or the vnode attribute
- * cache (or other things) within the specified range.  It does NOT mean
- * that the local machine actually has all of the requested data in-hand.
- */
-typedef enum ccms_state {
-    CCMS_STATE_INVALID = 0,
-    CCMS_STATE_SHARED,         /* clean, read-only, from backing store */
-    CCMS_STATE_SLAVE,          /* clean, read-only, from master */
-    CCMS_STATE_MASTER,         /* dirty, read-only, shared master copy */
-    CCMS_STATE_EXCLUSIVE,      /* clean, read-only, exclusive */
-    CCMS_STATE_MODIFIED                /* clean or dirty, read-write, exclusive */
-} ccms_state_t;
-
-/*
- * ccms_ltype_t - local access control lock state
- *
- * Note: A MODIFYING lock is an exclusive lock where the caller intends to
- * make a modification, such as issuing a WRITE.  The difference between the
- * two is in how the cache state is effected by the lock.   The distinction
- * exists because there are many situations where the governing structure
- * on the local machine needs to be locked exclusively, but the underlying
- * data cache does not.
- *
- *     lock type       cache state
- *     ---------       ---------
- *     SHARED          >= shared
- *     EXCLUSIVE       >= shared
- *     MODIFYING       >= exclusive
- */
-typedef enum ccms_ltype {
-    CCMS_LTYPE_SHARED = 0,     /* shared lock on the range */
-    CCMS_LTYPE_EXCLUSIVE,      /* exclusive lock on the range */
-    CCMS_LTYPE_MODIFYING       /* modifying lock on the range */
-} ccms_ltype_t;
-
-/*
- * The CCMS ABI information structure.  This structure contains ABI
- * calls to resolve incompatible cache states.
+ * Each CST has lstate and rstate.  lstate is the local cache state and rstate
+ * is the remotely-granted state.  Changes to the lstate require a compatible
+ * rstate.  If the rstate is not compatible a third-party transaction is
+ * required to obtain the proper rstate.
+ *
+ * INVALID   - Cache state is unknown and must be acquired.
+ *
+ * ALLOWED   -  (topo_cst.rstate only).  This is a granted state which
+ *             allows cache state transactions underneath the current
+ *             node (data, attribute, and recursively), but is not a proper
+ *             grant for topo_cst itself.  Someone specifically trying to
+ *             acquire topo_cst still needs to do a third party transaction
+ *             to get the cache into the proper state.
+ *
+ * SHARED    -  Indicates that the information is clean, shared, read-only.
+ *
+ * SLAVE     -  Indicates that the information is clean, shared, read-only.
+ *             Indicates that local backing store is out of date but the
+ *             in-memory cache is valid, meaning that we can only obtain
+ *             the data from the MASTER (somewhere in the cluster), and
+ *             that we may not be allowed to sync it to local backing
+ *             store yet e.g. due to the quorum protocol not having
+ *             completed.
+ *
+ * MASTER    -  Indicates that the information is dirty, but readonly
+ *             because other nodes in the cluster are in a SLAVE state.
+ *             This state is typically transitional and occurs while
+ *             a quorum operation is in progress, allowing slaves to
+ *             access the data without stalling.
+ *
+ * EXCLUSIVE - Indicates that the information is clean, read-only, and
+ *             that nobody else can access the data while we are in this
+ *             state.  A local node can upgrade both rstate and lstate
+ *             from EXCLUSIVE to MODIFIED without having to perform a
+ *             third-party transaction.
+ *
+ * MODIFIED  -  Indicates that the information is dirty, read-write, and
+ *             that nobody else can access the data while we are in this
+ *             state.
+ *
+ * It is important to note that remote cache-state grants can be more
+ * general than what was requested, plus they can be persistent.  So,
+ * for example, a remote can grant EXCLUSIVE access even if you just
+ * requested SHARED, which saves you from having to do another network
+ * transaction if you later need EXCLUSIVE.
  */
-struct ccms_info {
-    int        (*ccms_set_cache)(struct ccms_info *, struct ccms_lock *, ccms_state_t);
-    void *data;
-    /* XXX */
-};
 
-/*
- * A CCMS dataspace, typically stored in a vnode or VM object.   The primary
- * reference is to the ccms_dataspace representing the local machine.  The
- * chain field is used to link ccms_dataspace's representing other machines.
- * These foreign representations typically only contain summary 'worst-case'
- * CSTs.  The chain only needs to be followed if a CST has a cache state
- * that is incompatible with the request.
- */
-struct ccms_dataspace {
-    struct ccms_rb_tree        tree;
-    struct ccms_info   *info;
-    struct ccms_dataspace *chain;
-    ccms_state_t       defstate;
-    struct spinlock    spin;
-    struct ccms_cst    *delayed_free;  /* delayed frees */
-};
+#define CCMS_STATE_INVALID     0       /* unknown cache state */
+#define CCMS_STATE_ALLOWED     1       /* allow subsystem (topo only) */
+#define CCMS_STATE_SHARED      2       /* clean, shared, read-only */
+#define CCMS_STATE_SLAVE       3       /* live only, shared, read-only */
+#define CCMS_STATE_MASTER      4       /* dirty, shared, read-only */
+#define CCMS_STATE_EXCLUSIVE   5       /* clean, exclusive, read-only */
+#define CCMS_STATE_MODIFIED    6       /* dirty, exclusive, read-write */
 
 /*
- * The CCMS locking element - represents a high level locking request,
- * such as used by read, write, and truncate operations.  These requests
- * are not organized into any tree but instead are shadowed by items in
- * the actual cache state tree (ccms_cst).  There are no direct links
- * between a ccms_lock and the underlying CST items, only reference count
- * fields in the CST item.
+ * A CCMS locking element - represents a high level locking request,
+ * such as used by read, write, and attribute operations.  Initialize
+ * the ccms_lock structure and call ccms_lock_get().
  *
  * When a CCMS lock is established the cache state of the underlying elements
  * is adjusted to meet the requirements of the lock.  The cache state
- * requirements are infered by the lock type:
+ * requirements are infered by the lock type.  CCMS locks can block on
+ * third party interactions if the underlying remote cache state is not
+ * compatible.
  *
- * NOTE: Ranges may include negative offsets.  These are typically used to
- * represent meta-data.
- *
- *               local lock            cache state
- *               -----------------     --------------------
- * SHARED      - SHARED                must not be invalid
- * EXCLUSIVE   - EXCLUSIVE             must not be invalid
- * MODIFYING   - EXCLUSIVE             must be EXCLUSIVE or MODIFIED
+ * CCMS data locks imply a shared CCMS inode lock.  A CCMS topology lock does
+ * not imply a data or inode lock but topology locks can have far-reaching
+ * effects and block on numerous CST state.
  */
 struct ccms_lock {
-       struct ccms_dataspace *ds;
-       off_t   beg_offset;
-       off_t   end_offset;
-       ccms_ltype_t ltype;
+       ccms_state_t    tstate;
+       ccms_state_t    astate;
+       ccms_state_t    dstate;
+       ccms_off_t      beg_offset;     /* applies to dstate */
+       ccms_off_t      end_offset;     /* applies to dstate */
+       struct ccms_cst *icst;          /* points to topo_cst or attr_cst */
+       struct ccms_cst *dcst;          /* points to left edge in rbtree */
+#ifdef CCMS_DEBUG
+       TAILQ_ENTRY(ccms_lock) entry;
+#endif
 };
 
+#ifdef CCMS_DEBUG
+
+TAILQ_HEAD(ccms_lock_head, ccms_lock);
+
+#endif
+
 /*
  * CCMS cache state tree element (CST) - represents the actual cache
  * management state for a data space.  The cache state tree is a
@@ -212,10 +252,8 @@ struct ccms_lock {
  * initialized database typically has a single CST representing the default
  * cache state for the host.
  *
- * A CST represents *TWO* different things.  First, it represents local
- * locks held on data ranges.  Second, it represents the best-case cache
- * state for data cached on the local machine for local<->remote host
- * interactions.
+ * A CST keeps track of local cache state (lstate) AND remote cache state
+ * (rstate).
  *
  * Any arbitrary data range within a dataspace can be locked shared or
  * exclusive.  Obtaining a lock has the side effect of potentially modifying
@@ -227,51 +265,106 @@ struct ccms_lock {
  *
  * The end offset is byte-inclusive, allowing the entire 64 bit data space
  * to be represented without overflowing the edge case.  For example, a
- * 64 byte area might be represented as (0,63).  The offsets are SIGNED
- * entities.  Negative offsets are often used to represent meta-data
- * such as ownership and permissions.  The file size is typically cached as a
- * side effect of file operations occuring at the file EOF rather then
- * combined with ownership and permissions.
+ * 64 byte area might be represented as (0,63).  The offsets are UNSIGNED
+ * entities.
  */
 struct ccms_cst {
        RB_ENTRY(ccms_cst) rbnode;      /* stored in a red-black tree */
-       struct  ccms_cst *delayed_next; /* linked list to free */
-       off_t   beg_offset;
-       off_t   end_offset;
-       ccms_state_t state;             /* local cache state */
-       int     sharecount;             /* shared or exclusive lock count */
-       int     modifycount;            /* number of modifying exclusive lks */
-       int     blocked;                /* indicates a blocked lock request */
-       int     xrefs;                  /* lock overlap references */
-       int     lrefs;                  /* left edge refs */
-       int     rrefs;                  /* right edge refs */
+       struct ccms_cst *free_next;     /* free cache linked list */
+       struct ccms_inode *cino;        /* related ccms_inode */
+       ccms_off_t beg_offset;          /* range (inclusive) */
+       ccms_off_t end_offset;          /* range (inclusive) */
+       ccms_state_t lstate;            /* local cache state */
+       ccms_state_t rstate;            /* cache state granted by protocol */
+
+       int32_t flags;
+       int32_t count;                  /* shared/exclusive count */
+       int32_t blocked;                /* indicates a blocked lock request */
+       int32_t xrefs;                  /* lock overlap references */
+       int32_t lrefs;                  /* left edge refs */
+       int32_t rrefs;                  /* right edge refs */
+#ifdef CCMS_DEBUG
+       struct ccms_lock_head list;
+#endif
 };
 
-typedef struct ccms_info *ccms_info_t;
-typedef struct ccms_dataspace *ccms_dataspace_t;
-typedef struct ccms_lock *ccms_lock_t;
-typedef struct ccms_cst *ccms_cst_t;
+#define CCMS_CST_DYNAMIC       0x00000001
+#define CCMS_CST_DELETING      0x00000002
+#define CCMS_CST_INSERTED      0x00000004
+#define CCMS_CST_INHERITED     0x00000008      /* rstate inherited from par */
+
+/*
+ * A CCMS inode is typically embedded in a VFS file or directory object.
+ *
+ * The subdirectory topology is accessible downward by indexing topo_cst's
+ * from the children in the parent's cst_tree.
+ *
+ * attr_cst is independent of data-range CSTs.  However, adjustments to
+ * the topo_cst can have far-reaching effects to attr_cst, the CSTs in
+ * the tree, recursively both downward and upward.
+ */
+struct ccms_inode {
+       struct spinlock         spin;
+       struct ccms_inode       *parent;
+       struct ccms_rb_tree     tree;
+       struct ccms_cst         attr_cst;
+       struct ccms_cst         topo_cst;
+       struct ccms_cst         *free_cache;    /* cst free cache */
+       struct ccms_domain      *domain;
+       void                    *handle;        /* VFS opaque */
+       int32_t                 flags;
+};
+
+#define CCMS_INODE_INSERTED    0x0001
+#define CCMS_INODE_DELETING    0x0002
+
+/*
+ * Domain management, contains a pseudo-root for the CCMS topology.
+ */
+struct ccms_domain {
+       struct malloc_type      *mcst;          /* malloc space for cst's */
+       struct ccms_inode       root;           /* dummy protocol root */
+       int                     cst_count;      /* dynamic cst count */
+       int                     cst_limit;      /* dynamic cst limit */
+};
+
+typedef struct ccms_lock       ccms_lock_t;
+typedef struct ccms_cst                ccms_cst_t;
+typedef struct ccms_inode      ccms_inode_t;
+typedef struct ccms_domain     ccms_domain_t;
 
 /*
  * Kernel API
  */
 #ifdef _KERNEL
 
+/*
+ * Helper inline to initialize primarily a dstate lock which shortcuts
+ * the more common locking operations.  A dstate is specified and an
+ * astate is implied.  tstate locks cannot be acquired with this inline.
+ */
 static __inline
 void
-ccms_lock_init(ccms_lock_t lock, off_t beg_offset, off_t end_offset,
-              ccms_ltype_t ltype)
+ccms_lock_init(ccms_lock_t *lock, ccms_state_t dstate,
+              ccms_off_t beg_offset, ccms_off_t end_offset)
 {
-    lock->beg_offset = beg_offset;
-    lock->end_offset = end_offset;
-    lock->ltype = ltype;
+       lock->beg_offset = beg_offset;
+       lock->end_offset = end_offset;
+       lock->tstate = 0;
+       lock->astate = 0;
+       lock->dstate = dstate;
 }
 
-void ccms_dataspace_init(ccms_dataspace_t);
-void ccms_dataspace_destroy(ccms_dataspace_t);
-int ccms_lock_get(ccms_dataspace_t, ccms_lock_t);
-int ccms_lock_get_uio(ccms_dataspace_t, ccms_lock_t, struct uio *);
-int ccms_lock_put(ccms_dataspace_t, ccms_lock_t);
+void ccms_domain_init(ccms_domain_t *dom);
+void ccms_inode_init(ccms_domain_t *dom, ccms_inode_t *cino, void *handle);
+void ccms_inode_insert(ccms_inode_t *cpar, ccms_inode_t *cino);
+void ccms_inode_delete(ccms_inode_t *cino);
+void ccms_inode_uninit(ccms_inode_t *cino);
+
+int ccms_lock_get(ccms_inode_t *cino, ccms_lock_t *lock);
+int ccms_lock_get_uio(ccms_inode_t *cino, ccms_lock_t *lock, struct uio *uio);
+int ccms_lock_get_attr(ccms_inode_t *cino, ccms_lock_t *lock, ccms_state_t st);
+int ccms_lock_put(ccms_inode_t *cino, ccms_lock_t *lock);
 
 #endif