kernel - Fix excessive ipiq recursion
authorMatthew Dillon <dillon@apollo.backplane.com>
Wed, 20 Jul 2016 06:56:15 +0000 (23:56 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Wed, 20 Jul 2016 06:56:15 +0000 (23:56 -0700)
* Fix a situation where excessive IPIQ recursion can occur.  The problem
  was revealed by the previous commit when the passive signalling mechanism
  was changed.

* Passive IPI sends now signal at 1/4 full.

* Active IPI sends wait for the FIFO to be < 1/2 full only when the nesting
  level is 0, otherwise they allow it to become almost completely full.
  This effectively gives IPI callbacks a buffer of roughly 1/2 the FIFO in
  which they can issue IPI sends without triggering the wait-process loop
  (which is the cause of the nesting).

  IPI callbacks do not usually send more than one or two IPI sends to any
  given cpu target which should theoretically guarantee that excessive
  stacking will not occur.

Reported-by: marino
sys/kern/lwkt_ipiq.c

index 544292f..59bde20 100644 (file)
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2003,2004 The DragonFly Project.  All rights reserved.
+ * Copyright (c) 2003-2016 The DragonFly Project.  All rights reserved.
  * 
  * This code is derived from software contributed to The DragonFly Project
  * by Matthew Dillon <dillon@backplane.com>
@@ -187,6 +187,8 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
 {
     lwkt_ipiq_t ip;
     int windex;
+    int level1;
+    int level2;
     struct globaldata *gd = mycpu;
 
     logipiq(send_norm, func, arg1, arg2, gd, target);
@@ -210,11 +212,21 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
      * Do not allow the FIFO to become full.  Interrupts must be physically
      * enabled while we liveloop to avoid deadlocking the APIC.
      *
-     * The target ipiq may have gotten filled up due to passive IPIs and thus
-     * not be aware that its queue is too full, so be sure to issue an
-     * ipiq interrupt to the target cpu.
+     * If we are nested we want to queue the IPI without processing incoming
+     * IPIs, if possible, to avoid excessive stack recursion.  As long as
+     * the IPI callback does not itself try to send more than a few IPIs to
+     * any single target, it should not be possible to excessively nest because
+     * the unested send code always leaves at least 1/2 the fifo available.
      */
-    if (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 2) {
+    if (gd->gd_processing_ipiq) {
+       level1 = MAXCPUFIFO - 2;
+       level2 = MAXCPUFIFO - 4;
+    } else {
+       level1 = MAXCPUFIFO / 2;
+       level2 = MAXCPUFIFO / 4;
+    }
+
+    if (ip->ip_windex - ip->ip_rindex > level1) {
 #if defined(__x86_64__)
        unsigned long rflags = read_rflags();
 #else
@@ -228,7 +240,7 @@ lwkt_send_ipiq3(globaldata_t target, ipifunc3_t func, void *arg1, int arg2)
        cpu_enable_intr();
        ++ipiq_stat(gd).ipiq_fifofull;
        DEBUG_PUSH_INFO("send_ipiq3");
-       while (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 4) {
+       while (ip->ip_windex - ip->ip_rindex > level2) {
 #if 0
            if (atomic_swap_int(&target->gd_npoll, 1) == 0) {
                logipiq(cpu_send, func, arg1, arg2, gd, target);
@@ -318,6 +330,8 @@ lwkt_send_ipiq3_passive(globaldata_t target, ipifunc3_t func,
 {
     lwkt_ipiq_t ip;
     int windex;
+    int level1;
+    int level2;
     struct globaldata *gd = mycpu;
 
     KKASSERT(target != gd);
@@ -336,8 +350,18 @@ lwkt_send_ipiq3_passive(globaldata_t target, ipifunc3_t func,
     /*
      * Do not allow the FIFO to become full.  Interrupts must be physically
      * enabled while we liveloop to avoid deadlocking the APIC.
+     *
+     * If we are nested we want to queue the IPI without processing incoming
+     * IPIs, if possible, to avoid excessive stack recursion.
      */
-    if (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 2) {
+    if (gd->gd_processing_ipiq) {
+       level1 = MAXCPUFIFO - 2;
+       level2 = MAXCPUFIFO - 4;
+    } else {
+       level1 = MAXCPUFIFO / 2;
+       level2 = MAXCPUFIFO / 4;
+    }
+    if (ip->ip_windex - ip->ip_rindex > level1) {
 #if defined(__x86_64__)
        unsigned long rflags = read_rflags();
 #else
@@ -351,7 +375,7 @@ lwkt_send_ipiq3_passive(globaldata_t target, ipifunc3_t func,
        cpu_enable_intr();
        ++ipiq_stat(gd).ipiq_fifofull;
        DEBUG_PUSH_INFO("send_ipiq3_passive");
-       while (ip->ip_windex - ip->ip_rindex > MAXCPUFIFO / 4) {
+       while (ip->ip_windex - ip->ip_rindex > level2) {
 #if 0
            if (atomic_swap_int(&target->gd_npoll, 1) == 0) {
                logipiq(cpu_send, func, arg1, arg2, gd, target);
@@ -407,12 +431,23 @@ lwkt_send_ipiq3_passive(globaldata_t target, ipifunc3_t func,
     cpu_sfence();
     ++ip->ip_windex;
     ATOMIC_CPUMASK_ORBIT(target->gd_ipimask, gd->gd_cpuid);
-    --gd->gd_intr_nesting_level;
 
     /*
-     * Do not signal the target cpu, it will pick up the IPI when it next
-     * polls (typically on the next tick).
+     * We normally do not signal the target cpu, it will pick up the IPI when it
+     * next polls (typically on the next tick).  However, we do not want to allow
+     * the FIFO to get too full without signaling.  Make sure the target cpu is
+     * signalled once the FIFO is greater than 1/4 full.  This also ensures that
+     * the target cpu will be signalled in order to allow the drain wait to function
+     * without also signalling.
      */
+    if ((ip->ip_windex - ip->ip_rindex) > MAXCPUFIFO / 4 &&
+       atomic_swap_int(&target->gd_npoll, 1) == 0) {
+       logipiq(cpu_send, func, arg1, arg2, gd, target);
+       cpu_send_ipiq(target->gd_cpuid);
+    } else {
+       ++ipiq_stat(gd).ipiq_avoided;
+    }
+    --gd->gd_intr_nesting_level;
     crit_exit();
     logipiq(send_end, func, arg1, arg2, gd, target);