84c49e20b9c87bc83e6b3151f1e058f0362f2d33
[dragonfly.git] / sys / dsched / fq / dsched_fq_diskops.c
1 /*
2  * Copyright (c) 2009, 2010 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Alex Hornung <ahornung@gmail.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
37 #include <sys/proc.h>
38 #include <sys/sysctl.h>
39 #include <sys/buf.h>
40 #include <sys/conf.h>
41 #include <sys/diskslice.h>
42 #include <sys/disk.h>
43 #include <machine/atomic.h>
44 #include <sys/malloc.h>
45 #include <sys/thread.h>
46 #include <sys/thread2.h>
47 #include <sys/sysctl.h>
48 #include <sys/spinlock2.h>
49 #include <machine/md_var.h>
50 #include <sys/ctype.h>
51 #include <sys/syslog.h>
52 #include <sys/device.h>
53 #include <sys/msgport.h>
54 #include <sys/msgport2.h>
55 #include <sys/buf2.h>
56 #include <sys/dsched.h>
57 #include <machine/varargs.h>
58 #include <machine/param.h>
59
60 #include <dsched/fq/dsched_fq.h>
61
62
63 MALLOC_DEFINE(M_DSCHEDFQ, "dschedfq", "fq dsched allocs");
64
65 static dsched_prepare_t         fq_prepare;
66 static dsched_teardown_t        fq_teardown;
67 static dsched_flush_t           fq_flush;
68 static dsched_cancel_t          fq_cancel;
69 static dsched_queue_t           fq_queue;
70
71 /* These are in _procops */
72 dsched_new_buf_t        fq_new_buf;
73 dsched_new_proc_t       fq_new_proc;
74 dsched_new_thread_t     fq_new_thread;
75 dsched_exit_buf_t       fq_exit_buf;
76 dsched_exit_proc_t      fq_exit_proc;
77 dsched_exit_thread_t    fq_exit_thread;
78
79 extern struct dsched_fq_stats   fq_stats;
80 extern struct lock      fq_tdctx_lock;
81 extern TAILQ_HEAD(, fq_thread_ctx)      dsched_tdctx_list;
82 extern struct callout   fq_callout;
83
84 struct dsched_policy dsched_fq_policy = {
85         .name = "fq",
86
87         .prepare = fq_prepare,
88         .teardown = fq_teardown,
89         .flush = fq_flush,
90         .cancel_all = fq_cancel,
91         .bio_queue = fq_queue,
92
93         .new_buf = fq_new_buf,
94         .new_proc = fq_new_proc,
95         .new_thread = fq_new_thread,
96         .exit_buf = fq_exit_buf,
97         .exit_proc = fq_exit_proc,
98         .exit_thread = fq_exit_thread,
99 };
100
101
102
103 static int
104 fq_prepare(struct disk *dp)
105 {
106         struct  fq_disk_ctx     *diskctx;
107         struct fq_thread_ctx    *tdctx;
108         struct fq_thread_io     *tdio;
109         struct thread *td_core, *td_balance;
110
111         diskctx = fq_disk_ctx_alloc(dp);
112         fq_disk_ctx_ref(diskctx);
113         dsched_set_disk_priv(dp, diskctx);
114
115         FQ_GLOBAL_THREAD_CTX_LOCK();
116         TAILQ_FOREACH(tdctx, &dsched_tdctx_list, link) {
117                 tdio = fq_thread_io_alloc(dp, tdctx);
118 #if 0
119                 fq_thread_io_ref(tdio);
120 #endif
121         }
122         FQ_GLOBAL_THREAD_CTX_UNLOCK();
123
124         lwkt_create((void (*)(void *))fq_dispatcher, diskctx, &td_core, NULL,
125             TDF_MPSAFE, -1, "fq_dispatch_%s", dp->d_cdev->si_name);
126         lwkt_create((void (*)(void *))fq_balance_thread, diskctx, &td_balance,
127             NULL, TDF_MPSAFE, -1, "fq_balance_%s", dp->d_cdev->si_name);
128         diskctx->td_balance = td_balance;
129
130         return 0;
131 }
132
133
134
135 static void
136 fq_teardown(struct disk *dp)
137 {
138         struct fq_disk_ctx *diskctx;
139
140         diskctx = dsched_get_disk_priv(dp);
141         KKASSERT(diskctx != NULL);
142
143         /* Basically kill the dispatcher thread */
144         diskctx->die = 1;
145         wakeup(diskctx->td_balance);
146         wakeup(diskctx);
147         tsleep(diskctx, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
148         wakeup(diskctx->td_balance);
149         wakeup(diskctx);
150         tsleep(diskctx, 0, "fq_dispatcher", hz/10); /* wait 100 ms */
151         wakeup(diskctx->td_balance);
152         wakeup(diskctx);
153
154         fq_disk_ctx_unref(diskctx); /* from prepare */
155         fq_disk_ctx_unref(diskctx); /* from alloc */
156
157         dsched_set_disk_priv(dp, NULL);
158 }
159
160
161 /* Must be called with locked diskctx */
162 void
163 fq_drain(struct fq_disk_ctx *diskctx, int mode)
164 {
165         struct fq_thread_io *tdio, *tdio2;
166         struct bio *bio, *bio2;
167
168         TAILQ_FOREACH_MUTABLE(tdio, &diskctx->fq_tdio_list, dlink, tdio2) {
169                 if (tdio->qlength == 0)
170                         continue;
171
172                 FQ_THREAD_IO_LOCK(tdio);
173                 TAILQ_FOREACH_MUTABLE(bio, &tdio->queue, link, bio2) {
174                         TAILQ_REMOVE(&tdio->queue, bio, link);
175                         --tdio->qlength;
176                         if (__predict_false(mode == FQ_DRAIN_CANCEL)) {
177                                 /* FQ_DRAIN_CANCEL */
178                                 dsched_cancel_bio(bio);
179                                 atomic_add_int(&fq_stats.cancelled, 1);
180
181                                 /* Release ref acquired on fq_queue */
182                                 /* XXX: possible failure point */
183                                 fq_thread_io_unref(tdio);
184                         } else {
185                                 /* FQ_DRAIN_FLUSH */
186                                 fq_dispatch(diskctx, bio, tdio);
187                         }
188                 }
189                 FQ_THREAD_IO_UNLOCK(tdio);
190         }
191         return;
192 }
193
194
195 static void
196 fq_flush(struct disk *dp, struct bio *bio)
197 {
198         /* we don't do anything here */
199 }
200
201
202 static void
203 fq_cancel(struct disk *dp)
204 {
205         struct fq_disk_ctx      *diskctx;
206
207         diskctx = dsched_get_disk_priv(dp);
208         KKASSERT(diskctx != NULL);
209
210         /*
211          * all bios not in flight are queued in their respective tdios.
212          * good thing we have a list of tdios per disk diskctx.
213          */
214         FQ_DISK_CTX_LOCK(diskctx);
215         fq_drain(diskctx, FQ_DRAIN_CANCEL);
216         FQ_DISK_CTX_UNLOCK(diskctx);
217 }
218
219
220 static int
221 fq_queue(struct disk *dp, struct bio *obio)
222 {
223         struct bio *bio, *bio2;
224         struct fq_thread_ctx    *tdctx;
225         struct fq_thread_io     *tdio;
226         struct fq_disk_ctx      *diskctx;
227         int found = 0;
228         int max_tp, transactions;
229
230         /* We don't handle flushes, let dsched dispatch them */
231         if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
232                 return (EINVAL);
233
234         /* get tdctx and tdio */
235         tdctx = dsched_get_buf_priv(obio->bio_buf);
236
237         /*
238          * XXX: hack. we don't want the assert because some null-tdctxs are
239          * leaking through; just dispatch them. These come from the
240          * mi_startup() mess, which does the initial root mount.
241          */
242 #if 0
243         KKASSERT(tdctx != NULL);
244 #endif
245         if (tdctx == NULL) {
246                 /* We don't handle this case, let dsched dispatch */
247                 atomic_add_int(&fq_stats.no_tdctx, 1);
248                 return (EINVAL);
249         }
250
251
252         FQ_THREAD_CTX_LOCK(tdctx);
253 #if 0
254         kprintf("fq_queue, tdctx = %p\n", tdctx);
255 #endif
256         KKASSERT(!TAILQ_EMPTY(&tdctx->fq_tdio_list));
257         TAILQ_FOREACH(tdio, &tdctx->fq_tdio_list, link) {
258                 if (tdio->dp == dp) {
259                         fq_thread_io_ref(tdio);
260                         found = 1;
261                         break;
262                 }
263         }
264         FQ_THREAD_CTX_UNLOCK(tdctx);
265         dsched_clr_buf_priv(obio->bio_buf);
266         fq_thread_ctx_unref(tdctx); /* acquired on new_buf */
267
268         KKASSERT(found == 1);
269         diskctx = dsched_get_disk_priv(dp);
270
271         if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
272                 fq_balance_self(tdio);
273
274         max_tp = tdio->max_tp;
275         transactions = tdio->issued;
276
277         /* | No rate limiting || Hasn't reached limit rate | */
278         if ((max_tp == 0) || (transactions < max_tp)) {
279                 /*
280                  * Process pending bios from previous _queue() actions that
281                  * have been rate-limited and hence queued in the tdio.
282                  */
283                 KKASSERT(tdio->qlength >= 0);
284
285                 if (tdio->qlength > 0) {
286                         FQ_THREAD_IO_LOCK(tdio);
287
288                         TAILQ_FOREACH_MUTABLE(bio, &tdio->queue, link, bio2) {
289                                 /* Rebalance ourselves if required */
290                                 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
291                                         fq_balance_self(tdio);
292                                 if ((tdio->max_tp > 0) && (tdio->issued >= tdio->max_tp))
293                                         break;
294                                 TAILQ_REMOVE(&tdio->queue, bio, link);
295                                 --tdio->qlength;
296
297                                 /*
298                                  * beware that we do have an tdio reference from the
299                                  * queueing
300                                  */
301                                 fq_dispatch(diskctx, bio, tdio);
302                         }
303                         FQ_THREAD_IO_UNLOCK(tdio);
304                 }
305
306                 /* Nothing is pending from previous IO, so just pass it down */
307                 fq_thread_io_ref(tdio);
308
309                 fq_dispatch(diskctx, obio, tdio);
310         } else {
311                 /*
312                  * This thread has exceeeded its fair share,
313                  * the transactions are now rate limited. At
314                  * this point, the rate would be exceeded, so
315                  * we just queue requests instead of
316                  * despatching them.
317                  */
318                 FQ_THREAD_IO_LOCK(tdio);
319                 fq_thread_io_ref(tdio);
320
321                 /*
322                  * Prioritize reads by inserting them at the front of the
323                  * queue.
324                  *
325                  * XXX: this might cause issues with data that should
326                  *      have been written and is being read, but hasn't
327                  *      actually been written yet.
328                  */
329                 if (obio->bio_buf->b_cmd == BUF_CMD_READ)
330                         TAILQ_INSERT_HEAD(&tdio->queue, obio, link);
331                 else
332                         TAILQ_INSERT_TAIL(&tdio->queue, obio, link);
333
334                 ++tdio->qlength;
335                 FQ_THREAD_IO_UNLOCK(tdio);
336         }
337
338         fq_thread_io_unref(tdio);
339         return 0;
340 }
341
342
343 void
344 fq_completed(struct bio *bp)
345 {
346         struct bio *obio;
347         int     delta;
348         struct fq_thread_io     *tdio;
349         struct fq_disk_ctx      *diskctx;
350         struct disk     *dp;
351         int transactions, latency;
352
353         struct timeval tv;
354
355         getmicrotime(&tv);
356
357         dp = dsched_get_bio_dp(bp);
358         diskctx = dsched_get_disk_priv(dp);
359         tdio = dsched_get_bio_priv(bp);
360         KKASSERT(tdio != NULL);
361         KKASSERT(diskctx != NULL);
362
363         fq_disk_ctx_ref(diskctx);
364         atomic_subtract_int(&diskctx->incomplete_tp, 1);
365
366         if (!(bp->bio_buf->b_flags & B_ERROR)) {
367                 /*
368                  * Get the start ticks from when the bio was dispatched and calculate
369                  * how long it took until completion.
370                  */
371                 delta = (int)(1000000*((tv.tv_sec - bp->bio_caller_info3.tv.tv_sec)) +
372                     (tv.tv_usec - bp->bio_caller_info3.tv.tv_usec));
373                 if (delta <= 0)
374                         delta = 10000; /* default assume 10 ms */
375
376                 /* This is the last in-flight request and the disk is not idle yet */
377                 if ((diskctx->incomplete_tp <= 1) && (!diskctx->idle)) {
378                         diskctx->idle = 1;      /* Mark disk as idle */
379                         diskctx->start_idle = tv;       /* Save start idle time */
380                         wakeup(diskctx);                /* Wake up fq_dispatcher */
381                 }
382                 transactions = atomic_fetchadd_int(&tdio->transactions, 1);
383                 latency = tdio->avg_latency;
384
385                 if (latency != 0) {
386                         /* Moving averager, ((n-1)*avg_{n-1} + x) / n */
387                         latency = (int)(((int64_t)(transactions) *
388                             (int64_t)latency + (int64_t)delta) / ((int64_t)transactions + 1));
389                         KKASSERT(latency > 0);
390                 } else {
391                         latency = delta;
392                 }
393
394                 tdio->avg_latency = latency;
395
396                 atomic_add_int(&fq_stats.transactions_completed, 1);
397         }
398
399         fq_disk_ctx_unref(diskctx);
400         /* decrease the ref count that was bumped for us on dispatch */
401         fq_thread_io_unref(tdio);
402
403         obio = pop_bio(bp);
404         biodone(obio);
405 }
406
407 void
408 fq_dispatch(struct fq_disk_ctx *diskctx, struct bio *bio,
409     struct fq_thread_io *tdio)
410 {
411         struct timeval tv;
412
413         if (diskctx->idle) {
414                 getmicrotime(&tv);
415                 atomic_add_int(&diskctx->idle_time,
416                     (int)(1000000*((tv.tv_sec - diskctx->start_idle.tv_sec)) +
417                     (tv.tv_usec - diskctx->start_idle.tv_usec)));
418                 diskctx->idle = 0;
419         }
420         dsched_strategy_async(diskctx->dp, bio, fq_completed, tdio);
421
422         atomic_add_int(&tdio->issued, 1);
423         atomic_add_int(&diskctx->incomplete_tp, 1);
424         atomic_add_int(&fq_stats.transactions, 1);
425 }