kernel - Add callback API for mpipe
authorMatthew Dillon <dillon@apollo.backplane.com>
Sun, 30 Jan 2011 02:57:09 +0000 (18:57 -0800)
committerMatthew Dillon <dillon@apollo.backplane.com>
Sun, 30 Jan 2011 02:57:09 +0000 (18:57 -0800)
* Add a callback API for mpipe which uses a dedicated kthread,
  allowing clients to avoid deadlocks related to held locks during
  strategy calls.

* Add mpipe_alloc_callback().  Use of this function also requires
  that MPF_CALLBACK be supplied to mpipe_init().

* Add mpipe_wait().  This function may be used for clients which
  which to roll their own mpipe retry loop (or already have their
  own thread(s) to deal with it in a safe manner).

sys/kern/kern_mpipe.c
sys/sys/mpipe.h

index 26ca50e..60931e4 100644 (file)
 #include <sys/thread.h>
 #include <sys/globaldata.h>
 #include <sys/mpipe.h>
+#include <sys/kthread.h>
+
 #include <sys/thread2.h>
 
+struct mpipe_callback {
+       STAILQ_ENTRY(mpipe_callback) entry;
+       void (*func)(void *arg1, void *arg2);
+       void *arg1;
+       void *arg2;
+};
+
 static MALLOC_DEFINE(M_MPIPEARY, "MPipe Array", "Auxillary MPIPE structure");
 
+static void mpipe_thread(void *arg);
+
 /*
  * Initialize a malloc pipeline for the specified malloc type and allocation
  * size.  Create an array to cache up to nom_count buffers and preallocate
@@ -93,8 +104,17 @@ mpipe_init(malloc_pipe_t mpipe, malloc_type_t type, int bytes,
        ++mpipe->free_count;
        ++mpipe->total_count;
     }
+    STAILQ_INIT(&mpipe->queue);
 
     lwkt_token_init(&mpipe->token, "mpipe token");
+
+    /*
+     * Create a support thread for the mpipe queue
+     */
+    if (mpflags & MPF_CALLBACK) {
+           kthread_create(mpipe_thread, mpipe, &mpipe->thread,
+                          "mpipe_%s", type->ks_shortdesc);
+    }
 }
 
 /*
@@ -109,6 +129,20 @@ mpipe_done(malloc_pipe_t mpipe)
     int n;
 
     KKASSERT(mpipe->free_count == mpipe->total_count); /* no outstanding mem */
+
+    /*
+     * Clean up the kthread
+     */
+    lwkt_gettoken(&mpipe->token);
+    mpipe->mpflags |= MPF_EXITING;
+    while (mpipe->thread) {
+       wakeup(&mpipe->queue);
+       tsleep(mpipe, 0, "mpipex", 1);
+    }
+
+    /*
+     * Clean up the mpipe buffers
+     */
     for (n = mpipe->free_count - 1; n >= 0; --n) {
        buf = mpipe->array[n];
        mpipe->array[n] = NULL;
@@ -123,23 +157,54 @@ mpipe_done(malloc_pipe_t mpipe)
        kfree(mpipe->array, M_MPIPEARY);
        mpipe->array = NULL;
     }
-
+    lwkt_reltoken(&mpipe->token);
     lwkt_token_uninit(&mpipe->token);
 }
 
 /*
- * Allocate an entry, nominally non-blocking.  The allocation is guarenteed
+ * mpipe support thread for request failures when mpipe_alloc_callback()
+ * is called.
+ */
+static void
+mpipe_thread(void *arg)
+{
+    malloc_pipe_t mpipe = arg;
+    struct mpipe_callback *mcb;
+
+    lwkt_gettoken(&mpipe->token);
+    while ((mpipe->mpflags & MPF_EXITING) == 0) {
+       while (mpipe->free_count &&
+              (mcb = STAILQ_FIRST(&mpipe->queue)) != NULL) {
+               STAILQ_REMOVE(&mpipe->queue, mcb, mpipe_callback, entry);
+               mcb->func(mcb->arg1, mcb->arg2);
+               kfree(mcb, M_MPIPEARY);
+       }
+       mpipe->mpflags |= MPF_QUEUEWAIT;
+       tsleep(&mpipe->queue, 0, "wait", 0);
+    }
+    mpipe->thread = NULL;
+    wakeup(mpipe);
+    lwkt_reltoken(&mpipe->token);
+}
+
+
+/*
+ * Allocate an entry (inline suppot routine).  The allocation is guarenteed
  * to return non-NULL up to the nominal count after which it may return NULL.
  * Note that the implementation is defined to be allowed to block for short
- * periods of time.  Use mpipe_alloc_waitok() to guarentee the allocation.
+ * periods of time.
+ *
+ * Use mpipe_alloc_callback() for non-blocking operation with a callback
+ * Use mpipe_alloc_nowait() for non-blocking operation without a callback
+ * Use mpipe_alloc_waitok() for blocking operation & guarenteed non-NULL
  */
