kernel - shmid_ds structure needs to change on 64-bit :-(
[dragonfly.git] / sys / kern / kern_ccms.c
1 /*
2  * Copyright (c) 2006 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sys/kern/kern_ccms.c,v 1.4 2007/04/30 07:18:53 dillon Exp $
35  */
36 /*
37  * The Cache Coherency Management System (CCMS)
38  */
39
40 #include <sys/param.h>
41 #include <sys/systm.h>
42 #include <sys/kernel.h>
43 #include <sys/malloc.h>
44 #include <sys/objcache.h>
45 #include <sys/ccms.h>
46 #include <sys/sysctl.h>
47 #include <sys/uio.h>
48 #include <machine/limits.h>
49
50 #include <sys/spinlock2.h>
51
52 struct ccms_lock_scan_info {
53         ccms_dataspace_t ds;
54         ccms_lock_t lock;
55         ccms_cst_t  cst1;
56         ccms_cst_t  cst2;
57         ccms_cst_t  coll_cst;
58 };
59
60 static int ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2);
61 static int ccms_lock_scan_cmp(ccms_cst_t b1, void *arg);
62 static int ccms_lock_undo_cmp(ccms_cst_t b1, void *arg);
63 static int ccms_dataspace_destroy_match(ccms_cst_t cst, void *arg);
64 static int ccms_lock_get_match(struct ccms_cst *cst, void *arg);
65 static int ccms_lock_undo_match(struct ccms_cst *cst, void *arg);
66 static int ccms_lock_redo_match(struct ccms_cst *cst, void *arg);
67 static int ccms_lock_put_match(struct ccms_cst *cst, void *arg);
68
69 RB_GENERATE3(ccms_rb_tree, ccms_cst, rbnode, ccms_cst_cmp,
70              off_t, beg_offset, end_offset);
71 static MALLOC_DEFINE(M_CCMS, "CCMS", "Cache Coherency Management System");
72
73 static int ccms_enable;
74 SYSCTL_INT(_kern, OID_AUTO, ccms_enable, CTLFLAG_RW, &ccms_enable, 0, "");
75
76 static struct objcache *ccms_oc;
77
78 /*
79  * Initialize the CCMS subsystem
80  */
81 static void
82 ccmsinit(void *dummy)
83 {
84     ccms_oc = objcache_create_simple(M_CCMS, sizeof(struct ccms_cst));
85 }
86 SYSINIT(ccms, SI_BOOT2_MACHDEP, SI_ORDER_ANY, ccmsinit, NULL);
87
88 /*
89  * Initialize a new CCMS dataspace.  Create a new RB tree with a single
90  * element covering the entire 64 bit offset range.  This simplifies 
91  * algorithms enormously by removing a number of special cases.
92  */
93 void
94 ccms_dataspace_init(ccms_dataspace_t ds)
95 {
96     ccms_cst_t cst;
97
98     RB_INIT(&ds->tree);
99     ds->info = NULL;
100     ds->chain = NULL;
101     cst = objcache_get(ccms_oc, M_WAITOK);
102     bzero(cst, sizeof(*cst));
103     cst->beg_offset = LLONG_MIN;
104     cst->end_offset = LLONG_MAX;
105     cst->state = CCMS_STATE_INVALID;
106     RB_INSERT(ccms_rb_tree, &ds->tree, cst);
107     spin_init(&ds->spin);
108 }
109
110 /*
111  * Helper to destroy deleted cst's.
112  */
113 static __inline
114 void
115 ccms_delayed_free(ccms_cst_t cstn)
116 {
117     ccms_cst_t cst;
118
119     while((cst = cstn) != NULL) {
120         cstn = cst->delayed_next;
121         objcache_put(ccms_oc, cst);
122     }
123 }
124
125 /*
126  * Destroy a CCMS dataspace.
127  *
128  * MPSAFE
129  */
130 void
131 ccms_dataspace_destroy(ccms_dataspace_t ds)
132 {
133     ccms_cst_t cst;
134
135     spin_lock(&ds->spin);
136     RB_SCAN(ccms_rb_tree, &ds->tree, NULL,
137             ccms_dataspace_destroy_match, ds);
138     cst = ds->delayed_free;
139     ds->delayed_free = NULL;
140     spin_unlock(&ds->spin);
141     ccms_delayed_free(cst);
142 }
143
144 /*
145  * Helper routine to delete matches during a destroy.
146  *
147  * NOTE: called with spinlock held.
148  */
149 static
150 int
151 ccms_dataspace_destroy_match(ccms_cst_t cst, void *arg)
152 {
153     ccms_dataspace_t ds = arg;
154
155     RB_REMOVE(ccms_rb_tree, &ds->tree, cst);
156     cst->delayed_next = ds->delayed_free;
157     ds->delayed_free = cst;
158     return(0);
159 }
160
161 /*
162  * Obtain a CCMS lock
163  *
164  * MPSAFE
165  */
166 int
167 ccms_lock_get(ccms_dataspace_t ds, ccms_lock_t lock)
168 {
169     struct ccms_lock_scan_info info;
170     ccms_cst_t cst;
171
172     if (ccms_enable == 0) {
173         lock->ds = NULL;
174         return(0);
175     }
176
177     /*
178      * Partition the CST space so the precise range is covered and
179      * attempt to obtain the requested local lock (ltype) at the same 
180      * time.
181      */
182     lock->ds = ds;
183     info.lock = lock;
184     info.ds = ds;
185     info.coll_cst = NULL;
186     info.cst1 = objcache_get(ccms_oc, M_WAITOK);
187     info.cst2 = objcache_get(ccms_oc, M_WAITOK);
188
189     spin_lock(&ds->spin);
190     RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
191             ccms_lock_get_match, &info);
192
193     /*
194      * If a collision occured, undo the fragments we were able to obtain,
195      * block, and try again.
196      */
197     while (info.coll_cst != NULL) {
198         RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_undo_cmp,
199                 ccms_lock_undo_match, &info);
200         info.coll_cst->blocked = 1;
201         ssleep(info.coll_cst, &ds->spin, 0,
202                ((lock->ltype == CCMS_LTYPE_SHARED) ? "rngsh" : "rngex"),
203                hz);
204         info.coll_cst = NULL;
205         RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
206                 ccms_lock_redo_match, &info);
207     }
208     cst = ds->delayed_free;
209     ds->delayed_free = NULL;
210     spin_unlock(&ds->spin);
211
212     /*
213      * Cleanup
214      */
215     ccms_delayed_free(cst);
216     if (info.cst1)
217         objcache_put(ccms_oc, info.cst1);
218     if (info.cst2)
219         objcache_put(ccms_oc, info.cst2);
220
221     return(0);
222 }
223
224 /*
225  * Obtain a CCMS lock, initialize the lock structure from the uio.
226  *
227  * MPSAFE
228  */
229 int
230 ccms_lock_get_uio(ccms_dataspace_t ds, ccms_lock_t lock, struct uio *uio)
231 {
232     ccms_ltype_t ltype;
233     off_t eoff;
234
235     if (uio->uio_rw == UIO_READ)
236         ltype = CCMS_LTYPE_SHARED;
237     else
238         ltype = CCMS_LTYPE_MODIFYING;
239
240     /*
241      * Calculate the ending offset (byte inclusive), make sure a seek
242      * overflow does not blow us up.
243      */
244     eoff = uio->uio_offset + uio->uio_resid - 1;
245     if (eoff < uio->uio_offset)
246         eoff = 0x7FFFFFFFFFFFFFFFLL;
247     ccms_lock_init(lock, uio->uio_offset, eoff, ltype);
248     return(ccms_lock_get(ds, lock));
249 }
250
251 /*
252  * Helper routine.
253  *
254  * NOTE: called with spinlock held.
255  */
256 static
257 int
258 ccms_lock_get_match(ccms_cst_t cst, void *arg)
259 {
260     struct ccms_lock_scan_info *info = arg;
261     ccms_lock_t lock = info->lock;
262     ccms_cst_t ncst;
263
264     /*
265      * If the lock's left edge is within the CST we must split the CST
266      * into two pieces [cst][ncst].  lrefs must be bumped on the CST
267      * containing the left edge.
268      *
269      * NOTE! cst->beg_offset may not be modified.  This allows us to avoid
270      * having to manipulate the cst's position in the tree.
271      */
272     if (lock->beg_offset > cst->beg_offset) {
273         ncst = info->cst1;
274         info->cst1 = NULL;
275         KKASSERT(ncst != NULL);
276         *ncst = *cst;
277         cst->end_offset = lock->beg_offset - 1;
278         cst->rrefs = 0;
279         ncst->beg_offset = lock->beg_offset;
280         ncst->lrefs = 1;
281         RB_INSERT(ccms_rb_tree, &info->ds->tree, ncst);
282
283         /*
284          * ncst becomes our 'matching' cst.
285          */
286         cst = ncst;
287     } else if (lock->beg_offset == cst->beg_offset) {
288         ++cst->lrefs;
289     }
290
291     /*
292      * If the lock's right edge is within the CST we must split the CST
293      * into two pieces [cst][ncst].  rrefs must be bumped on the CST
294      * containing the right edge.
295      *
296      * NOTE! cst->beg_offset may not be modified.  This allows us to avoid
297      * having to manipulate the cst's position in the tree.
298      */
299     if (lock->end_offset < cst->end_offset) {
300         ncst = info->cst2;
301         info->cst2 = NULL;
302         KKASSERT(ncst != NULL);
303         *ncst = *cst;
304         cst->end_offset = lock->end_offset;
305         cst->rrefs = 1;
306         ncst->beg_offset = lock->end_offset + 1;
307         ncst->lrefs = 0;
308         RB_INSERT(ccms_rb_tree, &info->ds->tree, ncst);
309         /* cst remains our 'matching' cst */
310     } else if (lock->end_offset == cst->end_offset) {
311         ++cst->rrefs;
312     }
313
314     /*
315      * The lock covers the CST, so increment the CST's coverage count.
316      * Then attempt to obtain the shared/exclusive ltype.
317      */
318     ++cst->xrefs;
319
320     if (info->coll_cst == NULL) {
321         switch(lock->ltype) {
322         case CCMS_LTYPE_SHARED:
323             if (cst->sharecount < 0) {
324                 info->coll_cst = cst;
325             } else {
326                 ++cst->sharecount;
327                 if (ccms_enable >= 9) {
328                         kprintf("CST SHARE %d %lld-%lld\n",
329                                 cst->sharecount,
330                                 (long long)cst->beg_offset,
331                                 (long long)cst->end_offset);
332                 }
333             }
334             break;
335         case CCMS_LTYPE_EXCLUSIVE:
336             if (cst->sharecount != 0) {
337                 info->coll_cst = cst;
338             } else {
339                 --cst->sharecount;
340                 if (ccms_enable >= 9) {
341                         kprintf("CST EXCLS %d %lld-%lld\n",
342                                 cst->sharecount,
343                                 (long long)cst->beg_offset,
344                                 (long long)cst->end_offset);
345                 }
346             }
347             break;
348         case CCMS_LTYPE_MODIFYING:
349             if (cst->sharecount != 0) {
350                 info->coll_cst = cst;
351             } else {
352                 --cst->sharecount;
353                 ++cst->modifycount;
354                 if (ccms_enable >= 9) {
355                         kprintf("CST MODXL %d %lld-%lld\n",
356                                 cst->sharecount,
357                                 (long long)cst->beg_offset,
358                                 (long long)cst->end_offset);
359                 }
360             }
361             break;
362         }
363     }
364     return(0);
365 }
366
367 /*
368  * Undo a partially resolved ccms_ltype rangelock.  This is atomic with
369  * the scan/redo code so there should not be any blocked locks when
370  * transitioning to 0.
371  *
372  * NOTE: called with spinlock held.
373  */
374 static
375 int
376 ccms_lock_undo_match(ccms_cst_t cst, void *arg)
377 {
378     struct ccms_lock_scan_info *info = arg;
379     ccms_lock_t lock = info->lock;
380
381     switch(lock->ltype) {
382     case CCMS_LTYPE_SHARED:
383         KKASSERT(cst->sharecount > 0);
384         --cst->sharecount;
385         KKASSERT(cst->sharecount || cst->blocked == 0);
386         break;
387     case CCMS_LTYPE_EXCLUSIVE:
388         KKASSERT(cst->sharecount < 0);
389         ++cst->sharecount;
390         KKASSERT(cst->sharecount || cst->blocked == 0);
391         break;
392     case CCMS_LTYPE_MODIFYING:
393         KKASSERT(cst->sharecount < 0 && cst->modifycount > 0);
394         ++cst->sharecount;
395         --cst->modifycount;
396         KKASSERT(cst->sharecount || cst->blocked == 0);
397         break;
398     }
399     return(0);
400 }
401
402 /*
403  * Redo the local lock request for a range which has already been 
404  * partitioned.
405  *
406  * NOTE: called with spinlock held.
407  */
408 static
409 int
410 ccms_lock_redo_match(ccms_cst_t cst, void *arg)
411 {
412     struct ccms_lock_scan_info *info = arg;
413     ccms_lock_t lock = info->lock;
414
415     if (info->coll_cst == NULL) {
416         switch(lock->ltype) {
417         case CCMS_LTYPE_SHARED:
418             if (cst->sharecount < 0) {
419                 info->coll_cst = cst;
420             } else {
421                 if (ccms_enable >= 9) {
422                         kprintf("CST SHARE %d %lld-%lld\n",
423                                 cst->sharecount,
424                                 (long long)cst->beg_offset,
425                                 (long long)cst->end_offset);
426                 }
427                 ++cst->sharecount;
428             }
429             break;
430         case CCMS_LTYPE_EXCLUSIVE:
431             if (cst->sharecount != 0) {
432                 info->coll_cst = cst;
433             } else {
434                 --cst->sharecount;
435                 if (ccms_enable >= 9) {
436                         kprintf("CST EXCLS %d %lld-%lld\n",
437                                 cst->sharecount,
438                                 (long long)cst->beg_offset,
439                                 (long long)cst->end_offset);
440                 }
441             }
442             break;
443         case CCMS_LTYPE_MODIFYING:
444             if (cst->sharecount != 0) {
445                 info->coll_cst = cst;
446             } else {
447                 --cst->sharecount;
448                 ++cst->modifycount;
449                 if (ccms_enable >= 9) {
450                         kprintf("CST MODXL %d %lld-%lld\n",
451                                 cst->sharecount,
452                                 (long long)cst->beg_offset,
453                                 (long long)cst->end_offset);
454                 }
455             }
456             break;
457         }
458     }
459     return(0);
460 }
461
462 /*
463  * Release a CCMS lock
464  *
465  * MPSAFE
466  */
467 int
468 ccms_lock_put(ccms_dataspace_t ds, ccms_lock_t lock)
469 {
470     struct ccms_lock_scan_info info;
471     ccms_cst_t cst;
472
473     if (lock->ds == NULL)
474         return(0);
475
476     lock->ds = NULL;
477     info.lock = lock;
478     info.ds = ds;
479     info.cst1 = NULL;
480     info.cst2 = NULL;
481
482     spin_lock(&ds->spin);
483     RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
484             ccms_lock_put_match, &info);
485     cst = ds->delayed_free;
486     ds->delayed_free = NULL;
487     spin_unlock(&ds->spin);
488
489     ccms_delayed_free(cst);
490     if (info.cst1)
491         objcache_put(ccms_oc, info.cst1);
492     if (info.cst2)
493         objcache_put(ccms_oc, info.cst2);
494     return(0);
495 }
496
497 /*
498  * NOTE: called with spinlock held.
499  */
500 static
501 int
502 ccms_lock_put_match(ccms_cst_t cst, void *arg)
503 {
504     struct ccms_lock_scan_info *info = arg;
505     ccms_lock_t lock = info->lock;
506     ccms_cst_t ocst;
507
508     /*
509      * Undo the local shared/exclusive rangelock.
510      */
511     switch(lock->ltype) {
512     case CCMS_LTYPE_SHARED:
513         KKASSERT(cst->sharecount > 0);
514         --cst->sharecount;
515         if (ccms_enable >= 9) {
516                 kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->sharecount,
517                         (long long)cst->beg_offset,
518                         (long long)cst->end_offset,
519                         cst->blocked);
520         }
521         if (cst->blocked && cst->sharecount == 0) {
522                 cst->blocked = 0;
523                 wakeup(cst);
524         }
525         break;
526     case CCMS_LTYPE_EXCLUSIVE:
527         KKASSERT(cst->sharecount < 0);
528         ++cst->sharecount;
529         if (ccms_enable >= 9) {
530                 kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->sharecount,
531                         (long long)cst->beg_offset,
532                         (long long)cst->end_offset,
533                         cst->blocked);
534         }
535         if (cst->blocked && cst->sharecount == 0) {
536                 cst->blocked = 0;
537                 wakeup(cst);
538         }
539         break;
540     case CCMS_LTYPE_MODIFYING:
541         KKASSERT(cst->sharecount < 0 && cst->modifycount > 0);
542         ++cst->sharecount;
543         --cst->modifycount;
544         if (ccms_enable >= 9) {
545                 kprintf("CST UNMOD %d %lld-%lld (%d)\n", cst->sharecount,
546                         (long long)cst->beg_offset,
547                         (long long)cst->end_offset,
548                         cst->blocked);
549         }
550         if (cst->blocked && cst->sharecount == 0) {
551                 cst->blocked = 0;
552                 wakeup(cst);
553         }
554         break;
555     }
556
557     /*
558      * Decrement the lock coverage count on the CST.  Decrement the left and
559      * right edge counts as appropriate.
560      *
561      * When lrefs or rrefs drops to zero we check the adjacent entry to
562      * determine whether a merge is possible.  If the appropriate refs field
563      * (rrefs for the entry to our left, lrefs for the entry to our right)
564      * is 0, then all covering locks must cover both entries and the xrefs
565      * field must match.  We can then merge the entries if they have
566      * compatible cache states. 
567      *
568      * However, because we are cleaning up the shared/exclusive count at
569      * the same time, the sharecount field may be temporarily out of
570      * sync, so require that the sharecount field also match before doing
571      * a merge.
572      *
573      * When merging an element which is being blocked on, the blocking
574      * thread(s) will be woken up.
575      *
576      * If the dataspace has too many CSTs we may be able to merge the
577      * entries even if their cache states are not the same, by dropping
578      * both to a compatible (lower) cache state and performing the appropriate
579      * management operations.  XXX
580      */
581     --cst->xrefs;
582     if (lock->beg_offset == cst->beg_offset) {
583         --cst->lrefs;
584         if (cst->lrefs == 0) {
585             if ((ocst = RB_PREV(ccms_rb_tree, &info->ds->tree, cst)) != NULL &&
586                 ocst->rrefs == 0 &&
587                 ocst->state == cst->state &&
588                 ocst->sharecount == cst->sharecount
589             ) {
590                 KKASSERT(ocst->xrefs == cst->xrefs);
591                 KKASSERT(ocst->end_offset + 1 == cst->beg_offset);
592                 RB_REMOVE(ccms_rb_tree, &info->ds->tree, ocst);
593                 cst->beg_offset = ocst->beg_offset;
594                 cst->lrefs = ocst->lrefs;
595                 if (ccms_enable >= 9) {
596                     kprintf("MERGELEFT %p %lld-%lld (%d)\n", 
597                            ocst,
598                            (long long)cst->beg_offset,
599                            (long long)cst->end_offset,
600                            cst->blocked);
601                 }
602                 if (ocst->blocked) {
603                     ocst->blocked = 0;
604                     wakeup(ocst);
605                 }
606                 /*objcache_put(ccms_oc, ocst);*/
607                 ocst->delayed_next = info->ds->delayed_free;
608                 info->ds->delayed_free = ocst;
609             }
610         }
611     }
612     if (lock->end_offset == cst->end_offset) {
613         --cst->rrefs;
614         if (cst->rrefs == 0) {
615             if ((ocst = RB_NEXT(ccms_rb_tree, &info->ds->tree, cst)) != NULL &&
616                 ocst->lrefs == 0 &&
617                 ocst->state == cst->state &&
618                 ocst->sharecount == cst->sharecount
619             ) {
620                 KKASSERT(ocst->xrefs == cst->xrefs);
621                 KKASSERT(cst->end_offset + 1 == ocst->beg_offset);
622                 RB_REMOVE(ccms_rb_tree, &info->ds->tree, ocst);
623                 cst->end_offset = ocst->end_offset;
624                 cst->rrefs = ocst->rrefs;
625                 if (ccms_enable >= 9) {
626                     kprintf("MERGERIGHT %p %lld-%lld\n",
627                            ocst,
628                            (long long)cst->beg_offset,
629                            (long long)cst->end_offset);
630                 }
631                 /*objcache_put(ccms_oc, ocst);*/
632                 ocst->delayed_next = info->ds->delayed_free;
633                 info->ds->delayed_free = ocst;
634             }
635         }
636     }
637     return(0);
638 }
639
640 /*
641  * RB tree compare function for insertions and deletions.  This function
642  * compares two CSTs.
643  */
644 static int
645 ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2)
646 {
647     if (b1->end_offset < b2->beg_offset)
648         return(-1);
649     if (b1->beg_offset > b2->end_offset)
650         return(1);
651     return(0);
652 }
653
654 /*
655  * RB tree scanning compare function.  This function compares the CST
656  * from the tree against the supplied ccms_lock and returns the CST's
657  * placement relative to the lock.
658  */
659 static int
660 ccms_lock_scan_cmp(ccms_cst_t cst, void *arg)
661 {
662     struct ccms_lock_scan_info *info = arg;
663     ccms_lock_t lock = info->lock;
664
665     if (cst->end_offset < lock->beg_offset)
666         return(-1);
667     if (cst->beg_offset > lock->end_offset)
668         return(1);
669     return(0);
670 }
671
672 /*
673  * This function works like ccms_lock_scan_cmp but terminates at the
674  * collision point rather then at the lock's ending offset.  Only
675  * the CSTs that were already partially resolved are returned by the scan.
676  */
677 static int
678 ccms_lock_undo_cmp(ccms_cst_t cst, void *arg)
679 {
680     struct ccms_lock_scan_info *info = arg;
681     ccms_lock_t lock = info->lock;
682
683     if (cst->end_offset < lock->beg_offset)
684         return(-1);
685     if (cst->beg_offset >= info->coll_cst->beg_offset)
686         return(1);
687     return(0);
688 }
689