kernel - Fix excessive ipiq recursion (2)
authorMatthew Dillon <dillon@apollo.backplane.com>
Wed, 20 Jul 2016 23:50:01 +0000 (16:50 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Wed, 20 Jul 2016 23:50:01 +0000 (16:50 -0700)
* Second try at this fix.  Use different hysteresis levels when recursively
  processing incoming IPIs during a send, and in such cases only process
  incoming IPIs on queues which are trying to drain.

sys/kern/lwkt_ipiq.c
sys/sys/thread.h

index 59bde20..169bc11 100644 (file)
@@ -118,29 +118,30 @@ KTR_INFO(KTR_IPIQ, ipiq, sync_quick, 9, "cpumask=%08lx", unsigned long mask);
 #define logipiq2(name, arg)    \
        KTR_LOG(ipiq_ ## name, arg)
 
+static void lwkt_process_ipiq_nested(void);
 static int lwkt_process_ipiq_core(globaldata_t sgd, lwkt_ipiq_t ip, 
-                                 struct intrframe *frame);
+                                 struct intrframe *frame, int limit);
 static void lwkt_cpusync_remote1(lwkt_cpusync_t cs);
 static void lwkt_cpusync_remote2(lwkt_cpusync_t cs);
 
-#define IPIQ_SYSCTL(name) \
-static int \
-sysctl_##name(SYSCTL_HANDLER_ARGS) \
-{ \
-    int64_t val = 0; \
-    int cpu, error; \
- \
-    for (cpu = 0; cpu < ncpus; ++cpu) \
-       val += ipiq_stats_percpu[cpu].name; \
- \
-    error = sysctl_handle_quad(oidp, &val, 0, req); \
-    if (error || req->newptr == NULL) \
-       return error; \
- \
-    for (cpu = 0; cpu < ncpus; ++cpu) \
-       ipiq_stats_percpu[cpu].name = val; \
- \
-    return 0; \
+#define IPIQ_SYSCTL(name)                              \
+static int                                             \
+sysctl_##name(SYSCTL_HANDLER_ARGS)                     \
+{                                                      \
+    int64_t val = 0;                                   \
+    int cpu, error;                                    \
                                                      \
+    for (cpu = 0; cpu < ncpus; ++cpu)                  \
+       val += ipiq_stats_percpu[cpu].name;             \
                                                      \
+    error = sysctl_handle_quad(oidp, &val, 0, req);    \
+    if (error || req->newptr == NULL)                  \
+       return error;                                   \
                                                      \
+    for (cpu = 0; cpu < ncpus; ++cpu)                  \
+       ipiq_stats_percpu[cpu].name = val;              \
                                                      \
+    return 0;                                          \
 }
 
 IPIQ_SYSCTL(ipiq_count);
@@ -212,18 +213,25 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
      * Do not allow the FIFO to become full.  Interrupts must be physically
      * enabled while we liveloop to avoid deadlocking the APIC.
      *
-     * If we are nested we want to queue the IPI without processing incoming
-     * IPIs, if possible, to avoid excessive stack recursion.  As long as
-     * the IPI callback does not itself try to send more than a few IPIs to
-     * any single target, it should not be possible to excessively nest because
-     * the unested send code always leaves at least 1/2 the fifo available.
+     * When we are not nested inside a processing loop we allow the FIFO
+     * to get 1/2 full.  Once it exceeds 1/2 full we must wait for it to
+     * drain, executing any incoming IPIs while we wait.
+     *
+     * When we are nested we allow the FIFO to get almost completely full.
+     * This allows us to queue IPIs sent from IPI callbacks.  The processing
+     * code will only process incoming FIFOs that are trying to drain while
+     * we wait, and only to the only-slightly-less-full point, to avoid a
+     * deadlock.
+     *
+     * We are guaranteed
      */
-    if (gd->gd_processing_ipiq) {
-       level1 = MAXCPUFIFO - 2;
-       level2 = MAXCPUFIFO - 4;
-    } else {
+
+    if (gd->gd_processing_ipiq == 0) {
        level1 = MAXCPUFIFO / 2;
        level2 = MAXCPUFIFO / 4;
+    } else {
+       level1 = MAXCPUFIFO - 3;
+       level2 = MAXCPUFIFO - 5;
     }
 
     if (ip->ip_windex - ip->ip_rindex > level1) {
@@ -236,19 +244,15 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
        uint64_t tsc_base = rdtsc();
 #endif
        int repeating = 0;
+       int olimit;
 
        cpu_enable_intr();
        ++ipiq_stat(gd).ipiq_fifofull;
        DEBUG_PUSH_INFO("send_ipiq3");
+       olimit = atomic_swap_int(&ip->ip_drain, level2);
        while (ip->ip_windex - ip->ip_rindex > level2) {
-#if 0
-           if (atomic_swap_int(&target->gd_npoll, 1) == 0) {
-               logipiq(cpu_send, func, arg1, arg2, gd, target);
-               cpu_send_ipiq(target->gd_cpuid);
-           }
-#endif
            KKASSERT(ip->ip_windex - ip->ip_rindex != MAXCPUFIFO - 1);
-           lwkt_process_ipiq();
+           lwkt_process_ipiq_nested();
            cpu_pause();
 
            /*
@@ -278,6 +282,7 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
            }
 #endif
        }
+       atomic_swap_int(&ip->ip_drain, olimit);
        DEBUG_POP_INFO();
 #if defined(__x86_64__)
        write_rflags(rflags);
@@ -315,8 +320,8 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
 
 /*
  * Similar to lwkt_send_ipiq() but this function does not actually initiate
- * the IPI to the target cpu unless the FIFO has become too full, so it is
- * very fast.
+ * the IPI to the target cpu unless the FIFO is greater than 1/4 full.
+ * This function is usually very fast.
  *
  * This function is used for non-critical IPI messages, such as memory
  * deallocations.  The queue will typically be flushed by the target cpu at
@@ -330,96 +335,32 @@ lwkt_send_ipiq3_passive(globaldata_t target, ipifunc3_t func,
 {
     lwkt_ipiq_t ip;
     int windex;
-    int level1;
-    int level2;
     struct globaldata *gd = mycpu;
 
     KKASSERT(target != gd);
-    crit_enter();
+    crit_enter_gd(gd);
     ++gd->gd_intr_nesting_level;
-    logipiq(send_pasv, func, arg1, arg2, gd, target);
-#ifdef INVARIANTS
-    if (gd->gd_intr_nesting_level > 20)
-       panic("lwkt_send_ipiq: TOO HEAVILY NESTED!");
-#endif
-    KKASSERT(curthread->td_critcount);
-    ++ipiq_stat(gd).ipiq_count;
-    ++ipiq_stat(gd).ipiq_passive;
     ip = &gd->gd_ipiq[target->gd_cpuid];
 
     /*
-     * Do not allow the FIFO to become full.  Interrupts must be physically
-     * enabled while we liveloop to avoid deadlocking the APIC.
+     * If the FIFO is too full send the IPI actively.
      *
-     * If we are nested we want to queue the IPI without processing incoming
-     * IPIs, if possible, to avoid excessive stack recursion.
+     * WARNING! This level must be low enough not to trigger a wait loop
+     *         in the active sending code since we are not signalling the
+     *         target cpu.
      */
-    if (gd->gd_processing_ipiq) {
-       level1 = MAXCPUFIFO - 2;
-       level2 = MAXCPUFIFO - 4;
-    } else {
-       level1 = MAXCPUFIFO / 2;
-       level2 = MAXCPUFIFO / 4;
+    if (ip->ip_windex - ip->ip_rindex >= MAXCPUFIFO / 4) {
+       --gd->gd_intr_nesting_level;
+       crit_exit_gd(gd);
+       return lwkt_send_ipiq3(target, func, arg1, arg2);
     }
-    if (ip->ip_windex - ip->ip_rindex > level1) {
-#if defined(__x86_64__)
-       unsigned long rflags = read_rflags();
-#else
-#error "no read_*flags"
-#endif
-#ifndef _KERNEL_VIRTUAL
-       uint64_t tsc_base = rdtsc();
-#endif
-       int repeating = 0;
-
-       cpu_enable_intr();
-       ++ipiq_stat(gd).ipiq_fifofull;
-       DEBUG_PUSH_INFO("send_ipiq3_passive");
-       while (ip->ip_windex - ip->ip_rindex > level2) {
-#if 0
-           if (atomic_swap_int(&target->gd_npoll, 1) == 0) {
-               logipiq(cpu_send, func, arg1, arg2, gd, target);
-               cpu_send_ipiq(target->gd_cpuid);
-           }
-#endif
-           KKASSERT(ip->ip_windex - ip->ip_rindex != MAXCPUFIFO - 1);
-           lwkt_process_ipiq();
-           cpu_pause();
 
-           /*
-            * Check for target not draining issue.  This should be fixed but
-            * leave the code in-place anyway as it can recover an otherwise
-            * dead system.
-            */
-#ifdef _KERNEL_VIRTUAL
-           if (repeating++ > 10)
-                   pthread_yield();
-#else
-           if (rdtsc() - tsc_base > tsc_frequency) {
-               ++repeating;
-               if (repeating > 10) {
-                       smp_sniff();
-                       ATOMIC_CPUMASK_ORBIT(target->gd_ipimask, gd->gd_cpuid);
-                       cpu_send_ipiq(target->gd_cpuid);
-                       kprintf("send_ipiq %d->%d tgt not draining (%d) sniff=%p,%p\n",
-                               gd->gd_cpuid, target->gd_cpuid, repeating,
-                               target->gd_sample_pc, target->gd_sample_sp);
-               } else {
-                       smp_sniff();
-                       kprintf("send_ipiq %d->%d tgt not draining (%d)\n",
-                               gd->gd_cpuid, target->gd_cpuid, repeating);
-               }
-               tsc_base = rdtsc();
-           }
-#endif
-       }
-       DEBUG_POP_INFO();
-#if defined(__x86_64__)
-       write_rflags(rflags);
-#else
-#error "no write_*flags"
-#endif
-    }
+    /*
+     * Else we can do it passively.
+     */
+    logipiq(send_pasv, func, arg1, arg2, gd, target);
+    ++ipiq_stat(gd).ipiq_count;
+    ++ipiq_stat(gd).ipiq_passive;
 
     /*
      * Queue the new message
@@ -431,23 +372,12 @@ lwkt_send_ipiq3_passive(globaldata_t target, ipifunc3_t func,
     cpu_sfence();
     ++ip->ip_windex;
     ATOMIC_CPUMASK_ORBIT(target->gd_ipimask, gd->gd_cpuid);
+    --gd->gd_intr_nesting_level;
 
     /*
-     * We normally do not signal the target cpu, it will pick up the IPI when it
-     * next polls (typically on the next tick).  However, we do not want to allow
-     * the FIFO to get too full without signaling.  Make sure the target cpu is
-     * signalled once the FIFO is greater than 1/4 full.  This also ensures that
-     * the target cpu will be signalled in order to allow the drain wait to function
-     * without also signalling.
+     * Do not signal the target cpu, it will pick up the IPI when it next
+     * polls (typically on the next tick).
      */
-    if ((ip->ip_windex - ip->ip_rindex) > MAXCPUFIFO / 4 &&
-       atomic_swap_int(&target->gd_npoll, 1) == 0) {
-       logipiq(cpu_send, func, arg1, arg2, gd, target);
-       cpu_send_ipiq(target->gd_cpuid);
-    } else {
-       ++ipiq_stat(gd).ipiq_avoided;
-    }
-    --gd->gd_intr_nesting_level;
     crit_exit();
     logipiq(send_end, func, arg1, arg2, gd, target);
 
@@ -650,21 +580,23 @@ lwkt_process_ipiq(void)
      * We must process the entire cpumask if we are reentrant because it might
      * have been partially cleared.
      */
-    if (++gd->gd_processing_ipiq > 1)
-       ATOMIC_CPUMASK_COPY(gd->gd_ipimask, smp_active_mask);
+    ++gd->gd_processing_ipiq;
 again:
     atomic_swap_int(&gd->gd_npoll, 0);
     mask = gd->gd_ipimask;
     cpu_ccfence();
-    ATOMIC_CPUMASK_NANDMASK(gd->gd_ipimask, mask);
     while (CPUMASK_TESTNZERO(mask)) {
        n = BSFCPUMASK(mask);
        if (n != gd->gd_cpuid) {
            sgd = globaldata_find(n);
            ip = sgd->gd_ipiq;
            if (ip != NULL) {
-               while (lwkt_process_ipiq_core(sgd, &ip[gd->gd_cpuid], NULL))
+               ip += gd->gd_cpuid;
+               while (lwkt_process_ipiq_core(sgd, ip, NULL, 0))
                    ;
+               ATOMIC_CPUMASK_NANDBIT(gd->gd_ipimask, n);
+               if (ip->ip_rindex != ip->ip_windex)
+                       ATOMIC_CPUMASK_ORBIT(gd->gd_ipimask, n);
            }
        }
        CPUMASK_NANDBIT(mask, n);
@@ -675,7 +607,7 @@ again:
      * active cpusync we only run the list once and do not re-flag
      * as the thread itself is processing its interlock.
      */
-    if (lwkt_process_ipiq_core(gd, &gd->gd_cpusyncq, NULL)) {
+    if (lwkt_process_ipiq_core(gd, &gd->gd_cpusyncq, NULL, 0)) {
        if (gd->gd_curthread->td_cscount == 0)
            goto again;
        /* need_ipiq(); do not reflag */
@@ -703,27 +635,29 @@ lwkt_process_ipiq_frame(struct intrframe *frame)
      * We must process the entire cpumask if we are reentrant because it might
      * have been partially cleared.
      */
-    if (++gd->gd_processing_ipiq > 1)
-       ATOMIC_CPUMASK_COPY(gd->gd_ipimask, smp_active_mask);
+    ++gd->gd_processing_ipiq;
 again:
     atomic_swap_int(&gd->gd_npoll, 0);
     mask = gd->gd_ipimask;
     cpu_ccfence();
-    ATOMIC_CPUMASK_NANDMASK(gd->gd_ipimask, mask);
     while (CPUMASK_TESTNZERO(mask)) {
        n = BSFCPUMASK(mask);
        if (n != gd->gd_cpuid) {
            sgd = globaldata_find(n);
            ip = sgd->gd_ipiq;
            if (ip != NULL) {
-               while (lwkt_process_ipiq_core(sgd, &ip[gd->gd_cpuid], frame))
+               ip += gd->gd_cpuid;
+               while (lwkt_process_ipiq_core(sgd, ip, frame, 0))
                    ;
+               ATOMIC_CPUMASK_NANDBIT(gd->gd_ipimask, n);
+               if (ip->ip_rindex != ip->ip_windex)
+                       ATOMIC_CPUMASK_ORBIT(gd->gd_ipimask, n);
            }
        }
        CPUMASK_NANDBIT(mask, n);
     }
     if (gd->gd_cpusyncq.ip_rindex != gd->gd_cpusyncq.ip_windex) {
-       if (lwkt_process_ipiq_core(gd, &gd->gd_cpusyncq, frame)) {
+       if (lwkt_process_ipiq_core(gd, &gd->gd_cpusyncq, frame, 0)) {
            if (gd->gd_curthread->td_cscount == 0)
                goto again;
            /* need_ipiq(); do not reflag */
@@ -739,17 +673,70 @@ again:
     --gd->gd_processing_ipiq;
 }
 
-#if 0
-static int iqticks[SMP_MAXCPU];
-static int iqcount[SMP_MAXCPU];
-#endif
-#if 0
-static int iqterm[SMP_MAXCPU];
-#endif
+/*
+ * Only process incoming IPIQs from draining senders and only process them
+ * to the point where the draining sender is able to continue.  This is
+ * necessary to avoid deadlocking the IPI subsystem because we are acting on
+ * incoming messages and the callback may queue additional messages.
+ *
+ * We only want to have to act on senders that are blocked to limit the
+ * number of additional messages sent.  At the same time, recipients are
+ * trying to drain our own queue.  Theoretically this create a pipeline that
+ * cannot deadlock.
+ */
+static void
+lwkt_process_ipiq_nested(void)
+{
+    globaldata_t gd = mycpu;
+    globaldata_t sgd;
+    lwkt_ipiq_t ip;
+    cpumask_t mask;
+    int n;
+    int limit;
+
+    ++gd->gd_processing_ipiq;
+again:
+    mask = gd->gd_ipimask;
+    cpu_ccfence();
+    while (CPUMASK_TESTNZERO(mask)) {
+       n = BSFCPUMASK(mask);
+       if (n != gd->gd_cpuid) {
+           sgd = globaldata_find(n);
+           ip = sgd->gd_ipiq;
 
+           /*
+            * NOTE: We do not mess with the cpumask at all, instead we allow
+            *       the top-level ipiq processor deal with it.
+            */
+           if (ip != NULL) {
+               ip += gd->gd_cpuid;
+               if ((limit = ip->ip_drain) != 0) {
+                   lwkt_process_ipiq_core(sgd, ip, NULL, limit);
+               }
+           }
+       }
+       CPUMASK_NANDBIT(mask, n);
+    }
+
+    /*
+     * Process pending cpusyncs.  If the current thread has a cpusync
+     * active cpusync we only run the list once and do not re-flag
+     * as the thread itself is processing its interlock.
+     */
+    if (lwkt_process_ipiq_core(gd, &gd->gd_cpusyncq, NULL, 0)) {
+       if (gd->gd_curthread->td_cscount == 0)
+           goto again;
+       /* need_ipiq(); do not reflag */
+    }
+    --gd->gd_processing_ipiq;
+}
+
+/*
+ *
+ */
 static int
 lwkt_process_ipiq_core(globaldata_t sgd, lwkt_ipiq_t ip, 
-                      struct intrframe *frame)
+                      struct intrframe *frame, int limit)
 {
     globaldata_t mygd = mycpu;
     int ri;
@@ -758,30 +745,6 @@ lwkt_process_ipiq_core(globaldata_t sgd, lwkt_ipiq_t ip,
     void *copy_arg1;
     int copy_arg2;
 
-#if 0
-    if (iqticks[mygd->gd_cpuid] != ticks) {
-           iqticks[mygd->gd_cpuid] = ticks;
-           iqcount[mygd->gd_cpuid] = 0;
-    }
-    if (++iqcount[mygd->gd_cpuid] > 3000000) {
-       kprintf("cpu %d ipiq maxed cscount %d spin %d\n",
-               mygd->gd_cpuid,
-               mygd->gd_curthread->td_cscount,
-               mygd->gd_spinlocks);
-       iqcount[mygd->gd_cpuid] = 0;
-#if 0
-       if (++iqterm[mygd->gd_cpuid] > 10)
-               panic("cpu %d ipiq maxed", mygd->gd_cpuid);
-#endif
-       int i;
-       for (i = 0; i < ncpus; ++i) {
-               if (globaldata_find(i)->gd_infomsg)
-                       kprintf(" %s", globaldata_find(i)->gd_infomsg);
-       }
-       kprintf("\n");
-    }
-#endif
-
     /*
      * Clear the originating core from our ipimask, we will process all
      * incoming messages.
@@ -812,7 +775,7 @@ lwkt_process_ipiq_core(globaldata_t sgd, lwkt_ipiq_t ip,
      * NOTE: Single pass only.  Returns non-zero if the queue is not empty
      *      on return.
      */
-    while (wi - (ri = ip->ip_rindex) > 0) {
+    while (wi - (ri = ip->ip_rindex) > limit) {
        ri &= MAXCPUFIFO_MASK;
        cpu_lfence();
        copy_func = ip->ip_info[ri].func;
@@ -857,9 +820,9 @@ lwkt_process_ipiq_core(globaldata_t sgd, lwkt_ipiq_t ip,
     --mygd->gd_intr_nesting_level;
 
     /*
-     * Return non-zero if there is still more in the queue.
+     * Return non-zero if there is still more in the queue.  Don't worry
+     * about fencing, we will get another interrupt if necessary.
      */
-    cpu_lfence();
     return (ip->ip_rindex != ip->ip_windex);
 }
 
@@ -1093,16 +1056,6 @@ lwkt_cpusync_remote2(lwkt_cpusync_t cs)
         * better-detect spin loops.
         */
        ip = &gd->gd_cpusyncq;
-#if 0
-       if (ip->ip_rindex == ip->ip_windex) {
-               __asm __volatile("cli");
-               if (ip->ip_rindex == ip->ip_windex) {
-                       __asm __volatile("sti; hlt");
-               } else {
-                       __asm __volatile("sti");
-               }
-       }
-#endif
 
        wi = ip->ip_windex & MAXCPUFIFO_MASK;
        ip->ip_info[wi].func = (ipifunc3_t)(ipifunc1_t)lwkt_cpusync_remote2;
index 2258753..3b47a87 100644 (file)
@@ -195,6 +195,7 @@ struct lwkt_ipiq {
     int                ip_rindex;      /* only written by target cpu */
     int                ip_xindex;      /* written by target, indicates completion */
     int                ip_windex;      /* only written by source cpu */
+    int                ip_drain;       /* drain source limit */
     struct {
        ipifunc3_t      func;
        void            *arg1;