+static __inline
 void *
-mpipe_alloc_nowait(malloc_pipe_t mpipe)
+_mpipe_alloc_locked(malloc_pipe_t mpipe, int mfailed)
 {
     void *buf;
     int n;
 
-    lwkt_gettoken(&mpipe->token);
     if ((n = mpipe->free_count) != 0) {
        /*
         * Use a free entry if it exists.
@@ -148,7 +213,7 @@ mpipe_alloc_nowait(malloc_pipe_t mpipe)
        buf = mpipe->array[n];
        mpipe->array[n] = NULL; /* sanity check, not absolutely needed */
        mpipe->free_count = n;
-    } else if (mpipe->total_count >= mpipe->max_count) {
+    } else if (mpipe->total_count >= mpipe->max_count || mfailed) {
        /*
         * Return NULL if we have hit our limit
         */
@@ -164,10 +229,79 @@ mpipe_alloc_nowait(malloc_pipe_t mpipe)
                mpipe->construct(buf, mpipe->priv);
        }
     }
+    return(buf);
+}
+
+/*
+ * Nominal non-blocking mpipe allocation
+ */
+void *
+mpipe_alloc_nowait(malloc_pipe_t mpipe)
+{
+    void *buf;
+
+    lwkt_gettoken(&mpipe->token);
+    buf = _mpipe_alloc_locked(mpipe, 0);
     lwkt_reltoken(&mpipe->token);
+
     return(buf);
 }
 
+/*
+ * non-blocking mpipe allocation with callback for retry.
+ *
+ * If NULL is returned func(arg) is queued and will be called back when
+ * space is likely (but not necessarily) available.
+ *
+ * If non-NULL is returned func(arg) is ignored.
+ */
+void *
+mpipe_alloc_callback(malloc_pipe_t mpipe, void (*func)(void *arg1, void *arg2),
+                    void *arg1, void *arg2)
+{
+    struct mpipe_callback *mcb;
+    void *buf;
+
+    lwkt_gettoken(&mpipe->token);
+    buf = _mpipe_alloc_locked(mpipe, 0);
+    if (buf == NULL) {
+       mcb = kmalloc(sizeof(*mcb), M_MPIPEARY, M_INTWAIT);
+       buf = _mpipe_alloc_locked(mpipe, 0);
+       if (buf == NULL) {
+           mcb->func = func;
+           mcb->arg1 = arg1;
+           mcb->arg2 = arg2;
+           STAILQ_INSERT_TAIL(&mpipe->queue, mcb, entry);
+       } else {
+           kfree(mcb, M_MPIPEARY);
+       }
+    }
+    lwkt_reltoken(&mpipe->token);
+
+    return(buf);
+}
+
+/*
+ * This function can be called to nominally wait until resources are
+ * available and mpipe_alloc_nowait() is likely to return non-NULL.
+ *
+ * NOTE: mpipe_alloc_nowait() can still return NULL.
+ */
+void
+mpipe_wait(malloc_pipe_t mpipe)
+{
+    if (mpipe->free_count == 0) {
+       lwkt_gettoken(&mpipe->token);
+       while ((mpipe->mpflags & MPF_EXITING) == 0) {
+           if (mpipe->free_count)
+                   break;
+           mpipe->mpflags |= MPF_QUEUEWAIT;
+           tsleep(&mpipe->queue, 0, "wait", 0);
+       }
+       lwkt_reltoken(&mpipe->token);
+    }
+}
+
 /*
  * Allocate an entry, block until the allocation succeeds.  This may cause
  * us to block waiting for a prior allocation to be freed.
@@ -176,44 +310,20 @@ void *
 mpipe_alloc_waitok(malloc_pipe_t mpipe)
 {
     void *buf;
-    int n;
     int mfailed;
 
     lwkt_gettoken(&mpipe->token);
     mfailed = 0;
-    for (;;) {
-       if ((n = mpipe->free_count) != 0) {
-           /*
-            * Use a free entry if it exists.
-            */
-           --n;
-           buf = mpipe->array[n];
-           mpipe->array[n] = NULL;
-           mpipe->free_count = n;
-           break;
-       }
-       if (mpipe->total_count >= mpipe->max_count || mfailed) {
-           /*
-            * Block if we have hit our limit
-            */
-           mpipe->pending = 1;
-           tsleep(mpipe, 0, "mpipe1", 0);
-           continue;
-       }
+    while ((buf = _mpipe_alloc_locked(mpipe, mfailed)) == NULL) {
        /*
-        * Otherwise try to malloc() non-blocking.  If that fails loop to
-        * recheck, and block instead of trying to malloc() again.
+        * Block if we have hit our limit
         */
-       buf = kmalloc(mpipe->bytes, mpipe->type, M_NOWAIT | mpipe->mflags);
-       if (buf) {
-           ++mpipe->total_count;
-           if (mpipe->construct)
-               mpipe->construct(buf, mpipe->priv);
-           break;
-       }
+       mpipe->pending = 1;
+       tsleep(mpipe, 0, "mpipe1", 0);
        mfailed = 1;
     }
     lwkt_reltoken(&mpipe->token);
