taskqueue - Major overhaul
authorAlex Hornung <ahornung@gmail.com>
Wed, 30 Sep 2009 07:14:45 +0000 (08:14 +0100)
committerAlex Hornung <ahornung@gmail.com>
Wed, 30 Sep 2009 07:15:55 +0000 (08:15 +0100)
* Move taskqueue from critical sections to fine-grained locking with
  lockmgr for the list of taskqueues (could be changed to spinlocks) and
  spinlock for the task queue in each taskqueue itself.

* Add a taskqueue_swi_mp which is for mpsafe tasks. Unlike taskqueue_swi
  the mplock is not acquired when tasks are run.

* Add FreeBSD's taskqueue_start_threads and family, allowing for
  per-taskqueue threads.

* Add FreeBSD's taskqueue_block and taskqueue unblock.

* Out of necessity add a register_swi_mp, which registers a swi marked
  as MPSAFE.

Partially-Obtained-from: FreeBSD

sys/kern/kern_intr.c
sys/kern/subr_taskqueue.c
sys/sys/interrupt.h
sys/sys/taskqueue.h

index a779ee7..35a28ab 100644 (file)
@@ -189,6 +189,15 @@ register_swi(int intr, inthand2_t *handler, void *arg, const char *name,
     return(register_int(intr, handler, arg, name, serializer, 0));
 }
 
+void *
+register_swi_mp(int intr, inthand2_t *handler, void *arg, const char *name,
+               struct lwkt_serialize *serializer)
+{
+    if (intr < FIRST_SOFTINT || intr >= MAX_INTS)
+       panic("register_swi: bad intr %d", intr);
+    return(register_int(intr, handler, arg, name, serializer, INTR_MPSAFE));
+}
+
 void *
 register_int(int intr, inthand2_t *handler, void *arg, const char *name,
                struct lwkt_serialize *serializer, int intr_flags)
index b9cc454..0cff2cc 100644 (file)
 #include <sys/malloc.h>
 #include <sys/kthread.h>
 #include <sys/thread2.h>
+#include <sys/spinlock.h>
+#include <sys/spinlock2.h>
+#include <sys/serialize.h>
+#include <sys/proc.h>
+#include <machine/varargs.h>
 
 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
 
 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
