Merge branch 'vendor/GDB'
[dragonfly.git] / sys / kern / subr_taskqueue.c
1 /*-
2  * Copyright (c) 2000 Doug Rabson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice, this list of conditions and the following disclaimer.
10  * 2. Redistributions in binary form must reproduce the above copyright
11  *    notice, this list of conditions and the following disclaimer in the
12  *    documentation and/or other materials provided with the distribution.
13  *
14  * THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
15  * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
16  * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
17  * ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
18  * FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
19  * DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
20  * OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
21  * HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
22  * LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
23  * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
24  * SUCH DAMAGE.
25  *
26  * $FreeBSD: src/sys/kern/subr_taskqueue.c,v 1.69 2012/08/28 13:35:37 jhb Exp $"
27  */
28
29 #include <sys/param.h>
30 #include <sys/queue.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/taskqueue.h>
34 #include <sys/interrupt.h>
35 #include <sys/lock.h>
36 #include <sys/malloc.h>
37 #include <sys/kthread.h>
38 #include <sys/thread2.h>
39 #include <sys/spinlock.h>
40 #include <sys/spinlock2.h>
41 #include <sys/serialize.h>
42 #include <sys/proc.h>
43 #include <machine/varargs.h>
44
45 MALLOC_DEFINE(M_TASKQUEUE, "taskqueue", "Task Queues");
46
47 static STAILQ_HEAD(taskqueue_list, taskqueue) taskqueue_queues;
48 static struct lock      taskqueue_queues_lock;
49
50 struct taskqueue {
51         STAILQ_ENTRY(taskqueue) tq_link;
52         STAILQ_HEAD(, task)     tq_queue;
53         const char              *tq_name;
54         taskqueue_enqueue_fn    tq_enqueue;
55         void                    *tq_context;
56
57         struct task             *tq_running;
58         struct spinlock         tq_lock;
59         struct thread           **tq_threads;
60         int                     tq_tcount;
61         int                     tq_flags;
62         int                     tq_callouts;
63 };
64
65 #define TQ_FLAGS_ACTIVE         (1 << 0)
66 #define TQ_FLAGS_BLOCKED        (1 << 1)
67 #define TQ_FLAGS_PENDING        (1 << 2)
68
69 #define DT_CALLOUT_ARMED        (1 << 0)
70
71 void
72 _timeout_task_init(struct taskqueue *queue, struct timeout_task *timeout_task,
73     int priority, task_fn_t func, void *context)
74 {
75
76         TASK_INIT(&timeout_task->t, priority, func, context);
77         callout_init(&timeout_task->c);
78         timeout_task->q = queue;
79         timeout_task->f = 0;
80 }
81
82 static void taskqueue_run(struct taskqueue *queue, int lock_held);
83
84 static __inline void
85 TQ_LOCK_INIT(struct taskqueue *tq)
86 {
87         spin_init(&tq->tq_lock);
88 }
89
90 static __inline void
91 TQ_LOCK_UNINIT(struct taskqueue *tq)
92 {
93         spin_uninit(&tq->tq_lock);
94 }
95
96 static __inline void
97 TQ_LOCK(struct taskqueue *tq)
98 {
99         spin_lock(&tq->tq_lock);
100 }
101
102 static __inline void
103 TQ_UNLOCK(struct taskqueue *tq)
104 {
105         spin_unlock(&tq->tq_lock);
106 }
107
108 static __inline void
109 TQ_SLEEP(struct taskqueue *tq, void *ident, const char *wmesg)
110 {
111         ssleep(ident, &tq->tq_lock, 0, wmesg, 0);
112 }
113
114 struct taskqueue *
115 taskqueue_create(const char *name, int mflags,
116                  taskqueue_enqueue_fn enqueue, void *context)
117 {
118         struct taskqueue *queue;
119
120         queue = kmalloc(sizeof(*queue), M_TASKQUEUE, mflags | M_ZERO);
121         if (!queue)
122                 return NULL;
123         STAILQ_INIT(&queue->tq_queue);
124         queue->tq_name = name;
125         queue->tq_enqueue = enqueue;
126         queue->tq_context = context;
127         queue->tq_flags |= TQ_FLAGS_ACTIVE;
128         TQ_LOCK_INIT(queue);
129
130         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
131         STAILQ_INSERT_TAIL(&taskqueue_queues, queue, tq_link);
132         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
133
134         return queue;
135 }
136
137 static void
138 taskqueue_terminate(struct thread **pp, struct taskqueue *tq)
139 {
140         while(tq->tq_tcount > 0) {
141                 wakeup(tq);
142                 TQ_SLEEP(tq, pp, "taskqueue_terminate");
143         }
144 }
145
146 void
147 taskqueue_free(struct taskqueue *queue)
148 {
149         TQ_LOCK(queue);
150         queue->tq_flags &= ~TQ_FLAGS_ACTIVE;
151         taskqueue_run(queue, 1);
152         taskqueue_terminate(queue->tq_threads, queue);
153         TQ_UNLOCK(queue);
154
155         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
156         STAILQ_REMOVE(&taskqueue_queues, queue, taskqueue, tq_link);
157         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
158
159         TQ_LOCK_UNINIT(queue);
160
161         kfree(queue, M_TASKQUEUE);
162 }
163
164 struct taskqueue *
165 taskqueue_find(const char *name)
166 {
167         struct taskqueue *queue;
168
169         lockmgr(&taskqueue_queues_lock, LK_EXCLUSIVE);
170         STAILQ_FOREACH(queue, &taskqueue_queues, tq_link) {
171                 if (!strcmp(queue->tq_name, name)) {
172                         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
173                         return queue;
174                 }
175         }
176         lockmgr(&taskqueue_queues_lock, LK_RELEASE);
177         return NULL;
178 }
179
180 /*
181  * NOTE!  If using the per-cpu taskqueues ``taskqueue_thread[mycpuid]'',
182  * be sure NOT TO SHARE the ``task'' between CPUs.  TASKS ARE NOT LOCKED.
183  * So either use a throwaway task which will only be enqueued once, or
184  * use one task per CPU!
185  */
186 static int
187 taskqueue_enqueue_locked(struct taskqueue *queue, struct task *task)
188 {
189         struct task *ins;
190         struct task *prev;
191
192         /*
193          * Don't allow new tasks on a queue which is being freed.
194          */
195         if ((queue->tq_flags & TQ_FLAGS_ACTIVE) == 0) {
196                 TQ_UNLOCK(queue);
197                 return EPIPE;
198         }
199
200         /*
201          * Count multiple enqueues.
202          */
203         if (task->ta_pending) {
204                 task->ta_pending++;
205                 return 0;
206         }
207
208         /*
209          * Optimise the case when all tasks have the same priority.
210          */
211         prev = STAILQ_LAST(&queue->tq_queue, task, ta_link);
212         if (!prev || prev->ta_priority >= task->ta_priority) {
213                 STAILQ_INSERT_TAIL(&queue->tq_queue, task, ta_link);
214         } else {
215                 prev = NULL;
216                 for (ins = STAILQ_FIRST(&queue->tq_queue); ins;
217                      prev = ins, ins = STAILQ_NEXT(ins, ta_link))
218                         if (ins->ta_priority < task->ta_priority)
219                                 break;
220
221                 if (prev)
222                         STAILQ_INSERT_AFTER(&queue->tq_queue, prev, task, ta_link);
223                 else
224                         STAILQ_INSERT_HEAD(&queue->tq_queue, task, ta_link);
225         }
226
227         task->ta_pending = 1;
228         if ((queue->tq_flags & TQ_FLAGS_BLOCKED) == 0) {
229                 if (queue->tq_enqueue)
230                         queue->tq_enqueue(queue->tq_context);
231         } else {
232                 queue->tq_flags |= TQ_FLAGS_PENDING;
233         }
234
235         return 0;
236 }
237
238 int
239 taskqueue_enqueue(struct taskqueue *queue, struct task *task)
240 {
241         int res;
242
243         TQ_LOCK(queue);
244         res = taskqueue_enqueue_locked(queue, task);
245         TQ_UNLOCK(queue);
246
247         return (res);
248 }
249
250 static void
251 taskqueue_timeout_func(void *arg)
252 {
253         struct taskqueue *queue;
254         struct timeout_task *timeout_task;
255
256         timeout_task = arg;
257         queue = timeout_task->q;
258         KASSERT((timeout_task->f & DT_CALLOUT_ARMED) != 0, ("Stray timeout"));
259         timeout_task->f &= ~DT_CALLOUT_ARMED;
260         queue->tq_callouts--;
261         taskqueue_enqueue_locked(timeout_task->q, &timeout_task->t);
262 }
263
264 int
265 taskqueue_enqueue_timeout(struct taskqueue *queue,
266     struct timeout_task *timeout_task, int ticks)
267 {
268         int res;
269
270         TQ_LOCK(queue);
271         KASSERT(timeout_task->q == NULL || timeout_task->q == queue,
272             ("Migrated queue"));
273         timeout_task->q = queue;
274         res = timeout_task->t.ta_pending;
275         if (ticks == 0) {
276                 taskqueue_enqueue_locked(queue, &timeout_task->t);
277         } else {
278                 if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
279                         res++;
280                 } else {
281                         queue->tq_callouts++;
282                         timeout_task->f |= DT_CALLOUT_ARMED;
283                 }
284                 callout_reset(&timeout_task->c, ticks, taskqueue_timeout_func,
285                     timeout_task);
286         }
287         TQ_UNLOCK(queue);
288         return (res);
289 }
290
291 void
292 taskqueue_block(struct taskqueue *queue)
293 {
294         TQ_LOCK(queue);
295         queue->tq_flags |= TQ_FLAGS_BLOCKED;
296         TQ_UNLOCK(queue);
297 }
298
299 void
300 taskqueue_unblock(struct taskqueue *queue)
301 {
302         TQ_LOCK(queue);
303         queue->tq_flags &= ~TQ_FLAGS_BLOCKED;
304         if (queue->tq_flags & TQ_FLAGS_PENDING) {
305                 queue->tq_flags &= ~TQ_FLAGS_PENDING;
306                 if (queue->tq_enqueue)
307                         queue->tq_enqueue(queue->tq_context);
308         }
309         TQ_UNLOCK(queue);
310 }
311
312 void
313 taskqueue_run(struct taskqueue *queue, int lock_held)
314 {
315         struct task *task;
316         int pending;
317
318         if (lock_held == 0)
319                 TQ_LOCK(queue);
320         while (STAILQ_FIRST(&queue->tq_queue)) {
321                 /*
322                  * Carefully remove the first task from the queue and
323                  * zero its pending count.
324                  */
325                 task = STAILQ_FIRST(&queue->tq_queue);
326                 STAILQ_REMOVE_HEAD(&queue->tq_queue, ta_link);
327                 pending = task->ta_pending;
328                 task->ta_pending = 0;
329                 queue->tq_running = task;
330                 TQ_UNLOCK(queue);
331
332                 task->ta_func(task->ta_context, pending);
333
334                 TQ_LOCK(queue);
335                 queue->tq_running = NULL;
336                 wakeup(task);
337         }
338         if (lock_held == 0)
339                 TQ_UNLOCK(queue);
340 }
341
342 static int
343 taskqueue_cancel_locked(struct taskqueue *queue, struct task *task,
344     u_int *pendp)
345 {
346
347         if (task->ta_pending > 0)
348                 STAILQ_REMOVE(&queue->tq_queue, task, task, ta_link);
349         if (pendp != NULL)
350                 *pendp = task->ta_pending;
351         task->ta_pending = 0;
352         return (task == queue->tq_running ? EBUSY : 0);
353 }
354
355 int
356 taskqueue_cancel(struct taskqueue *queue, struct task *task, u_int *pendp)
357 {
358         u_int pending;
359         int error;
360
361         TQ_LOCK(queue);
362         pending = task->ta_pending;
363         error = taskqueue_cancel_locked(queue, task, pendp);
364         TQ_UNLOCK(queue);
365
366         return (error);
367 }
368
369 int
370 taskqueue_cancel_timeout(struct taskqueue *queue,
371     struct timeout_task *timeout_task, u_int *pendp)
372 {
373         u_int pending, pending1;
374         int error;
375
376         TQ_LOCK(queue);
377         pending = !!callout_stop(&timeout_task->c);
378         error = taskqueue_cancel_locked(queue, &timeout_task->t, &pending1);
379         if ((timeout_task->f & DT_CALLOUT_ARMED) != 0) {
380                 timeout_task->f &= ~DT_CALLOUT_ARMED;
381                 queue->tq_callouts--;
382         }
383         TQ_UNLOCK(queue);
384
385         if (pendp != NULL)
386                 *pendp = pending + pending1;
387         return (error);
388 }
389
390 void
391 taskqueue_drain(struct taskqueue *queue, struct task *task)
392 {
393         TQ_LOCK(queue);
394         while (task->ta_pending != 0 || task == queue->tq_running)
395                 TQ_SLEEP(queue, task, "-");
396         TQ_UNLOCK(queue);
397 }
398
399 void
400 taskqueue_drain_timeout(struct taskqueue *queue,
401     struct timeout_task *timeout_task)
402 {
403
404         callout_stop_sync(&timeout_task->c);
405         taskqueue_drain(queue, &timeout_task->t);
406 }
407
408 static void
409 taskqueue_swi_enqueue(void *context)
410 {
411         setsofttq();
412 }
413
414 static void
415 taskqueue_swi_run(void *arg, void *frame)
416 {
417         taskqueue_run(taskqueue_swi, 0);
418 }
419
420 static void
421 taskqueue_swi_mp_run(void *arg, void *frame)
422 {
423         taskqueue_run(taskqueue_swi_mp, 0);
424 }
425
426 int
427 taskqueue_start_threads(struct taskqueue **tqp, int count, int pri, int ncpu,
428                         const char *fmt, ...)
429 {
430         __va_list ap;
431         struct thread *td;
432         struct taskqueue *tq;
433         int i, error, cpu;
434         char ktname[MAXCOMLEN];
435
436         if (count <= 0)
437                 return EINVAL;
438
439         tq = *tqp;
440         cpu = ncpu;
441
442         __va_start(ap, fmt);
443         kvsnprintf(ktname, MAXCOMLEN, fmt, ap);
444         __va_end(ap);
445
446         tq->tq_threads = kmalloc(sizeof(struct thread *) * count, M_TASKQUEUE,
447             M_WAITOK | M_ZERO);
448
449         for (i = 0; i < count; i++) {
450                 /*
451                  * If no specific cpu was specified and more than one thread
452                  * is to be created, we distribute the threads amongst all
453                  * cpus.
454                  */
455                 if ((ncpu <= -1) && (count > 1))
456                         cpu = i%ncpus;
457
458                 if (count == 1) {
459                         error = lwkt_create(taskqueue_thread_loop, tqp,
460                                             &tq->tq_threads[i], NULL,
461                                             TDF_NOSTART, cpu,
462                                             "%s", ktname);
463                 } else {
464                         error = lwkt_create(taskqueue_thread_loop, tqp,
465                                             &tq->tq_threads[i], NULL,
466                                             TDF_NOSTART, cpu,
467                                             "%s_%d", ktname, i);
468                 }
469                 if (error) {
470                         kprintf("%s: lwkt_create(%s): error %d", __func__,
471                             ktname, error);
472                         tq->tq_threads[i] = NULL;
473                 } else {
474                         td = tq->tq_threads[i];
475                         lwkt_setpri_initial(td, pri);
476                         lwkt_schedule(td);
477                         tq->tq_tcount++;
478                 }
479         }
480
481         return 0;
482 }
483
484 void
485 taskqueue_thread_loop(void *arg)
486 {
487         struct taskqueue **tqp, *tq;
488
489         tqp = arg;
490         tq = *tqp;
491         TQ_LOCK(tq);
492         while ((tq->tq_flags & TQ_FLAGS_ACTIVE) != 0) {
493                 taskqueue_run(tq, 1);
494                 TQ_SLEEP(tq, tq, "tqthr");
495         }
496
497         /* rendezvous with thread that asked us to terminate */
498         tq->tq_tcount--;
499         wakeup_one(tq->tq_threads);
500         TQ_UNLOCK(tq);
501         lwkt_exit();
502 }
503
504 void
505 taskqueue_thread_enqueue(void *context)
506 {
507         struct taskqueue **tqp, *tq;
508
509         tqp = context;
510         tq = *tqp;
511
512         wakeup_one(tq);
513 }
514
515 TASKQUEUE_DEFINE(swi, taskqueue_swi_enqueue, 0,
516          register_swi(SWI_TQ, taskqueue_swi_run, NULL, "swi_taskq", NULL, -1));
517 /*
518  * XXX: possibly use a different SWI_TQ_MP or so.
519  * related: sys/interrupt.h
520  * related: platform/XXX/isa/ipl_funcs.c
521  */
522 TASKQUEUE_DEFINE(swi_mp, taskqueue_swi_enqueue, 0,
523     register_swi_mp(SWI_TQ, taskqueue_swi_mp_run, NULL, "swi_mp_taskq", NULL, 
524                     -1));
525
526 struct taskqueue *taskqueue_thread[MAXCPU];
527
528 static void
529 taskqueue_init(void)
530 {
531         int cpu;
532
533         lockinit(&taskqueue_queues_lock, "tqqueues", 0, 0);
534         STAILQ_INIT(&taskqueue_queues);
535
536         for (cpu = 0; cpu < ncpus; cpu++) {
537                 taskqueue_thread[cpu] = taskqueue_create("thread", M_INTWAIT,
538                     taskqueue_thread_enqueue, &taskqueue_thread[cpu]);
539                 taskqueue_start_threads(&taskqueue_thread[cpu], 1,
540                     TDPRI_KERN_DAEMON, cpu, "taskq_cpu %d", cpu);
541         }
542 }
543
544 SYSINIT(taskqueueinit, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, taskqueue_init, NULL);