2 * Copyright (c) 2009, 2010 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Alex Hornung <ahornung@gmail.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
38 #include <sys/sysctl.h>
41 #include <sys/diskslice.h>
43 #include <machine/atomic.h>
44 #include <sys/thread.h>
45 #include <sys/thread2.h>
46 #include <sys/ctype.h>
48 #include <sys/syslog.h>
49 #include <sys/dsched.h>
50 #include <machine/param.h>
52 #include <kern/dsched/fq/fq.h>
54 static dsched_prepare_t fq_prepare;
55 static dsched_teardown_t fq_teardown;
56 static dsched_cancel_t fq_cancel;
57 static dsched_queue_t fq_queue;
59 extern struct dsched_fq_stats fq_stats;
61 struct dsched_policy dsched_fq_policy = {
64 .prepare = fq_prepare,
65 .teardown = fq_teardown,
66 .cancel_all = fq_cancel,
71 fq_prepare(struct dsched_disk_ctx *ds_diskctx)
73 struct fq_disk_ctx *diskctx = (struct fq_disk_ctx *)ds_diskctx;
74 struct thread *td_core, *td_balance;
76 lwkt_create((void (*)(void *))fq_dispatcher, diskctx, &td_core,
77 NULL, 0, -1, "fq_dispatch_%s",
78 ds_diskctx->dp->d_cdev->si_name);
79 lwkt_create((void (*)(void *))fq_balance_thread, diskctx, &td_balance,
80 NULL, 0, -1, "fq_balance_%s",
81 ds_diskctx->dp->d_cdev->si_name);
82 diskctx->td_balance = td_balance;
90 fq_teardown(struct dsched_disk_ctx *ds_diskctx)
92 struct fq_disk_ctx *diskctx = (struct fq_disk_ctx *)ds_diskctx;
93 KKASSERT(diskctx != NULL);
95 /* Basically kill the dispatcher thread */
97 wakeup(diskctx->td_balance);
99 tsleep(diskctx, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
100 wakeup(diskctx->td_balance);
102 tsleep(diskctx, 0, "fq_dispatcher", hz/10); /* wait 100 ms */
103 wakeup(diskctx->td_balance);
108 /* Must be called with locked diskctx */
110 fq_drain(struct fq_disk_ctx *diskctx, int mode)
112 struct dsched_thread_io *ds_tdio, *ds_tdio2;
113 struct fq_thread_io *tdio;
114 struct bio *bio, *bio2;
116 TAILQ_FOREACH_MUTABLE(ds_tdio, &diskctx->head.tdio_list, dlink, ds_tdio2) {
117 tdio = (struct fq_thread_io *)ds_tdio;
118 if (tdio->head.qlength == 0)
121 DSCHED_THREAD_IO_LOCK(&tdio->head);
122 TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
123 TAILQ_REMOVE(&tdio->head.queue, bio, link);
124 --tdio->head.qlength;
125 if (__predict_false(mode == FQ_DRAIN_CANCEL)) {
126 /* FQ_DRAIN_CANCEL */
127 dsched_cancel_bio(bio);
128 atomic_add_int(&fq_stats.cancelled, 1);
130 /* Release ref acquired on fq_queue */
131 /* XXX: possible failure point */
132 dsched_thread_io_unref(&tdio->head);
135 fq_dispatch(diskctx, bio, tdio);
138 DSCHED_THREAD_IO_UNLOCK(&tdio->head);
144 fq_cancel(struct dsched_disk_ctx *ds_diskctx)
146 struct fq_disk_ctx *diskctx = (struct fq_disk_ctx *)ds_diskctx;
148 KKASSERT(diskctx != NULL);
151 * all bios not in flight are queued in their respective tdios.
152 * good thing we have a list of tdios per disk diskctx.
154 DSCHED_DISK_CTX_LOCK(&diskctx->head);
155 fq_drain(diskctx, FQ_DRAIN_CANCEL);
156 DSCHED_DISK_CTX_UNLOCK(&diskctx->head);
161 fq_queue(struct dsched_disk_ctx *ds_diskctx, struct dsched_thread_io *ds_tdio, struct bio *obio)
163 struct bio *b_dispatch_ary[FQ_DISPATCH_SML_ARRAY_SZ];
164 struct bio *bio, *bio2;
165 struct fq_thread_io *tdio;
166 struct fq_disk_ctx *diskctx;
167 int max_tp, transactions;
170 /* We don't handle flushes, let dsched dispatch them */
171 if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
174 tdio = (struct fq_thread_io *)ds_tdio;
175 diskctx = (struct fq_disk_ctx *)ds_diskctx;
177 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
178 fq_balance_self(tdio);
180 max_tp = tdio->max_tp;
181 transactions = tdio->issued;
183 /* | No rate limiting || Hasn't reached limit rate | */
184 if ((max_tp == 0) || (transactions < max_tp)) {
186 * Process pending bios from previous _queue() actions that
187 * have been rate-limited and hence queued in the tdio.
189 KKASSERT(tdio->head.qlength >= 0);
191 if (tdio->head.qlength > 0) {
194 DSCHED_THREAD_IO_LOCK(&tdio->head);
196 TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
197 /* Rebalance ourselves if required */
198 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
199 fq_balance_self(tdio);
200 if ((tdio->max_tp > 0) &&
201 (tdio->issued + i >= tdio->max_tp))
203 if (i == FQ_DISPATCH_SML_ARRAY_SZ)
206 TAILQ_REMOVE(&tdio->head.queue, bio, link);
207 --tdio->head.qlength;
210 * beware that we do have an tdio reference from the
213 * XXX: note that here we don't dispatch the BIOs yet
214 * but just prepare them for dispatch so that
215 * later they are pushed down to the driver
216 * without holding locks.
218 b_dispatch_ary[i++] = bio;
221 DSCHED_THREAD_IO_UNLOCK(&tdio->head);
224 * Now dispatch all the prepared BIOs without holding
225 * the thread_io lock.
227 for (--i; i >= 0; i--)
228 fq_dispatch(diskctx, b_dispatch_ary[i], tdio);
231 /* Nothing is pending from previous IO, so just pass it down */
232 dsched_thread_io_ref(&tdio->head);
234 fq_dispatch(diskctx, obio, tdio);
237 * This thread has exceeeded its fair share,
238 * the transactions are now rate limited. At
239 * this point, the rate would be exceeded, so
240 * we just queue requests instead of
243 DSCHED_THREAD_IO_LOCK(&tdio->head);
244 dsched_thread_io_ref(&tdio->head);
247 * Prioritize reads by inserting them at the front of the
250 * XXX: this might cause issues with data that should
251 * have been written and is being read, but hasn't
252 * actually been written yet.
254 if (obio->bio_buf->b_cmd == BUF_CMD_READ)
255 TAILQ_INSERT_HEAD(&tdio->head.queue, obio, link);
257 TAILQ_INSERT_TAIL(&tdio->head.queue, obio, link);
259 ++tdio->head.qlength;
260 DSCHED_THREAD_IO_UNLOCK(&tdio->head);
268 fq_completed(struct bio *bp)
272 struct fq_thread_io *tdio;
273 struct fq_disk_ctx *diskctx;
275 int transactions, latency;
281 dp = dsched_get_bio_dp(bp);
282 diskctx = dsched_get_disk_priv(dp);
283 tdio = dsched_get_bio_priv(bp);
284 KKASSERT(tdio != NULL);
285 KKASSERT(diskctx != NULL);
287 dsched_disk_ctx_ref(&diskctx->head);
288 atomic_subtract_int(&diskctx->incomplete_tp, 1);
290 if (!(bp->bio_buf->b_flags & B_ERROR)) {
292 * Get the start ticks from when the bio was dispatched and calculate
293 * how long it took until completion.
295 delta = (int)(1000000*((tv.tv_sec - bp->bio_caller_info3.tv.tv_sec)) +
296 (tv.tv_usec - bp->bio_caller_info3.tv.tv_usec));
298 delta = 10000; /* default assume 10 ms */
300 /* This is the last in-flight request and the disk is not idle yet */
301 if ((diskctx->incomplete_tp <= 1) && (!diskctx->idle)) {
302 diskctx->idle = 1; /* Mark disk as idle */
303 diskctx->start_idle = tv; /* Save start idle time */
304 wakeup(diskctx); /* Wake up fq_dispatcher */
306 transactions = atomic_fetchadd_int(&tdio->transactions, 1);
307 latency = tdio->avg_latency;
310 /* Moving averager, ((n-1)*avg_{n-1} + x) / n */
311 latency = (int)(((int64_t)(transactions) *
312 (int64_t)latency + (int64_t)delta) / ((int64_t)transactions + 1));
313 KKASSERT(latency > 0);
318 tdio->avg_latency = latency;
320 atomic_add_int(&fq_stats.transactions_completed, 1);
323 dsched_disk_ctx_unref(&diskctx->head);
324 /* decrease the ref count that was bumped for us on dispatch */
325 dsched_thread_io_unref(&tdio->head);
332 fq_dispatch(struct fq_disk_ctx *diskctx, struct bio *bio,
333 struct fq_thread_io *tdio)
339 atomic_add_int(&diskctx->idle_time,
340 (int)(1000000*((tv.tv_sec - diskctx->start_idle.tv_sec)) +
341 (tv.tv_usec - diskctx->start_idle.tv_usec)));
344 dsched_strategy_async(diskctx->head.dp, bio, fq_completed, tdio);
346 atomic_add_int(&tdio->issued, 1);
347 atomic_add_int(&diskctx->incomplete_tp, 1);
348 atomic_add_int(&fq_stats.transactions, 1);