+static struct lock     taskqueue_queues_lock;
 
 struct taskqueue {
        STAILQ_ENTRY(taskqueue) tq_link;
@@ -48,15 +54,53 @@ struct taskqueue {
        const char              *tq_name;
        taskqueue_enqueue_fn    tq_enqueue;
        void                    *tq_context;
-       int                     tq_draining;
+
+       struct task             *tq_running;
+       struct spinlock         tq_lock;
+       struct thread           **tq_threads;
+       int                     tq_tcount;
+       int                     tq_flags;
 };
 
+#define        TQ_FLAGS_ACTIVE         (1 << 0)
+#define        TQ_FLAGS_BLOCKED        (1 << 1)
+#define        TQ_FLAGS_PENDING        (1 << 2)
+
+static __inline void
+TQ_LOCK_INIT(struct taskqueue *tq)
+{
+       spin_init(&tq->tq_lock);
+}
+
+static __inline void
+TQ_LOCK_UNINIT(struct taskqueue *tq)
+{
+       spin_uninit(&tq->tq_lock);
+}
+
+static __inline void
+TQ_LOCK(struct taskqueue *tq)
+{
+       spin_lock_wr(&tq->tq_lock);
+}
+
+static __inline void
+TQ_UNLOCK(struct taskqueue *tq)
+{
+       spin_unlock_wr(&tq->tq_lock);
+}
+
+static __inline void
+TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg)
+{
+       ssleep(ident, &tq->tq_lock, 0, wmesg, 0);
+}
+
 struct taskqueue *
 taskqueue_create(const char *name, int mflags,
                 taskqueue_enqueue_fn enqueue, void *context)
 {
        struct taskqueue *queue;
-       static int once = 1;
 
        queue = kmalloc(sizeof(struct taskqueue), M_TASKQUEUE, mflags);
        if (!queue)
@@ -65,31 +109,43 @@ taskqueue_create(const char *name, int mflags,
        queue->tq_name = name;
        queue->tq_enqueue = enqueue;
        queue->tq_context = context;
-       queue->tq_draining = 0;
+       queue->tq_flags |= TQ_FLAGS_ACTIVE;
+       TQ_LOCK_INIT(queue);
 
-       crit_enter();
-       if (once) {
-               STAILQ_INIT(&taskqueue_queues);
-               once = 0;
-       }
+       lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
        STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
-       crit_exit();
+       lockmgr(&taskqueue_queues_lock, LK_RELEASE);
 
        return queue;
 }
 
+static void
+taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
+{
+       while(tq->tq_tcount > 0) {
+               wakeup(tq);
+               TQ_SLEEP(tq, pp, "taskqueue_terminate");
+       }
+}
+
 void
 taskqueue_free(struct taskqueue *queue)
 {
-       crit_enter();
-       queue->tq_draining = 1;
-       crit_exit();
+       TQ_LOCK(queue);
+       queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
+       TQ_UNLOCK(queue);
 
        taskqueue_run(queue);
 
-       crit_enter();
+       TQ_LOCK(queue);
+       taskqueue_terminate(queue->tq_threads, queue);
+       TQ_UNLOCK(queue);
+
+       lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
        STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
-       crit_exit();
+       lockmgr(&taskqueue_queues_lock, LK_RELEASE);
+
+       TQ_LOCK_UNINIT(queue);
 
        kfree(queue, M_TASKQUEUE);
 }
@@ -99,14 +155,14 @@ taskqueue_find(const char *name)
 {
        struct taskqueue *queue;
 
-       crit_enter();
+       lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
        STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
                if (!strcmp(queue->tq_name, name)) {
-                       crit_exit();
+                       lockmgr(&taskqueue_queues_lock, LK_RELEASE);
                        return queue;
                }
        }
-       crit_exit();
+       lockmgr(&taskqueue_queues_lock, LK_RELEASE);
        return NULL;
 }
 
@@ -122,13 +178,13 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task)
        struct task *ins;
        struct task *prev;
 
-       crit_enter();
+       TQ_LOCK(queue);
 
        /*
         * Don't allow new tasks on a queue which is being freed.
         */
-       if (queue->tq_draining) {
-               crit_exit();
+       if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) {
+               TQ_UNLOCK(queue);
                return EPIPE;
        }
 
@@ -137,7 +193,7 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task)
         */
        if (task->ta_pending) {
                task->ta_pending++;
-               crit_exit();
+               TQ_UNLOCK(queue);
                return 0;
        }
 
@@ -148,7 +204,7 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task)
        if (!prev || prev->ta_priority >= task->ta_priority) {
                STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
        } else {
-               prev = 0;
+               prev = NULL;
                for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
                     prev = ins, ins = STAILQ_NEXT(ins, ta_link))
                        if (ins->ta_priority < task->ta_priority)
@@ -161,21 +217,46 @@ taskqueue_enqueue(struct taskqueue *queue, struct task *task)
        }
 
        task->ta_pending = 1;
-       if (queue->tq_enqueue)
-               queue->tq_enqueue(queue->tq_context);
+       if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) {
+               if (queue->tq_enqueue)
+                       queue->tq_enqueue(queue->tq_context);
+       } else {
+               queue->tq_flags |= TQ_FLAGS_PENDING;
+       }
 
-       crit_exit();
+       TQ_UNLOCK(queue);
 
        return 0;
 }
 
