2 * Copyright (c) 2009, 2010 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Alex Hornung <ahornung@gmail.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
38 #include <sys/sysctl.h>
41 #include <sys/diskslice.h>
43 #include <machine/atomic.h>
44 #include <sys/malloc.h>
45 #include <sys/thread.h>
46 #include <sys/thread2.h>
47 #include <sys/sysctl.h>
48 #include <sys/spinlock2.h>
49 #include <machine/md_var.h>
50 #include <sys/ctype.h>
51 #include <sys/syslog.h>
52 #include <sys/device.h>
53 #include <sys/msgport.h>
54 #include <sys/msgport2.h>
56 #include <sys/dsched.h>
57 #include <machine/varargs.h>
58 #include <machine/param.h>
60 #include <dsched/fq/dsched_fq.h>
62 MALLOC_DECLARE(M_DSCHEDFQ);
64 static int dsched_fq_version_maj = 0;
65 static int dsched_fq_version_min = 8;
67 struct dsched_fq_stats fq_stats;
69 struct objcache_malloc_args fq_disk_ctx_malloc_args = {
70 sizeof(struct fq_disk_ctx), M_DSCHEDFQ };
71 struct objcache_malloc_args fq_thread_io_malloc_args = {
72 sizeof(struct fq_thread_io), M_DSCHEDFQ };
73 struct objcache_malloc_args fq_thread_ctx_malloc_args = {
74 sizeof(struct fq_thread_ctx), M_DSCHEDFQ };
76 static struct objcache *fq_diskctx_cache;
77 static struct objcache *fq_tdctx_cache;
78 static struct objcache *fq_tdio_cache;
80 TAILQ_HEAD(, fq_thread_ctx) dsched_tdctx_list =
81 TAILQ_HEAD_INITIALIZER(dsched_tdctx_list);
83 struct lock fq_tdctx_lock;
85 extern struct dsched_policy dsched_fq_policy;
88 fq_disk_ctx_ref(struct fq_disk_ctx *diskctx)
92 refcount = atomic_fetchadd_int(&diskctx->refcount, 1);
94 KKASSERT(refcount >= 0);
98 fq_thread_io_ref(struct fq_thread_io *tdio)
102 refcount = atomic_fetchadd_int(&tdio->refcount, 1);
104 KKASSERT(refcount >= 0);
108 fq_thread_ctx_ref(struct fq_thread_ctx *tdctx)
112 refcount = atomic_fetchadd_int(&tdctx->refcount, 1);
114 KKASSERT(refcount >= 0);
118 fq_disk_ctx_unref(struct fq_disk_ctx *diskctx)
120 struct fq_thread_io *tdio, *tdio2;
123 refcount = atomic_fetchadd_int(&diskctx->refcount, -1);
126 KKASSERT(refcount >= 0 || refcount <= -0x400);
129 atomic_subtract_int(&diskctx->refcount, 0x400); /* mark as: in destruction */
131 kprintf("diskctx (%p) destruction started, trace:\n", diskctx);
134 lockmgr(&diskctx->lock, LK_EXCLUSIVE);
135 TAILQ_FOREACH_MUTABLE(tdio, &diskctx->fq_tdio_list, dlink, tdio2) {
136 TAILQ_REMOVE(&diskctx->fq_tdio_list, tdio, dlink);
137 tdio->flags &= ~FQ_LINKED_DISK_CTX;
138 fq_thread_io_unref(tdio);
140 lockmgr(&diskctx->lock, LK_RELEASE);
142 objcache_put(fq_diskctx_cache, diskctx);
143 atomic_subtract_int(&fq_stats.diskctx_allocations, 1);
148 fq_thread_io_unref(struct fq_thread_io *tdio)
150 struct fq_thread_ctx *tdctx;
151 struct fq_disk_ctx *diskctx;
154 refcount = atomic_fetchadd_int(&tdio->refcount, -1);
156 KKASSERT(refcount >= 0 || refcount <= -0x400);
159 atomic_subtract_int(&tdio->refcount, 0x400); /* mark as: in destruction */
161 kprintf("tdio (%p) destruction started, trace:\n", tdio);
164 diskctx = tdio->diskctx;
165 KKASSERT(diskctx != NULL);
166 KKASSERT(tdio->qlength == 0);
168 if (tdio->flags & FQ_LINKED_DISK_CTX) {
169 lockmgr(&diskctx->lock, LK_EXCLUSIVE);
171 TAILQ_REMOVE(&diskctx->fq_tdio_list, tdio, dlink);
172 tdio->flags &= ~FQ_LINKED_DISK_CTX;
174 lockmgr(&diskctx->lock, LK_RELEASE);
177 if (tdio->flags & FQ_LINKED_THREAD_CTX) {
179 KKASSERT(tdctx != NULL);
181 spin_lock_wr(&tdctx->lock);
183 TAILQ_REMOVE(&tdctx->fq_tdio_list, tdio, link);
184 tdio->flags &= ~FQ_LINKED_THREAD_CTX;
186 spin_unlock_wr(&tdctx->lock);
189 objcache_put(fq_tdio_cache, tdio);
190 atomic_subtract_int(&fq_stats.tdio_allocations, 1);
192 fq_disk_ctx_unref(diskctx);
198 fq_thread_ctx_unref(struct fq_thread_ctx *tdctx)
200 struct fq_thread_io *tdio, *tdio2;
203 refcount = atomic_fetchadd_int(&tdctx->refcount, -1);
205 KKASSERT(refcount >= 0 || refcount <= -0x400);
208 atomic_subtract_int(&tdctx->refcount, 0x400); /* mark as: in destruction */
210 kprintf("tdctx (%p) destruction started, trace:\n", tdctx);
213 FQ_GLOBAL_THREAD_CTX_LOCK();
215 TAILQ_FOREACH_MUTABLE(tdio, &tdctx->fq_tdio_list, link, tdio2) {
216 TAILQ_REMOVE(&tdctx->fq_tdio_list, tdio, link);
217 tdio->flags &= ~FQ_LINKED_THREAD_CTX;
218 fq_thread_io_unref(tdio);
220 TAILQ_REMOVE(&dsched_tdctx_list, tdctx, link);
222 FQ_GLOBAL_THREAD_CTX_UNLOCK();
224 objcache_put(fq_tdctx_cache, tdctx);
225 atomic_subtract_int(&fq_stats.tdctx_allocations, 1);
230 struct fq_thread_io *
231 fq_thread_io_alloc(struct disk *dp, struct fq_thread_ctx *tdctx)
233 struct fq_thread_io *tdio;
235 fq_disk_ctx_ref(dsched_get_disk_priv(dp));
237 tdio = objcache_get(fq_tdio_cache, M_WAITOK);
238 bzero(tdio, sizeof(struct fq_thread_io));
240 /* XXX: maybe we do need another ref for the disk list for tdio */
241 fq_thread_io_ref(tdio);
243 FQ_THREAD_IO_LOCKINIT(tdio);
246 tdio->diskctx = dsched_get_disk_priv(dp);
247 TAILQ_INIT(&tdio->queue);
249 TAILQ_INSERT_TAIL(&tdio->diskctx->fq_tdio_list, tdio, dlink);
250 tdio->flags |= FQ_LINKED_DISK_CTX;
256 /* Put the tdio in the tdctx list */
257 FQ_THREAD_CTX_LOCK(tdctx);
258 TAILQ_INSERT_TAIL(&tdctx->fq_tdio_list, tdio, link);
259 FQ_THREAD_CTX_UNLOCK(tdctx);
260 tdio->flags |= FQ_LINKED_THREAD_CTX;
263 atomic_add_int(&fq_stats.tdio_allocations, 1);
269 fq_disk_ctx_alloc(struct disk *dp)
271 struct fq_disk_ctx *diskctx;
273 diskctx = objcache_get(fq_diskctx_cache, M_WAITOK);
274 bzero(diskctx, sizeof(struct fq_disk_ctx));
275 fq_disk_ctx_ref(diskctx);
277 diskctx->avg_rq_time = 0;
278 diskctx->incomplete_tp = 0;
279 FQ_DISK_CTX_LOCKINIT(diskctx);
280 TAILQ_INIT(&diskctx->fq_tdio_list);
282 atomic_add_int(&fq_stats.diskctx_allocations, 1);
287 struct fq_thread_ctx *
288 fq_thread_ctx_alloc(struct proc *p)
290 struct fq_thread_ctx *tdctx;
291 struct fq_thread_io *tdio;
292 struct disk *dp = NULL;
294 tdctx = objcache_get(fq_tdctx_cache, M_WAITOK);
295 bzero(tdctx, sizeof(struct fq_thread_ctx));
296 fq_thread_ctx_ref(tdctx);
298 kprintf("fq_thread_ctx_alloc, new tdctx = %p\n", tdctx);
300 FQ_THREAD_CTX_LOCKINIT(tdctx);
301 TAILQ_INIT(&tdctx->fq_tdio_list);
304 while ((dp = dsched_disk_enumerate(dp, &dsched_fq_policy))) {
305 tdio = fq_thread_io_alloc(dp, tdctx);
307 fq_thread_io_ref(tdio);
311 FQ_GLOBAL_THREAD_CTX_LOCK();
312 TAILQ_INSERT_TAIL(&dsched_tdctx_list, tdctx, link);
313 FQ_GLOBAL_THREAD_CTX_UNLOCK();
315 atomic_add_int(&fq_stats.tdctx_allocations, 1);
321 fq_dispatcher(struct fq_disk_ctx *diskctx)
323 struct fq_thread_ctx *tdctx;
324 struct fq_thread_io *tdio, *tdio2;
325 struct bio *bio, *bio2;
329 * We need to manually assign an tdio to the tdctx of this thread
330 * since it isn't assigned one during fq_prepare, as the disk
333 tdctx = dsched_get_thread_priv(curthread);
334 KKASSERT(tdctx != NULL);
336 tdio = fq_thread_io_alloc(diskctx->dp, tdctx);
338 fq_thread_io_ref(tdio);
341 FQ_DISK_CTX_LOCK(diskctx);
345 if ((lksleep(diskctx, &diskctx->lock, 0, "fq_dispatcher", hz/15) == 0)) {
347 * We've been woken up; this either means that we are
348 * supposed to die away nicely or that the disk is idle.
351 if (__predict_false(diskctx->die == 1)) {
352 /* If we are supposed to die, drain all queues */
353 fq_drain(diskctx, FQ_DRAIN_FLUSH);
355 /* Now we can safely unlock and exit */
356 FQ_DISK_CTX_UNLOCK(diskctx);
357 kprintf("fq_dispatcher is peacefully dying\n");
363 * We have been awakened because the disk is idle.
364 * So let's get ready to dispatch some extra bios.
369 /* Maybe the disk is idle and we just didn't get the wakeup */
371 idle = diskctx->idle;
374 * XXX: further room for improvements here. It would be better
375 * to dispatch a few requests from each tdio as to ensure
378 TAILQ_FOREACH_MUTABLE(tdio, &diskctx->fq_tdio_list, dlink, tdio2) {
379 if (tdio->qlength == 0)
382 FQ_THREAD_IO_LOCK(tdio);
383 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
384 fq_balance_self(tdio);
386 * XXX: why 5 extra? should probably be dynamic,
387 * relying on information on latency.
389 if ((tdio->max_tp > 0) && idle &&
390 (tdio->issued >= tdio->max_tp)) {
394 TAILQ_FOREACH_MUTABLE(bio, &tdio->queue, link, bio2) {
395 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
396 fq_balance_self(tdio);
397 if ((tdio->max_tp > 0) &&
398 ((tdio->issued >= tdio->max_tp)))
401 TAILQ_REMOVE(&tdio->queue, bio, link);
405 * beware that we do have an tdio reference
408 fq_dispatch(diskctx, bio, tdio);
410 FQ_THREAD_IO_UNLOCK(tdio);
417 fq_balance_thread(struct fq_disk_ctx *diskctx)
419 struct fq_thread_io *tdio, *tdio2;
420 struct timeval tv, old_tv;
421 int64_t total_budget, product;
422 int64_t budget[FQ_PRIO_MAX+1];
423 int n, i, sum, total_disk_time;
426 FQ_DISK_CTX_LOCK(diskctx);
428 getmicrotime(&diskctx->start_interval);
432 if ((lksleep(curthread, &diskctx->lock, 0, "fq_balancer", hz/2) == 0)) {
433 if (__predict_false(diskctx->die)) {
434 FQ_DISK_CTX_UNLOCK(diskctx);
439 bzero(budget, sizeof(budget));
443 old_tv = diskctx->start_interval;
446 total_disk_time = (int)(1000000*((tv.tv_sec - old_tv.tv_sec)) +
447 (tv.tv_usec - old_tv.tv_usec));
449 if (total_disk_time == 0)
452 dsched_debug(LOG_INFO, "total_disk_time = %d\n", total_disk_time);
454 diskctx->start_interval = tv;
456 diskctx->disk_busy = (100*(total_disk_time - diskctx->idle_time)) / total_disk_time;
457 if (diskctx->disk_busy < 0)
458 diskctx->disk_busy = 0;
460 diskctx->idle_time = 0;
463 TAILQ_FOREACH_MUTABLE(tdio, &diskctx->fq_tdio_list, dlink, tdio2) {
464 tdio->interval_avg_latency = tdio->avg_latency;
465 tdio->interval_transactions = tdio->transactions;
466 if (tdio->interval_transactions > 0) {
467 product = (int64_t)tdio->interval_avg_latency *
468 tdio->interval_transactions;
469 product >>= lost_bits;
470 while(total_budget >= INT64_MAX - product) {
475 total_budget += product;
476 ++budget[(tdio->p) ? tdio->p->p_ionice : 0];
477 KKASSERT(total_budget >= 0);
478 dsched_debug(LOG_INFO,
479 "%d) avg_latency = %d, transactions = %d, ioprio = %d\n",
480 n, tdio->interval_avg_latency, tdio->interval_transactions,
481 (tdio->p) ? tdio->p->p_ionice : 0);
487 tdio->transactions = 0;
488 tdio->avg_latency = 0;
492 dsched_debug(LOG_INFO, "%d procs competing for disk\n"
493 "total_budget = %jd (lost bits = %d)\n"
494 "incomplete tp = %d\n", n, (intmax_t)total_budget,
495 lost_bits, diskctx->incomplete_tp);
502 for (i = 0; i < FQ_PRIO_MAX+1; i++) {
505 sum += (FQ_PRIO_BIAS+i)*budget[i];
511 dsched_debug(LOG_INFO, "sum = %d\n", sum);
513 for (i = 0; i < FQ_PRIO_MAX+1; i++) {
518 * XXX: if we still overflow here, we really need to switch to
519 * some more advanced mechanism such as compound int128 or
520 * storing the lost bits so they can be used in the
523 diskctx->budgetpb[i] = ((FQ_PRIO_BIAS+i)*total_budget/sum) << lost_bits;
524 KKASSERT(diskctx->budgetpb[i] >= 0);
527 dsched_debug(4, "disk is %d%% busy\n", diskctx->disk_busy);
528 TAILQ_FOREACH(tdio, &diskctx->fq_tdio_list, dlink) {
532 diskctx->prev_full = diskctx->last_full;
533 diskctx->last_full = (diskctx->disk_busy >= 90)?1:0;
539 * fq_balance_self should be called from all sorts of dispatchers. It basically
540 * offloads some of the heavier calculations on throttling onto the process that
541 * wants to do I/O instead of doing it in the fq_balance thread.
542 * - should be called with diskctx lock held
545 fq_balance_self(struct fq_thread_io *tdio) {
546 struct fq_disk_ctx *diskctx;
548 int64_t budget, used_budget;
550 int64_t transactions;
552 transactions = (int64_t)tdio->interval_transactions;
553 avg_latency = (int64_t)tdio->interval_avg_latency;
554 diskctx = tdio->diskctx;
557 /* XXX: do we really require the lock? */
558 FQ_DISK_CTX_LOCK_ASSERT(diskctx);
561 used_budget = ((int64_t)avg_latency * transactions);
562 budget = diskctx->budgetpb[(tdio->p) ? tdio->p->p_ionice : 0];
564 if (used_budget > 0) {
565 dsched_debug(LOG_INFO,
566 "info: used_budget = %jd, budget = %jd\n",
567 (intmax_t)used_budget, budget);
570 if ((used_budget > budget) && (diskctx->disk_busy >= 90)) {
571 KKASSERT(avg_latency != 0);
573 tdio->max_tp = budget/(avg_latency);
574 atomic_add_int(&fq_stats.procs_limited, 1);
576 dsched_debug(LOG_INFO,
577 "rate limited to %d transactions\n", tdio->max_tp);
579 } else if (((used_budget*2 < budget) || (diskctx->disk_busy < 80)) &&
580 (!diskctx->prev_full && !diskctx->last_full)) {
587 do_fqstats(SYSCTL_HANDLER_ARGS)
589 return (sysctl_handle_opaque(oidp, &fq_stats, sizeof(struct dsched_fq_stats), req));
593 SYSCTL_PROC(_kern, OID_AUTO, fq_stats, CTLTYPE_OPAQUE|CTLFLAG_RD,
594 0, sizeof(struct dsched_fq_stats), do_fqstats, "fq_stats",
595 "dsched_fq statistics");
613 fq_tdio_cache = objcache_create("fq-tdio-cache", 0, 0,
615 objcache_malloc_alloc,
616 objcache_malloc_free,
617 &fq_thread_io_malloc_args );
619 fq_tdctx_cache = objcache_create("fq-tdctx-cache", 0, 0,
621 objcache_malloc_alloc,
622 objcache_malloc_free,
623 &fq_thread_ctx_malloc_args );
625 FQ_GLOBAL_THREAD_CTX_LOCKINIT();
627 fq_diskctx_cache = objcache_create("fq-diskctx-cache", 0, 0,
629 objcache_malloc_alloc,
630 objcache_malloc_free,
631 &fq_disk_ctx_malloc_args );
633 bzero(&fq_stats, sizeof(struct dsched_fq_stats));
635 dsched_register(&dsched_fq_policy);
637 kprintf("FQ scheduler policy version %d.%d loaded\n",
638 dsched_fq_version_maj, dsched_fq_version_min);
647 SYSINIT(fq_register, SI_SUB_PRE_DRIVERS, SI_ORDER_ANY, fq_init, NULL);
648 SYSUNINIT(fq_register, SI_SUB_PRE_DRIVERS, SI_ORDER_FIRST, fq_uninit, NULL);
650 SYSINIT(fq_early, SI_SUB_CREATE_INIT-1, SI_ORDER_FIRST, fq_earlyinit, NULL);
651 SYSUNINIT(fq_early, SI_SUB_CREATE_INIT-1, SI_ORDER_ANY, fq_earlyuninit, NULL);