Merge branch 'vendor/OPENPAM'
[dragonfly.git] / sys / kern / dsched / fq / fq_core.c
1 /*
2  * Copyright (c) 2009, 2010 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Alex Hornung <ahornung@gmail.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34 #include <sys/param.h>
35 #include <sys/systm.h>
36 #include <sys/kernel.h>
37 #include <sys/proc.h>
38 #include <sys/sysctl.h>
39 #include <sys/buf.h>
40 #include <sys/conf.h>
41 #include <sys/diskslice.h>
42 #include <sys/disk.h>
43 #include <machine/atomic.h>
44 #include <sys/thread.h>
45 #include <sys/thread2.h>
46 #include <sys/ctype.h>
47 #include <sys/buf2.h>
48 #include <sys/syslog.h>
49 #include <sys/dsched.h>
50 #include <machine/param.h>
51
52 #include <kern/dsched/fq/fq.h>
53
54 static int      dsched_fq_version_maj = 1;
55 static int      dsched_fq_version_min = 1;
56
57 /* Make sure our structs fit */
58 CTASSERT(sizeof(struct fq_thread_io) <= DSCHED_THREAD_IO_MAX_SZ);
59 CTASSERT(sizeof(struct fq_disk_ctx) <= DSCHED_DISK_CTX_MAX_SZ);
60
61 struct dsched_fq_stats  fq_stats;
62
63 extern struct dsched_policy dsched_fq_policy;
64
65 void
66 fq_dispatcher(struct fq_disk_ctx *diskctx)
67 {
68         struct dispatch_prep *dispatch_ary;
69         struct dsched_thread_io *ds_tdio, *ds_tdio2;
70         struct fq_thread_io     *tdio;
71         struct bio *bio, *bio2;
72         int idle;
73         int i, prepd_io;
74
75         /*
76          * Array is dangerously big for an on-stack declaration, allocate
77          * it instead.
78          */
79         dispatch_ary = kmalloc(sizeof(*dispatch_ary) * FQ_DISPATCH_ARRAY_SZ,
80                                M_TEMP, M_INTWAIT | M_ZERO);
81
82         /*
83          * We need to manually assign an tdio to the tdctx of this thread
84          * since it isn't assigned one during fq_prepare, as the disk
85          * is not set up yet.
86          */
87         tdio = (struct fq_thread_io *)dsched_new_policy_thread_tdio(&diskctx->head,
88             &dsched_fq_policy);
89
90         DSCHED_DISK_CTX_LOCK(&diskctx->head);
91         for(;;) {
92                 idle = 0;
93                 /*
94                  * sleep ~60 ms, failsafe low hz rates.
95                  */
96                 if ((lksleep(diskctx, &diskctx->head.lock, 0,
97                              "fq_dispatcher", (hz + 14) / 15) == 0)) {
98                         /*
99                          * We've been woken up; this either means that we are
100                          * supposed to die away nicely or that the disk is idle.
101                          */
102
103                         if (__predict_false(diskctx->die == 1))
104                                 break;
105
106                         /*
107                          * We have been awakened because the disk is idle.
108                          * So let's get ready to dispatch some extra bios.
109                          */
110                         idle = 1;
111                 }
112
113                 /* Maybe the disk is idle and we just didn't get the wakeup */
114                 if (idle == 0)
115                         idle = diskctx->idle;
116
117                 /* Set the number of prepared requests to 0 */
118                 i = 0;
119
120                 /*
121                  * XXX: further room for improvements here. It would be better
122                  *      to dispatch a few requests from each tdio as to ensure
123                  *      real fairness.
124                  */
125                 TAILQ_FOREACH_MUTABLE(ds_tdio, &diskctx->head.tdio_list, dlink, ds_tdio2) {
126                         tdio = (struct fq_thread_io *)ds_tdio;
127                         if (tdio->head.qlength == 0)
128                                 continue;
129
130                         DSCHED_THREAD_IO_LOCK(&tdio->head);
131                         if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
132                                 fq_balance_self(tdio);
133                         /*
134                          * XXX: why 5 extra? should probably be dynamic,
135                          *      relying on information on latency.
136                          */
137                         if ((tdio->max_tp > 0) && idle &&
138                             (tdio->issued >= tdio->max_tp)) {
139                                 tdio->max_tp += 5;
140                         }
141
142                         prepd_io = 0;
143                         TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
144                                 if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
145                                         fq_balance_self(tdio);
146                                 if (((tdio->max_tp > 0) &&
147                                     (tdio->issued + prepd_io >= tdio->max_tp)) ||
148                                     (i == FQ_DISPATCH_ARRAY_SZ))
149                                         break;
150
151                                 TAILQ_REMOVE(&tdio->head.queue, bio, link);
152                                 --tdio->head.qlength;
153
154                                 /*
155                                  * beware that we do have an tdio reference
156                                  * from the queueing
157                                  *
158                                  * XXX: note that here we don't dispatch it yet
159                                  *      but just prepare it for dispatch so
160                                  *      that no locks are held when calling
161                                  *      into the drivers.
162                                  */
163                                 dispatch_ary[i].bio = bio;
164                                 dispatch_ary[i].tdio = tdio;
165                                 ++i;
166                                 ++prepd_io;
167                         }
168                         DSCHED_THREAD_IO_UNLOCK(&tdio->head);
169
170                 }
171
172                 dsched_disk_ctx_ref(&diskctx->head);
173                 DSCHED_DISK_CTX_UNLOCK(&diskctx->head);
174
175                 /*
176                  * Dispatch all the previously prepared bios, now without
177                  * holding any locks.
178                  */
179                 for (--i; i >= 0; i--) {
180                         bio = dispatch_ary[i].bio;
181                         tdio = dispatch_ary[i].tdio;
182                         fq_dispatch(diskctx, bio, tdio);
183                 }
184
185                 DSCHED_DISK_CTX_LOCK(&diskctx->head);
186                 dsched_disk_ctx_unref(&diskctx->head);
187         }
188
189         /*
190          * If we are supposed to die, drain all queues, then
191          * unlock and exit.
192          */
193         fq_drain(diskctx, FQ_DRAIN_FLUSH);
194         DSCHED_DISK_CTX_UNLOCK(&diskctx->head);
195         kfree(dispatch_ary, M_TEMP);
196
197         kprintf("fq_dispatcher is peacefully dying\n");
198         lwkt_exit();
199         /* NOTREACHED */
200 }
201
202 void
203 fq_balance_thread(struct fq_disk_ctx *diskctx)
204 {
205         struct dsched_thread_io *ds_tdio;
206         struct  fq_thread_io    *tdio;
207         struct timeval tv, old_tv;
208         int64_t total_budget, product;
209         int64_t budget[FQ_PRIO_MAX+1];
210         int     n, i, sum, total_disk_time;
211         int     lost_bits;
212
213         DSCHED_DISK_CTX_LOCK(&diskctx->head);
214
215         getmicrotime(&diskctx->start_interval);
216
217         for (;;) {
218                 /* sleep ~1s */
219                 if ((lksleep(curthread, &diskctx->head.lock, 0, "fq_balancer", hz/2) == 0)) {
220                         if (__predict_false(diskctx->die)) {
221                                 DSCHED_DISK_CTX_UNLOCK(&diskctx->head);
222                                 lwkt_exit();
223                         }
224                 }
225
226                 bzero(budget, sizeof(budget));
227                 total_budget = 0;
228                 n = 0;
229
230                 old_tv = diskctx->start_interval;
231                 getmicrotime(&tv);
232
233                 total_disk_time = (int)(1000000*((tv.tv_sec - old_tv.tv_sec)) +
234                     (tv.tv_usec - old_tv.tv_usec));
235
236                 if (total_disk_time == 0)
237                         total_disk_time = 1;
238
239                 dsched_debug(LOG_INFO, "total_disk_time = %d\n", total_disk_time);
240
241                 diskctx->start_interval = tv;
242
243                 diskctx->disk_busy = (100*(total_disk_time - diskctx->idle_time)) / total_disk_time;
244                 if (diskctx->disk_busy < 0)
245                         diskctx->disk_busy = 0;
246
247                 diskctx->idle_time = 0;
248                 lost_bits = 0;
249
250                 TAILQ_FOREACH(ds_tdio, &diskctx->head.tdio_list, dlink) {
251                         tdio = (struct fq_thread_io *)ds_tdio;
252                         tdio->interval_avg_latency = tdio->avg_latency;
253                         tdio->interval_transactions = tdio->transactions;
254                         if (tdio->interval_transactions > 0) {
255                                 product = (int64_t)tdio->interval_avg_latency *
256                                     tdio->interval_transactions;
257                                 product >>= lost_bits;
258                                 while(total_budget >= INT64_MAX - product) {
259                                         ++lost_bits;
260                                         product >>= 1;
261                                         total_budget >>= 1;
262                                 }
263                                 total_budget += product;
264                                 ++budget[(tdio->head.p) ? tdio->head.p->p_ionice : 0];
265                                 KKASSERT(total_budget >= 0);
266                                 dsched_debug(LOG_INFO,
267                                     "%d) avg_latency = %d, transactions = %d, ioprio = %d\n",
268                                     n, tdio->interval_avg_latency, tdio->interval_transactions,
269                                     (tdio->head.p) ? tdio->head.p->p_ionice : 0);
270                                 ++n;
271                         } else {
272                                 tdio->max_tp = 0;
273                         }
274                         tdio->rebalance = 0;
275                         tdio->transactions = 0;
276                         tdio->avg_latency = 0;
277                         tdio->issued = 0;
278                 }
279
280                 dsched_debug(LOG_INFO, "%d procs competing for disk\n"
281                     "total_budget = %jd (lost bits = %d)\n"
282                     "incomplete tp = %d\n", n, (intmax_t)total_budget,
283                     lost_bits, diskctx->incomplete_tp);
284
285                 if (n == 0)
286                         continue;
287
288                 sum = 0;
289
290                 for (i = 0; i < FQ_PRIO_MAX+1; i++) {
291                         if (budget[i] == 0)
292                                 continue;
293                         sum += (FQ_PRIO_BIAS+i)*budget[i];
294                 }
295
296                 if (sum == 0)
297                         sum = 1;
298
299                 dsched_debug(LOG_INFO, "sum = %d\n", sum);
300
301                 for (i = 0; i < FQ_PRIO_MAX+1; i++) {
302                         if (budget[i] == 0)
303                                 continue;
304
305                         /*
306                          * XXX: if we still overflow here, we really need to switch to
307                          *      some more advanced mechanism such as compound int128 or
308                          *      storing the lost bits so they can be used in the
309                          *      fq_balance_self.
310                          */
311                         diskctx->budgetpb[i] = ((FQ_PRIO_BIAS+i)*total_budget/sum) << lost_bits;
312                         KKASSERT(diskctx->budgetpb[i] >= 0);
313                 }
314
315                 dsched_debug(4, "disk is %d%% busy\n", diskctx->disk_busy);
316                 TAILQ_FOREACH(ds_tdio, &diskctx->head.tdio_list, dlink) {
317                         tdio = (struct fq_thread_io *)ds_tdio;
318                         tdio->rebalance = 1;
319                 }
320
321                 diskctx->prev_full = diskctx->last_full;
322                 diskctx->last_full = (diskctx->disk_busy >= 90)?1:0;
323         }
324 }
325
326
327 /*
328  * fq_balance_self should be called from all sorts of dispatchers. It basically
329  * offloads some of the heavier calculations on throttling onto the process that
330  * wants to do I/O instead of doing it in the fq_balance thread.
331  * - should be called with diskctx lock held
332  */
333 void
334 fq_balance_self(struct fq_thread_io *tdio) {
335         struct fq_disk_ctx *diskctx;
336
337         int64_t budget, used_budget;
338         int64_t avg_latency;
339         int64_t transactions;
340
341         transactions = (int64_t)tdio->interval_transactions;
342         avg_latency = (int64_t)tdio->interval_avg_latency;
343         diskctx = (struct fq_disk_ctx *)tdio->head.diskctx;
344
345 #if 0
346         /* XXX: do we really require the lock? */
347         DSCHED_DISK_CTX_LOCK_ASSERT(diskctx);
348 #endif
349
350         used_budget = avg_latency * transactions;
351         budget = diskctx->budgetpb[(tdio->head.p) ? tdio->head.p->p_ionice : 0];
352
353         if (used_budget > 0) {
354                 dsched_debug(LOG_INFO,
355                     "info: used_budget = %jd, budget = %jd\n",
356                     (intmax_t)used_budget, budget);
357         }
358
359         if ((used_budget > budget) && (diskctx->disk_busy >= 90)) {
360                 KKASSERT(avg_latency != 0);
361
362                 tdio->max_tp = budget/(avg_latency);
363                 atomic_add_int(&fq_stats.procs_limited, 1);
364
365                 dsched_debug(LOG_INFO,
366                     "rate limited to %d transactions\n", tdio->max_tp);
367
368         } else if (((used_budget*2 < budget) || (diskctx->disk_busy < 80)) &&
369             (!diskctx->prev_full && !diskctx->last_full)) {
370                 tdio->max_tp = 0;
371         }
372 }
373
374
375 static int
376 do_fqstats(SYSCTL_HANDLER_ARGS)
377 {
378         return (sysctl_handle_opaque(oidp, &fq_stats, sizeof(struct dsched_fq_stats), req));
379 }
380
381 static int
382 fq_mod_handler(module_t mod, int type, void *unused)
383 {
384         static struct sysctl_ctx_list sysctl_ctx;
385         static struct sysctl_oid *oid;
386         static char version[16];
387         int error;
388
389         ksnprintf(version, sizeof(version), "%d.%d",
390             dsched_fq_version_maj, dsched_fq_version_min);
391
392         switch (type) {
393         case MOD_LOAD:
394                 bzero(&fq_stats, sizeof(struct dsched_fq_stats));
395                 if ((error = dsched_register(&dsched_fq_policy)))
396                         return (error);
397
398                 sysctl_ctx_init(&sysctl_ctx);
399                 oid = SYSCTL_ADD_NODE(&sysctl_ctx,
400                     SYSCTL_STATIC_CHILDREN(_dsched),
401                     OID_AUTO,
402                     "fq",
403                     CTLFLAG_RD, 0, "");
404
405                 SYSCTL_ADD_PROC(&sysctl_ctx, SYSCTL_CHILDREN(oid),
406                     OID_AUTO, "stats", CTLTYPE_OPAQUE|CTLFLAG_RD,
407                     0, 0, do_fqstats, "S,dsched_fq_stats", "fq statistics");
408
409                 SYSCTL_ADD_STRING(&sysctl_ctx, SYSCTL_CHILDREN(oid),
410                     OID_AUTO, "version", CTLFLAG_RD, version, 0, "fq version");
411
412                 kprintf("FQ scheduler policy version %d.%d loaded\n",
413                     dsched_fq_version_maj, dsched_fq_version_min);
414                 break;
415
416         case MOD_UNLOAD:
417                 if ((error = dsched_unregister(&dsched_fq_policy)))
418                         return (error);
419                 sysctl_ctx_free(&sysctl_ctx);
420                 kprintf("FQ scheduler policy unloaded\n");
421                 break;
422
423         default:
424                 break;
425         }
426
427         return 0;
428 }
429
430 DSCHED_POLICY_MODULE(dsched_fq, fq_mod_handler);