dsched_fq - Refactor and clean; handle flushes
[games.git] / sys / dsched / fq / dsched_fq_diskops.c
1 /*
2  * Copyright (c) 2009, 2010 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Alex Hornung <ahornung@gmail.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
37 #include <sys/proc.h>
38 #include <sys/sysctl.h>
39 #include <sys/buf.h>
40 #include <sys/conf.h>
41 #include <sys/diskslice.h>
42 #include <sys/disk.h>
43 #include <machine/atomic.h>
44 #include <sys/malloc.h>
45 #include <sys/thread.h>
46 #include <sys/thread2.h>
47 #include <sys/sysctl.h>
48 #include <sys/spinlock2.h>
49 #include <machine/md_var.h>
50 #include <sys/ctype.h>
51 #include <sys/syslog.h>
52 #include <sys/device.h>
53 #include <sys/msgport.h>
54 #include <sys/msgport2.h>
55 #include <sys/buf2.h>
56 #include <sys/dsched.h>
57 #include <machine/varargs.h>
58 #include <machine/param.h>
59
60 #include <dsched/fq/dsched_fq.h>
61
62
63 MALLOC_DEFINE(M_DSCHEDFQ, "dschedfq", "fq dsched allocs");
64
65 static dsched_prepare_t         fq_prepare;
66 static dsched_teardown_t        fq_teardown;
67 static dsched_flush_t           fq_flush;
68 static dsched_cancel_t          fq_cancel;
69 static dsched_queue_t           fq_queue;
70
71 /* These are in _procops */
72 dsched_new_buf_t        fq_new_buf;
73 dsched_new_proc_t       fq_new_proc;
74 dsched_new_thread_t     fq_new_thread;
75 dsched_exit_buf_t       fq_exit_buf;
76 dsched_exit_proc_t      fq_exit_proc;
77 dsched_exit_thread_t    fq_exit_thread;
78
79 extern struct dsched_fq_stats   fq_stats;
80 extern struct spinlock  fq_fqmp_lock;
81 extern TAILQ_HEAD(, dsched_fq_mpriv)    dsched_fqmp_list;
82 extern struct callout   fq_callout;
83
84 struct dsched_ops dsched_fq_ops = {
85         .head = {
86                 .name = "fq"
87         },
88         .prepare = fq_prepare,
89         .teardown = fq_teardown,
90         .flush = fq_flush,
91         .cancel_all = fq_cancel,
92         .bio_queue = fq_queue,
93
94         .new_buf = fq_new_buf,
95         .new_proc = fq_new_proc,
96         .new_thread = fq_new_thread,
97         .exit_buf = fq_exit_buf,
98         .exit_proc = fq_exit_proc,
99         .exit_thread = fq_exit_thread,
100 };
101
102
103
104 static int
105 fq_prepare(struct disk *dp)
106 {
107         struct  dsched_fq_dpriv *dpriv;
108         struct dsched_fq_mpriv  *fqmp;
109         struct dsched_fq_priv   *fqp;
110         struct thread *td_core;
111
112         dpriv = fq_alloc_dpriv(dp);
113         fq_reference_dpriv(dpriv);
114         dsched_set_disk_priv(dp, dpriv);
115
116         FQ_GLOBAL_FQMP_LOCK();
117         TAILQ_FOREACH(fqmp, &dsched_fqmp_list, link) {
118                 fqp = fq_alloc_priv(dp);
119
120                 FQ_FQMP_LOCK(fqmp);
121 #if 0
122                 fq_reference_priv(fqp);
123 #endif
124                 TAILQ_INSERT_TAIL(&fqmp->fq_priv_list, fqp, link);
125                 FQ_FQMP_UNLOCK(fqmp);
126         }
127
128         FQ_GLOBAL_FQMP_UNLOCK();
129         lwkt_create((void (*)(void *))fq_dispatcher, dpriv, &td_core, NULL,
130             0, 0, "fq_dispatcher_%s", dp->d_cdev->si_name);
131         fq_balance_thread(dpriv);
132
133         return 0;
134 }
135
136
137
138 static void
139 fq_teardown(struct disk *dp)
140 {
141         struct dsched_fq_dpriv *dpriv;
142
143         dpriv = dsched_get_disk_priv(dp);
144         KKASSERT(dpriv != NULL);
145
146         /* Basically kill the dispatcher thread */
147         callout_stop(&fq_callout);
148         dpriv->die = 1;
149         wakeup(dpriv);
150         tsleep(dpriv, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
151         callout_stop(&fq_callout);
152         wakeup(dpriv);
153         tsleep(dpriv, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
154         callout_stop(&fq_callout);
155         /* XXX: we really need callout_drain, this REALLY sucks */
156
157
158         fq_dereference_dpriv(dpriv); /* from prepare */
159         fq_dereference_dpriv(dpriv); /* from alloc */
160
161         dsched_set_disk_priv(dp, NULL);
162         /* XXX: get rid of dpriv, cancel all queued requests...
163          *      but how do we get rid of all loose fqps?
164          *    --> possibly same solution as devfs; tracking a list of
165          *        orphans.
166          * XXX XXX: this XXX is probably irrelevant by now :)
167          */
168 }
169
170
171 /* Must be called with locked dpriv */
172 void
173 fq_drain(struct dsched_fq_dpriv *dpriv, int mode)
174 {
175         struct dsched_fq_priv *fqp, *fqp2;
176         struct bio *bio, *bio2;
177
178         TAILQ_FOREACH_MUTABLE(fqp, &dpriv->fq_priv_list, dlink, fqp2) {
179                 if (fqp->qlength == 0)
180                         continue;
181
182                 FQ_FQP_LOCK(fqp);
183                 TAILQ_FOREACH_MUTABLE(bio, &fqp->queue, link, bio2) {
184                         TAILQ_REMOVE(&fqp->queue, bio, link);
185                         --fqp->qlength;
186                         if (__predict_false(mode == FQ_DRAIN_CANCEL)) {
187                                 /* FQ_DRAIN_CANCEL */
188                                 dsched_cancel_bio(bio);
189                                 atomic_add_int(&fq_stats.cancelled, 1);
190
191                                 /* Release ref acquired on fq_queue */
192                                 /* XXX: possible failure point */
193                                 fq_dereference_priv(fqp);
194                         } else {
195                                 /* FQ_DRAIN_FLUSH */
196                                 fq_dispatch(dpriv, bio, fqp);
197                         }
198                 }
199                 FQ_FQP_UNLOCK(fqp);
200         }
201         return;
202 }
203
204
205 static void
206 fq_flush(struct disk *dp, struct bio *bio)
207 {
208         /* we don't do anything here */
209 }
210
211
212 static void
213 fq_cancel(struct disk *dp)
214 {
215         struct dsched_fq_dpriv  *dpriv;
216
217         dpriv = dsched_get_disk_priv(dp);
218         KKASSERT(dpriv != NULL);
219
220         /*
221          * all bios not in flight are queued in their respective fqps.
222          * good thing we have a list of fqps per disk dpriv.
223          */
224         FQ_DPRIV_LOCK(dpriv);
225         fq_drain(dpriv, FQ_DRAIN_CANCEL);
226         FQ_DPRIV_UNLOCK(dpriv);
227 }
228
229
230 static int
231 fq_queue(struct disk *dp, struct bio *obio)
232 {
233         struct bio *bio, *bio2;
234         struct dsched_fq_mpriv  *fqmp;
235         struct dsched_fq_priv   *fqp;
236         struct dsched_fq_dpriv  *dpriv;
237         int found = 0;
238         int count;
239         int max_tp, transactions;
240
241         /* We don't handle flushes, let dsched dispatch them */
242         if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
243                 return (EINVAL);
244
245         /* get fqmp and fqp */
246         fqmp = dsched_get_buf_priv(obio->bio_buf);
247
248         /*
249          * XXX: hack. we don't want the assert because some null-fqmps are
250          * leaking through; just dispatch them. These come from the
251          * mi_startup() mess, which does the initial root mount.
252          */
253 #if 0
254         KKASSERT(fqmp != NULL);
255 #endif
256         if (fqmp == NULL) {
257                 /* We don't handle this case, let dsched dispatch */
258                 atomic_add_int(&fq_stats.no_fqmp, 1);
259                 return (EINVAL);
260         }
261
262
263         FQ_FQMP_LOCK(fqmp);
264 #if 0
265         kprintf("fq_queue, fqmp = %p\n", fqmp);
266 #endif
267         KKASSERT(!TAILQ_EMPTY(&fqmp->fq_priv_list));
268         TAILQ_FOREACH(fqp, &fqmp->fq_priv_list, link) {
269                 if (fqp->dp == dp) {
270                         fq_reference_priv(fqp);
271                         found = 1;
272                         break;
273                 }
274         }
275         FQ_FQMP_UNLOCK(fqmp);
276         dsched_clr_buf_priv(obio->bio_buf);
277         fq_dereference_mpriv(fqmp); /* acquired on new_buf */
278         atomic_subtract_int(&fq_stats.nbufs, 1);
279
280         KKASSERT(found == 1);
281         dpriv = dsched_get_disk_priv(dp);
282
283         /* XXX: probably rather pointless doing this atomically */
284         max_tp = atomic_fetchadd_int(&fqp->max_tp, 0);
285         transactions = atomic_fetchadd_int(&fqp->issued, 0);
286
287         /* | No rate limiting || Hasn't reached limit rate | */
288         if ((max_tp == 0) || (transactions < max_tp)) {
289                 /*
290                  * Process pending bios from previous _queue() actions that
291                  * have been rate-limited and hence queued in the fqp.
292                  */
293                 KKASSERT(fqp->qlength >= 0);
294
295                 if (fqp->qlength > 0) {
296                         FQ_FQP_LOCK(fqp);
297                         count = 0;
298
299                         TAILQ_FOREACH_MUTABLE(bio, &fqp->queue, link, bio2) {
300                                 if ((fqp->max_tp > 0) && (fqp->issued >= fqp->max_tp))
301                                         break;
302                                 TAILQ_REMOVE(&fqp->queue, bio, link);
303                                 --fqp->qlength;
304
305                                 /*
306                                  * beware that we do have an fqp reference from the
307                                  * queueing
308                                  */
309                                 fq_dispatch(dpriv, bio, fqp);
310                         }
311                         FQ_FQP_UNLOCK(fqp);
312                 }
313
314                 /* Nothing is pending from previous IO, so just pass it down */
315                 fq_reference_priv(fqp);
316
317                 fq_dispatch(dpriv, obio, fqp);
318         } else {
319                 /*
320                  * This thread has exceeeded its fair share,
321                  * the transactions are now rate limited. At
322                  * this point, the rate would be exceeded, so
323                  * we just queue requests instead of
324                  * despatching them.
325                  */
326                 FQ_FQP_LOCK(fqp);
327                 fq_reference_priv(fqp);
328
329                 /*
330                  * Prioritize reads by inserting them at the front of the
331                  * queue.
332                  *
333                  * XXX: this might cause issues with data that should
334                  *      have been written and is being read, but hasn't
335                  *      actually been written yet.
336                  */
337                 if (obio->bio_buf->b_cmd == BUF_CMD_READ)
338                         TAILQ_INSERT_HEAD(&fqp->queue, obio, link);
339                 else
340                         TAILQ_INSERT_TAIL(&fqp->queue, obio, link);
341
342                 ++fqp->qlength;
343                 FQ_FQP_UNLOCK(fqp);
344         }
345
346         fq_dereference_priv(fqp);
347         return 0;
348 }
349
350
351 void
352 fq_completed(struct bio *bp)
353 {
354         struct bio *obio;
355         int     delta;
356         struct dsched_fq_priv   *fqp;
357         struct dsched_fq_dpriv  *dpriv;
358         struct disk     *dp;
359         int transactions, latency;
360
361         struct timeval tv;
362
363         getmicrotime(&tv);
364
365         dp = dsched_get_bio_dp(bp);
366         dpriv = dsched_get_disk_priv(dp);
367         fqp = dsched_get_bio_priv(bp);
368         KKASSERT(fqp != NULL);
369         KKASSERT(dpriv != NULL);
370
371         fq_reference_dpriv(dpriv);
372
373         if (!(bp->bio_buf->b_flags & B_ERROR)) {
374                 /*
375                  * Get the start ticks from when the bio was dispatched and calculate
376                  * how long it took until completion.
377                  */
378                 delta = (int)(1000000*((tv.tv_sec - bp->bio_caller_info3.tv.tv_sec)) +
379                     (tv.tv_usec - bp->bio_caller_info3.tv.tv_usec));
380                 if (delta <= 0)
381                         delta = 10000; /* default assume 10 ms */
382
383                 /* This is the last in-flight request and the disk is not idle yet */
384                 if ((dpriv->incomplete_tp <= 1) && (!dpriv->idle)) {
385                         dpriv->idle = 1;        /* Mark disk as idle */
386                         dpriv->start_idle = tv; /* Save start idle time */
387                         wakeup(dpriv);          /* Wake up fq_dispatcher */
388                 }
389                 atomic_subtract_int(&dpriv->incomplete_tp, 1);
390                 transactions = atomic_fetchadd_int(&fqp->transactions, 1);
391                 latency = atomic_fetchadd_int(&fqp->avg_latency, 0);
392
393                 if (latency != 0) {
394                         /* Moving averager, ((n-1)*avg_{n-1} + x) / n */
395                         latency = ((transactions) *
396                             latency + delta) / (transactions + 1);
397                 } else {
398                         latency = delta;
399                 }
400
401                 fqp->avg_latency = latency;
402                 atomic_add_int(&fq_stats.transactions_completed, 1);
403         }
404
405         fq_dereference_dpriv(dpriv);
406         /* decrease the ref count that was bumped for us on dispatch */
407         fq_dereference_priv(fqp);
408
409         obio = pop_bio(bp);
410         biodone(obio);
411 }
412
413 void
414 fq_dispatch(struct dsched_fq_dpriv *dpriv, struct bio *bio,
415     struct dsched_fq_priv *fqp)
416 {
417         struct timeval tv;
418
419         if (dpriv->idle) {
420                 getmicrotime(&tv);
421                 atomic_add_int(&dpriv->idle_time,
422                     (int)(1000000*((tv.tv_sec - dpriv->start_idle.tv_sec)) +
423                     (tv.tv_usec - dpriv->start_idle.tv_usec)));
424                 dpriv->idle = 0;
425         }
426         dsched_strategy_async(dpriv->dp, bio, fq_completed, fqp);
427
428         atomic_add_int(&fqp->issued, 1);
429         atomic_add_int(&dpriv->incomplete_tp, 1);
430         atomic_add_int(&fq_stats.transactions, 1);
431 }