kernel - usched_dfly revamp (6), reimplement shared spinlocks & misc others
authorMatthew Dillon <dillon@apollo.backplane.com>
Tue, 25 Sep 2012 01:24:22 +0000 (18:24 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Tue, 25 Sep 2012 01:24:22 +0000 (18:24 -0700)
* Rename gd_spinlocks_wr to just gd_spinlocks.

* Reimplement shared spinlocks and optimize the shared spinlock path.
  Contended exclusive spinlocks are less optimal with this change.

* Use shared spinlocks for all file descriptor accesses.  This includes
  not only most IO calls like read() and write(), but also callbacks
  from kqueue to double-check the validity of a file descriptor.

* Use getnanouptime() instead of nanouptime() in kqueue_sleep() and
  kern_kevent(), removing a hardware I/O serialization (to read the HPET)
  from the critical path.

* These changes significantly reduce kernel spinlock contention when running
  postgres/pgbench benchmarks.

16 files changed:
sys/kern/kern_descrip.c
sys/kern/kern_event.c
sys/kern/kern_intr.c
sys/kern/kern_lock.c
sys/kern/kern_mutex.c
sys/kern/kern_spinlock.c
sys/kern/lwkt_ipiq.c
sys/kern/lwkt_thread.c
sys/kern/usched_bsd4.c
sys/kern/usched_dfly.c
sys/platform/pc32/isa/clock.c
sys/platform/pc64/isa/clock.c
sys/sys/globaldata.h
sys/sys/mutex2.h
sys/sys/spinlock.h
sys/sys/spinlock2.h

index a967e33..a43b6f9 100644 (file)
@@ -1561,12 +1561,12 @@ checkfdclosed(struct filedesc *fdp, int fd, struct file *fp)
 {
        int error;
 
-       spin_lock(&fdp->fd_spin);
+       spin_lock_shared(&fdp->fd_spin);
        if ((unsigned)fd >= fdp->fd_nfiles || fp != fdp->fd_files[fd].fp)
                error = EBADF;
        else
                error = 0;
-       spin_unlock(&fdp->fd_spin);
+       spin_unlock_shared(&fdp->fd_spin);
        return (error);
 }
 
@@ -2120,7 +2120,7 @@ holdfp(struct filedesc *fdp, int fd, int flag)
 {
        struct file* fp;
 
-       spin_lock(&fdp->fd_spin);
+       spin_lock_shared(&fdp->fd_spin);
        if (((u_int)fd) >= fdp->fd_nfiles) {
                fp = NULL;
                goto done;
@@ -2133,7 +2133,7 @@ holdfp(struct filedesc *fdp, int fd, int flag)
        }
        fhold(fp);
 done:
-       spin_unlock(&fdp->fd_spin);
+       spin_unlock_shared(&fdp->fd_spin);
        return (fp);
 }
 
@@ -2150,7 +2150,7 @@ holdsock(struct filedesc *fdp, int fd, struct file **fpp)
        struct file *fp;
        int error;
 
-       spin_lock(&fdp->fd_spin);
+       spin_lock_shared(&fdp->fd_spin);
        if ((unsigned)fd >= fdp->fd_nfiles) {
                error = EBADF;
                fp = NULL;
@@ -2167,7 +2167,7 @@ holdsock(struct filedesc *fdp, int fd, struct file **fpp)
        fhold(fp);
        error = 0;
 done:
-       spin_unlock(&fdp->fd_spin);
+       spin_unlock_shared(&fdp->fd_spin);
        *fpp = fp;
        return (error);
 }
@@ -2183,7 +2183,7 @@ holdvnode(struct filedesc *fdp, int fd, struct file **fpp)
        struct file *fp;
        int error;
 
-       spin_lock(&fdp->fd_spin);
+       spin_lock_shared(&fdp->fd_spin);
        if ((unsigned)fd >= fdp->fd_nfiles) {
                error = EBADF;
                fp = NULL;
@@ -2201,7 +2201,7 @@ holdvnode(struct filedesc *fdp, int fd, struct file **fpp)
        fhold(fp);
        error = 0;
 done:
-       spin_unlock(&fdp->fd_spin);
+       spin_unlock_shared(&fdp->fd_spin);
        *fpp = fp;
        return (error);
 }
@@ -2779,7 +2779,7 @@ sysctl_kern_file_callback(struct proc *p, void *data)
         * The fdp's own spinlock prevents the contents from being
         * modified.
         */
-       spin_lock(&fdp->fd_spin);
+       spin_lock_shared(&fdp->fd_spin);
        for (n = 0; n < fdp->fd_nfiles; ++n) {
                if ((fp = fdp->fd_files[n].fp) == NULL)
                        continue;
@@ -2788,14 +2788,14 @@ sysctl_kern_file_callback(struct proc *p, void *data)
                } else {
                        uid = p->p_ucred ? p->p_ucred->cr_uid : -1;
                        kcore_make_file(&kf, fp, p->p_pid, uid, n);
-                       spin_unlock(&fdp->fd_spin);
+                       spin_unlock_shared(&fdp->fd_spin);
                        info->error = SYSCTL_OUT(info->req, &kf, sizeof(kf));
-                       spin_lock(&fdp->fd_spin);
+                       spin_lock_shared(&fdp->fd_spin);
                        if (info->error)
                                break;
                }
        }
