2 * Copyright (c) 2009, 2010 The DragonFly Project. All rights reserved.
4 * This code is derived from software contributed to The DragonFly Project
5 * by Alex Hornung <ahornung@gmail.com>
7 * Redistribution and use in source and binary forms, with or without
8 * modification, are permitted provided that the following conditions
11 * 1. Redistributions of source code must retain the above copyright
12 * notice, this list of conditions and the following disclaimer.
13 * 2. Redistributions in binary form must reproduce the above copyright
14 * notice, this list of conditions and the following disclaimer in
15 * the documentation and/or other materials provided with the
17 * 3. Neither the name of The DragonFly Project nor the names of its
18 * contributors may be used to endorse or promote products derived
19 * from this software without specific, prior written permission.
21 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22 * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23 * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24 * FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
25 * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26 * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27 * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28 * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29 * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30 * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31 * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
38 #include <sys/sysctl.h>
41 #include <sys/diskslice.h>
43 #include <machine/atomic.h>
44 #include <sys/malloc.h>
45 #include <sys/thread.h>
46 #include <sys/thread2.h>
47 #include <sys/sysctl.h>
48 #include <sys/spinlock2.h>
49 #include <machine/md_var.h>
50 #include <sys/ctype.h>
51 #include <sys/syslog.h>
52 #include <sys/device.h>
53 #include <sys/msgport.h>
54 #include <sys/msgport2.h>
56 #include <sys/dsched.h>
57 #include <machine/varargs.h>
58 #include <machine/param.h>
60 #include <dsched/fq/dsched_fq.h>
63 MALLOC_DEFINE(M_DSCHEDFQ, "dschedfq", "fq dsched allocs");
65 static dsched_prepare_t fq_prepare;
66 static dsched_teardown_t fq_teardown;
67 static dsched_flush_t fq_flush;
68 static dsched_cancel_t fq_cancel;
69 static dsched_queue_t fq_queue;
71 /* These are in _procops */
72 dsched_new_buf_t fq_new_buf;
73 dsched_new_proc_t fq_new_proc;
74 dsched_new_thread_t fq_new_thread;
75 dsched_exit_buf_t fq_exit_buf;
76 dsched_exit_proc_t fq_exit_proc;
77 dsched_exit_thread_t fq_exit_thread;
79 extern struct dsched_fq_stats fq_stats;
80 extern struct spinlock fq_fqmp_lock;
81 extern TAILQ_HEAD(, dsched_fq_mpriv) dsched_fqmp_list;
82 extern struct callout fq_callout;
84 struct dsched_ops dsched_fq_ops = {
88 .prepare = fq_prepare,
89 .teardown = fq_teardown,
91 .cancel_all = fq_cancel,
92 .bio_queue = fq_queue,
94 .new_buf = fq_new_buf,
95 .new_proc = fq_new_proc,
96 .new_thread = fq_new_thread,
97 .exit_buf = fq_exit_buf,
98 .exit_proc = fq_exit_proc,
99 .exit_thread = fq_exit_thread,
105 fq_prepare(struct disk *dp)
107 struct dsched_fq_dpriv *dpriv;
108 struct dsched_fq_mpriv *fqmp;
109 struct dsched_fq_priv *fqp;
110 struct thread *td_core;
112 dpriv = fq_alloc_dpriv(dp);
113 fq_reference_dpriv(dpriv);
114 dsched_set_disk_priv(dp, dpriv);
116 FQ_GLOBAL_FQMP_LOCK();
117 TAILQ_FOREACH(fqmp, &dsched_fqmp_list, link) {
118 fqp = fq_alloc_priv(dp);
122 fq_reference_priv(fqp);
124 TAILQ_INSERT_TAIL(&fqmp->fq_priv_list, fqp, link);
125 FQ_FQMP_UNLOCK(fqmp);
128 FQ_GLOBAL_FQMP_UNLOCK();
129 lwkt_create((void (*)(void *))fq_dispatcher, dpriv, &td_core, NULL,
130 0, 0, "fq_dispatcher_%s", dp->d_cdev->si_name);
131 fq_balance_thread(dpriv);
139 fq_teardown(struct disk *dp)
141 struct dsched_fq_dpriv *dpriv;
143 dpriv = dsched_get_disk_priv(dp);
144 KKASSERT(dpriv != NULL);
146 /* Basically kill the dispatcher thread */
147 callout_stop(&fq_callout);
150 tsleep(dpriv, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
151 callout_stop(&fq_callout);
153 tsleep(dpriv, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
154 callout_stop(&fq_callout);
155 /* XXX: we really need callout_drain, this REALLY sucks */
158 fq_dereference_dpriv(dpriv); /* from prepare */
159 fq_dereference_dpriv(dpriv); /* from alloc */
161 dsched_set_disk_priv(dp, NULL);
162 /* XXX: get rid of dpriv, cancel all queued requests...
163 * but how do we get rid of all loose fqps?
164 * --> possibly same solution as devfs; tracking a list of
166 * XXX XXX: this XXX is probably irrelevant by now :)
171 /* Must be called with locked dpriv */
173 fq_drain(struct dsched_fq_dpriv *dpriv, int mode)
175 struct dsched_fq_priv *fqp, *fqp2;
176 struct bio *bio, *bio2;
178 TAILQ_FOREACH_MUTABLE(fqp, &dpriv->fq_priv_list, dlink, fqp2) {
179 if (fqp->qlength == 0)
183 TAILQ_FOREACH_MUTABLE(bio, &fqp->queue, link, bio2) {
184 TAILQ_REMOVE(&fqp->queue, bio, link);
186 if (__predict_false(mode == FQ_DRAIN_CANCEL)) {
187 /* FQ_DRAIN_CANCEL */
188 dsched_cancel_bio(bio);
189 atomic_add_int(&fq_stats.cancelled, 1);
191 /* Release ref acquired on fq_queue */
192 /* XXX: possible failure point */
193 fq_dereference_priv(fqp);
196 fq_dispatch(dpriv, bio, fqp);
206 fq_flush(struct disk *dp, struct bio *bio)
208 /* we don't do anything here */
213 fq_cancel(struct disk *dp)
215 struct dsched_fq_dpriv *dpriv;
217 dpriv = dsched_get_disk_priv(dp);
218 KKASSERT(dpriv != NULL);
221 * all bios not in flight are queued in their respective fqps.
222 * good thing we have a list of fqps per disk dpriv.
224 FQ_DPRIV_LOCK(dpriv);
225 fq_drain(dpriv, FQ_DRAIN_CANCEL);
226 FQ_DPRIV_UNLOCK(dpriv);
231 fq_queue(struct disk *dp, struct bio *obio)
233 struct bio *bio, *bio2;
234 struct dsched_fq_mpriv *fqmp;
235 struct dsched_fq_priv *fqp;
236 struct dsched_fq_dpriv *dpriv;
239 int max_tp, transactions;
241 /* We don't handle flushes, let dsched dispatch them */
242 if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
245 /* get fqmp and fqp */
246 fqmp = dsched_get_buf_priv(obio->bio_buf);
249 * XXX: hack. we don't want the assert because some null-fqmps are
250 * leaking through; just dispatch them. These come from the
251 * mi_startup() mess, which does the initial root mount.
254 KKASSERT(fqmp != NULL);
257 /* We don't handle this case, let dsched dispatch */
258 atomic_add_int(&fq_stats.no_fqmp, 1);
265 kprintf("fq_queue, fqmp = %p\n", fqmp);
267 KKASSERT(!TAILQ_EMPTY(&fqmp->fq_priv_list));
268 TAILQ_FOREACH(fqp, &fqmp->fq_priv_list, link) {
270 fq_reference_priv(fqp);
275 FQ_FQMP_UNLOCK(fqmp);
276 dsched_clr_buf_priv(obio->bio_buf);
277 fq_dereference_mpriv(fqmp); /* acquired on new_buf */
278 atomic_subtract_int(&fq_stats.nbufs, 1);
280 KKASSERT(found == 1);
281 dpriv = dsched_get_disk_priv(dp);
283 /* XXX: probably rather pointless doing this atomically */
284 max_tp = atomic_fetchadd_int(&fqp->max_tp, 0);
285 transactions = atomic_fetchadd_int(&fqp->issued, 0);
287 /* | No rate limiting || Hasn't reached limit rate | */
288 if ((max_tp == 0) || (transactions < max_tp)) {
290 * Process pending bios from previous _queue() actions that
291 * have been rate-limited and hence queued in the fqp.
293 KKASSERT(fqp->qlength >= 0);
295 if (fqp->qlength > 0) {
299 TAILQ_FOREACH_MUTABLE(bio, &fqp->queue, link, bio2) {
300 if ((fqp->max_tp > 0) && (fqp->issued >= fqp->max_tp))
302 TAILQ_REMOVE(&fqp->queue, bio, link);
306 * beware that we do have an fqp reference from the
309 fq_dispatch(dpriv, bio, fqp);
314 /* Nothing is pending from previous IO, so just pass it down */
315 fq_reference_priv(fqp);
317 fq_dispatch(dpriv, obio, fqp);
320 * This thread has exceeeded its fair share,
321 * the transactions are now rate limited. At
322 * this point, the rate would be exceeded, so
323 * we just queue requests instead of
327 fq_reference_priv(fqp);
330 * Prioritize reads by inserting them at the front of the
333 * XXX: this might cause issues with data that should
334 * have been written and is being read, but hasn't
335 * actually been written yet.
337 if (obio->bio_buf->b_cmd == BUF_CMD_READ)
338 TAILQ_INSERT_HEAD(&fqp->queue, obio, link);
340 TAILQ_INSERT_TAIL(&fqp->queue, obio, link);
346 fq_dereference_priv(fqp);
352 fq_completed(struct bio *bp)
356 struct dsched_fq_priv *fqp;
357 struct dsched_fq_dpriv *dpriv;
359 int transactions, latency;
365 dp = dsched_get_bio_dp(bp);
366 dpriv = dsched_get_disk_priv(dp);
367 fqp = dsched_get_bio_priv(bp);
368 KKASSERT(fqp != NULL);
369 KKASSERT(dpriv != NULL);
371 fq_reference_dpriv(dpriv);
373 if (!(bp->bio_buf->b_flags & B_ERROR)) {
375 * Get the start ticks from when the bio was dispatched and calculate
376 * how long it took until completion.
378 delta = (int)(1000000*((tv.tv_sec - bp->bio_caller_info3.tv.tv_sec)) +
379 (tv.tv_usec - bp->bio_caller_info3.tv.tv_usec));
381 delta = 10000; /* default assume 10 ms */
383 /* This is the last in-flight request and the disk is not idle yet */
384 if ((dpriv->incomplete_tp <= 1) && (!dpriv->idle)) {
385 dpriv->idle = 1; /* Mark disk as idle */
386 dpriv->start_idle = tv; /* Save start idle time */
387 wakeup(dpriv); /* Wake up fq_dispatcher */
389 atomic_subtract_int(&dpriv->incomplete_tp, 1);
390 transactions = atomic_fetchadd_int(&fqp->transactions, 1);
391 latency = atomic_fetchadd_int(&fqp->avg_latency, 0);
394 /* Moving averager, ((n-1)*avg_{n-1} + x) / n */
395 latency = ((transactions) *
396 latency + delta) / (transactions + 1);
401 fqp->avg_latency = latency;
402 atomic_add_int(&fq_stats.transactions_completed, 1);
405 fq_dereference_dpriv(dpriv);
406 /* decrease the ref count that was bumped for us on dispatch */
407 fq_dereference_priv(fqp);
414 fq_dispatch(struct dsched_fq_dpriv *dpriv, struct bio *bio,
415 struct dsched_fq_priv *fqp)
421 atomic_add_int(&dpriv->idle_time,
422 (int)(1000000*((tv.tv_sec - dpriv->start_idle.tv_sec)) +
423 (tv.tv_usec - dpriv->start_idle.tv_usec)));
426 dsched_strategy_async(dpriv->dp, bio, fq_completed, fqp);
428 atomic_add_int(&fqp->issued, 1);
429 atomic_add_int(&dpriv->incomplete_tp, 1);
430 atomic_add_int(&fq_stats.transactions, 1);