+void
+taskqueue_block(struct taskqueue *queue)
+{
+       TQ_LOCK(queue);
+       queue->tq_flags |= TQ_FLAGS_BLOCKED;
+       TQ_UNLOCK(queue);
+}
+
+void
+taskqueue_unblock(struct taskqueue *queue)
+{
+       TQ_LOCK(queue);
+       queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
+       if (queue->tq_flags & TQ_FLAGS_PENDING) {
+               queue->tq_flags &= ~TQ_FLAGS_PENDING;
+               if (queue->tq_enqueue)
+                       queue->tq_enqueue(queue->tq_context);
+       }
+       TQ_UNLOCK(queue);
+}
+
 void
 taskqueue_run(struct taskqueue *queue)
 {
        struct task *task;
        int pending;
 
-       crit_enter();
+       TQ_LOCK(queue);
        while (STAILQ_FIRST(&queue->tq_queue)) {
                /*
                 * Carefully remove the first task from the queue and
@@ -185,13 +266,25 @@ taskqueue_run(struct taskqueue *queue)
                STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
                pending = task->ta_pending;
                task->ta_pending = 0;
-               crit_exit();
+               queue->tq_running = task;
+               TQ_UNLOCK(queue);
 
                task->ta_func(task->ta_context, pending);
 
-               crit_enter();
+               TQ_LOCK(queue);
+               queue->tq_running = NULL;
+               wakeup(task);
        }
-       crit_exit();
+       TQ_UNLOCK(queue);
+}
+
+void
+taskqueue_drain(struct taskqueue *queue, struct task *task)
+{
+       TQ_LOCK(queue);
+       while (task->ta_pending != 0 || task == queue->tq_running)
+               TQ_SLEEP(queue, task, "-");
+       TQ_UNLOCK(queue);
 }
 
 static void
@@ -206,23 +299,110 @@ taskqueue_swi_run(void *arg, void *frame)
        taskqueue_run(taskqueue_swi);
 }
 
+static void
+taskqueue_swi_mp_run(void *arg, void *frame)
+{
+       taskqueue_run(taskqueue_swi_mp);
+}
+
+int
+taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
+                       const char *fmt, ...)
+{
+       __va_list ap;
+       struct thread *td;
+       struct taskqueue *tq;
+       int i, error;
+       char ktname[MAXCOMLEN];
+
+       if (count <= 0)
+               return EINVAL;
+
+       tq = *tqp;
+
+       __va_start(ap, fmt);
+       kvsnprintf(ktname, MAXCOMLEN, fmt, ap);
+       __va_end(ap);
+
+       tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE,
+           M_WAITOK | M_ZERO);
+
+       for (i = 0; i < count; i++) {
+               error = lwkt_create(taskqueue_thread_loop, tqp,
+                   &tq->tq_threads[i], NULL, TDF_STOPREQ | TDF_MPSAFE, -1,
+                   "%s_%d", ktname, i);
+               if (error) {
+                       kprintf("%s: kthread_add(%s): error %d", __func__,
+                           ktname, error);
+                       tq->tq_threads[i] = NULL;
+               } else {
+                       td = tq->tq_threads[i];
+                       lwkt_setpri(td, pri);
+                       lwkt_schedule(td);
+                       tq->tq_tcount++;
+               }
+       }
+
+       return 0;
+}
+
+void
+taskqueue_thread_loop(void *arg)
+{
+       struct taskqueue **tqp, *tq;
+
+       tqp = arg;
+       tq = *tqp;
+       TQ_LOCK(tq);
+       while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
+               taskqueue_run(tq);
+               TQ_SLEEP(tq, tq, "-");
+       }
+
+       /* rendezvous with thread that asked us to terminate */
+       tq->tq_tcount--;
+       wakeup_one(tq->tq_threads);
+       TQ_UNLOCK(tq);
+       lwkt_exit();
+}
+
+void
+taskqueue_thread_enqueue(void *context)
+{
+       struct taskqueue **tqp, *tq;
+
+       tqp = context;
+       tq = *tqp;
+
+       wakeup_one(tq);
+}
+
 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
         register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL));
