fq - fix a (possible) deadlock
[dragonfly.git] / sys / kern / dsched / fq / fq_diskops.c
1 /*
2  * Copyright (c) 2009, 2010 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Alex Hornung <ahornung@gmail.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
37 #include <sys/proc.h>
38 #include <sys/sysctl.h>
39 #include <sys/buf.h>
40 #include <sys/conf.h>
41 #include <sys/diskslice.h>
42 #include <sys/disk.h>
43 #include <machine/atomic.h>
44 #include <sys/thread.h>
45 #include <sys/thread2.h>
46 #include <sys/ctype.h>
47 #include <sys/buf2.h>
48 #include <sys/syslog.h>
49 #include <sys/dsched.h>
50 #include <machine/param.h>
51
52 #include <kern/dsched/fq/fq.h>
53
54 static dsched_prepare_t         fq_prepare;
55 static dsched_teardown_t        fq_teardown;
56 static dsched_cancel_t          fq_cancel;
57 static dsched_queue_t           fq_queue;
58
59 extern struct dsched_fq_stats   fq_stats;
60
61 struct dsched_policy dsched_fq_policy = {
62         .name = "fq",
63
64         .prepare = fq_prepare,
65         .teardown = fq_teardown,
66         .cancel_all = fq_cancel,
67         .bio_queue = fq_queue
68 };
69
70 static int
71 fq_prepare(struct dsched_disk_ctx *ds_diskctx)
72 {
73         struct  fq_disk_ctx     *diskctx = (struct fq_disk_ctx *)ds_diskctx;
74         struct thread *td_core, *td_balance;
75
76         lwkt_create((void (*)(void *))fq_dispatcher, diskctx, &td_core,
77                     NULL, 0, -1, "fq_dispatch_%s",
78                     ds_diskctx->dp->d_cdev->si_name);
79         lwkt_create((void (*)(void *))fq_balance_thread, diskctx, &td_balance,
80                     NULL, 0, -1, "fq_balance_%s",
81                     ds_diskctx->dp->d_cdev->si_name);
82         diskctx->td_balance = td_balance;
83
84         return 0;
85 }
86
87
88
89 static void
90 fq_teardown(struct dsched_disk_ctx *ds_diskctx)
91 {
92         struct fq_disk_ctx *diskctx = (struct fq_disk_ctx *)ds_diskctx;
93         KKASSERT(diskctx != NULL);
94
95         /* Basically kill the dispatcher thread */
96         diskctx->die = 1;
97         wakeup(diskctx->td_balance);
98         wakeup(diskctx);
99         tsleep(diskctx, 0, "fq_dispatcher", hz/5); /* wait 200 ms */
100         wakeup(diskctx->td_balance);
101         wakeup(diskctx);
102         tsleep(diskctx, 0, "fq_dispatcher", hz/10); /* wait 100 ms */
103         wakeup(diskctx->td_balance);
104         wakeup(diskctx);
105 }
106
107
108 /* Must be called with locked diskctx */
109 void
110 fq_drain(struct fq_disk_ctx *diskctx, int mode)
111 {
112         struct dsched_thread_io *ds_tdio, *ds_tdio2;
113         struct fq_thread_io *tdio;
114         struct bio *bio, *bio2;
115
116         TAILQ_FOREACH_MUTABLE(ds_tdio, &diskctx->head.tdio_list, dlink, ds_tdio2) {
117                 tdio = (struct fq_thread_io *)ds_tdio;
118                 if (tdio->head.qlength == 0)
119                         continue;
120
121                 DSCHED_THREAD_IO_LOCK(&tdio->head);
122                 TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
123                         TAILQ_REMOVE(&tdio->head.queue, bio, link);
124                         --tdio->head.qlength;
125                         if (__predict_false(mode == FQ_DRAIN_CANCEL)) {
126                                 /* FQ_DRAIN_CANCEL */
127                                 dsched_cancel_bio(bio);
128                                 atomic_add_int(&fq_stats.cancelled, 1);
129
130                                 /* Release ref acquired on fq_queue */
131                                 /* XXX: possible failure point */
132                                 dsched_thread_io_unref(&tdio->head);
133                         } else {
134                                 /* FQ_DRAIN_FLUSH */
135                                 fq_dispatch(diskctx, bio, tdio);
136                         }
137                 }
138                 DSCHED_THREAD_IO_UNLOCK(&tdio->head);
139         }
140         return;
141 }
142
143 static void
144 fq_cancel(struct dsched_disk_ctx *ds_diskctx)
145 {
146         struct fq_disk_ctx      *diskctx = (struct fq_disk_ctx *)ds_diskctx;
147
148         KKASSERT(diskctx != NULL);
149
150         /*
151          * all bios not in flight are queued in their respective tdios.
152          * good thing we have a list of tdios per disk diskctx.
153          */
154         DSCHED_DISK_CTX_LOCK(&diskctx->head);
155         fq_drain(diskctx, FQ_DRAIN_CANCEL);
156         DSCHED_DISK_CTX_UNLOCK(&diskctx->head);
157 }
158
159
160 static int
161 fq_queue(struct dsched_disk_ctx *ds_diskctx, struct dsched_thread_io *ds_tdio, struct bio *obio)
162 {
163         struct bio *b_dispatch_ary[FQ_DISPATCH_SML_ARRAY_SZ];
164         struct bio *bio, *bio2;
165         struct fq_thread_io     *tdio;
166         struct fq_disk_ctx      *diskctx;
167         int max_tp, transactions;
168         int i;
169
170         /* We don't handle flushes, let dsched dispatch them */
171         if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
172                 return (EINVAL);
173
174         tdio = (struct fq_thread_io *)ds_tdio;
175         diskctx = (struct fq_disk_ctx *)ds_diskctx;
176
177         if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
178                 fq_balance_self(tdio);
179
180         max_tp = tdio->max_tp;
181         transactions = tdio->issued;
182
183         /* | No rate limiting || Hasn't reached limit rate | */
184         if ((max_tp == 0) || (transactions < max_tp)) {
185                 /*
186                  * Process pending bios from previous _queue() actions that
187                  * have been rate-limited and hence queued in the tdio.
188                  */
189                 KKASSERT(tdio->head.qlength >= 0);
190
191                 if (tdio->head.qlength > 0) {
192                         i = 0;
193
194                         DSCHED_THREAD_IO_LOCK(&tdio->head);
195
196                         TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
197                                 /* Rebalance ourselves if required */
198                                 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
199                                         fq_balance_self(tdio);
200                                 if ((tdio->max_tp > 0) &&
201                                     (tdio->issued + i >= tdio->max_tp))
202                                         break;
203                                 if (i == FQ_DISPATCH_SML_ARRAY_SZ)
204                                         break;
205
206                                 TAILQ_REMOVE(&tdio->head.queue, bio, link);
207                                 --tdio->head.qlength;
208
209                                 /*
210                                  * beware that we do have an tdio reference from the
211                                  * queueing
212                                  *
213                                  * XXX: note that here we don't dispatch the BIOs yet
214                                  *      but just prepare them for dispatch so that
215                                  *      later they are pushed down to the driver
216                                  *      without holding locks.
217                                  */
218                                 b_dispatch_ary[i++] = bio;
219                         }
220
221                         DSCHED_THREAD_IO_UNLOCK(&tdio->head);
222
223                         /*
224                          * Now dispatch all the prepared BIOs without holding
225                          * the thread_io lock.
226                          */
227                         for (--i; i >= 0; i--)
228                                 fq_dispatch(diskctx, b_dispatch_ary[i], tdio);
229                 }
230
231                 /* Nothing is pending from previous IO, so just pass it down */
232                 dsched_thread_io_ref(&tdio->head);
233
234                 fq_dispatch(diskctx, obio, tdio);
235         } else {
236                 /*
237                  * This thread has exceeeded its fair share,
238                  * the transactions are now rate limited. At
239                  * this point, the rate would be exceeded, so
240                  * we just queue requests instead of
241                  * despatching them.
242                  */
243                 DSCHED_THREAD_IO_LOCK(&tdio->head);
244                 dsched_thread_io_ref(&tdio->head);
245
246                 /*
247                  * Prioritize reads by inserting them at the front of the
248                  * queue.
249                  *
250                  * XXX: this might cause issues with data that should
251                  *      have been written and is being read, but hasn't
252                  *      actually been written yet.
253                  */
254                 if (obio->bio_buf->b_cmd == BUF_CMD_READ)
255                         TAILQ_INSERT_HEAD(&tdio->head.queue, obio, link);
256                 else
257                         TAILQ_INSERT_TAIL(&tdio->head.queue, obio, link);
258
259                 ++tdio->head.qlength;
260                 DSCHED_THREAD_IO_UNLOCK(&tdio->head);
261         }
262
263         return 0;
264 }
265
266
267 void
268 fq_completed(struct bio *bp)
269 {
270         struct bio *obio;
271         int     delta;
272         struct fq_thread_io     *tdio;
273         struct fq_disk_ctx      *diskctx;
274         struct disk     *dp;
275         int transactions, latency;
276
277         struct timeval tv;
278
279         getmicrotime(&tv);
280
281         dp = dsched_get_bio_dp(bp);
282         diskctx = dsched_get_disk_priv(dp);
283         tdio = dsched_get_bio_priv(bp);
284         KKASSERT(tdio != NULL);
285         KKASSERT(diskctx != NULL);
286
287         dsched_disk_ctx_ref(&diskctx->head);
288         atomic_subtract_int(&diskctx->incomplete_tp, 1);
289
290         if (!(bp->bio_buf->b_flags & B_ERROR)) {
291                 /*
292                  * Get the start ticks from when the bio was dispatched and calculate
293                  * how long it took until completion.
294                  */
295                 delta = (int)(1000000*((tv.tv_sec - bp->bio_caller_info3.tv.tv_sec)) +
296                     (tv.tv_usec - bp->bio_caller_info3.tv.tv_usec));
297                 if (delta <= 0)
298                         delta = 10000; /* default assume 10 ms */
299
300                 /* This is the last in-flight request and the disk is not idle yet */
301                 if ((diskctx->incomplete_tp <= 1) && (!diskctx->idle)) {
302                         diskctx->idle = 1;      /* Mark disk as idle */
303                         diskctx->start_idle = tv;       /* Save start idle time */
304                         wakeup(diskctx);                /* Wake up fq_dispatcher */
305                 }
306                 transactions = atomic_fetchadd_int(&tdio->transactions, 1);
307                 latency = tdio->avg_latency;
308
309                 if (latency != 0) {
310                         /* Moving averager, ((n-1)*avg_{n-1} + x) / n */
311                         latency = (int)(((int64_t)(transactions) *
312                             (int64_t)latency + (int64_t)delta) / ((int64_t)transactions + 1));
313                         KKASSERT(latency > 0);
314                 } else {
315                         latency = delta;
316                 }
317
318                 tdio->avg_latency = latency;
319
320                 atomic_add_int(&fq_stats.transactions_completed, 1);
321         }
322
323         dsched_disk_ctx_unref(&diskctx->head);
324         /* decrease the ref count that was bumped for us on dispatch */
325         dsched_thread_io_unref(&tdio->head);
326
327         obio = pop_bio(bp);
328         biodone(obio);
329 }
330
331 void
332 fq_dispatch(struct fq_disk_ctx *diskctx, struct bio *bio,
333     struct fq_thread_io *tdio)
334 {
335         struct timeval tv;
336
337         if (diskctx->idle) {
338                 getmicrotime(&tv);
339                 atomic_add_int(&diskctx->idle_time,
340                     (int)(1000000*((tv.tv_sec - diskctx->start_idle.tv_sec)) +
341                     (tv.tv_usec - diskctx->start_idle.tv_usec)));
342                 diskctx->idle = 0;
343         }
344         dsched_strategy_async(diskctx->head.dp, bio, fq_completed, tdio);
345
346         atomic_add_int(&tdio->issued, 1);
347         atomic_add_int(&diskctx->incomplete_tp, 1);
348         atomic_add_int(&fq_stats.transactions, 1);
349 }