+
     return(buf);
 }
 
@@ -237,8 +347,13 @@ mpipe_free(malloc_pipe_t mpipe, void *buf)
        ++mpipe->free_count;
        if ((mpipe->mpflags & (MPF_CACHEDATA|MPF_NOZERO)) == 0) 
            bzero(buf, mpipe->bytes);
-       lwkt_reltoken(&mpipe->token);
-
+       if (mpipe->mpflags & MPF_QUEUEWAIT) {
+               mpipe->mpflags &= ~MPF_QUEUEWAIT;
+               lwkt_reltoken(&mpipe->token);
+               wakeup(&mpipe->queue);
+       } else {
+               lwkt_reltoken(&mpipe->token);
+       }
        /*
         * Wakeup anyone blocked in mpipe_alloc_*().
         */
index 228eda0..5424f3a 100644 (file)
@@ -43,6 +43,9 @@
 #ifndef _SYS_THREAD_H_
 #include <sys/thread.h>
 #endif
+#ifndef _SYS_QUEUE_H_
+#include <sys/queue.h>
+#endif
 
 /*
  * Pipeline memory allocations with persistent store capabilities.  This
@@ -75,6 +78,7 @@
  * MPF_INT             Use the interrupt reserve if necessary.
  */
 struct mpipe_buf;
+struct mpipe_callback;
 
 struct malloc_pipe {
     malloc_type_t type;                /* malloc bucket */
@@ -91,11 +95,16 @@ struct malloc_pipe {
     void       (*construct)(void *buf, void *priv);
     void       (*deconstruct)(void *buf, void *priv);
     void       *priv;
+    struct thread *thread;     /* support thread for mpipe */
+    STAILQ_HEAD(, mpipe_callback) queue;
 };
 
 #define MPF_CACHEDATA          0x0001  /* cache old buffers (do not zero) */ 
 #define MPF_NOZERO             0x0002  /* do not zero-out new allocations */
 #define MPF_INT                        0x0004  /* use the interrupt memory reserve */
+#define MPF_QUEUEWAIT          0x0008
+#define MPF_CALLBACK           0x0010  /* callback will be used */
+#define MPF_EXITING            0x80000000
 
 typedef struct malloc_pipe *malloc_pipe_t;
 
@@ -110,6 +119,9 @@ void mpipe_init(malloc_pipe_t mpipe, malloc_type_t type,
 void mpipe_done(malloc_pipe_t mpipe);
 void *mpipe_alloc_waitok(malloc_pipe_t mpipe);
 void *mpipe_alloc_nowait(malloc_pipe_t mpipe);
+void *mpipe_alloc_callback(malloc_pipe_t mpipe,
+               void (*func)(void *arg1, void *arg2), void *arg1, void *arg2);
+void mpipe_wait(malloc_pipe_t mpipe);
 void mpipe_free(malloc_pipe_t mpipe, void *vbuf);
 
 #endif