-       spin_unlock(&fdp->fd_spin);
+       spin_unlock_shared(&fdp->fd_spin);
        atomic_subtract_int(&fdp->fd_softrefs, 1);
        if (info->error)
                return(-1);
index 9eb32fd..ec108eb 100644 (file)
@@ -663,7 +663,7 @@ kern_kevent(struct kqueue *kq, int nevents, int *res, void *uap,
                struct timespec ats;
 
                if (tsp->tv_sec || tsp->tv_nsec) {
-                       nanouptime(&ats);
+                       getnanouptime(&ats);
                        timespecadd(tsp, &ats);         /* tsp = target time */
                }
        }
@@ -1018,7 +1018,7 @@ kqueue_sleep(struct kqueue *kq, struct timespec *tsp)
                struct timespec atx = *tsp;
                int timeout;
 
-               nanouptime(&ats);
+               getnanouptime(&ats);
                timespecsub(&atx, &ats);
                if (ats.tv_sec < 0) {
                        error = EWOULDBLOCK;
index 22d06c1..e5fa84c 100644 (file)
@@ -93,13 +93,13 @@ static int max_installed_hard_intr[MAXCPU];
 
 #define TD_INVARIANTS_GET(td)                                   \
         do {                                                    \
-                spincount = (td)->td_gd->gd_spinlocks_wr;       \
+                spincount = (td)->td_gd->gd_spinlocks;         \
                 curstop = (td)->td_toks_stop;                   \
         } while(0)
 
 #define TD_INVARIANTS_TEST(td, name)                                    \
         do {                                                            \
-                KASSERT(spincount == (td)->td_gd->gd_spinlocks_wr,      \
+                KASSERT(spincount == (td)->td_gd->gd_spinlocks,                \
                         ("spincount mismatch after interrupt handler %s", \
                         name));                                         \
                 KASSERT(curstop == (td)->td_toks_stop,                  \
index 2e67007..d919791 100644 (file)
@@ -189,11 +189,9 @@ debuglockmgr(struct lock *lkp, u_int flags,
        }
 
 #ifdef DEBUG_LOCKS
-       if (mycpu->gd_spinlocks_wr &&
-           ((flags & LK_NOWAIT) == 0)
-       ) {
+       if (mycpu->gd_spinlocks && ((flags & LK_NOWAIT) == 0)) {
                panic("lockmgr %s from %s:%d: called with %d spinlocks held",
-                     lkp->lk_wmesg, file, line, mycpu->gd_spinlocks_wr);
+                     lkp->lk_wmesg, file, line, mycpu->gd_spinlocks);
        }
 #endif
 
index ee95de6..6410e06 100644 (file)
@@ -318,7 +318,7 @@ _mtx_spinlock(mtx_t mtx)
 
 /*
  * Attempt to acquire a spinlock, if we fail we must undo the
- * gd->gd_spinlocks_wr/gd->gd_curthead->td_critcount predisposition.
+ * gd->gd_spinlocks/gd->gd_curthead->td_critcount predisposition.
  *
  * Returns 0 on success, EAGAIN on failure.
  */
@@ -345,7 +345,7 @@ _mtx_spinlock_try(mtx_t mtx)
                        if (atomic_cmpset_int(&mtx->mtx_lock, lock, nlock))
                                break;
                } else {
-                       --gd->gd_spinlocks_wr;
+                       --gd->gd_spinlocks;
                        cpu_ccfence();
                        --gd->gd_curthread->td_critcount;
                        res = EAGAIN;
index c61fc4e..0cea05d 100644 (file)
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
+
 /*
- * The spinlock code utilizes two counters to form a virtual FIFO, allowing
- * a spinlock to allocate a slot and then only issue memory read operations
- * until it is handed the lock (if it is not the next owner for the lock).
+ * The implementation is designed to avoid looping when compatible operations
+ * are executed.
+ *
+ * To acquire a spinlock we first increment counta.  Then we check if counta
+ * meets our requirements.  For an exclusive spinlock it must be 1, of a
+ * shared spinlock it must either be 1 or the SHARED_SPINLOCK bit must be set.
+ *
+ * Shared spinlock failure case: Decrement the count, loop until we can
+ * transition from 0 to SHARED_SPINLOCK|1, or until we find SHARED_SPINLOCK
+ * is set and increment the count.
+ *
+ * Exclusive spinlock failure case: While maintaining the count, clear the
+ * SHARED_SPINLOCK flag unconditionally.  Then use an atomic add to transfer
+ * the count from the low bits to the high bits of counta.  Then loop until
+ * all low bits are 0.  Once the low bits drop to 0 we can transfer the
+ * count back with an atomic_cmpset_int(), atomically, and return.
  */
-
 #include <sys/param.h>
 #include <sys/systm.h>
 #include <sys/types.h>
@@ -132,7 +145,8 @@ spin_trylock_contested(struct spinlock *spin)
        globaldata_t gd = mycpu;
 
        /*++spinlocks_contested1;*/
-       --gd->gd_spinlocks_wr;
+       /*atomic_add_int(&spin->counta, -1);*/
+       --gd->gd_spinlocks;
        --gd->gd_curthread->td_critcount;
        return (FALSE);
 }
@@ -145,6 +159,10 @@ spin_trylock_contested(struct spinlock *spin)
  * the same on single-socket multi-core systems.  However, atomic_swap_int()
  * does not result in an even distribution of successful acquisitions.
  *
+ * UNFORTUNATELY we cannot really use atomic_swap_int() when also implementing
+ * shared spin locks, so as we do a better job removing contention we've
+ * moved to atomic_cmpset_int() to be able handle multiple states.
+ *
  * Another problem we have is that (at least on the 48-core opteron we test
  * with) having all 48 cores contesting the same spin lock reduces
  * performance to around 600,000 ops/sec, verses millions when fewer cores
@@ -177,11 +195,25 @@ spin_lock_contested(struct spinlock *spin)
        struct indefinite_info info = { 0, 0 };
        int i;
 
+       /*
+        * Force any existing shared locks to exclusive so no new shared
+        * locks can occur.  Transfer our count to the high bits, then
+        * loop until we can acquire the low counter (== 1).
+        */
+       atomic_clear_int(&spin->counta, SPINLOCK_SHARED);
+       atomic_add_int(&spin->counta, SPINLOCK_EXCLWAIT - 1);
+
 #ifdef DEBUG_LOCKS_LATENCY
        long j;
        for (j = spinlocks_add_latency; j > 0; --j)
                cpu_ccfence();
 #endif
+       if (spin_lock_test_mode > 10 &&
+           spin->countb > spin_lock_test_mode &&
+           (spin_lock_test_mode & 0xFF) == mycpu->gd_cpuid) {
+               spin->countb = 0;
+               print_backtrace(-1);
+       }
 
        i = 0;
        ++spin->countb;
@@ -189,17 +221,26 @@ spin_lock_contested(struct spinlock *spin)
        /*logspin(beg, spin, 'w');*/
        for (;;) {
                /*
+                * If the low bits are zero, try to acquire the exclusive lock
+                * by transfering our high bit counter to the low bits.
+                *
                 * NOTE: Reading spin->counta prior to the swap is extremely
                 *       important on multi-chip/many-core boxes.  On 48-core
                 *       this one change improves fully concurrent all-cores
                 *       compiles by 100% or better.
                 *
-                *       I can't emphasize enough how important the pre-read is in
-                *       preventing hw cache bus armageddon on multi-chip systems.
-                *       And on single-chip/multi-core systems it just doesn't hurt.
+                *       I can't emphasize enough how important the pre-read
+                *       is in preventing hw cache bus armageddon on
+                *       multi-chip systems.  And on single-chip/multi-core
+                *       systems it just doesn't hurt.
                 */
-               if (spin->counta == 0 && atomic_swap_int(&spin->counta, 1) == 0)
+               uint32_t ovalue = spin->counta;
+               cpu_ccfence();
+               if ((ovalue & (SPINLOCK_EXCLWAIT - 1)) == 0 &&
+                   atomic_cmpset_int(&spin->counta, ovalue,
+                                     (ovalue - SPINLOCK_EXCLWAIT) | 1)) {
                        break;
+               }
                if ((++i & 0x7F) == 0x7F) {
                        ++spin->countb;
                        if (spin_indefinite_check(spin, &info))
@@ -209,6 +250,68 @@ spin_lock_contested(struct spinlock *spin)
        /*logspin(end, spin, 'w');*/
 }
 
+/*
+ * Shared spinlocks
+ */
+void
+spin_lock_shared_contested(struct spinlock *spin)
+{
+       struct indefinite_info info = { 0, 0 };
+       int i;
+
+       atomic_add_int(&spin->counta, -1);
+#ifdef DEBUG_LOCKS_LATENCY
+       long j;
+       for (j = spinlocks_add_latency; j > 0; --j)
+               cpu_ccfence();
+#endif
+       if (spin_lock_test_mode > 10 &&
+           spin->countb > spin_lock_test_mode &&
+           (spin_lock_test_mode & 0xFF) == mycpu->gd_cpuid) {
+               spin->countb = 0;
+               print_backtrace(-1);
+       }
+
+       i = 0;
+       ++spin->countb;
+
+       /*logspin(beg, spin, 'w');*/
+       for (;;) {
+               /*
+                * NOTE: Reading spin->counta prior to the swap is extremely
+                *       important on multi-chip/many-core boxes.  On 48-core
+                *       this one change improves fully concurrent all-cores
+                *       compiles by 100% or better.
+                *
+                *       I can't emphasize enough how important the pre-read
+                *       is in preventing hw cache bus armageddon on
+                *       multi-chip systems.  And on single-chip/multi-core
+                *       systems it just doesn't hurt.
+                */
+               uint32_t ovalue = spin->counta;
+
+               cpu_ccfence();
+               if (ovalue == 0) {
+                       if (atomic_cmpset_int(&spin->counta, 0,
+                                             SPINLOCK_SHARED | 1))
+                               break;
+               } else if (ovalue & SPINLOCK_SHARED) {
+                       if (atomic_cmpset_int(&spin->counta, ovalue,
+                                             ovalue + 1))
+                               break;
+               }
+               if ((++i & 0x7F) == 0x7F) {
+                       ++spin->countb;
+                       if (spin_indefinite_check(spin, &info))
+                               break;
+               }
+       }
+       /*logspin(end, spin, 'w');*/
+}
+
+/*
+ * Pool functions (SHARED SPINLOCKS NOT SUPPORTED)
+ */
 static __inline int
 _spin_pool_hash(void *ptr)
 {
index 972458e..a7c31bc 100644 (file)
@@ -603,7 +603,7 @@ lwkt_process_ipiq_core(globaldata_t sgd, lwkt_ipiq_t ip,
        kprintf("cpu %d ipiq maxed cscount %d spin %d\n",
                mygd->gd_cpuid,
                mygd->gd_curthread->td_cscount,
-               mygd->gd_spinlocks_wr);
+               mygd->gd_spinlocks);
        iqcount[mygd->gd_cpuid] = 0;
 #if 0
        if (++iqterm[mygd->gd_cpuid] > 10)
index beaf7a0..935d2d9 100644 (file)
@@ -615,9 +615,9 @@ lwkt_switch(void)
      * We had better not be holding any spin locks, but don't get into an
      * endless panic loop.
      */
-    KASSERT(gd->gd_spinlocks_wr == 0 || panicstr != NULL, 
+    KASSERT(gd->gd_spinlocks == 0 || panicstr != NULL,
            ("lwkt_switch: still holding %d exclusive spinlocks!",
-            gd->gd_spinlocks_wr));
+            gd->gd_spinlocks));
 
 
 #ifdef SMP
@@ -1028,7 +1028,7 @@ lwkt_preempt(thread_t ntd, int critcount)
      * We could try to acquire the tokens but this case is so rare there
      * is no need to support it.
      */
-    KKASSERT(gd->gd_spinlocks_wr == 0);
+    KKASSERT(gd->gd_spinlocks == 0);
 
     if (TD_TOKS_HELD(ntd)) {
        ++preempt_miss;
@@ -1102,7 +1102,7 @@ splz_check(void)
  * We only want to execute the splz() on the 1->0 transition of
  * critcount and not in a hard code section or if too deeply nested.
  *
- * NOTE: gd->gd_spinlocks_wr is implied to be 0 when td_critcount is 0.
+ * NOTE: gd->gd_spinlocks is implied to be 0 when td_critcount is 0.
  */
 void
 lwkt_maybe_splz(thread_t td)
index 043b264..ea63e60 100644 (file)
@@ -957,7 +957,7 @@ bsd4_schedulerclock(struct lwp *lp, sysclock_t period, sysclock_t cpstamp)
         * Spinlocks also hold a critical section so there should not be
         * any active.
         */
-       KKASSERT(gd->gd_spinlocks_wr == 0);
+       KKASSERT(gd->gd_spinlocks == 0);
 
        bsd4_resetpriority(lp);
 }
index 80182ac..bc676d2 100644 (file)
@@ -744,7 +744,7 @@ dfly_schedulerclock(struct lwp *lp, sysclock_t period, sysclock_t cpstamp)
         * Spinlocks also hold a critical section so there should not be
         * any active.
         */
-       KKASSERT(gd->gd_spinlocks_wr == 0);
+       KKASSERT(gd->gd_spinlocks == 0);
 
        if (lp == NULL)
                return;
@@ -1089,9 +1089,18 @@ dfly_resetpriority(struct lwp *lp)
        }
 
        /*
-        * Adjust effective load
+        * Adjust effective load.
+        *
+        * Calculate load then scale up or down geometrically based on p_nice.
+        * Processes niced up (positive) are less important, and processes
+        * niced downard (negative) are more important.  The higher the uload,
+        * the more important the thread.
         */
-       delta_uload = lp->lwp_estcpu / NQS;     /* 0-511, 0-100% cpu */
+       /* 0-511, 0-100% cpu */
+       delta_uload = lp->lwp_estcpu / NQS;
+       delta_uload -= delta_uload * lp->lwp_proc->p_nice / (PRIO_MAX + 1);
+
+
        delta_uload -= lp->lwp_uload;
        lp->lwp_uload += delta_uload;
        if (lp->lwp_mpflags & LWP_MP_ULOAD)
index 838bf56..ad4a63b 100644 (file)
@@ -459,7 +459,7 @@ DRIVERSLEEP(int usec)
 {
        globaldata_t gd = mycpu;
 
-       if (gd->gd_intr_nesting_level || gd->gd_spinlocks_wr) {
+       if (gd->gd_intr_nesting_level || gd->gd_spinlocks) {
                DODELAY(usec, 0);
        } else {
                DODELAY(usec, 1);
index ed55c44..0376179 100644 (file)
@@ -466,7 +466,7 @@ DRIVERSLEEP(int usec)
 {
        globaldata_t gd = mycpu;
 
-       if (gd->gd_intr_nesting_level || gd->gd_spinlocks_wr) {
+       if (gd->gd_intr_nesting_level || gd->gd_spinlocks) {
                DODELAY(usec, 0);
        } else {
                DODELAY(usec, 1);
index 9d2b895..b9aff89 100644 (file)
@@ -159,7 +159,7 @@ struct globaldata {
 
        struct tslpque  *gd_tsleep_hash;        /* tsleep/wakeup support */
        long            gd_processing_ipiq;
-       int             gd_spinlocks_wr;        /* Exclusive spinlocks held */
+       int             gd_spinlocks;           /* Exclusive spinlocks held */
        struct systimer *gd_systimer_inprog;    /* in-progress systimer */
        int             gd_timer_running;
        u_int           gd_idle_repeat;         /* repeated switches to idle */
index c0cbc54..ada23be 100644 (file)
@@ -168,7 +168,7 @@ mtx_spinlock(mtx_t mtx)
         */
        ++gd->gd_curthread->td_critcount;
        cpu_ccfence();
-       ++gd->gd_spinlocks_wr;
+       ++gd->gd_spinlocks;
 
        /*
         * If we cannot get it trivially get it the hard way.
@@ -192,7 +192,7 @@ mtx_spinlock_try(mtx_t mtx)
         */
        ++gd->gd_curthread->td_critcount;
        cpu_ccfence();
-       ++gd->gd_spinlocks_wr;
+       ++gd->gd_spinlocks;
 
        /*
         * If we cannot get it trivially call _mtx_spinlock_try().  This
@@ -325,7 +325,7 @@ mtx_spinunlock(mtx_t mtx)
 
        mtx_unlock(mtx);
 
-       --gd->gd_spinlocks_wr;
+       --gd->gd_spinlocks;
        cpu_ccfence();
        --gd->gd_curthread->td_critcount;
 }
index 703f34c..2da0267 100644 (file)
@@ -54,5 +54,8 @@ struct spinlock {
 
 #define SPINLOCK_INITIALIZER(head)     { 0, 0 }
 
+#define SPINLOCK_SHARED                        0x80000000
+#define SPINLOCK_EXCLWAIT              0x00100000 /* high bits counter */
+
 #endif
 
index f4ad337..e67bdd3 100644 (file)
@@ -57,6 +57,7 @@ extern struct spinlock pmap_spin;
 
 int spin_trylock_contested(struct spinlock *spin);
 void spin_lock_contested(struct spinlock *spin);
+void spin_lock_shared_contested(struct spinlock *spin);
 void _spin_pool_lock(void *chan);
 void _spin_pool_unlock(void *chan);
 
@@ -75,8 +76,8 @@ spin_trylock(struct spinlock *spin)
 
        ++gd->gd_curthread->td_critcount;
        cpu_ccfence();
-       ++gd->gd_spinlocks_wr;
-       if (atomic_swap_int(&spin->counta, 1))
+       ++gd->gd_spinlocks;
+       if (atomic_cmpset_int(&spin->counta, 0, 1) == 0)
                return (spin_trylock_contested(spin));
 #ifdef DEBUG_LOCKS
        int i;
@@ -85,7 +86,7 @@ spin_trylock(struct spinlock *spin)
                        gd->gd_curthread->td_spinlock_stack_id[i] = 1;
                        gd->gd_curthread->td_spinlock_stack[i] = spin;
                        gd->gd_curthread->td_spinlock_caller_pc[i] =
-                                               __builtin_return_address(0);
+                               __builtin_return_address(0);
                        break;
                }
        }
@@ -102,7 +103,7 @@ spin_trylock(struct spinlock *spin)
 
        ++gd->gd_curthread->td_critcount;
        cpu_ccfence();
-       ++gd->gd_spinlocks_wr;
+       ++gd->gd_spinlocks;
        return (TRUE);
 }
 
@@ -125,9 +126,10 @@ spin_lock_quick(globaldata_t gd, struct spinlock *spin)
 {
        ++gd->gd_curthread->td_critcount;
        cpu_ccfence();
-       ++gd->gd_spinlocks_wr;
+       ++gd->gd_spinlocks;
 #ifdef SMP
-       if (atomic_swap_int(&spin->counta, 1))
+       atomic_add_int(&spin->counta, 1);
+       if (spin->counta != 1)
                spin_lock_contested(spin);
 #ifdef DEBUG_LOCKS
        int i;
@@ -179,20 +181,15 @@ spin_unlock_quick(globaldata_t gd, struct spinlock *spin)
        KKASSERT(spin->counta != 0);
 #endif
        cpu_sfence();
-       spin->counta = 0;
+       atomic_add_int(&spin->counta, -1);
        cpu_sfence();
 #endif
 #ifdef DEBUG_LOCKS
-       KKASSERT(gd->gd_spinlocks_wr > 0);
+       KKASSERT(gd->gd_spinlocks > 0);
 #endif
-       --gd->gd_spinlocks_wr;
+       --gd->gd_spinlocks;
        cpu_ccfence();
        --gd->gd_curthread->td_critcount;
-#if 0
-       /* FUTURE */
-       if (__predict_false(gd->gd_reqflags & RQF_IDLECHECK_MASK))
-               lwkt_maybe_splz(gd->gd_curthread);
-#endif
 }
 
 static __inline void
@@ -201,6 +198,89 @@ spin_unlock(struct spinlock *spin)
        spin_unlock_quick(mycpu, spin);
 }
 
+/*
+ * Shared spinlocks
+ */
+static __inline void
+spin_lock_shared_quick(globaldata_t gd, struct spinlock *spin)
+{
+       ++gd->gd_curthread->td_critcount;
+       cpu_ccfence();
+       ++gd->gd_spinlocks;
+#ifdef SMP
+       atomic_add_int(&spin->counta, 1);
+       if (spin->counta == 1)
+               atomic_set_int(&spin->counta, SPINLOCK_SHARED);
+       if ((spin->counta & SPINLOCK_SHARED) == 0)
+               spin_lock_shared_contested(spin);
+#ifdef DEBUG_LOCKS
+       int i;
+       for (i = 0; i < SPINLOCK_DEBUG_ARRAY_SIZE; i++) {
+               if (gd->gd_curthread->td_spinlock_stack_id[i] == 0) {
+                       gd->gd_curthread->td_spinlock_stack_id[i] = 1;
+                       gd->gd_curthread->td_spinlock_stack[i] = spin;
+                       gd->gd_curthread->td_spinlock_caller_pc[i] =
+                               __builtin_return_address(0);
+                       break;
+               }
+       }
+#endif
+#endif
+}
+
+static __inline void
+spin_unlock_shared_quick(globaldata_t gd, struct spinlock *spin)
+{
+#ifdef SMP
+#ifdef DEBUG_LOCKS
+       int i;
+       for (i = 0; i < SPINLOCK_DEBUG_ARRAY_SIZE; i++) {
+               if ((gd->gd_curthread->td_spinlock_stack_id[i] == 1) &&
+                   (gd->gd_curthread->td_spinlock_stack[i] == spin)) {
+                       gd->gd_curthread->td_spinlock_stack_id[i] = 0;
+                       gd->gd_curthread->td_spinlock_stack[i] = NULL;
+                       gd->gd_curthread->td_spinlock_caller_pc[i] = NULL;
+                       break;
+               }
+       }
+#endif
+#ifdef DEBUG_LOCKS
+       KKASSERT(spin->counta != 0);
+#endif
+       cpu_sfence();
+       atomic_add_int(&spin->counta, -1);
+
+       /*
+        * Make sure SPINLOCK_SHARED is cleared.  If another cpu tries to
+        * get a shared or exclusive lock this loop will break out.  We're
+        * only talking about a very trivial edge case here.
+        */
+       while (spin->counta == SPINLOCK_SHARED) {
+               if (atomic_cmpset_int(&spin->counta, SPINLOCK_SHARED, 0))
+                       break;
+       }
+       cpu_sfence();
+#endif
+#ifdef DEBUG_LOCKS
+       KKASSERT(gd->gd_spinlocks > 0);
+#endif
+       --gd->gd_spinlocks;
+       cpu_ccfence();
+       --gd->gd_curthread->td_critcount;
+}
+
+static __inline void
+spin_lock_shared(struct spinlock *spin)
+{
+       spin_lock_shared_quick(mycpu, spin);
+}
+
+static __inline void
+spin_unlock_shared(struct spinlock *spin)
+{
+       spin_unlock_shared_quick(mycpu, spin);
+}
+
 static __inline void
 spin_pool_lock(void *chan)
 {