+/*
+ * XXX: possibly use a different SWI_TQ_MP or so.
+ * related: sys/interrupt.h
+ * related: platform/XXX/isa/ipl_funcs.c
+ */
+TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0,
+        register_swi(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL));
 
 static void
 taskqueue_kthread(void *arg)
 {
+       struct taskqueue *queue = taskqueue_thread[mycpuid];
+
        for (;;) {
-               taskqueue_run(taskqueue_thread[mycpuid]);
-               crit_enter();
-               if (STAILQ_EMPTY(&taskqueue_thread[mycpuid]->tq_queue))
-                       tsleep(taskqueue_thread[mycpuid], 0, "tqthr", 0);
-               crit_exit();
+               taskqueue_run(queue);
+               TQ_LOCK(queue);
+               if (STAILQ_EMPTY(&queue->tq_queue))
+                       TQ_SLEEP(queue, queue, "tqthr");
+               TQ_UNLOCK(queue);
        }
 }
 
 static void
-taskqueue_thread_enqueue(void *context)
+taskqueue_kthread_enqueue(void *context)
 {
        wakeup(taskqueue_thread[mycpuid]);
 }
@@ -235,13 +415,16 @@ taskqueue_init(void)
 {
        int cpu;
 
+       lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0);
+       STAILQ_INIT(&taskqueue_queues);
+
        for (cpu = 0; cpu < ncpus; cpu++) {
                taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT,
-                   taskqueue_thread_enqueue, NULL);
+                   taskqueue_kthread_enqueue, NULL);
                lwkt_create(taskqueue_kthread, NULL,
                    &taskqueue_thread_td[cpu], NULL,
                    0, cpu, "taskqueue %d", cpu);
        }
 }
 
-SYSINIT(taskqueueinit, SI_SUB_CONFIGURE, SI_ORDER_SECOND, taskqueue_init, NULL);
+SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL);
index dbe1a1c..d6f743a 100644 (file)
@@ -107,10 +107,13 @@ struct intrframe;
 struct thread;
 struct lwkt_serialize;
 void *register_swi(int intr, inthand2_t *handler, void *arg,
-                           const char *name, 
+                           const char *name,
+                           struct lwkt_serialize *serializer);
+void *register_swi_mp(int intr, inthand2_t *handler, void *arg,
+                           const char *name,
                            struct lwkt_serialize *serializer);
 void *register_int(int intr, inthand2_t *handler, void *arg,
-                           const char *name, 
+                           const char *name,
                            struct lwkt_serialize *serializer, int flags);
 long get_interrupt_counter(int intr);
 int count_registered_ints(int intr);
index aa0897a..334e472 100644 (file)
@@ -66,10 +66,21 @@ struct task {
 struct taskqueue *taskqueue_create(const char *name, int mflags,
                                    taskqueue_enqueue_fn enqueue,
                                    void *context);
+int    taskqueue_start_threads(struct taskqueue **tqp, int count, int pri,
+                               const char *name, ...) __printflike(4, 5);
 int    taskqueue_enqueue(struct taskqueue *queue, struct task *task);
+void   taskqueue_drain(struct taskqueue *queue, struct task *task);
 struct taskqueue *taskqueue_find(const char *name);
 void   taskqueue_free(struct taskqueue *queue);
 void   taskqueue_run(struct taskqueue *queue);
+void   taskqueue_block(struct taskqueue *queue);
+void   taskqueue_unblock(struct taskqueue *queue);
+
+/*
+ * Functions for dedicated thread taskqueues
+ */
+void   taskqueue_thread_loop(void *arg);
+void   taskqueue_thread_enqueue(void *context);
 
 /*
  * Initialise a task structure.
@@ -107,11 +118,16 @@ SYSINIT(taskqueue_##name, SI_SUB_CONFIGURE, SI_ORDER_SECOND,              \
                                                                        \
 struct __hack
 
+#define        TASKQUEUE_DEFINE_THREAD(name)                                   \
+TASKQUEUE_DEFINE(name, taskqueue_thread_enqueue, &taskqueue_##name,    \
+       taskqueue_start_threads(&taskqueue_##name, 1, prio,             \
+       "%s taskq", #name))
 /*
  * This queue is serviced by a software interrupt handler.  To enqueue
  * a task, call taskqueue_enqueue(taskqueue_swi, &task).
  */
 TASKQUEUE_DECLARE(swi);
+TASKQUEUE_DECLARE(swi_mp);
 
 /*
  * This queue is serviced by a per-cpu kernel thread.  To enqueue a task, call