pipe - replace use of DELAY() with a better solution, add features.
authorMatthew Dillon <dillon@apollo.backplane.com>
Mon, 13 Jul 2009 05:14:43 +0000 (22:14 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 13 Jul 2009 05:14:43 +0000 (22:14 -0700)
* Use the new tsc functions to poll for new read data in a pipe for a short
  period of time on a SMP box.  This greatly increases the odds of a pipe
  writer on one cpu being able to pipeline data to a reader on another cpu
  without having to use an IPI or tsleep/wakeup.

  For the pipe1 test this brings the synchronous communications path
  over a pipe (Awrite, Bread, Bwrite, Aread) down from 7uS to around 2uS.

  For the pipe2 test this value greatly reduces the number of IPIs
  and improves bandwidth by a few hundred megabytes/sec (the old DELAY
  did the same thing so there is no change for the pipe2 test).

* Add sysctl kern.pipe.delay which defaults to 5000 nanoseconds (5uS).
  This is the maximum a pipe reader will wait for additional data before
  falling back to tsleep/wakeup (and related ipis).  pipe_delay may be
  set to 0 to disable the function.  I value of at least 3000 is
  recomended.  Pipelining large buffers efficiently requires a higher value,
  say up to 8000 or so.

* Allow kern.pipe.mpsafe to be set to 2 which adds a predictive wakeup
  when a writer is found to be stalled.  This currently has no significant
  effect on operations due to token collisions.

* Add statistics: kern.pipe.wblocked and kern.pipe.rblocked, counting
  the number of times a pipe blocks in "pipewr" or "piperd".

* Fix MP races in pipe_ioctl().

sys/kern/sys_pipe.c

index 95ce531..21ee2c0 100644 (file)
@@ -122,17 +122,26 @@ static int pipe_bigcount;
 static int pipe_nbig;
 static int pipe_bcache_alloc;
 static int pipe_bkmem_alloc;
+static int pipe_rblocked_count;
+static int pipe_wblocked_count;
 
 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
+SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
+        CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
+SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
+        CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
 #ifdef SMP
+static int pipe_delay = 5000;  /* 5uS default */
+SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
+        CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
 static int pipe_mpsafe = 0;
 SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe,
         CTLFLAG_RW, &pipe_mpsafe, 0, "");
@@ -398,6 +407,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 {
        struct pipe *rpipe;
        int error;
+       int orig_resid;
        int nread = 0;
        int nbio;
        u_int size;     /* total bytes available */
@@ -411,7 +421,8 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
        /*
         * Degenerate case
         */
-       if (uio->uio_resid == 0)
+       orig_resid = uio->uio_resid;
+       if (orig_resid == 0)
                return(0);
 
        /*
@@ -464,9 +475,8 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        nread += nsize;
 
                        /*
-                        * If the FIFO has not been drained past the half-way
-                        * mark then just continue and do not try to notify
-                        * the writer yet.
+                        * If the FIFO is still over half full just continue
+                        * and do not try to notify the writer yet.
                         */
                        if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
                                notify_writer = 0;
@@ -474,10 +484,9 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        }
 
                        /*
-                        * If the FIFO has been drained past the half-way
-                        * mark we have to check the writer at some point,
-                        * but for now continue if the writer is not yet
-                        * blocked.
+                        * When the FIFO is less then half full notify any
+                        * waiting writer.  WANTW can be checked while
+                        * holding just the rlock.
                         */
                        notify_writer = 1;
                        if ((rpipe->pipe_state & PIPE_WANTW) == 0)
@@ -506,31 +515,47 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 
                /*
                 * Pick up our copy loop again if the writer sent data to
-                * us.
+                * us while we were messing around.
+                *
+                * On a SMP box poll up to pipe_delay nanoseconds for new
+                * data.  Typically a value of 2000 to 4000 is sufficient
+                * to eradicate most IPIs/tsleeps/wakeups when a pipe
+                * is used for synchronous communications with small packets,
+                * and 8000 or so (8uS) will pipeline large buffer xfers
+                * between cpus over a pipe.
+                *
+                * For synchronous communications a hit means doing a
+                * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
+                * where as miss requiring a tsleep/wakeup sequence
+                * will take 7uS or more.
                 */
-               size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
-               if (size)
+               if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
                        continue;
 
+#if defined(SMP) && defined(_RDTSC_SUPPORTED_)
+               if (pipe_delay) {
+                       int64_t tsc_target;
+                       int good = 0;
+
+                       tsc_target = tsc_get_target(pipe_delay);
+                       while (tsc_test_target(tsc_target) == 0) {
+                               if (rpipe->pipe_buffer.windex !=
+                                   rpipe->pipe_buffer.rindex) {
+                                       good = 1;
+                                       break;
+                               }
+                       }
+                       if (good)
+                               continue;
+               }
+#endif
+
                /*
                 * Detect EOF condition, do not set error.
                 */
                if (rpipe->pipe_state & PIPE_REOF)
                        break;
 
-#ifdef SMP
-               /*
-                * Gravy train if SMP box.  This saves a ton of IPIs and
-                * allows two cpus to operate in lockstep.
-                *
-                * XXX check pipe_wip also?
-                */
-               DELAY(1);
-               size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
-               if (size)
-                       continue;
-#endif
-
                /*
                 * Break if some data was read, or if this was a non-blocking
                 * read.
@@ -583,6 +608,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                tsleep_interlock(rpipe);
                lwkt_reltoken(&wlock);
                error = tsleep(rpipe, PCATCH, "piperd", 0);
+               ++pipe_rblocked_count;
                if (error)
                        break;
        }
@@ -700,10 +726,11 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
                    (pipe_nbig < pipe_maxbig) &&
                    (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
-                       if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) {
+                       atomic_add_int(&pipe_nbig, 1);
+                       if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
                                ++pipe_bigcount;
-                               pipe_nbig++;
-                       }
+                       else
+                               atomic_subtract_int(&pipe_nbig, 1);
                }
                lwkt_reltoken(&rlock);
        }
@@ -774,31 +801,25 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                         * NOTE: We can't clear WANTR here without acquiring
                         * the rlock, which we don't want to do here!
                         */
-                       if (wpipe->pipe_state & PIPE_WANTR)
+                       if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1)
                                wakeup(wpipe);
 
                        /*
-                        * Transfer first segment
+                        * Transfer segment, which may include a wrap-around.
+                        * Update windex to account for both all in one go
+                        * so the reader can read() the data atomically.
                         */
                        error = uiomove(&wpipe->pipe_buffer.buffer[windex],
                                        segsize, uio);
-                       cpu_mfence();
-                       wpipe->pipe_buffer.windex += segsize;
-
                        if (error == 0 && segsize < space) {
-                               /*
-                                * Transfer remaining part now, to
-                                * support atomic writes.  Wraparound
-                                * happened.
-                                */
                                segsize = space - segsize;
                                error = uiomove(&wpipe->pipe_buffer.buffer[0],
                                                segsize, uio);
-                               cpu_mfence();
-                               wpipe->pipe_buffer.windex += segsize;
                        }
                        if (error)
                                break;
+                       cpu_mfence();
+                       wpipe->pipe_buffer.windex += space;
                        wcount += space;
                        continue;
                }
@@ -851,6 +872,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        ++wpipe->pipe_wantwcnt;
                        wpipe->pipe_state |= PIPE_WANTW;
                        error = tsleep(wpipe, PCATCH, "pipewr", 0);
+                       ++pipe_wblocked_count;
                }
                lwkt_reltoken(&rlock);
 
@@ -919,11 +941,17 @@ int
 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
 {
        struct pipe *mpipe;
+       lwkt_tokref rlock;
+       lwkt_tokref wlock;
        int error;
+       int mpsave;
 
-       get_mplock();
+       pipe_get_mplock(&mpsave);
        mpipe = (struct pipe *)fp->f_data;
 
+       lwkt_gettoken(&rlock, &mpipe->pipe_rlock);
+       lwkt_gettoken(&wlock, &mpipe->pipe_wlock);
+
        switch (cmd) {
        case FIOASYNC:
                if (*(int *)data) {
@@ -939,7 +967,9 @@ pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
                error = 0;
                break;
        case FIOSETOWN:
+               get_mplock();
                error = fsetown(*(int *)data, &mpipe->pipe_sigio);
+               rel_mplock();
                break;
        case FIOGETOWN:
                *(int *)data = fgetown(mpipe->pipe_sigio);
@@ -947,7 +977,9 @@ pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
                break;
        case TIOCSPGRP:
                /* This is deprecated, FIOSETOWN should be used instead. */
+               get_mplock();
                error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
+               rel_mplock();
                break;
 
        case TIOCGPGRP:
@@ -959,7 +991,10 @@ pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
                error = ENOTTY;
                break;
        }
-       rel_mplock();
+       lwkt_reltoken(&rlock);
+       lwkt_reltoken(&wlock);
+       pipe_rel_mplock(&mpsave);
+
        return (error);
 }
 
@@ -1142,7 +1177,7 @@ pipe_free_kmem(struct pipe *cpipe)
 {
        if (cpipe->pipe_buffer.buffer != NULL) {
                if (cpipe->pipe_buffer.size > PIPE_SIZE)
-                       --pipe_nbig;
+                       atomic_subtract_int(&pipe_nbig, 1);
                kmem_free(&kernel_map,
                        (vm_offset_t)cpipe->pipe_buffer.buffer,
                        cpipe->pipe_buffer.size);