kernel - Rewrite the callout_*() API
[dragonfly.git] / sys / kern / subr_taskqueue.c
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
27  */
28
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
35 #include <sys/lock.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/spinlock.h>
39 #include <sys/spinlock2.h>
40 #include <sys/serialize.h>
41 #include <sys/proc.h>
42
43 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
44
45 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
46 static struct lock      taskqueue_queues_lock;
47
48 struct taskqueue {
49         STAILQ_ENTRY(taskqueue) tq_link;
50         STAILQ_HEAD(, task)     tq_queue;
51         const char              *tq_name;
52         /* NOTE: tq must be locked before calling tq_enqueue */
53         taskqueue_enqueue_fn    tq_enqueue;
54         void                    *tq_context;
55
56         struct task             *tq_running;
57         struct spinlock         tq_lock;
58         struct thread           **tq_threads;
59         int                     tq_tcount;
60         int                     tq_flags;
61         int                     tq_callouts;
62 };
63
64 #define TQ_FLAGS_ACTIVE         (1 << 0)
65 #define TQ_FLAGS_BLOCKED        (1 << 1)
66 #define TQ_FLAGS_PENDING        (1 << 2)
67
68 #define DT_CALLOUT_ARMED        (1 << 0)
69
70 void
71 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
72     int priority, task_fn_t func, void *context)
73 {
74
75         TASK_INIT(&timeout_task->t, priority, func, context);
76         callout_init(&timeout_task->c); /* XXX use callout_init_mp() */
77         timeout_task->q = queue;
78         timeout_task->f = 0;
79 }
80
81 static void taskqueue_run(struct taskqueue *queue, int lock_held);
82
83 static __inline void
84 TQ_LOCK_INIT(struct taskqueue *tq)
85 {
86         spin_init(&tq->tq_lock, "tqlock");
87 }
88
89 static __inline void
90 TQ_LOCK_UNINIT(struct taskqueue *tq)
91 {
92         spin_uninit(&tq->tq_lock);
93 }
94
95 static __inline void
96 TQ_LOCK(struct taskqueue *tq)
97 {
98         spin_lock(&tq->tq_lock);
99 }
100
101 static __inline void
102 TQ_UNLOCK(struct taskqueue *tq)
103 {
104         spin_unlock(&tq->tq_lock);
105 }
106
107 static __inline void
108 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg)
109 {
110         ssleep(ident, &tq->tq_lock, 0, wmesg, 0);
111 }
112
113 struct taskqueue *
114 taskqueue_create(const char *name, int mflags,
115                  taskqueue_enqueue_fn enqueue, void *context)
116 {
117         struct taskqueue *queue;
118
119         queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO);
120         if (!queue)
121                 return NULL;
122         STAILQ_INIT(&queue->tq_queue);
123         queue->tq_name = name;
124         queue->tq_enqueue = enqueue;
125         queue->tq_context = context;
126         queue->tq_flags |= TQ_FLAGS_ACTIVE;
127         TQ_LOCK_INIT(queue);
128
129         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
130         STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
131         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
132
133         return queue;
134 }
135
136 /* NOTE: tq must be locked */
137 static void
138 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
139 {
140         while(tq->tq_tcount > 0) {
141                 /* Unlock spinlock before wakeup() */
142                 TQ_UNLOCK(tq);
143                 wakeup(tq);
144                 TQ_LOCK(tq);
145                 TQ_SLEEP(tq, pp, "taskqueue_terminate");
146         }
147 }
148
149 void
150 taskqueue_free(struct taskqueue *queue)
151 {
152         TQ_LOCK(queue);
153         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
154         taskqueue_run(queue, 1);
155         taskqueue_terminate(queue->tq_threads, queue);
156         TQ_UNLOCK(queue);
157
158         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
159         STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
160         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
161
162         TQ_LOCK_UNINIT(queue);
163
164         kfree(queue, M_TASKQUEUE);
165 }
166
167 struct taskqueue *
168 taskqueue_find(const char *name)
169 {
170         struct taskqueue *queue;
171
172         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
173         STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
174                 if (!strcmp(queue->tq_name, name)) {
175                         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
176                         return queue;
177                 }
178         }
179         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
180         return NULL;
181 }
182
183 /*
184  * NOTE!  If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
185  * be sure NOT TO SHARE the ``task'' between CPUs.  TASKS ARE NOT LOCKED.
186  * So either use a throwaway task which will only be enqueued once, or
187  * use one task per CPU!
188  */
189 static int
190 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
191 {
192         struct task *ins;
193         struct task *prev;
194
195         /*
196          * Don't allow new tasks on a queue which is being freed.
197          */
198         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0)
199                 return EPIPE;
200
201         /*
202          * Count multiple enqueues.
203          */
204         if (task->ta_pending) {
205                 task->ta_pending++;
206                 return 0;
207         }
208
209         /*
210          * Optimise the case when all tasks have the same priority.
211          */
212         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
213         if (!prev || prev->ta_priority >= task->ta_priority) {
214                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
215         } else {
216                 prev = NULL;
217                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
218                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
219                         if (ins->ta_priority < task->ta_priority)
220                                 break;
221
222                 if (prev)
223                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
224                 else
225                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
226         }
227
228         task->ta_pending = 1;
229         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) {
230                 if (queue->tq_enqueue)
231                         queue->tq_enqueue(queue->tq_context);
232         } else {
233                 queue->tq_flags |= TQ_FLAGS_PENDING;
234         }
235
236         return 0;
237 }
238
239 int
240 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
241 {
242         int res;
243
244         TQ_LOCK(queue);
245         res = taskqueue_enqueue_locked(queue, task);
246         TQ_UNLOCK(queue);
247
248         return (res);
249 }
250
251 static void
252 taskqueue_timeout_func(void *arg)
253 {
254         struct taskqueue *queue;
255         struct timeout_task *timeout_task;
256
257         timeout_task = arg;
258         queue = timeout_task->q;
259
260         TQ_LOCK(queue);
261         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
262         timeout_task->f &= ~DT_CALLOUT_ARMED;
263         queue->tq_callouts--;
264         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
265         TQ_UNLOCK(queue);
266 }
267
268 int
269 taskqueue_enqueue_timeout(struct taskqueue *queue,
270     struct timeout_task *timeout_task, int ticks)
271 {
272         int res;
273
274         TQ_LOCK(queue);
275         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
276                 ("Migrated queue"));
277         timeout_task->q = queue;
278         res = timeout_task->t.ta_pending;
279         if (ticks == 0) {
280                 taskqueue_enqueue_locked(queue, &timeout_task->t);
281                 TQ_UNLOCK(queue);
282         } else {
283                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
284                         res++;
285                 } else {
286                         queue->tq_callouts++;
287                         timeout_task->f |= DT_CALLOUT_ARMED;
288                 }
289                 TQ_UNLOCK(queue);
290                 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
291                               timeout_task);
292         }
293         return (res);
294 }
295
296 void
297 taskqueue_block(struct taskqueue *queue)
298 {
299         TQ_LOCK(queue);
300         queue->tq_flags |= TQ_FLAGS_BLOCKED;
301         TQ_UNLOCK(queue);
302 }
303
304 void
305 taskqueue_unblock(struct taskqueue *queue)
306 {
307         TQ_LOCK(queue);
308         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
309         if (queue->tq_flags & TQ_FLAGS_PENDING) {
310                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
311                 if (queue->tq_enqueue)
312                         queue->tq_enqueue(queue->tq_context);
313         }
314         TQ_UNLOCK(queue);
315 }
316
317 static void
318 taskqueue_run(struct taskqueue *queue, int lock_held)
319 {
320         struct task *task;
321         int pending;
322
323         if (lock_held == 0)
324                 TQ_LOCK(queue);
325         while (STAILQ_FIRST(&queue->tq_queue)) {
326                 /*
327                  * Carefully remove the first task from the queue and
328                  * zero its pending count.
329                  */
330                 task = STAILQ_FIRST(&queue->tq_queue);
331                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
332                 pending = task->ta_pending;
333                 task->ta_pending = 0;
334                 queue->tq_running = task;
335
336                 TQ_UNLOCK(queue);
337                 task->ta_func(task->ta_context, pending);
338                 queue->tq_running = NULL;
339                 wakeup(task);
340                 TQ_LOCK(queue);
341         }
342         if (lock_held == 0)
343                 TQ_UNLOCK(queue);
344 }
345
346 static int
347 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
348     u_int *pendp)
349 {
350
351         if (task->ta_pending > 0)
352                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
353         if (pendp != NULL)
354                 *pendp = task->ta_pending;
355         task->ta_pending = 0;
356         return (task == queue->tq_running ? EBUSY : 0);
357 }
358
359 int
360 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
361 {
362         int error;
363
364         TQ_LOCK(queue);
365         error = taskqueue_cancel_locked(queue, task, pendp);
366         TQ_UNLOCK(queue);
367
368         return (error);
369 }
370
371 int
372 taskqueue_cancel_timeout(struct taskqueue *queue,
373                          struct timeout_task *timeout_task, u_int *pendp)
374 {
375         u_int pending, pending1;
376         int error;
377
378         pending = !!callout_stop(&timeout_task->c);
379         TQ_LOCK(queue);
380         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
381         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
382                 timeout_task->f &= ~DT_CALLOUT_ARMED;
383                 queue->tq_callouts--;
384         }
385         TQ_UNLOCK(queue);
386
387         if (pendp != NULL)
388                 *pendp = pending + pending1;
389         return (error);
390 }
391
392 void
393 taskqueue_drain(struct taskqueue *queue, struct task *task)
394 {
395         TQ_LOCK(queue);
396         while (task->ta_pending != 0 || task == queue->tq_running)
397                 TQ_SLEEP(queue, task, "-");
398         TQ_UNLOCK(queue);
399 }
400
401 void
402 taskqueue_drain_timeout(struct taskqueue *queue,
403     struct timeout_task *timeout_task)
404 {
405         callout_cancel(&timeout_task->c);
406         taskqueue_drain(queue, &timeout_task->t);
407 }
408
409 static void
410 taskqueue_swi_enqueue(void *context)
411 {
412         setsofttq();
413 }
414
415 static void
416 taskqueue_swi_run(void *arg, void *frame)
417 {
418         taskqueue_run(taskqueue_swi, 0);
419 }
420
421 static void
422 taskqueue_swi_mp_run(void *arg, void *frame)
423 {
424         taskqueue_run(taskqueue_swi_mp, 0);
425 }
426
427 int
428 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu,
429                         const char *fmt, ...)
430 {
431         __va_list ap;
432         struct thread *td;
433         struct taskqueue *tq;
434         int i, error, cpu;
435         char ktname[MAXCOMLEN];
436
437         if (count <= 0)
438                 return EINVAL;
439         /* catch call argument mistakes */
440         KKASSERT(pri > 0 && pri < TDPRI_MAX);
441
442         tq = *tqp;
443         cpu = ncpu;
444
445         __va_start(ap, fmt);
446         kvsnprintf(ktname, MAXCOMLEN, fmt, ap);
447         __va_end(ap);
448
449         tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE,
450             M_WAITOK | M_ZERO);
451
452         for (i = 0; i < count; i++) {
453                 /*
454                  * If no specific cpu was specified and more than one thread
455                  * is to be created, we distribute the threads amongst all
456                  * cpus.
457                  */
458                 if ((ncpu <= -1) && (count > 1))
459                         cpu = i%ncpus;
460
461                 if (count == 1) {
462                         error = lwkt_create(taskqueue_thread_loop, tqp,
463                                             &tq->tq_threads[i], NULL,
464                                             TDF_NOSTART, cpu,
465                                             "%s", ktname);
466                 } else {
467                         error = lwkt_create(taskqueue_thread_loop, tqp,
468                                             &tq->tq_threads[i], NULL,
469                                             TDF_NOSTART, cpu,
470                                             "%s_%d", ktname, i);
471                 }
472                 if (error) {
473                         kprintf("%s: lwkt_create(%s): error %d", __func__,
474                             ktname, error);
475                         tq->tq_threads[i] = NULL;
476                 } else {
477                         td = tq->tq_threads[i];
478                         lwkt_setpri_initial(td, pri);
479                         lwkt_schedule(td);
480                         tq->tq_tcount++;
481                 }
482         }
483
484         return 0;
485 }
486
487 void
488 taskqueue_thread_loop(void *arg)
489 {
490         struct taskqueue **tqp, *tq;
491
492         tqp = arg;
493         tq = *tqp;
494         TQ_LOCK(tq);
495         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
496                 taskqueue_run(tq, 1);
497                 TQ_SLEEP(tq, tq, "tqthr");
498         }
499
500         /* rendezvous with thread that asked us to terminate */
501         tq->tq_tcount--;
502         TQ_UNLOCK(tq);
503         wakeup_one(tq->tq_threads);
504         lwkt_exit();
505 }
506
507 /* NOTE: tq must be locked */
508 void
509 taskqueue_thread_enqueue(void *context)
510 {
511         struct taskqueue **tqp, *tq;
512
513         tqp = context;
514         tq = *tqp;
515
516         /* Unlock spinlock before wakeup_one() */
517         TQ_UNLOCK(tq);
518         wakeup_one(tq);
519         TQ_LOCK(tq);
520 }
521
522 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
523          register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL, -1));
524 /*
525  * XXX: possibly use a different SWI_TQ_MP or so.
526  * related: sys/interrupt.h
527  * related: platform/XXX/isa/ipl_funcs.c
528  */
529 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0,
530     register_swi_mp(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL, 
531                     -1));
532
533 struct taskqueue *taskqueue_thread[MAXCPU];
534
535 static void
536 taskqueue_init(void)
537 {
538         int cpu;
539
540         lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0);
541         STAILQ_INIT(&taskqueue_queues);
542
543         for (cpu = 0; cpu < ncpus; cpu++) {
544                 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT,
545                     taskqueue_thread_enqueue, &taskqueue_thread[cpu]);
546                 taskqueue_start_threads(&taskqueue_thread[cpu], 1,
547                     TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu);
548         }
549 }
550
551 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL);