kernel - Reoptimize sys_pipe
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 14 Oct 2017 04:26:30 +0000 (21:26 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 16 Oct 2017 18:30:23 +0000 (11:30 -0700)
* Use atomic ops for state updates, allowing us to avoid acquiring
  the other side's token.  This removes all remaining contention.

* Performance boosted by around 35%.  On the ryzen, bulk buffer
  write->read tests between localized cpu cores went from 9.2 GB/sec
  to around 13 GBytes/sec.  Cross-die performance increased from
  2.5 GB/sec to around 4.5 GB/sec (gigabytes/sec).

  1-byte ping-ponging (write-1/read-1/turn-around/write-back-1/
  read-back1) fell from 1.0-2.0uS to 0.7uS to 1.7uS.

* Add kern.pipe.size, allowing the kernel pipe buffer size to be
  changed (effects new pipes only).  The default buffer size has
  been increased to 32KB (it was 16KB).

* Refactor pipelining optimizations, further reducing unnecessary
  tsleep/wakeup IPIs.

* Improve kern.pipe.delay operation (an IPI avoidance mechanism),
  and reduce from 5uS to 4uS.

  Also add cpu_pause() in the TSC loop (suggested-by mjg_).

sys/kern/sys_pipe.c

index 839f1a3..0a1fa2f 100644 (file)
@@ -115,12 +115,31 @@ static struct pipegdlock *pipe_gdlocks;
 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
+
+/*
+ * The pipe buffer size can be changed at any time.  Only new pipe()s
+ * are affected.  Note that due to cpu cache effects, you do not want
+ * to make this value too large.
+ */
 static int pipe_size = 32768;
 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
-static int pipe_delay = 5000;  /* 5uS default */
+
+/*
+ * Reader/writer delay loop.  When the reader exhausts the pipe buffer
+ * or the write completely fills the pipe buffer and would otherwise sleep,
+ * it first busy-loops for a few microseconds waiting for data or buffer
+ * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
+ * and also helps when the user program uses a large data buffer in its
+ * UIOs.
+ *
+ * This defaults to 4uS.
+ */
+#ifdef _RDTSC_SUPPORTED_
+static int pipe_delay = 4000;  /* 4uS default */
 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
+#endif
 
 /*
  * Auto-size pipe cache to reduce kmem allocations and frees.
@@ -150,6 +169,30 @@ static void pipeclose (struct pipe *pipe,
 static void pipe_free_kmem (struct pipebuf *buf);
 static int pipe_create (struct pipe **pipep);
 
+/*
+ * Test and clear the specified flag, wakeup(pb) if it was set.
+ * This function must also act as a memory barrier.
+ */
+static __inline void
+pipesignal(struct pipebuf *pb, uint32_t flags)
+{
+       uint32_t oflags;
+       uint32_t nflags;
+
+       for (;;) {
+               oflags = pb->state;
+               cpu_ccfence();
+               nflags = oflags & ~flags;
+               if (atomic_cmpset_int(&pb->state, oflags, nflags))
+                       break;
+       }
+       if (oflags & flags)
+               wakeup(pb);
+}
+
+/*
+ *
+ */
 static __inline void
 pipewakeup(struct pipebuf *pb, int dosigio)
 {
@@ -168,7 +211,7 @@ pipewakeup(struct pipebuf *pb, int dosigio)
  * We use these routines to serialize reads against other reads
  * and writes against other writes.
  *
- * The read token is held on entry so *ipp does not race.
+ * The appropriate token is held on entry so *ipp does not race.
  */
 static __inline int
 pipe_start_uio(int *ipp)
@@ -432,8 +475,10 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
        /*
         * 'quick' NBIO test before things get expensive.
         */
-       if (nbio && rpb->rindex == rpb->windex)
+       if (nbio && rpb->rindex == rpb->windex &&
+           (rpb->state & PIPE_REOF) == 0) {
                return EAGAIN;
+       }
 
        /*
         * Reads are serialized.  Note however that buffer.buffer and
@@ -465,6 +510,10 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        }
                }
 
+               /*
+                * lfence required to avoid read-reordering of buffer
+                * contents prior to validation of size.
+                */
                size = rpb->windex - rpb->rindex;
                cpu_lfence();
                if (size) {
@@ -474,50 +523,40 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                                nsize = rpb->size - rindex;
                        nsize = szmin(nsize, uio->uio_resid);
 
+                       /*
+                        * Limit how much we move in one go so we have a
+                        * chance to kick the writer while data is still
+                        * available in the pipe.  This avoids getting into
+                        * a ping-pong with the writer.
+                        */
+                       if (nsize > (rpb->size >> 1))
+                               nsize = rpb->size >> 1;
+
                        error = uiomove(&rpb->buffer[rindex], nsize, uio);
                        if (error)
                                break;
-                       cpu_mfence();
                        rpb->rindex += nsize;
                        nread += nsize;
 
                        /*
                         * If the FIFO is still over half full just continue
-                        * and do not try to notify the writer yet.
+                        * and do not try to notify the writer yet.  If
+                        * less than half full notify any waiting writer.
                         */
-                       if (size - nsize >= (rpb->size >> 1)) {
+                       if (size - nsize > (rpb->size >> 1)) {
                                notify_writer = 0;
-                               continue;
+                       } else {
+                               notify_writer = 1;
+                               pipesignal(rpb, PIPE_WANTW);
                        }
-
-                       /*
-                        * When the FIFO is less then half full notify any
-                        * waiting writer.  WANTW can be checked while
-                        * holding just the rlock.
-                        */
-                       notify_writer = 1;
-                       if ((rpb->state & PIPE_WANTW) == 0)
-                               continue;
+                       continue;
                }
 
                /*
                 * If the "write-side" was blocked we wake it up.  This code
-                * is reached either when the buffer is completely emptied
-                * or if it becomes more then half-empty.
-                *
-                * Pipe_state can only be modified if both the rlock and
-                * wlock are held.
+                * is reached when the buffer is completely emptied.
                 */
-               if (rpb->state & PIPE_WANTW) {
-                       lwkt_gettoken(&rpb->wlock);
-                       if (rpb->state & PIPE_WANTW) {
-                               rpb->state &= ~PIPE_WANTW;
-                               lwkt_reltoken(&rpb->wlock);
-                               wakeup(rpb);
-                       } else {
-                               lwkt_reltoken(&rpb->wlock);
-                       }
-               }
+               pipesignal(rpb, PIPE_WANTW);
 
                /*
                 * Pick up our copy loop again if the writer sent data to
@@ -545,10 +584,12 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 
                        tsc_target = tsc_get_target(pipe_delay);
                        while (tsc_test_target(tsc_target) == 0) {
+                               cpu_lfence();
                                if (rpb->windex != rpb->rindex) {
                                        good = 1;
                                        break;
                                }
+                               cpu_pause();
                        }
                        if (good)
                                continue;
@@ -574,52 +615,27 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                }
 
                /*
-                * Last chance, interlock with WANTR.
+                * Last chance, interlock with WANTR
                 */
-               lwkt_gettoken(&rpb->wlock);
-               size = rpb->windex - rpb->rindex;
-               if (size) {
-                       lwkt_reltoken(&rpb->wlock);
-                       continue;
-               }
+               tsleep_interlock(rpb, PCATCH);
+               atomic_set_int(&rpb->state, PIPE_WANTR);
 
                /*
-                * Retest EOF - acquiring a new token can temporarily release
-                * tokens already held.
+                * Retest bytes available after memory barrier above.
                 */
-               if (rpb->state & PIPE_REOF) {
-                       lwkt_reltoken(&rpb->wlock);
-                       break;
-               }
+               size = rpb->windex - rpb->rindex;
+               if (size)
+                       continue;
 
                /*
-                * If there is no more to read in the pipe, reset its
-                * pointers to the beginning.  This improves cache hit
-                * stats.
-                *
-                * We need both locks to modify both pointers, and there
-                * must also not be a write in progress or the uiomove()
-                * in the write might block and temporarily release
-                * its wlock, then reacquire and update windex.  We are
-                * only serialized against reads, not writes.
-                *
-                * XXX should we even bother resetting the indices?  It
-                *     might actually be more cache efficient not to.
+                * Retest EOF after memory barrier above.
                 */
-               if (rpb->rindex == rpb->windex && rpb->wip == 0) {
-                       rpb->rindex = 0;
-                       rpb->windex = 0;
-               }
+               if (rpb->state & PIPE_REOF)
+                       break;
 
                /*
-                * Wait for more data.
-                *
-                * Pipe_state can only be set if both the rlock and wlock
-                * are held.
+                * Wait for more data or state change
                 */
-               rpb->state |= PIPE_WANTR;
-               tsleep_interlock(rpb, PCATCH);
-               lwkt_reltoken(&rpb->wlock);
                error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
                if (error)
                        break;
@@ -644,16 +660,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                /*
                 * Synchronous blocking is done on the pipe involved
                 */
-               if (rpb->state & PIPE_WANTW) {
-                       lwkt_gettoken(&rpb->wlock);
-                       if (rpb->state & PIPE_WANTW) {
-                               rpb->state &= ~PIPE_WANTW;
-                               lwkt_reltoken(&rpb->wlock);
-                               wakeup(rpb);
-                       } else {
-                               lwkt_reltoken(&rpb->wlock);
-                       }
-               }
+               pipesignal(rpb, PIPE_WANTW);
 
                /*
                 * But we may also have to deal with a kqueue which is
@@ -661,9 +668,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * EVFILT_WRITE event waiting for our side to drain will
                 * be on the other side.
                 */
-               lwkt_gettoken(&wpb->wlock);
                pipewakeup(wpb, 0);
-               lwkt_reltoken(&wpb->wlock);
        }
        /*size = rpb->windex - rpb->rindex;*/
        lwkt_reltoken(&rpb->rlock);
@@ -767,9 +772,10 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 
                windex = wpb->windex & (wpb->size - 1);
                space = wpb->size - (wpb->windex - wpb->rindex);
-               cpu_lfence();
 
-               /* Writes of size <= PIPE_BUF must be atomic. */
+               /*
+                * Writes of size <= PIPE_BUF must be atomic.
+                */
                if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
                        space = 0;
 
@@ -782,14 +788,10 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        size_t segsize;
 
                        /*
-                        * Transfer size is minimum of uio transfer
-                        * and free space in pipe buffer.
-                        *
-                        * Limit each uiocopy to no more then wpb->size
-                        * so we can keep the gravy train going on a
-                        * SMP box.  This significantly increases write
-                        * performance.  Otherwise large writes wind up doing
-                        * an inefficient synchronous ping-pong.
+                        * We want to notify a potentially waiting reader
+                        * before we exhaust the write buffer for SMP
+                        * pipelining.  Otherwise the write/read will begin
+                        * to ping-pong.
                         */
                        space = szmin(space, uio->uio_resid);
                        if (space > (wpb->size >> 1))
@@ -814,12 +816,9 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                         * on the reader side is the fastest way to get the
                         * reader going.  (The scheduler will hard loop on
                         * lock tokens).
-                        *
-                        * NOTE: We can't clear WANTR here without acquiring
-                        * the rlock, which we don't want to do here!
                         */
-                       if ((wpb->state & PIPE_WANTR))
-                               wakeup(wpb);
+                       if (wcount == 0)
+                               pipesignal(wpb, PIPE_WANTR);
 
                        /*
                         * Transfer segment, which may include a wrap-around.
@@ -833,56 +832,78 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        }
                        if (error)
                                break;
-                       cpu_mfence();
+
+                       /*
+                        * Memory fence prior to windex updating (note: not
+                        * needed so this is a NOP on Intel).
+                        */
+                       cpu_sfence();
                        wpb->windex += space;
+
+                       /*
+                        * Signal reader
+                        */
+                       if (wcount != 0)
+                               pipesignal(wpb, PIPE_WANTR);
                        wcount += space;
                        continue;
                }
 
                /*
-                * We need both the rlock and the wlock to interlock against
-                * the EOF, WANTW, and size checks, and to modify pipe_state.
-                *
-                * These are token locks so we do not have to worry about
-                * deadlocks.
+                * Wakeup any pending reader
                 */
-               lwkt_gettoken(&wpb->rlock);
-
-               /*
-                * If the "read-side" has been blocked, wake it up now
-                * and yield to let it drain synchronously rather
-                * then block.
-                */
-               if (wpb->state & PIPE_WANTR) {
-                       wpb->state &= ~PIPE_WANTR;
-                       wakeup(wpb);
-               }
+               pipesignal(wpb, PIPE_WANTR);
 
                /*
                 * don't block on non-blocking I/O
                 */
                if (nbio) {
-                       lwkt_reltoken(&wpb->rlock);
                        error = EAGAIN;
                        break;
                }
 
+#ifdef _RDTSC_SUPPORTED_
+               if (pipe_delay) {
+                       int64_t tsc_target;
+                       int good = 0;
+
+                       tsc_target = tsc_get_target(pipe_delay);
+                       while (tsc_test_target(tsc_target) == 0) {
+                               cpu_lfence();
+                               space = wpb->size - (wpb->windex - wpb->rindex);
+                               if ((space < uio->uio_resid) &&
+                                   (orig_resid <= PIPE_BUF)) {
+                                       space = 0;
+                               }
+                               if (space) {
+                                       good = 1;
+                                       break;
+                               }
+                               cpu_pause();
+                       }
+                       if (good)
+                               continue;
+               }
+#endif
+
+               /*
+                * Interlocked test.   Atomic op enforces the memory barrier.
+                */
+               tsleep_interlock(wpb, PCATCH);
+               atomic_set_int(&wpb->state, PIPE_WANTW);
+
                /*
-                * re-test whether we have to block in the writer after
-                * acquiring both locks, in case the reader opened up
-                * some space.
+                * Retest space available after memory barrier above.
+                * Writes of size <= PIPE_BUF must be atomic.
                 */
                space = wpb->size - (wpb->windex - wpb->rindex);
-               cpu_lfence();
                if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
                        space = 0;
 
                /*
-                * Retest EOF - acquiring a new token can temporarily release
-                * tokens already held.
+                * Retest EOF after memory barrier above.
                 */
                if (wpb->state & PIPE_WEOF) {
-                       lwkt_reltoken(&wpb->rlock);
                        error = EPIPE;
                        break;
                }
@@ -892,12 +913,9 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * wake up select/poll/kq.
                 */
                if (space == 0) {
-                       wpb->state |= PIPE_WANTW;
                        pipewakeup(wpb, 1);
-                       if (wpb->state & PIPE_WANTW)
-                               error = tsleep(wpb, PCATCH, "pipewr", 0);
+                       error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
                }
-               lwkt_reltoken(&wpb->rlock);
 
                /*
                 * Break out if we errored or the read side wants us to go
@@ -919,19 +937,8 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
         * Both rlock and wlock are required to be able to modify pipe_state.
         */
        if (wpb->windex != wpb->rindex) {
-               if (wpb->state & PIPE_WANTR) {
-                       lwkt_gettoken(&wpb->rlock);
-                       if (wpb->state & PIPE_WANTR) {
-                               wpb->state &= ~PIPE_WANTR;
-                               lwkt_reltoken(&wpb->rlock);
-                               wakeup(wpb);
-                       } else {
-                               lwkt_reltoken(&wpb->rlock);
-                       }
-               }
-               lwkt_gettoken(&wpb->rlock);
+               pipesignal(wpb, PIPE_WANTR);
                pipewakeup(wpb, 1);
-               lwkt_reltoken(&wpb->rlock);
        }
 
        /*
@@ -980,9 +987,9 @@ pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
        switch (cmd) {
        case FIOASYNC:
                if (*(int *)data) {
-                       rpb->state |= PIPE_ASYNC;
+                       atomic_set_int(&rpb->state, PIPE_ASYNC);
                } else {
-                       rpb->state &= ~PIPE_ASYNC;
+                       atomic_clear_int(&rpb->state, PIPE_ASYNC);
                }
                error = 0;
                break;
@@ -1106,8 +1113,10 @@ pipe_shutdown(struct file *fp, int how)
        switch(how) {
        case SHUT_RDWR:
        case SHUT_RD:
-               rpb->state |= PIPE_REOF;                /* my reads */
-               rpb->state |= PIPE_WEOF;                /* peer writes */
+               /*
+                * EOF on my reads and peer writes
+                */
+               atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
                if (rpb->state & PIPE_WANTR) {
                        rpb->state &= ~PIPE_WANTR;
                        wakeup(rpb);
@@ -1121,8 +1130,10 @@ pipe_shutdown(struct file *fp, int how)
                        break;
                /* fall through */
        case SHUT_WR:
-               wpb->state |= PIPE_REOF;                /* peer reads */
-               wpb->state |= PIPE_WEOF;                /* my writes */
+               /*
+                * EOF on peer reads and my writes
+                */
+               atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
                if (wpb->state & PIPE_WANTR) {
                        wpb->state &= ~PIPE_WANTR;
                        wakeup(wpb);
@@ -1188,7 +1199,7 @@ pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
                return;
        }
 
-       rpb->state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
+       atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
        pipewakeup(rpb, 1);
        if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
                rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
@@ -1203,7 +1214,7 @@ pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
        lwkt_gettoken(&wpb->rlock);
        lwkt_gettoken(&wpb->wlock);
 
-       wpb->state |= PIPE_REOF | PIPE_WEOF;
+       atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
        pipewakeup(wpb, 1);
        if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
                wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);