2 * Copyright (c) 2009, 2010 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Alex Hornung <ahornung@gmail.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
38 #include <sys/sysctl.h>
41 #include <sys/diskslice.h>
43 #include <machine/atomic.h>
44 #include <sys/malloc.h>
45 #include <sys/thread.h>
46 #include <sys/thread2.h>
47 #include <sys/sysctl.h>
48 #include <sys/spinlock2.h>
49 #include <machine/md_var.h>
50 #include <sys/ctype.h>
51 #include <sys/syslog.h>
52 #include <sys/device.h>
53 #include <sys/msgport.h>
54 #include <sys/msgport2.h>
56 #include <sys/dsched.h>
57 #include <machine/varargs.h>
58 #include <machine/param.h>
60 #include <dsched/fq/dsched_fq.h>
63 MALLOC_DEFINE(M_DSCHEDFQ, "dschedfq", "fq dsched allocs");
65 static dsched_prepare_t fq_prepare;
66 static dsched_teardown_t fq_teardown;
67 static dsched_flush_t fq_flush;
68 static dsched_cancel_t fq_cancel;
69 static dsched_queue_t fq_queue;
71 /* These are in _procops */
72 dsched_new_buf_t fq_new_buf;
73 dsched_new_proc_t fq_new_proc;
74 dsched_new_thread_t fq_new_thread;
75 dsched_exit_buf_t fq_exit_buf;
76 dsched_exit_proc_t fq_exit_proc;
77 dsched_exit_thread_t fq_exit_thread;
79 extern struct dsched_fq_stats fq_stats;
80 extern struct lock fq_tdctx_lock;
81 extern TAILQ_HEAD(, fq_thread_ctx) dsched_tdctx_list;
82 extern struct callout fq_callout;
84 struct dsched_policy dsched_fq_policy = {
87 .prepare = fq_prepare,
88 .teardown = fq_teardown,
90 .cancel_all = fq_cancel,
91 .bio_queue = fq_queue,
93 .new_buf = fq_new_buf,
94 .new_proc = fq_new_proc,
95 .new_thread = fq_new_thread,
96 .exit_buf = fq_exit_buf,
97 .exit_proc = fq_exit_proc,
98 .exit_thread = fq_exit_thread,
104 fq_prepare(struct disk *dp)
106 struct fq_disk_ctx *diskctx;
107 struct fq_thread_ctx *tdctx;
108 struct fq_thread_io *tdio;
109 struct thread *td_core, *td_balance;
111 diskctx = fq_disk_ctx_alloc(dp);
112 fq_disk_ctx_ref(diskctx);
113 dsched_set_disk_priv(dp, diskctx);
115 FQ_GLOBAL_THREAD_CTX_LOCK();
116 TAILQ_FOREACH(tdctx, &dsched_tdctx_list, link) {
117 tdio = fq_thread_io_alloc(dp, tdctx);
119 fq_thread_io_ref(tdio);
122 FQ_GLOBAL_THREAD_CTX_UNLOCK();
124 lwkt_create((void (*)(void *))fq_dispatcher, diskctx, &td_core, NULL,
125 TDF_MPSAFE, -1, "fq_dispatch_%s", dp->d_cdev->si_name);
126 lwkt_create((void (*)(void *))fq_balance_thread, diskctx, &td_balance,
127 NULL, TDF_MPSAFE, -1, "fq_balance_%s", dp->d_cdev->si_name);
128 diskctx->td_balance = td_balance;
136 fq_teardown(struct disk *dp)
138 struct fq_disk_ctx *diskctx;
140 diskctx = dsched_get_disk_priv(dp);
141 KKASSERT(diskctx != NULL);
143 /* Basically kill the dispatcher thread */
145 wakeup(diskctx->td_balance);
147 tsleep(diskctx, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
148 wakeup(diskctx->td_balance);
150 tsleep(diskctx, 0, "fq_dispatcher", hz/10); /* wait 100 ms */
151 wakeup(diskctx->td_balance);
154 fq_disk_ctx_unref(diskctx); /* from prepare */
155 fq_disk_ctx_unref(diskctx); /* from alloc */
157 dsched_set_disk_priv(dp, NULL);
161 /* Must be called with locked diskctx */
163 fq_drain(struct fq_disk_ctx *diskctx, int mode)
165 struct fq_thread_io *tdio, *tdio2;
166 struct bio *bio, *bio2;
168 TAILQ_FOREACH_MUTABLE(tdio, &diskctx->fq_tdio_list, dlink, tdio2) {
169 if (tdio->qlength == 0)
172 FQ_THREAD_IO_LOCK(tdio);
173 TAILQ_FOREACH_MUTABLE(bio, &tdio->queue, link, bio2) {
174 TAILQ_REMOVE(&tdio->queue, bio, link);
176 if (__predict_false(mode == FQ_DRAIN_CANCEL)) {
177 /* FQ_DRAIN_CANCEL */
178 dsched_cancel_bio(bio);
179 atomic_add_int(&fq_stats.cancelled, 1);
181 /* Release ref acquired on fq_queue */
182 /* XXX: possible failure point */
183 fq_thread_io_unref(tdio);
186 fq_dispatch(diskctx, bio, tdio);
189 FQ_THREAD_IO_UNLOCK(tdio);
196 fq_flush(struct disk *dp, struct bio *bio)
198 /* we don't do anything here */
203 fq_cancel(struct disk *dp)
205 struct fq_disk_ctx *diskctx;
207 diskctx = dsched_get_disk_priv(dp);
208 KKASSERT(diskctx != NULL);
211 * all bios not in flight are queued in their respective tdios.
212 * good thing we have a list of tdios per disk diskctx.
214 FQ_DISK_CTX_LOCK(diskctx);
215 fq_drain(diskctx, FQ_DRAIN_CANCEL);
216 FQ_DISK_CTX_UNLOCK(diskctx);
221 fq_queue(struct disk *dp, struct bio *obio)
223 struct bio *bio, *bio2;
224 struct fq_thread_ctx *tdctx;
225 struct fq_thread_io *tdio;
226 struct fq_disk_ctx *diskctx;
228 int max_tp, transactions;
230 /* We don't handle flushes, let dsched dispatch them */
231 if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
234 /* get tdctx and tdio */
235 tdctx = dsched_get_buf_priv(obio->bio_buf);
238 * XXX: hack. we don't want the assert because some null-tdctxs are
239 * leaking through; just dispatch them. These come from the
240 * mi_startup() mess, which does the initial root mount.
243 KKASSERT(tdctx != NULL);
246 /* We don't handle this case, let dsched dispatch */
247 atomic_add_int(&fq_stats.no_tdctx, 1);
252 FQ_THREAD_CTX_LOCK(tdctx);
254 kprintf("fq_queue, tdctx = %p\n", tdctx);
256 KKASSERT(!TAILQ_EMPTY(&tdctx->fq_tdio_list));
257 TAILQ_FOREACH(tdio, &tdctx->fq_tdio_list, link) {
258 if (tdio->dp == dp) {
259 fq_thread_io_ref(tdio);
264 FQ_THREAD_CTX_UNLOCK(tdctx);
265 dsched_clr_buf_priv(obio->bio_buf);
266 fq_thread_ctx_unref(tdctx); /* acquired on new_buf */
268 KKASSERT(found == 1);
269 diskctx = dsched_get_disk_priv(dp);
271 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
272 fq_balance_self(tdio);
274 max_tp = tdio->max_tp;
275 transactions = tdio->issued;
277 /* | No rate limiting || Hasn't reached limit rate | */
278 if ((max_tp == 0) || (transactions < max_tp)) {
280 * Process pending bios from previous _queue() actions that
281 * have been rate-limited and hence queued in the tdio.
283 KKASSERT(tdio->qlength >= 0);
285 if (tdio->qlength > 0) {
286 FQ_THREAD_IO_LOCK(tdio);
288 TAILQ_FOREACH_MUTABLE(bio, &tdio->queue, link, bio2) {
289 /* Rebalance ourselves if required */
290 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
291 fq_balance_self(tdio);
292 if ((tdio->max_tp > 0) && (tdio->issued >= tdio->max_tp))
294 TAILQ_REMOVE(&tdio->queue, bio, link);
298 * beware that we do have an tdio reference from the
301 fq_dispatch(diskctx, bio, tdio);
303 FQ_THREAD_IO_UNLOCK(tdio);
306 /* Nothing is pending from previous IO, so just pass it down */
307 fq_thread_io_ref(tdio);
309 fq_dispatch(diskctx, obio, tdio);
312 * This thread has exceeeded its fair share,
313 * the transactions are now rate limited. At
314 * this point, the rate would be exceeded, so
315 * we just queue requests instead of
318 FQ_THREAD_IO_LOCK(tdio);
319 fq_thread_io_ref(tdio);
322 * Prioritize reads by inserting them at the front of the
325 * XXX: this might cause issues with data that should
326 * have been written and is being read, but hasn't
327 * actually been written yet.
329 if (obio->bio_buf->b_cmd == BUF_CMD_READ)
330 TAILQ_INSERT_HEAD(&tdio->queue, obio, link);
332 TAILQ_INSERT_TAIL(&tdio->queue, obio, link);
335 FQ_THREAD_IO_UNLOCK(tdio);
338 fq_thread_io_unref(tdio);
344 fq_completed(struct bio *bp)
348 struct fq_thread_io *tdio;
349 struct fq_disk_ctx *diskctx;
351 int transactions, latency;
357 dp = dsched_get_bio_dp(bp);
358 diskctx = dsched_get_disk_priv(dp);
359 tdio = dsched_get_bio_priv(bp);
360 KKASSERT(tdio != NULL);
361 KKASSERT(diskctx != NULL);
363 fq_disk_ctx_ref(diskctx);
364 atomic_subtract_int(&diskctx->incomplete_tp, 1);
366 if (!(bp->bio_buf->b_flags & B_ERROR)) {
368 * Get the start ticks from when the bio was dispatched and calculate
369 * how long it took until completion.
371 delta = (int)(1000000*((tv.tv_sec - bp->bio_caller_info3.tv.tv_sec)) +
372 (tv.tv_usec - bp->bio_caller_info3.tv.tv_usec));
374 delta = 10000; /* default assume 10 ms */
376 /* This is the last in-flight request and the disk is not idle yet */
377 if ((diskctx->incomplete_tp <= 1) && (!diskctx->idle)) {
378 diskctx->idle = 1; /* Mark disk as idle */
379 diskctx->start_idle = tv; /* Save start idle time */
380 wakeup(diskctx); /* Wake up fq_dispatcher */
382 transactions = atomic_fetchadd_int(&tdio->transactions, 1);
383 latency = tdio->avg_latency;
386 /* Moving averager, ((n-1)*avg_{n-1} + x) / n */
387 latency = (int)(((int64_t)(transactions) *
388 (int64_t)latency + (int64_t)delta) / ((int64_t)transactions + 1));
389 KKASSERT(latency > 0);
394 tdio->avg_latency = latency;
396 atomic_add_int(&fq_stats.transactions_completed, 1);
399 fq_disk_ctx_unref(diskctx);
400 /* decrease the ref count that was bumped for us on dispatch */
401 fq_thread_io_unref(tdio);
408 fq_dispatch(struct fq_disk_ctx *diskctx, struct bio *bio,
409 struct fq_thread_io *tdio)
415 atomic_add_int(&diskctx->idle_time,
416 (int)(1000000*((tv.tv_sec - diskctx->start_idle.tv_sec)) +
417 (tv.tv_usec - diskctx->start_idle.tv_usec)));
420 dsched_strategy_async(diskctx->dp, bio, fq_completed, tdio);
422 atomic_add_int(&tdio->issued, 1);
423 atomic_add_int(&diskctx->incomplete_tp, 1);
424 atomic_add_int(&fq_stats.transactions, 1);