hammer2 - Move CCMS code from kernel to hammer2
[dragonfly.git] / sys / vfs / hammer2 / hammer2_ccms.c
1 /*
2  * Copyright (c) 2006,2012 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 /*
35  * The Cache Coherency Management System (CCMS)
36  */
37
38 #include <sys/param.h>
39 #include <sys/systm.h>
40 #include <sys/kernel.h>
41 #include <sys/malloc.h>
42 #include <sys/objcache.h>
43 #include <sys/ccms.h>
44 #include <sys/sysctl.h>
45 #include <sys/uio.h>
46 #include <machine/limits.h>
47
48 #include <sys/spinlock2.h>
49
50 struct ccms_lock_scan_info {
51         ccms_dataspace_t ds;
52         ccms_lock_t lock;
53         ccms_cst_t  cst1;
54         ccms_cst_t  cst2;
55         ccms_cst_t  coll_cst;
56 };
57
58 static int ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2);
59 static int ccms_lock_scan_cmp(ccms_cst_t b1, void *arg);
60 static int ccms_lock_undo_cmp(ccms_cst_t b1, void *arg);
61 static int ccms_dataspace_destroy_match(ccms_cst_t cst, void *arg);
62 static int ccms_lock_get_match(struct ccms_cst *cst, void *arg);
63 static int ccms_lock_undo_match(struct ccms_cst *cst, void *arg);
64 static int ccms_lock_redo_match(struct ccms_cst *cst, void *arg);
65 static int ccms_lock_put_match(struct ccms_cst *cst, void *arg);
66
67 RB_GENERATE3(ccms_rb_tree, ccms_cst, rbnode, ccms_cst_cmp,
68              off_t, beg_offset, end_offset);
69 static MALLOC_DEFINE(M_CCMS, "CCMS", "Cache Coherency Management System");
70
71 static int ccms_enable;
72 SYSCTL_INT(_kern, OID_AUTO, ccms_enable, CTLFLAG_RW, &ccms_enable, 0, "");
73
74 static struct objcache *ccms_oc;
75
76 /*
77  * Initialize the CCMS subsystem
78  */
79 static void
80 ccmsinit(void *dummy)
81 {
82     ccms_oc = objcache_create_simple(M_CCMS, sizeof(struct ccms_cst));
83 }
84 SYSINIT(ccms, SI_BOOT2_MACHDEP, SI_ORDER_ANY, ccmsinit, NULL);
85
86 /*
87  * Initialize a new CCMS dataspace.  Create a new RB tree with a single
88  * element covering the entire 64 bit offset range.  This simplifies
89  * algorithms enormously by removing a number of special cases.
90  */
91 void
92 ccms_dataspace_init(ccms_dataspace_t ds)
93 {
94     ccms_cst_t cst;
95
96     RB_INIT(&ds->tree);
97     ds->info = NULL;
98     ds->chain = NULL;
99     cst = objcache_get(ccms_oc, M_WAITOK);
100     bzero(cst, sizeof(*cst));
101     cst->beg_offset = LLONG_MIN;
102     cst->end_offset = LLONG_MAX;
103     cst->state = CCMS_STATE_INVALID;
104     RB_INSERT(ccms_rb_tree, &ds->tree, cst);
105     spin_init(&ds->spin);
106 }
107
108 /*
109  * Helper to destroy deleted cst's.
110  */
111 static __inline
112 void
113 ccms_delayed_free(ccms_cst_t cstn)
114 {
115     ccms_cst_t cst;
116
117     while((cst = cstn) != NULL) {
118         cstn = cst->delayed_next;
119         objcache_put(ccms_oc, cst);
120     }
121 }
122
123 /*
124  * Destroy a CCMS dataspace.
125  *
126  * MPSAFE
127  */
128 void
129 ccms_dataspace_destroy(ccms_dataspace_t ds)
130 {
131     ccms_cst_t cst;
132
133     spin_lock(&ds->spin);
134     RB_SCAN(ccms_rb_tree, &ds->tree, NULL,
135             ccms_dataspace_destroy_match, ds);
136     cst = ds->delayed_free;
137     ds->delayed_free = NULL;
138     spin_unlock(&ds->spin);
139     ccms_delayed_free(cst);
140 }
141
142 /*
143  * Helper routine to delete matches during a destroy.
144  *
145  * NOTE: called with spinlock held.
146  */
147 static
148 int
149 ccms_dataspace_destroy_match(ccms_cst_t cst, void *arg)
150 {
151     ccms_dataspace_t ds = arg;
152
153     RB_REMOVE(ccms_rb_tree, &ds->tree, cst);
154     cst->delayed_next = ds->delayed_free;
155     ds->delayed_free = cst;
156     return(0);
157 }
158
159 /*
160  * Obtain a CCMS lock
161  *
162  * MPSAFE
163  */
164 int
165 ccms_lock_get(ccms_dataspace_t ds, ccms_lock_t lock)
166 {
167     struct ccms_lock_scan_info info;
168     ccms_cst_t cst;
169
170     if (ccms_enable == 0) {
171         lock->ds = NULL;
172         return(0);
173     }
174
175     /*
176      * Partition the CST space so the precise range is covered and
177      * attempt to obtain the requested local lock (ltype) at the same
178      * time.
179      */
180     lock->ds = ds;
181     info.lock = lock;
182     info.ds = ds;
183     info.coll_cst = NULL;
184     info.cst1 = objcache_get(ccms_oc, M_WAITOK);
185     info.cst2 = objcache_get(ccms_oc, M_WAITOK);
186
187     spin_lock(&ds->spin);
188     RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
189             ccms_lock_get_match, &info);
190
191     /*
192      * If a collision occured, undo the fragments we were able to obtain,
193      * block, and try again.
194      */
195     while (info.coll_cst != NULL) {
196         RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_undo_cmp,
197                 ccms_lock_undo_match, &info);
198         info.coll_cst->blocked = 1;
199         ssleep(info.coll_cst, &ds->spin, 0,
200                ((lock->ltype == CCMS_LTYPE_SHARED) ? "rngsh" : "rngex"),
201                hz);
202         info.coll_cst = NULL;
203         RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
204                 ccms_lock_redo_match, &info);
205     }
206     cst = ds->delayed_free;
207     ds->delayed_free = NULL;
208     spin_unlock(&ds->spin);
209
210     /*
211      * Cleanup
212      */
213     ccms_delayed_free(cst);
214     if (info.cst1)
215         objcache_put(ccms_oc, info.cst1);
216     if (info.cst2)
217         objcache_put(ccms_oc, info.cst2);
218
219     return(0);
220 }
221
222 /*
223  * Obtain a CCMS lock, initialize the lock structure from the uio.
224  *
225  * MPSAFE
226  */
227 int
228 ccms_lock_get_uio(ccms_dataspace_t ds, ccms_lock_t lock, struct uio *uio)
229 {
230     ccms_ltype_t ltype;
231     off_t eoff;
232
233     if (uio->uio_rw == UIO_READ)
234         ltype = CCMS_LTYPE_SHARED;
235     else
236         ltype = CCMS_LTYPE_MODIFYING;
237
238     /*
239      * Calculate the ending offset (byte inclusive), make sure a seek
240      * overflow does not blow us up.
241      */
242     eoff = uio->uio_offset + uio->uio_resid - 1;
243     if (eoff < uio->uio_offset)
244         eoff = 0x7FFFFFFFFFFFFFFFLL;
245     ccms_lock_init(lock, uio->uio_offset, eoff, ltype);
246     return(ccms_lock_get(ds, lock));
247 }
248
249 /*
250  * Helper routine.
251  *
252  * NOTE: called with spinlock held.
253  */
254 static
255 int
256 ccms_lock_get_match(ccms_cst_t cst, void *arg)
257 {
258     struct ccms_lock_scan_info *info = arg;
259     ccms_lock_t lock = info->lock;
260     ccms_cst_t ncst;
261
262     /*
263      * If the lock's left edge is within the CST we must split the CST
264      * into two pieces [cst][ncst].  lrefs must be bumped on the CST
265      * containing the left edge.
266      *
267      * NOTE! cst->beg_offset may not be modified.  This allows us to avoid
268      * having to manipulate the cst's position in the tree.
269      */
270     if (lock->beg_offset > cst->beg_offset) {
271         ncst = info->cst1;
272         info->cst1 = NULL;
273         KKASSERT(ncst != NULL);
274         *ncst = *cst;
275         cst->end_offset = lock->beg_offset - 1;
276         cst->rrefs = 0;
277         ncst->beg_offset = lock->beg_offset;
278         ncst->lrefs = 1;
279         RB_INSERT(ccms_rb_tree, &info->ds->tree, ncst);
280
281         /*
282          * ncst becomes our 'matching' cst.
283          */
284         cst = ncst;
285     } else if (lock->beg_offset == cst->beg_offset) {
286         ++cst->lrefs;
287     }
288
289     /*
290      * If the lock's right edge is within the CST we must split the CST
291      * into two pieces [cst][ncst].  rrefs must be bumped on the CST
292      * containing the right edge.
293      *
294      * NOTE! cst->beg_offset may not be modified.  This allows us to avoid
295      * having to manipulate the cst's position in the tree.
296      */
297     if (lock->end_offset < cst->end_offset) {
298         ncst = info->cst2;
299         info->cst2 = NULL;
300         KKASSERT(ncst != NULL);
301         *ncst = *cst;
302         cst->end_offset = lock->end_offset;
303         cst->rrefs = 1;
304         ncst->beg_offset = lock->end_offset + 1;
305         ncst->lrefs = 0;
306         RB_INSERT(ccms_rb_tree, &info->ds->tree, ncst);
307         /* cst remains our 'matching' cst */
308     } else if (lock->end_offset == cst->end_offset) {
309         ++cst->rrefs;
310     }
311
312     /*
313      * The lock covers the CST, so increment the CST's coverage count.
314      * Then attempt to obtain the shared/exclusive ltype.
315      */
316     ++cst->xrefs;
317
318     if (info->coll_cst == NULL) {
319         switch(lock->ltype) {
320         case CCMS_LTYPE_SHARED:
321             if (cst->sharecount < 0) {
322                 info->coll_cst = cst;
323             } else {
324                 ++cst->sharecount;
325                 if (ccms_enable >= 9) {
326                         kprintf("CST SHARE %d %lld-%lld\n",
327                                 cst->sharecount,
328                                 (long long)cst->beg_offset,
329                                 (long long)cst->end_offset);
330                 }
331             }
332             break;
333         case CCMS_LTYPE_EXCLUSIVE:
334             if (cst->sharecount != 0) {
335                 info->coll_cst = cst;
336             } else {
337                 --cst->sharecount;
338                 if (ccms_enable >= 9) {
339                         kprintf("CST EXCLS %d %lld-%lld\n",
340                                 cst->sharecount,
341                                 (long long)cst->beg_offset,
342                                 (long long)cst->end_offset);
343                 }
344             }
345             break;
346         case CCMS_LTYPE_MODIFYING:
347             if (cst->sharecount != 0) {
348                 info->coll_cst = cst;
349             } else {
350                 --cst->sharecount;
351                 ++cst->modifycount;
352                 if (ccms_enable >= 9) {
353                         kprintf("CST MODXL %d %lld-%lld\n",
354                                 cst->sharecount,
355                                 (long long)cst->beg_offset,
356                                 (long long)cst->end_offset);
357                 }
358             }
359             break;
360         }
361     }
362     return(0);
363 }
364
365 /*
366  * Undo a partially resolved ccms_ltype rangelock.  This is atomic with
367  * the scan/redo code so there should not be any blocked locks when
368  * transitioning to 0.
369  *
370  * NOTE: called with spinlock held.
371  */
372 static
373 int
374 ccms_lock_undo_match(ccms_cst_t cst, void *arg)
375 {
376     struct ccms_lock_scan_info *info = arg;
377     ccms_lock_t lock = info->lock;
378
379     switch(lock->ltype) {
380     case CCMS_LTYPE_SHARED:
381         KKASSERT(cst->sharecount > 0);
382         --cst->sharecount;
383         KKASSERT(cst->sharecount || cst->blocked == 0);
384         break;
385     case CCMS_LTYPE_EXCLUSIVE:
386         KKASSERT(cst->sharecount < 0);
387         ++cst->sharecount;
388         KKASSERT(cst->sharecount || cst->blocked == 0);
389         break;
390     case CCMS_LTYPE_MODIFYING:
391         KKASSERT(cst->sharecount < 0 && cst->modifycount > 0);
392         ++cst->sharecount;
393         --cst->modifycount;
394         KKASSERT(cst->sharecount || cst->blocked == 0);
395         break;
396     }
397     return(0);
398 }
399
400 /*
401  * Redo the local lock request for a range which has already been
402  * partitioned.
403  *
404  * NOTE: called with spinlock held.
405  */
406 static
407 int
408 ccms_lock_redo_match(ccms_cst_t cst, void *arg)
409 {
410     struct ccms_lock_scan_info *info = arg;
411     ccms_lock_t lock = info->lock;
412
413     if (info->coll_cst == NULL) {
414         switch(lock->ltype) {
415         case CCMS_LTYPE_SHARED:
416             if (cst->sharecount < 0) {
417                 info->coll_cst = cst;
418             } else {
419                 if (ccms_enable >= 9) {
420                         kprintf("CST SHARE %d %lld-%lld\n",
421                                 cst->sharecount,
422                                 (long long)cst->beg_offset,
423                                 (long long)cst->end_offset);
424                 }
425                 ++cst->sharecount;
426             }
427             break;
428         case CCMS_LTYPE_EXCLUSIVE:
429             if (cst->sharecount != 0) {
430                 info->coll_cst = cst;
431             } else {
432                 --cst->sharecount;
433                 if (ccms_enable >= 9) {
434                         kprintf("CST EXCLS %d %lld-%lld\n",
435                                 cst->sharecount,
436                                 (long long)cst->beg_offset,
437                                 (long long)cst->end_offset);
438                 }
439             }
440             break;
441         case CCMS_LTYPE_MODIFYING:
442             if (cst->sharecount != 0) {
443                 info->coll_cst = cst;
444             } else {
445                 --cst->sharecount;
446                 ++cst->modifycount;
447                 if (ccms_enable >= 9) {
448                         kprintf("CST MODXL %d %lld-%lld\n",
449                                 cst->sharecount,
450                                 (long long)cst->beg_offset,
451                                 (long long)cst->end_offset);
452                 }
453             }
454             break;
455         }
456     }
457     return(0);
458 }
459
460 /*
461  * Release a CCMS lock
462  *
463  * MPSAFE
464  */
465 int
466 ccms_lock_put(ccms_dataspace_t ds, ccms_lock_t lock)
467 {
468     struct ccms_lock_scan_info info;
469     ccms_cst_t cst;
470
471     if (lock->ds == NULL)
472         return(0);
473
474     lock->ds = NULL;
475     info.lock = lock;
476     info.ds = ds;
477     info.cst1 = NULL;
478     info.cst2 = NULL;
479
480     spin_lock(&ds->spin);
481     RB_SCAN(ccms_rb_tree, &ds->tree, ccms_lock_scan_cmp,
482             ccms_lock_put_match, &info);
483     cst = ds->delayed_free;
484     ds->delayed_free = NULL;
485     spin_unlock(&ds->spin);
486
487     ccms_delayed_free(cst);
488     if (info.cst1)
489         objcache_put(ccms_oc, info.cst1);
490     if (info.cst2)
491         objcache_put(ccms_oc, info.cst2);
492     return(0);
493 }
494
495 /*
496  * NOTE: called with spinlock held.
497  */
498 static
499 int
500 ccms_lock_put_match(ccms_cst_t cst, void *arg)
501 {
502     struct ccms_lock_scan_info *info = arg;
503     ccms_lock_t lock = info->lock;
504     ccms_cst_t ocst;
505
506     /*
507      * Undo the local shared/exclusive rangelock.
508      */
509     switch(lock->ltype) {
510     case CCMS_LTYPE_SHARED:
511         KKASSERT(cst->sharecount > 0);
512         --cst->sharecount;
513         if (ccms_enable >= 9) {
514                 kprintf("CST UNSHR %d %lld-%lld (%d)\n", cst->sharecount,
515                         (long long)cst->beg_offset,
516                         (long long)cst->end_offset,
517                         cst->blocked);
518         }
519         if (cst->blocked && cst->sharecount == 0) {
520                 cst->blocked = 0;
521                 wakeup(cst);
522         }
523         break;
524     case CCMS_LTYPE_EXCLUSIVE:
525         KKASSERT(cst->sharecount < 0);
526         ++cst->sharecount;
527         if (ccms_enable >= 9) {
528                 kprintf("CST UNEXC %d %lld-%lld (%d)\n", cst->sharecount,
529                         (long long)cst->beg_offset,
530                         (long long)cst->end_offset,
531                         cst->blocked);
532         }
533         if (cst->blocked && cst->sharecount == 0) {
534                 cst->blocked = 0;
535                 wakeup(cst);
536         }
537         break;
538     case CCMS_LTYPE_MODIFYING:
539         KKASSERT(cst->sharecount < 0 && cst->modifycount > 0);
540         ++cst->sharecount;
541         --cst->modifycount;
542         if (ccms_enable >= 9) {
543                 kprintf("CST UNMOD %d %lld-%lld (%d)\n", cst->sharecount,
544                         (long long)cst->beg_offset,
545                         (long long)cst->end_offset,
546                         cst->blocked);
547         }
548         if (cst->blocked && cst->sharecount == 0) {
549                 cst->blocked = 0;
550                 wakeup(cst);
551         }
552         break;
553     }
554
555     /*
556      * Decrement the lock coverage count on the CST.  Decrement the left and
557      * right edge counts as appropriate.
558      *
559      * When lrefs or rrefs drops to zero we check the adjacent entry to
560      * determine whether a merge is possible.  If the appropriate refs field
561      * (rrefs for the entry to our left, lrefs for the entry to our right)
562      * is 0, then all covering locks must cover both entries and the xrefs
563      * field must match.  We can then merge the entries if they have
564      * compatible cache states.
565      *
566      * However, because we are cleaning up the shared/exclusive count at
567      * the same time, the sharecount field may be temporarily out of
568      * sync, so require that the sharecount field also match before doing
569      * a merge.
570      *
571      * When merging an element which is being blocked on, the blocking
572      * thread(s) will be woken up.
573      *
574      * If the dataspace has too many CSTs we may be able to merge the
575      * entries even if their cache states are not the same, by dropping
576      * both to a compatible (lower) cache state and performing the appropriate
577      * management operations.  XXX
578      */
579     --cst->xrefs;
580     if (lock->beg_offset == cst->beg_offset) {
581         --cst->lrefs;
582         if (cst->lrefs == 0) {
583             if ((ocst = RB_PREV(ccms_rb_tree, &info->ds->tree, cst)) != NULL &&
584                 ocst->rrefs == 0 &&
585                 ocst->state == cst->state &&
586                 ocst->sharecount == cst->sharecount
587             ) {
588                 KKASSERT(ocst->xrefs == cst->xrefs);
589                 KKASSERT(ocst->end_offset + 1 == cst->beg_offset);
590                 RB_REMOVE(ccms_rb_tree, &info->ds->tree, ocst);
591                 cst->beg_offset = ocst->beg_offset;
592                 cst->lrefs = ocst->lrefs;
593                 if (ccms_enable >= 9) {
594                     kprintf("MERGELEFT %p %lld-%lld (%d)\n",
595                            ocst,
596                            (long long)cst->beg_offset,
597                            (long long)cst->end_offset,
598                            cst->blocked);
599                 }
600                 if (ocst->blocked) {
601                     ocst->blocked = 0;
602                     wakeup(ocst);
603                 }
604                 /*objcache_put(ccms_oc, ocst);*/
605                 ocst->delayed_next = info->ds->delayed_free;
606                 info->ds->delayed_free = ocst;
607             }
608         }
609     }
610     if (lock->end_offset == cst->end_offset) {
611         --cst->rrefs;
612         if (cst->rrefs == 0) {
613             if ((ocst = RB_NEXT(ccms_rb_tree, &info->ds->tree, cst)) != NULL &&
614                 ocst->lrefs == 0 &&
615                 ocst->state == cst->state &&
616                 ocst->sharecount == cst->sharecount
617             ) {
618                 KKASSERT(ocst->xrefs == cst->xrefs);
619                 KKASSERT(cst->end_offset + 1 == ocst->beg_offset);
620                 RB_REMOVE(ccms_rb_tree, &info->ds->tree, ocst);
621                 cst->end_offset = ocst->end_offset;
622                 cst->rrefs = ocst->rrefs;
623                 if (ccms_enable >= 9) {
624                     kprintf("MERGERIGHT %p %lld-%lld\n",
625                            ocst,
626                            (long long)cst->beg_offset,
627                            (long long)cst->end_offset);
628                 }
629                 /*objcache_put(ccms_oc, ocst);*/
630                 ocst->delayed_next = info->ds->delayed_free;
631                 info->ds->delayed_free = ocst;
632             }
633         }
634     }
635     return(0);
636 }
637
638 /*
639  * RB tree compare function for insertions and deletions.  This function
640  * compares two CSTs.
641  */
642 static int
643 ccms_cst_cmp(ccms_cst_t b1, ccms_cst_t b2)
644 {
645     if (b1->end_offset < b2->beg_offset)
646         return(-1);
647     if (b1->beg_offset > b2->end_offset)
648         return(1);
649     return(0);
650 }
651
652 /*
653  * RB tree scanning compare function.  This function compares the CST
654  * from the tree against the supplied ccms_lock and returns the CST's
655  * placement relative to the lock.
656  */
657 static int
658 ccms_lock_scan_cmp(ccms_cst_t cst, void *arg)
659 {
660     struct ccms_lock_scan_info *info = arg;
661     ccms_lock_t lock = info->lock;
662
663     if (cst->end_offset < lock->beg_offset)
664         return(-1);
665     if (cst->beg_offset > lock->end_offset)
666         return(1);
667     return(0);
668 }
669
670 /*
671  * This function works like ccms_lock_scan_cmp but terminates at the
672  * collision point rather then at the lock's ending offset.  Only
673  * the CSTs that were already partially resolved are returned by the scan.
674  */
675 static int
676 ccms_lock_undo_cmp(ccms_cst_t cst, void *arg)
677 {
678     struct ccms_lock_scan_info *info = arg;
679     ccms_lock_t lock = info->lock;
680
681     if (cst->end_offset < lock->beg_offset)
682         return(-1);
683     if (cst->beg_offset >= info->coll_cst->beg_offset)
684         return(1);
685     return(0);
686 }