kernel - Add callout debugging
[dragonfly.git] / sys / kern / subr_taskqueue.c
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
27  */
28
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
35 #include <sys/lock.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/spinlock.h>
39 #include <sys/spinlock2.h>
40 #include <sys/serialize.h>
41 #include <sys/proc.h>
42
43 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
44
45 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46 static struct lock      taskqueue_queues_lock;
47
48 struct taskqueue {
49         STAILQ_ENTRY(taskqueue) tq_link;
50         STAILQ_HEAD(, task)     tq_queue;
51         const char              *tq_name;
52         /* NOTE: tq must be locked before calling tq_enqueue */
53         taskqueue_enqueue_fn    tq_enqueue;
54         void                    *tq_context;
55
56         struct task             *tq_running;
57         struct spinlock         tq_lock;
58         struct thread           **tq_threads;
59         int                     tq_tcount;
60         int                     tq_flags;
61         int                     tq_callouts;
62 };
63
64 #define TQ_FLAGS_ACTIVE         (1 << 0)
65 #define TQ_FLAGS_BLOCKED        (1 << 1)
66 #define TQ_FLAGS_PENDING        (1 << 2)
67
68 #define DT_CALLOUT_ARMED        (1 << 0)
69
70 void
71 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
72     int priority, task_fn_t func, void *context)
73 {
74
75         TASK_INIT(&timeout_task->t, priority, func, context);
76         callout_init(&timeout_task->c); /* XXX use callout_init_mp() */
77         timeout_task->q = queue;
78         timeout_task->f = 0;
79 }
80
81 static void taskqueue_run(struct taskqueue *queue, int lock_held);
82
83 static __inline void
84 TQ_LOCK_INIT(struct taskqueue *tq)
85 {
86         spin_init(&tq->tq_lock, "tqlock");
87 }
88
89 static __inline void
90 TQ_LOCK_UNINIT(struct taskqueue *tq)
91 {
92         spin_uninit(&tq->tq_lock);
93 }
94
95 static __inline void
96 TQ_LOCK(struct taskqueue *tq)
97 {
98         spin_lock(&tq->tq_lock);
99 }
100
101 static __inline void
102 TQ_UNLOCK(struct taskqueue *tq)
103 {
104         spin_unlock(&tq->tq_lock);
105 }
106
107 static __inline void
108 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg)
109 {
110         ssleep(ident, &tq->tq_lock, 0, wmesg, 0);
111 }
112
113 struct taskqueue *
114 taskqueue_create(const char *name, int mflags,
115                  taskqueue_enqueue_fn enqueue, void *context)
116 {
117         struct taskqueue *queue;
118
119         queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO);
120         if (!queue)
121                 return NULL;
122         STAILQ_INIT(&queue->tq_queue);
123         queue->tq_name = name;
124         queue->tq_enqueue = enqueue;
125         queue->tq_context = context;
126         queue->tq_flags |= TQ_FLAGS_ACTIVE;
127         TQ_LOCK_INIT(queue);
128
129         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
130         STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
131         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
132
133         return queue;
134 }
135
136 /* NOTE: tq must be locked */
137 static void
138 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
139 {
140         while(tq->tq_tcount > 0) {
141                 /* Unlock spinlock before wakeup() */
142                 TQ_UNLOCK(tq);
143                 wakeup(tq);
144                 TQ_LOCK(tq);
145                 TQ_SLEEP(tq, pp, "taskqueue_terminate");
146         }
147 }
148
149 void
150 taskqueue_free(struct taskqueue *queue)
151 {
152         TQ_LOCK(queue);
153         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
154         taskqueue_run(queue, 1);
155         taskqueue_terminate(queue->tq_threads, queue);
156         TQ_UNLOCK(queue);
157
158         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
159         STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
160         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
161
162         TQ_LOCK_UNINIT(queue);
163
164         kfree(queue, M_TASKQUEUE);
165 }
166
167 struct taskqueue *
168 taskqueue_find(const char *name)
169 {
170         struct taskqueue *queue;
171
172         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
173         STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
174                 if (!strcmp(queue->tq_name, name)) {
175                         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
176                         return queue;
177                 }
178         }
179         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
180         return NULL;
181 }
182
183 /*
184  * NOTE!  If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
185  * be sure NOT TO SHARE the ``task'' between CPUs.  TASKS ARE NOT LOCKED.
186  * So either use a throwaway task which will only be enqueued once, or
187  * use one task per CPU!
188  */
189 static int
190 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
191 {
192         struct task *ins;
193         struct task *prev;
194
195         /*
196          * Don't allow new tasks on a queue which is being freed.
197          */
198         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
199                 return EPIPE;
200
201         /*
202          * Count multiple enqueues.
203          */
204         if (task->ta_pending) {
205                 task->ta_pending++;
206                 return 0;
207         }
208
209         /*
210          * Optimise the case when all tasks have the same priority.
211          */
212         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
213         if (!prev || prev->ta_priority >= task->ta_priority) {
214                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
215         } else {
216                 prev = NULL;
217                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
218                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
219                         if (ins->ta_priority < task->ta_priority)
220                                 break;
221
222                 if (prev)
223                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
224                 else
225                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
226         }
227
228         task->ta_pending = 1;
229         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) {
230                 if (queue->tq_enqueue)
231                         queue->tq_enqueue(queue->tq_context);
232         } else {
233                 queue->tq_flags |= TQ_FLAGS_PENDING;
234         }
235
236         return 0;
237 }
238
239 int
240 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
241 {
242         int res;
243
244         TQ_LOCK(queue);
245         res = taskqueue_enqueue_locked(queue, task);
246         TQ_UNLOCK(queue);
247
248         return (res);
249 }
250
251 static void
252 taskqueue_timeout_func(void *arg)
253 {
254         struct taskqueue *queue;
255         struct timeout_task *timeout_task;
256
257         timeout_task = arg;
258         queue = timeout_task->q;
259
260         TQ_LOCK(queue);
261         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
262         timeout_task->f &= ~DT_CALLOUT_ARMED;
263         queue->tq_callouts--;
264         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
265         TQ_UNLOCK(queue);
266 }
267
268 int
269 taskqueue_enqueue_timeout(struct taskqueue *queue,
270     struct timeout_task *timeout_task, int ticks)
271 {
272         int res;
273
274         TQ_LOCK(queue);
275         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
276                 ("Migrated queue"));
277         timeout_task->q = queue;
278         res = timeout_task->t.ta_pending;
279         if (ticks == 0) {
280                 taskqueue_enqueue_locked(queue, &timeout_task->t);
281                 TQ_UNLOCK(queue);
282         } else {
283                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
284                         res++;
285                 } else {
286                         queue->tq_callouts++;
287                         timeout_task->f |= DT_CALLOUT_ARMED;
288                 }
289                 TQ_UNLOCK(queue);
290                 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
291                               timeout_task);
292         }
293         return (res);
294 }
295
296 void
297 taskqueue_block(struct taskqueue *queue)
298 {
299         TQ_LOCK(queue);
300         queue->tq_flags |= TQ_FLAGS_BLOCKED;
301         TQ_UNLOCK(queue);
302 }
303
304 void
305 taskqueue_unblock(struct taskqueue *queue)
306 {
307         TQ_LOCK(queue);
308         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
309         if (queue->tq_flags & TQ_FLAGS_PENDING) {
310                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
311                 if (queue->tq_enqueue)
312                         queue->tq_enqueue(queue->tq_context);
313         }
314         TQ_UNLOCK(queue);
315 }
316
317 static void
318 taskqueue_run(struct taskqueue *queue, int lock_held)
319 {
320         struct task *task;
321         int pending;
322
323         if (lock_held == 0)
324                 TQ_LOCK(queue);
325         while (STAILQ_FIRST(&queue->tq_queue)) {
326                 /*
327                  * Carefully remove the first task from the queue and
328                  * zero its pending count.
329                  */
330                 task = STAILQ_FIRST(&queue->tq_queue);
331                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
332                 pending = task->ta_pending;
333                 task->ta_pending = 0;
334                 queue->tq_running = task;
335
336                 TQ_UNLOCK(queue);
337                 task->ta_func(task->ta_context, pending);
338                 queue->tq_running = NULL;
339                 wakeup(task);
340                 TQ_LOCK(queue);
341         }
342         if (lock_held == 0)
343                 TQ_UNLOCK(queue);
344 }
345
346 static int
347 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
348     u_int *pendp)
349 {
350
351         if (task->ta_pending > 0)
352                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
353         if (pendp != NULL)
354                 *pendp = task->ta_pending;
355         task->ta_pending = 0;
356         return (task == queue->tq_running ? EBUSY : 0);
357 }
358
359 int
360 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
361 {
362         int error;
363
364         TQ_LOCK(queue);
365         error = taskqueue_cancel_locked(queue, task, pendp);
366         TQ_UNLOCK(queue);
367
368         return (error);
369 }
370
371 int
372 taskqueue_cancel_timeout(struct taskqueue *queue,
373                          struct timeout_task *timeout_task, u_int *pendp)
374 {
375         u_int pending, pending1;
376         int error;
377
378         pending = !!callout_stop(&timeout_task->c);
379         TQ_LOCK(queue);
380         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
381         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
382                 timeout_task->f &= ~DT_CALLOUT_ARMED;
383                 queue->tq_callouts--;
384         }
385         TQ_UNLOCK(queue);
386
387         if (pendp != NULL)
388                 *pendp = pending + pending1;
389         return (error);
390 }
391
392 void
393 taskqueue_drain(struct taskqueue *queue, struct task *task)
394 {
395         TQ_LOCK(queue);
396         while (task->ta_pending != 0 || task == queue->tq_running)
397                 TQ_SLEEP(queue, task, "-");
398         TQ_UNLOCK(queue);
399 }
400
401 void
402 taskqueue_drain_timeout(struct taskqueue *queue,
403     struct timeout_task *timeout_task)
404 {
405
406         callout_stop_sync(&timeout_task->c);
407         taskqueue_drain(queue, &timeout_task->t);
408 }
409
410 static void
411 taskqueue_swi_enqueue(void *context)
412 {
413         setsofttq();
414 }
415
416 static void
417 taskqueue_swi_run(void *arg, void *frame)
418 {
419         taskqueue_run(taskqueue_swi, 0);
420 }
421
422 static void
423 taskqueue_swi_mp_run(void *arg, void *frame)
424 {
425         taskqueue_run(taskqueue_swi_mp, 0);
426 }
427
428 int
429 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu,
430                         const char *fmt, ...)
431 {
432         __va_list ap;
433         struct thread *td;
434         struct taskqueue *tq;
435         int i, error, cpu;
436         char ktname[MAXCOMLEN];
437
438         if (count <= 0)
439                 return EINVAL;
440         /* catch call argument mistakes */
441         KKASSERT(pri > 0 && pri < TDPRI_MAX);
442
443         tq = *tqp;
444         cpu = ncpu;
445
446         __va_start(ap, fmt);
447         kvsnprintf(ktname, MAXCOMLEN, fmt, ap);
448         __va_end(ap);
449
450         tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE,
451             M_WAITOK | M_ZERO);
452
453         for (i = 0; i < count; i++) {
454                 /*
455                  * If no specific cpu was specified and more than one thread
456                  * is to be created, we distribute the threads amongst all
457                  * cpus.
458                  */
459                 if ((ncpu <= -1) && (count > 1))
460                         cpu = i%ncpus;
461
462                 if (count == 1) {
463                         error = lwkt_create(taskqueue_thread_loop, tqp,
464                                             &tq->tq_threads[i], NULL,
465                                             TDF_NOSTART, cpu,
466                                             "%s", ktname);
467                 } else {
468                         error = lwkt_create(taskqueue_thread_loop, tqp,
469                                             &tq->tq_threads[i], NULL,
470                                             TDF_NOSTART, cpu,
471                                             "%s_%d", ktname, i);
472                 }
473                 if (error) {
474                         kprintf("%s: lwkt_create(%s): error %d", __func__,
475                             ktname, error);
476                         tq->tq_threads[i] = NULL;
477                 } else {
478                         td = tq->tq_threads[i];
479                         lwkt_setpri_initial(td, pri);
480                         lwkt_schedule(td);
481                         tq->tq_tcount++;
482                 }
483         }
484
485         return 0;
486 }
487
488 void
489 taskqueue_thread_loop(void *arg)
490 {
491         struct taskqueue **tqp, *tq;
492
493         tqp = arg;
494         tq = *tqp;
495         TQ_LOCK(tq);
496         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
497                 taskqueue_run(tq, 1);
498                 TQ_SLEEP(tq, tq, "tqthr");
499         }
500
501         /* rendezvous with thread that asked us to terminate */
502         tq->tq_tcount--;
503         TQ_UNLOCK(tq);
504         wakeup_one(tq->tq_threads);
505         lwkt_exit();
506 }
507
508 /* NOTE: tq must be locked */
509 void
510 taskqueue_thread_enqueue(void *context)
511 {
512         struct taskqueue **tqp, *tq;
513
514         tqp = context;
515         tq = *tqp;
516
517         /* Unlock spinlock before wakeup_one() */
518         TQ_UNLOCK(tq);
519         wakeup_one(tq);
520         TQ_LOCK(tq);
521 }
522
523 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
524          register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL, -1));
525 /*
526  * XXX: possibly use a different SWI_TQ_MP or so.
527  * related: sys/interrupt.h
528  * related: platform/XXX/isa/ipl_funcs.c
529  */
530 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0,
531     register_swi_mp(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL, 
532                     -1));
533
534 struct taskqueue *taskqueue_thread[MAXCPU];
535
536 static void
537 taskqueue_init(void)
538 {
539         int cpu;
540
541         lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0);
542         STAILQ_INIT(&taskqueue_queues);
543
544         for (cpu = 0; cpu < ncpus; cpu++) {
545                 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT,
546                     taskqueue_thread_enqueue, &taskqueue_thread[cpu]);
547                 taskqueue_start_threads(&taskqueue_thread[cpu], 1,
548                     TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu);
549         }
550 }
551
552 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL);