Merge branch 'vendor/DIALOG'
[dragonfly.git] / sys / kern / subr_taskqueue.c
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
27  */
28
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
35 #include <sys/lock.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/thread2.h>
39 #include <sys/spinlock.h>
40 #include <sys/spinlock2.h>
41 #include <sys/serialize.h>
42 #include <sys/proc.h>
43 #include <machine/varargs.h>
44
45 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46
47 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
48 static struct lock      taskqueue_queues_lock;
49
50 struct taskqueue {
51         STAILQ_ENTRY(taskqueue) tq_link;
52         STAILQ_HEAD(, task)     tq_queue;
53         const char              *tq_name;
54         /* NOTE: tq must be locked before calling tq_enqueue */
55         taskqueue_enqueue_fn    tq_enqueue;
56         void                    *tq_context;
57
58         struct task             *tq_running;
59         struct spinlock         tq_lock;
60         struct thread           **tq_threads;
61         int                     tq_tcount;
62         int                     tq_flags;
63         int                     tq_callouts;
64 };
65
66 #define TQ_FLAGS_ACTIVE         (1 << 0)
67 #define TQ_FLAGS_BLOCKED        (1 << 1)
68 #define TQ_FLAGS_PENDING        (1 << 2)
69
70 #define DT_CALLOUT_ARMED        (1 << 0)
71
72 void
73 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
74     int priority, task_fn_t func, void *context)
75 {
76
77         TASK_INIT(&timeout_task->t, priority, func, context);
78         callout_init(&timeout_task->c); /* XXX use callout_init_mp() */
79         timeout_task->q = queue;
80         timeout_task->f = 0;
81 }
82
83 static void taskqueue_run(struct taskqueue *queue, int lock_held);
84
85 static __inline void
86 TQ_LOCK_INIT(struct taskqueue *tq)
87 {
88         spin_init(&tq->tq_lock, "tqlock");
89 }
90
91 static __inline void
92 TQ_LOCK_UNINIT(struct taskqueue *tq)
93 {
94         spin_uninit(&tq->tq_lock);
95 }
96
97 static __inline void
98 TQ_LOCK(struct taskqueue *tq)
99 {
100         spin_lock(&tq->tq_lock);
101 }
102
103 static __inline void
104 TQ_UNLOCK(struct taskqueue *tq)
105 {
106         spin_unlock(&tq->tq_lock);
107 }
108
109 static __inline void
110 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg)
111 {
112         ssleep(ident, &tq->tq_lock, 0, wmesg, 0);
113 }
114
115 struct taskqueue *
116 taskqueue_create(const char *name, int mflags,
117                  taskqueue_enqueue_fn enqueue, void *context)
118 {
119         struct taskqueue *queue;
120
121         queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO);
122         if (!queue)
123                 return NULL;
124         STAILQ_INIT(&queue->tq_queue);
125         queue->tq_name = name;
126         queue->tq_enqueue = enqueue;
127         queue->tq_context = context;
128         queue->tq_flags |= TQ_FLAGS_ACTIVE;
129         TQ_LOCK_INIT(queue);
130
131         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
132         STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
133         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
134
135         return queue;
136 }
137
138 /* NOTE: tq must be locked */
139 static void
140 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
141 {
142         while(tq->tq_tcount > 0) {
143                 /* Unlock spinlock before wakeup() */
144                 TQ_UNLOCK(tq);
145                 wakeup(tq);
146                 TQ_LOCK(tq);
147                 TQ_SLEEP(tq, pp, "taskqueue_terminate");
148         }
149 }
150
151 void
152 taskqueue_free(struct taskqueue *queue)
153 {
154         TQ_LOCK(queue);
155         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
156         taskqueue_run(queue, 1);
157         taskqueue_terminate(queue->tq_threads, queue);
158         TQ_UNLOCK(queue);
159
160         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
161         STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
162         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
163
164         TQ_LOCK_UNINIT(queue);
165
166         kfree(queue, M_TASKQUEUE);
167 }
168
169 struct taskqueue *
170 taskqueue_find(const char *name)
171 {
172         struct taskqueue *queue;
173
174         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
175         STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
176                 if (!strcmp(queue->tq_name, name)) {
177                         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
178                         return queue;
179                 }
180         }
181         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
182         return NULL;
183 }
184
185 /*
186  * NOTE!  If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
187  * be sure NOT TO SHARE the ``task'' between CPUs.  TASKS ARE NOT LOCKED.
188  * So either use a throwaway task which will only be enqueued once, or
189  * use one task per CPU!
190  */
191 static int
192 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
193 {
194         struct task *ins;
195         struct task *prev;
196
197         /*
198          * Don't allow new tasks on a queue which is being freed.
199          */
200         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
201                 return EPIPE;
202
203         /*
204          * Count multiple enqueues.
205          */
206         if (task->ta_pending) {
207                 task->ta_pending++;
208                 return 0;
209         }
210
211         /*
212          * Optimise the case when all tasks have the same priority.
213          */
214         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
215         if (!prev || prev->ta_priority >= task->ta_priority) {
216                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
217         } else {
218                 prev = NULL;
219                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
220                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
221                         if (ins->ta_priority < task->ta_priority)
222                                 break;
223
224                 if (prev)
225                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
226                 else
227                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
228         }
229
230         task->ta_pending = 1;
231         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) {
232                 if (queue->tq_enqueue)
233                         queue->tq_enqueue(queue->tq_context);
234         } else {
235                 queue->tq_flags |= TQ_FLAGS_PENDING;
236         }
237
238         return 0;
239 }
240
241 int
242 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
243 {
244         int res;
245
246         TQ_LOCK(queue);
247         res = taskqueue_enqueue_locked(queue, task);
248         TQ_UNLOCK(queue);
249
250         return (res);
251 }
252
253 static void
254 taskqueue_timeout_func(void *arg)
255 {
256         struct taskqueue *queue;
257         struct timeout_task *timeout_task;
258
259         timeout_task = arg;
260         queue = timeout_task->q;
261
262         TQ_LOCK(queue);
263         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
264         timeout_task->f &= ~DT_CALLOUT_ARMED;
265         queue->tq_callouts--;
266         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
267         TQ_UNLOCK(queue);
268 }
269
270 int
271 taskqueue_enqueue_timeout(struct taskqueue *queue,
272     struct timeout_task *timeout_task, int ticks)
273 {
274         int res;
275
276         TQ_LOCK(queue);
277         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
278                 ("Migrated queue"));
279         timeout_task->q = queue;
280         res = timeout_task->t.ta_pending;
281         if (ticks == 0) {
282                 taskqueue_enqueue_locked(queue, &timeout_task->t);
283                 TQ_UNLOCK(queue);
284         } else {
285                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
286                         res++;
287                 } else {
288                         queue->tq_callouts++;
289                         timeout_task->f |= DT_CALLOUT_ARMED;
290                 }
291                 TQ_UNLOCK(queue);
292                 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
293                               timeout_task);
294         }
295         return (res);
296 }
297
298 void
299 taskqueue_block(struct taskqueue *queue)
300 {
301         TQ_LOCK(queue);
302         queue->tq_flags |= TQ_FLAGS_BLOCKED;
303         TQ_UNLOCK(queue);
304 }
305
306 void
307 taskqueue_unblock(struct taskqueue *queue)
308 {
309         TQ_LOCK(queue);
310         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
311         if (queue->tq_flags & TQ_FLAGS_PENDING) {
312                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
313                 if (queue->tq_enqueue)
314                         queue->tq_enqueue(queue->tq_context);
315         }
316         TQ_UNLOCK(queue);
317 }
318
319 static void
320 taskqueue_run(struct taskqueue *queue, int lock_held)
321 {
322         struct task *task;
323         int pending;
324
325         if (lock_held == 0)
326                 TQ_LOCK(queue);
327         while (STAILQ_FIRST(&queue->tq_queue)) {
328                 /*
329                  * Carefully remove the first task from the queue and
330                  * zero its pending count.
331                  */
332                 task = STAILQ_FIRST(&queue->tq_queue);
333                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
334                 pending = task->ta_pending;
335                 task->ta_pending = 0;
336                 queue->tq_running = task;
337
338                 TQ_UNLOCK(queue);
339                 task->ta_func(task->ta_context, pending);
340                 queue->tq_running = NULL;
341                 wakeup(task);
342                 TQ_LOCK(queue);
343         }
344         if (lock_held == 0)
345                 TQ_UNLOCK(queue);
346 }
347
348 static int
349 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
350     u_int *pendp)
351 {
352
353         if (task->ta_pending > 0)
354                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
355         if (pendp != NULL)
356                 *pendp = task->ta_pending;
357         task->ta_pending = 0;
358         return (task == queue->tq_running ? EBUSY : 0);
359 }
360
361 int
362 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
363 {
364         int error;
365
366         TQ_LOCK(queue);
367         error = taskqueue_cancel_locked(queue, task, pendp);
368         TQ_UNLOCK(queue);
369
370         return (error);
371 }
372
373 int
374 taskqueue_cancel_timeout(struct taskqueue *queue,
375                          struct timeout_task *timeout_task, u_int *pendp)
376 {
377         u_int pending, pending1;
378         int error;
379
380         pending = !!callout_stop(&timeout_task->c);
381         TQ_LOCK(queue);
382         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
383         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
384                 timeout_task->f &= ~DT_CALLOUT_ARMED;
385                 queue->tq_callouts--;
386         }
387         TQ_UNLOCK(queue);
388
389         if (pendp != NULL)
390                 *pendp = pending + pending1;
391         return (error);
392 }
393
394 void
395 taskqueue_drain(struct taskqueue *queue, struct task *task)
396 {
397         TQ_LOCK(queue);
398         while (task->ta_pending != 0 || task == queue->tq_running)
399                 TQ_SLEEP(queue, task, "-");
400         TQ_UNLOCK(queue);
401 }
402
403 void
404 taskqueue_drain_timeout(struct taskqueue *queue,
405     struct timeout_task *timeout_task)
406 {
407
408         callout_stop_sync(&timeout_task->c);
409         taskqueue_drain(queue, &timeout_task->t);
410 }
411
412 static void
413 taskqueue_swi_enqueue(void *context)
414 {
415         setsofttq();
416 }
417
418 static void
419 taskqueue_swi_run(void *arg, void *frame)
420 {
421         taskqueue_run(taskqueue_swi, 0);
422 }
423
424 static void
425 taskqueue_swi_mp_run(void *arg, void *frame)
426 {
427         taskqueue_run(taskqueue_swi_mp, 0);
428 }
429
430 int
431 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu,
432                         const char *fmt, ...)
433 {
434         __va_list ap;
435         struct thread *td;
436         struct taskqueue *tq;
437         int i, error, cpu;
438         char ktname[MAXCOMLEN];
439
440         if (count <= 0)
441                 return EINVAL;
442
443         tq = *tqp;
444         cpu = ncpu;
445
446         __va_start(ap, fmt);
447         kvsnprintf(ktname, MAXCOMLEN, fmt, ap);
448         __va_end(ap);
449
450         tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE,
451             M_WAITOK | M_ZERO);
452
453         for (i = 0; i < count; i++) {
454                 /*
455                  * If no specific cpu was specified and more than one thread
456                  * is to be created, we distribute the threads amongst all
457                  * cpus.
458                  */
459                 if ((ncpu <= -1) && (count > 1))
460                         cpu = i%ncpus;
461
462                 if (count == 1) {
463                         error = lwkt_create(taskqueue_thread_loop, tqp,
464                                             &tq->tq_threads[i], NULL,
465                                             TDF_NOSTART, cpu,
466                                             "%s", ktname);
467                 } else {
468                         error = lwkt_create(taskqueue_thread_loop, tqp,
469                                             &tq->tq_threads[i], NULL,
470                                             TDF_NOSTART, cpu,
471                                             "%s_%d", ktname, i);
472                 }
473                 if (error) {
474                         kprintf("%s: lwkt_create(%s): error %d", __func__,
475                             ktname, error);
476                         tq->tq_threads[i] = NULL;
477                 } else {
478                         td = tq->tq_threads[i];
479                         lwkt_setpri_initial(td, pri);
480                         lwkt_schedule(td);
481                         tq->tq_tcount++;
482                 }
483         }
484
485         return 0;
486 }
487
488 void
489 taskqueue_thread_loop(void *arg)
490 {
491         struct taskqueue **tqp, *tq;
492
493         tqp = arg;
494         tq = *tqp;
495         TQ_LOCK(tq);
496         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
497                 taskqueue_run(tq, 1);
498                 TQ_SLEEP(tq, tq, "tqthr");
499         }
500
501         /* rendezvous with thread that asked us to terminate */
502         tq->tq_tcount--;
503         TQ_UNLOCK(tq);
504         wakeup_one(tq->tq_threads);
505         lwkt_exit();
506 }
507
508 /* NOTE: tq must be locked */
509 void
510 taskqueue_thread_enqueue(void *context)
511 {
512         struct taskqueue **tqp, *tq;
513
514         tqp = context;
515         tq = *tqp;
516
517         /* Unlock spinlock before wakeup_one() */
518         TQ_UNLOCK(tq);
519         wakeup_one(tq);
520         TQ_LOCK(tq);
521 }
522
523 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
524          register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL, -1));
525 /*
526  * XXX: possibly use a different SWI_TQ_MP or so.
527  * related: sys/interrupt.h
528  * related: platform/XXX/isa/ipl_funcs.c
529  */
530 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0,
531     register_swi_mp(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL, 
532                     -1));
533
534 struct taskqueue *taskqueue_thread[MAXCPU];
535
536 static void
537 taskqueue_init(void)
538 {
539         int cpu;
540
541         lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0);
542         STAILQ_INIT(&taskqueue_queues);
543
544         for (cpu = 0; cpu < ncpus; cpu++) {
545                 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT,
546                     taskqueue_thread_enqueue, &taskqueue_thread[cpu]);
547                 taskqueue_start_threads(&taskqueue_thread[cpu], 1,
548                     TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu);
549         }
550 }
551
552 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL);