fq - fix a (possible) deadlock
authorAlex Hornung <ahornung@gmail.com>
Fri, 26 Aug 2011 00:29:17 +0000 (00:29 +0000)
committerAlex Hornung <ahornung@gmail.com>
Sat, 27 Aug 2011 10:33:50 +0000 (10:33 +0000)
 * A deadlock can occur if a lock is held around the strategy call when
   the completion is synchronous and ends up in the destruction of a
   tdio, as both will acquire the same diskctx lock.

 * Refactor the code around strategy calls so that the bios are first
   prepared, under the protection of the lock, and inserted into a local
   array. Once that is ready and the bios are ready and taken out of
   their respective queues, the locks are dropped and the strategy call
   occurs locklessly.

Reported-by: Antonio Huete (tuxillo@)
sys/kern/dsched/fq/fq.h
sys/kern/dsched/fq/fq_core.c
sys/kern/dsched/fq/fq_diskops.c

index a77bd48..75c9f0d 100644 (file)
@@ -55,6 +55,9 @@
 #define FQ_PRIO_IDLE           -1
 #define        FQ_BUCKET_ACTIVE        0x01
 
+#define FQ_DISPATCH_SML_ARRAY_SZ       128
+#define FQ_DISPATCH_ARRAY_SZ   512
+
 #define        FQ_DRAIN_CANCEL 0x1
 #define        FQ_DRAIN_FLUSH  0x2
 
@@ -95,6 +98,10 @@ struct fq_thread_io {
        int     rebalance;      /* thread needs to rebalance w/ fq_balance_self */
 };
 
+struct dispatch_prep {
+       struct fq_thread_io     *tdio;
+       struct bio              *bio;
+};
 
 
 void   fq_balance_thread(struct fq_disk_ctx *diskctx);
index 823895f..848a7eb 100644 (file)
@@ -52,7 +52,7 @@
 #include <kern/dsched/fq/fq.h>
 
 static int     dsched_fq_version_maj = 1;
-static int     dsched_fq_version_min = 0;
+static int     dsched_fq_version_min = 1;
 
 /* Make sure our structs fit */
 CTASSERT(sizeof(struct fq_thread_io) <= DSCHED_THREAD_IO_MAX_SZ);
@@ -65,10 +65,12 @@ extern struct dsched_policy dsched_fq_policy;
 void
 fq_dispatcher(struct fq_disk_ctx *diskctx)
 {
+       struct dispatch_prep dispatch_ary[FQ_DISPATCH_ARRAY_SZ];
        struct dsched_thread_io *ds_tdio, *ds_tdio2;
        struct fq_thread_io     *tdio;
        struct bio *bio, *bio2;
        int idle;
+       int i, prepd_io;
 
        /*
         * We need to manually assign an tdio to the tdctx of this thread
@@ -110,6 +112,9 @@ fq_dispatcher(struct fq_disk_ctx *diskctx)
                if (idle == 0)
                        idle = diskctx->idle;
 
+               /* Set the number of prepared requests to 0 */
+               i = 0;
+
                /*
                 * XXX: further room for improvements here. It would be better
                 *      to dispatch a few requests from each tdio as to ensure
@@ -132,11 +137,13 @@ fq_dispatcher(struct fq_disk_ctx *diskctx)
                                tdio->max_tp += 5;
                        }
 
+                       prepd_io = 0;
                        TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
                                if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
                                        fq_balance_self(tdio);
-                               if ((tdio->max_tp > 0) &&
-                                   ((tdio->issued >= tdio->max_tp)))
+                               if (((tdio->max_tp > 0) &&
+                                   (tdio->issued + prepd_io >= tdio->max_tp)) ||
+                                   (i == FQ_DISPATCH_ARRAY_SZ))
                                        break;
 
                                TAILQ_REMOVE(&tdio->head.queue, bio, link);
@@ -145,12 +152,36 @@ fq_dispatcher(struct fq_disk_ctx *diskctx)
                                /*
                                 * beware that we do have an tdio reference
                                 * from the queueing
+                                *
+                                * XXX: note that here we don't dispatch it yet
+                                *      but just prepare it for dispatch so
+                                *      that no locks are held when calling
+                                *      into the drivers.
                                 */
-                               fq_dispatch(diskctx, bio, tdio);
+                               dispatch_ary[i].bio = bio;
+                               dispatch_ary[i].tdio = tdio;
+                               ++i;
+                               ++prepd_io;
                        }
                        DSCHED_THREAD_IO_UNLOCK(&tdio->head);
 
                }
+
+               dsched_disk_ctx_ref(&diskctx->head);
+               DSCHED_DISK_CTX_UNLOCK(&diskctx->head);
+
+               /*
+                * Dispatch all the previously prepared bios, now without
+                * holding any locks.
+                */
+               for (--i; i >= 0; i--) {
+                       bio = dispatch_ary[i].bio;
+                       tdio = dispatch_ary[i].tdio;
+                       fq_dispatch(diskctx, bio, tdio);
+               }
+
+               DSCHED_DISK_CTX_LOCK(&diskctx->head);
+               dsched_disk_ctx_unref(&diskctx->head);
        }
 }
 
index 487ea51..c10dcc2 100644 (file)
@@ -160,10 +160,12 @@ fq_cancel(struct dsched_disk_ctx *ds_diskctx)
 static int
 fq_queue(struct dsched_disk_ctx *ds_diskctx, struct dsched_thread_io *ds_tdio, struct bio *obio)
 {
+       struct bio *b_dispatch_ary[FQ_DISPATCH_SML_ARRAY_SZ];
        struct bio *bio, *bio2;
        struct fq_thread_io     *tdio;
        struct fq_disk_ctx      *diskctx;
        int max_tp, transactions;
+       int i;
 
        /* We don't handle flushes, let dsched dispatch them */
        if (__predict_false(obio->bio_buf->b_cmd == BUF_CMD_FLUSH))
@@ -187,24 +189,43 @@ fq_queue(struct dsched_disk_ctx *ds_diskctx, struct dsched_thread_io *ds_tdio, s
                KKASSERT(tdio->head.qlength >= 0);
 
                if (tdio->head.qlength > 0) {
+                       i = 0;
+
                        DSCHED_THREAD_IO_LOCK(&tdio->head);
 
                        TAILQ_FOREACH_MUTABLE(bio, &tdio->head.queue, link, bio2) {
                                /* Rebalance ourselves if required */
                                if (atomic_cmpset_int(&tdio->rebalance, 1, 0))
                                        fq_balance_self(tdio);
-                               if ((tdio->max_tp > 0) && (tdio->issued >= tdio->max_tp))
+                               if ((tdio->max_tp > 0) &&
+                                   (tdio->issued + i >= tdio->max_tp))
+                                       break;
+                               if (i == FQ_DISPATCH_SML_ARRAY_SZ)
                                        break;
+
                                TAILQ_REMOVE(&tdio->head.queue, bio, link);
                                --tdio->head.qlength;
 
                                /*
                                 * beware that we do have an tdio reference from the
                                 * queueing
+                                *
+                                * XXX: note that here we don't dispatch the BIOs yet
+                                *      but just prepare them for dispatch so that
+                                *      later they are pushed down to the driver
+                                *      without holding locks.
                                 */
-                               fq_dispatch(diskctx, bio, tdio);
+                               b_dispatch_ary[i++] = bio;
                        }
+
                        DSCHED_THREAD_IO_UNLOCK(&tdio->head);
+
+                       /*
+                        * Now dispatch all the prepared BIOs without holding
+                        * the thread_io lock.
+                        */
+                       for (--i; i >= 0; i--)
+                               fq_dispatch(diskctx, b_dispatch_ary[i], tdio);
                }
 
                /* Nothing is pending from previous IO, so just pass it down */