kernel - Refactor sys_pipe
authorMatthew Dillon <dillon@apollo.backplane.com>
Sat, 14 Oct 2017 00:55:41 +0000 (17:55 -0700)
committerMatthew Dillon <dillon@apollo.backplane.com>
Mon, 16 Oct 2017 18:30:22 +0000 (11:30 -0700)
* Refactor the pipe code in preparation for optimization.  Get rid of
  the dual-pipe structure and instead have one pipe structure with
  two buffers.

* Scrap a ton of global statistics variables that nobody uses any more,
  get rid of pipe_peer, and get rid of the slock.

sys/kern/sys_pipe.c
sys/sys/pipe.h

index ac2c0c7..839f1a3 100644 (file)
@@ -1,6 +1,10 @@
 /*
  * Copyright (c) 1996 John S. Dyson
  * All rights reserved.
+ * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
+ *
+ * This code is derived from software contributed to The DragonFly Project
+ * by Matthew Dillon <dillon@backplane.com>
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -15,8 +19,6 @@
  *    John S. Dyson.
  * 4. Modifications may be freely made to this file if the above conditions
  *    are met.
- *
- * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
  */
 
 /*
 
 #include <machine/cpufunc.h>
 
+struct pipegdlock {
+       struct mtx      mtx;
+} __cachealign;
+
 /*
  * interfaces to the outside world
  */
@@ -101,53 +107,20 @@ static struct filterops pipe_wfiltops =
 
 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
 
-/*
- * Default pipe buffer size(s), this can be kind-of large now because pipe
- * space is pageable.  The pipe code will try to maintain locality of
- * reference for performance reasons, so small amounts of outstanding I/O
- * will not wipe the cache.
- */
-#define MINPIPESIZE (PIPE_SIZE/3)
-#define MAXPIPESIZE (2*PIPE_SIZE/3)
-
-/*
- * Limit the number of "big" pipes
- */
-#define LIMITBIGPIPES  64
 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
 
-static int pipe_maxbig = LIMITBIGPIPES;
 static int pipe_maxcache = PIPEQ_MAX_CACHE;
-static int pipe_bigcount;
-static int pipe_nbig;
-static int pipe_bcache_alloc;
-static int pipe_bkmem_alloc;
-static int pipe_rblocked_count;
-static int pipe_wblocked_count;
-static struct mtx *pipe_gdlocks;
+static struct pipegdlock *pipe_gdlocks;
 
 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
-SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
-        CTLFLAG_RD, &pipe_nbig, 0, "number of big pipes allocated");
-SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
-        CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
-SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
-        CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
-SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
-        CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
-SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
-        CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
+static int pipe_size = 32768;
+SYSCTL_INT(_kern_pipe, OID_AUTO, size,
+        CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
 static int pipe_delay = 5000;  /* 5uS default */
 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
-#if !defined(NO_PIPE_SYSCTL_STATS)
-SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
-        CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
-SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
-        CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
-#endif
 
 /*
  * Auto-size pipe cache to reduce kmem allocations and frees.
@@ -159,12 +132,6 @@ pipeinit(void *dummy)
        size_t mbytes = kmem_lim_size();
        int n;
 
-       if (pipe_maxbig == LIMITBIGPIPES) {
-               if (mbytes >= 7 * 1024)
-                       pipe_maxbig *= 2;
-               if (mbytes >= 15 * 1024)
-                       pipe_maxbig *= 2;
-       }
        if (pipe_maxcache == PIPEQ_MAX_CACHE) {
                if (mbytes >= 7 * 1024)
                        pipe_maxcache *= 2;
@@ -174,24 +141,24 @@ pipeinit(void *dummy)
        pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
                             M_PIPE, M_WAITOK | M_ZERO);
        for (n = 0; n < ncpus; ++n)
-               mtx_init(&pipe_gdlocks[n], "pipekm");
+               mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
 }
 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
 
-static void pipeclose (struct pipe *cpipe);
-static void pipe_free_kmem (struct pipe *cpipe);
-static int pipe_create (struct pipe **cpipep);
-static int pipespace (struct pipe *cpipe, int size);
+static void pipeclose (struct pipe *pipe,
+               struct pipebuf *pbr, struct pipebuf *pbw);
+static void pipe_free_kmem (struct pipebuf *buf);
+static int pipe_create (struct pipe **pipep);
 
 static __inline void
-pipewakeup(struct pipe *cpipe, int dosigio)
+pipewakeup(struct pipebuf *pb, int dosigio)
 {
-       if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
+       if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
                lwkt_gettoken(&sigio_token);
-               pgsigio(cpipe->pipe_sigio, SIGIO, 0);
+               pgsigio(pb->sigio, SIGIO, 0);
                lwkt_reltoken(&sigio_token);
        }
-       KNOTE(&cpipe->pipe_kq.ki_note, 0);
+       KNOTE(&pb->kq.ki_note, 0);
 }
 
 /*
@@ -204,7 +171,7 @@ pipewakeup(struct pipe *cpipe, int dosigio)
  * The read token is held on entry so *ipp does not race.
  */
 static __inline int
-pipe_start_uio(struct pipe *cpipe, int *ipp)
+pipe_start_uio(int *ipp)
 {
        int error;
 
@@ -219,7 +186,7 @@ pipe_start_uio(struct pipe *cpipe, int *ipp)
 }
 
 static __inline void
-pipe_end_uio(struct pipe *cpipe, int *ipp)
+pipe_end_uio(int *ipp)
 {
        if (*ipp < 0) {
                *ipp = 0;
@@ -255,20 +222,20 @@ kern_pipe(long *fds, int flags)
        struct thread *td = curthread;
        struct filedesc *fdp = td->td_proc->p_fd;
        struct file *rf, *wf;
-       struct pipe *rpipe, *wpipe;
+       struct pipe *pipe;
        int fd1, fd2, error;
 
-       rpipe = wpipe = NULL;
-       if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
-               pipeclose(rpipe); 
-               pipeclose(wpipe); 
+       pipe = NULL;
+       if (pipe_create(&pipe)) {
+               pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
+               pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
                return (ENFILE);
        }
        
        error = falloc(td->td_lwp, &rf, &fd1);
        if (error) {
-               pipeclose(rpipe);
-               pipeclose(wpipe);
+               pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
+               pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
                return (error);
        }
        fds[0] = fd1;
@@ -282,7 +249,7 @@ kern_pipe(long *fds, int flags)
        rf->f_type = DTYPE_PIPE;
        rf->f_flag = FREAD | FWRITE;
        rf->f_ops = &pipeops;
-       rf->f_data = rpipe;
+       rf->f_data = (void *)((intptr_t)pipe | 0);
        if (flags & O_NONBLOCK)
                rf->f_flag |= O_NONBLOCK;
        if (flags & O_CLOEXEC)
@@ -292,14 +259,15 @@ kern_pipe(long *fds, int flags)
        if (error) {
                fsetfd(fdp, NULL, fd1);
                fdrop(rf);
-               /* rpipe has been closed by fdrop(). */
-               pipeclose(wpipe);
+               /* pipeA has been closed by fdrop() */
+               /* close pipeB here */
+               pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
                return (error);
        }
        wf->f_type = DTYPE_PIPE;
        wf->f_flag = FREAD | FWRITE;
        wf->f_ops = &pipeops;
-       wf->f_data = wpipe;
+       wf->f_data = (void *)((intptr_t)pipe | 1);
        if (flags & O_NONBLOCK)
                wf->f_flag |= O_NONBLOCK;
        if (flags & O_CLOEXEC)
@@ -307,13 +275,6 @@ kern_pipe(long *fds, int flags)
 
        fds[1] = fd2;
 
-       rpipe->pipe_slock = kmalloc(sizeof(struct lock),
-                                   M_PIPE, M_WAITOK|M_ZERO);
-       wpipe->pipe_slock = rpipe->pipe_slock;
-       rpipe->pipe_peer = wpipe;
-       wpipe->pipe_peer = rpipe;
-       lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
-
        /*
         * Once activated the peer relationship remains valid until
         * both sides are closed.
@@ -327,20 +288,29 @@ kern_pipe(long *fds, int flags)
 }
 
 /*
- * Allocate kva for pipe circular buffer, the space is pageable
- * This routine will 'realloc' the size of a pipe safely, if it fails
- * it will retain the old buffer.
- * If it fails it will return ENOMEM.
+ * [re]allocates KVA for the pipe's circular buffer.  The space is
+ * pageable.  Called twice to setup full-duplex communications.
+ *
+ * NOTE: Independent vm_object's are used to improve performance.
+ *
+ * Returns 0 on success, ENOMEM on failure.
  */
 static int
-pipespace(struct pipe *cpipe, int size)
+pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
 {
        struct vm_object *object;
        caddr_t buffer;
-       int npages, error;
+       vm_pindex_t npages;
+       int error;
+
+       size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
+       if (size < 16384)
+               size = 16384;
+       if (size > 1024*1024)
+               size = 1024*1024;
 
        npages = round_page(size) / PAGE_SIZE;
-       object = cpipe->pipe_buffer.object;
+       object = pb->object;
 
        /*
         * [re]create the object if necessary and reserve space for it
@@ -362,77 +332,94 @@ pipespace(struct pipe *cpipe, int size)
                        vm_object_deallocate(object);
                        return (ENOMEM);
                }
-               pipe_free_kmem(cpipe);
-               cpipe->pipe_buffer.object = object;
-               cpipe->pipe_buffer.buffer = buffer;
-               cpipe->pipe_buffer.size = size;
-               ++pipe_bkmem_alloc;
-       } else {
-               ++pipe_bcache_alloc;
+               pipe_free_kmem(pb);
+               pb->object = object;
+               pb->buffer = buffer;
+               pb->size = size;
        }
-       cpipe->pipe_buffer.rindex = 0;
-       cpipe->pipe_buffer.windex = 0;
+       pb->rindex = 0;
+       pb->windex = 0;
+
        return (0);
 }
 
 /*
  * Initialize and allocate VM and memory for pipe, pulling the pipe from
- * our per-cpu cache if possible.  For now make sure it is sized for the
- * smaller PIPE_SIZE default.
+ * our per-cpu cache if possible.
+ *
+ * Returns 0 on success, else an error code (typically ENOMEM).  Caller
+ * must still deallocate the pipe on failure.
  */
 static int
-pipe_create(struct pipe **cpipep)
+pipe_create(struct pipe **pipep)
 {
        globaldata_t gd = mycpu;
-       struct pipe *cpipe;
+       struct pipe *pipe;
        int error;
 
-       if ((cpipe = gd->gd_pipeq) != NULL) {
-               gd->gd_pipeq = cpipe->pipe_peer;
+       if ((pipe = gd->gd_pipeq) != NULL) {
+               gd->gd_pipeq = pipe->next;
                --gd->gd_pipeqcount;
-               cpipe->pipe_peer = NULL;
-               cpipe->pipe_wantwcnt = 0;
+               pipe->next = NULL;
        } else {
-               cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
+               pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
+               lwkt_token_init(&pipe->bufferA.rlock, "piper");
+               lwkt_token_init(&pipe->bufferA.wlock, "pipew");
+               lwkt_token_init(&pipe->bufferB.rlock, "piper");
+               lwkt_token_init(&pipe->bufferB.wlock, "pipew");
        }
-       *cpipep = cpipe;
-       if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
+       *pipep = pipe;
+       if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
                return (error);
-       vfs_timestamp(&cpipe->pipe_ctime);
-       cpipe->pipe_atime = cpipe->pipe_ctime;
-       cpipe->pipe_mtime = cpipe->pipe_ctime;
-       lwkt_token_init(&cpipe->pipe_rlock, "piper");
-       lwkt_token_init(&cpipe->pipe_wlock, "pipew");
+       }
+       if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
+               return (error);
+       }
+       vfs_timestamp(&pipe->ctime);
+       pipe->bufferA.atime = pipe->ctime;
+       pipe->bufferA.mtime = pipe->ctime;
+       pipe->bufferB.atime = pipe->ctime;
+       pipe->bufferB.mtime = pipe->ctime;
+       pipe->open_count = 2;
+
        return (0);
 }
 
+/*
+ * Read data from a pipe
+ */
 static int
 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 {
-       struct pipe *rpipe;
-       struct pipe *wpipe;
-       int error;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
        size_t nread = 0;
-       int nbio;
-       u_int size;     /* total bytes available */
-       u_int nsize;    /* total bytes to read */
-       u_int rindex;   /* contiguous bytes available */
+       size_t size;    /* total bytes available */
+       size_t nsize;   /* total bytes to read */
+       size_t rindex;  /* contiguous bytes available */
        int notify_writer;
        int bigread;
        int bigcount;
+       int error;
+       int nbio;
 
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
        atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
 
        if (uio->uio_resid == 0)
                return(0);
 
        /*
-        * Setup locks, calculate nbio
+        * Calculate nbio
         */
-       rpipe = (struct pipe *)fp->f_data;
-       wpipe = rpipe->pipe_peer;
-       lwkt_gettoken(&rpipe->pipe_rlock);
-
        if (fflags & O_FBLOCKING)
                nbio = 0;
        else if (fflags & O_FNONBLOCKING)
@@ -443,14 +430,21 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                nbio = 0;
 
        /*
-        * Reads are serialized.  Note however that pipe_buffer.buffer and
-        * pipe_buffer.size can change out from under us when the number
+        * 'quick' NBIO test before things get expensive.
+        */
+       if (nbio && rpb->rindex == rpb->windex)
+               return EAGAIN;
+
+       /*
+        * Reads are serialized.  Note however that buffer.buffer and
+        * buffer.size can change out from under us when the number
         * of bytes in the buffer are zero due to the write-side doing a
         * pipespace().
         */
-       error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
+       lwkt_gettoken(&rpb->rlock);
+       error = pipe_start_uio(&rpb->rip);
        if (error) {
-               lwkt_reltoken(&rpipe->pipe_rlock);
+               lwkt_reltoken(&rpb->rlock);
                return (error);
        }
        notify_writer = 0;
@@ -471,29 +465,27 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        }
                }
 
-               size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
+               size = rpb->windex - rpb->rindex;
                cpu_lfence();
                if (size) {
-                       rindex = rpipe->pipe_buffer.rindex &
-                                (rpipe->pipe_buffer.size - 1);
+                       rindex = rpb->rindex & (rpb->size - 1);
                        nsize = size;
-                       if (nsize > rpipe->pipe_buffer.size - rindex)
-                               nsize = rpipe->pipe_buffer.size - rindex;
+                       if (nsize > rpb->size - rindex)
+                               nsize = rpb->size - rindex;
                        nsize = szmin(nsize, uio->uio_resid);
 
-                       error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
-                                       nsize, uio);
+                       error = uiomove(&rpb->buffer[rindex], nsize, uio);
                        if (error)
                                break;
                        cpu_mfence();
-                       rpipe->pipe_buffer.rindex += nsize;
+                       rpb->rindex += nsize;
                        nread += nsize;
 
                        /*
                         * If the FIFO is still over half full just continue
                         * and do not try to notify the writer yet.
                         */
-                       if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
+                       if (size - nsize >= (rpb->size >> 1)) {
                                notify_writer = 0;
                                continue;
                        }
@@ -504,7 +496,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                         * holding just the rlock.
                         */
                        notify_writer = 1;
-                       if ((rpipe->pipe_state & PIPE_WANTW) == 0)
+                       if ((rpb->state & PIPE_WANTW) == 0)
                                continue;
                }
 
@@ -516,14 +508,14 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * Pipe_state can only be modified if both the rlock and
                 * wlock are held.
                 */
-               if (rpipe->pipe_state & PIPE_WANTW) {
-                       lwkt_gettoken(&rpipe->pipe_wlock);
-                       if (rpipe->pipe_state & PIPE_WANTW) {
-                               rpipe->pipe_state &= ~PIPE_WANTW;
-                               lwkt_reltoken(&rpipe->pipe_wlock);
-                               wakeup(rpipe);
+               if (rpb->state & PIPE_WANTW) {
+                       lwkt_gettoken(&rpb->wlock);
+                       if (rpb->state & PIPE_WANTW) {
+                               rpb->state &= ~PIPE_WANTW;
+                               lwkt_reltoken(&rpb->wlock);
+                               wakeup(rpb);
                        } else {
-                               lwkt_reltoken(&rpipe->pipe_wlock);
+                               lwkt_reltoken(&rpb->wlock);
                        }
                }
 
@@ -543,7 +535,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * where as miss requiring a tsleep/wakeup sequence
                 * will take 7uS or more.
                 */
-               if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
+               if (rpb->windex != rpb->rindex)
                        continue;
 
 #ifdef _RDTSC_SUPPORTED_
@@ -553,8 +545,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 
                        tsc_target = tsc_get_target(pipe_delay);
                        while (tsc_test_target(tsc_target) == 0) {
-                               if (rpipe->pipe_buffer.windex !=
-                                   rpipe->pipe_buffer.rindex) {
+                               if (rpb->windex != rpb->rindex) {
                                        good = 1;
                                        break;
                                }
@@ -567,7 +558,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                /*
                 * Detect EOF condition, do not set error.
                 */
-               if (rpipe->pipe_state & PIPE_REOF)
+               if (rpb->state & PIPE_REOF)
                        break;
 
                /*
@@ -585,10 +576,10 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                /*
                 * Last chance, interlock with WANTR.
                 */
-               lwkt_gettoken(&rpipe->pipe_wlock);
-               size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
+               lwkt_gettoken(&rpb->wlock);
+               size = rpb->windex - rpb->rindex;
                if (size) {
-                       lwkt_reltoken(&rpipe->pipe_wlock);
+                       lwkt_reltoken(&rpb->wlock);
                        continue;
                }
 
@@ -596,8 +587,8 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * Retest EOF - acquiring a new token can temporarily release
                 * tokens already held.
                 */
-               if (rpipe->pipe_state & PIPE_REOF) {
-                       lwkt_reltoken(&rpipe->pipe_wlock);
+               if (rpb->state & PIPE_REOF) {
+                       lwkt_reltoken(&rpb->wlock);
                        break;
                }
 
@@ -615,10 +606,9 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * XXX should we even bother resetting the indices?  It
                 *     might actually be more cache efficient not to.
                 */
-               if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
-                   rpipe->pipe_wip == 0) {
-                       rpipe->pipe_buffer.rindex = 0;
-                       rpipe->pipe_buffer.windex = 0;
+               if (rpb->rindex == rpb->windex && rpb->wip == 0) {
+                       rpb->rindex = 0;
+                       rpb->windex = 0;
                }
 
                /*
@@ -627,21 +617,20 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * Pipe_state can only be set if both the rlock and wlock
                 * are held.
                 */
-               rpipe->pipe_state |= PIPE_WANTR;
-               tsleep_interlock(rpipe, PCATCH);
-               lwkt_reltoken(&rpipe->pipe_wlock);
-               error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
-               ++pipe_rblocked_count;
+               rpb->state |= PIPE_WANTR;
+               tsleep_interlock(rpb, PCATCH);
+               lwkt_reltoken(&rpb->wlock);
+               error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
                if (error)
                        break;
        }
-       pipe_end_uio(rpipe, &rpipe->pipe_rip);
+       pipe_end_uio(&rpb->rip);
 
        /*
         * Uptime last access time
         */
        if (error == 0 && nread)
-               vfs_timestamp(&rpipe->pipe_atime);
+               vfs_timestamp(&rpb->atime);
 
        /*
         * If we drained the FIFO more then half way then handle
@@ -655,14 +644,14 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                /*
                 * Synchronous blocking is done on the pipe involved
                 */
-               if (rpipe->pipe_state & PIPE_WANTW) {
-                       lwkt_gettoken(&rpipe->pipe_wlock);
-                       if (rpipe->pipe_state & PIPE_WANTW) {
-                               rpipe->pipe_state &= ~PIPE_WANTW;
-                               lwkt_reltoken(&rpipe->pipe_wlock);
-                               wakeup(rpipe);
+               if (rpb->state & PIPE_WANTW) {
+                       lwkt_gettoken(&rpb->wlock);
+                       if (rpb->state & PIPE_WANTW) {
+                               rpb->state &= ~PIPE_WANTW;
+                               lwkt_reltoken(&rpb->wlock);
+                               wakeup(rpb);
                        } else {
-                               lwkt_reltoken(&rpipe->pipe_wlock);
+                               lwkt_reltoken(&rpb->wlock);
                        }
                }
 
@@ -672,12 +661,12 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * EVFILT_WRITE event waiting for our side to drain will
                 * be on the other side.
                 */
-               lwkt_gettoken(&wpipe->pipe_wlock);
-               pipewakeup(wpipe, 0);
-               lwkt_reltoken(&wpipe->pipe_wlock);
+               lwkt_gettoken(&wpb->wlock);
+               pipewakeup(wpb, 0);
+               lwkt_reltoken(&wpb->wlock);
        }
-       /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
-       lwkt_reltoken(&rpipe->pipe_rlock);
+       /*size = rpb->windex - rpb->rindex;*/
+       lwkt_reltoken(&rpb->rlock);
 
        return (error);
 }
@@ -685,25 +674,53 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 static int
 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
 {
-       int error;
-       int orig_resid;
-       int nbio;
-       struct pipe *wpipe;
-       struct pipe *rpipe;
-       u_int windex;
-       u_int space;
-       u_int wcount;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
+       size_t windex;
+       size_t space;
+       size_t wcount;
+       size_t orig_resid;
        int bigwrite;
        int bigcount;
+       int error;
+       int nbio;
+
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
+
+       /*
+        * Calculate nbio
+        */
+       if (fflags & O_FBLOCKING)
+               nbio = 0;
+       else if (fflags & O_FNONBLOCKING)
+               nbio = 1;
+       else if (fp->f_flag & O_NONBLOCK)
+               nbio = 1;
+       else
+               nbio = 0;
+
+       /*
+        * 'quick' NBIO test before things get expensive.
+        */
+       if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
+           uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
+               return EAGAIN;
+       }
 
        /*
         * Writes go to the peer.  The peer will always exist.
         */
-       rpipe = (struct pipe *) fp->f_data;
-       wpipe = rpipe->pipe_peer;
-       lwkt_gettoken(&wpipe->pipe_wlock);
-       if (wpipe->pipe_state & PIPE_WEOF) {
-               lwkt_reltoken(&wpipe->pipe_wlock);
+       lwkt_gettoken(&wpb->wlock);
+       if (wpb->state & PIPE_WEOF) {
+               lwkt_reltoken(&wpb->wlock);
                return (EPIPE);
        }
 
@@ -711,52 +728,19 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
         * Degenerate case (EPIPE takes prec)
         */
        if (uio->uio_resid == 0) {
-               lwkt_reltoken(&wpipe->pipe_wlock);
+               lwkt_reltoken(&wpb->wlock);
                return(0);
        }
 
        /*
         * Writes are serialized (start_uio must be called with wlock)
         */
-       error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
+       error = pipe_start_uio(&wpb->wip);
        if (error) {
-               lwkt_reltoken(&wpipe->pipe_wlock);
+               lwkt_reltoken(&wpb->wlock);
                return (error);
        }
 
-       if (fflags & O_FBLOCKING)
-               nbio = 0;
-       else if (fflags & O_FNONBLOCKING)
-               nbio = 1;
-       else if (fp->f_flag & O_NONBLOCK)
-               nbio = 1;
-       else
-               nbio = 0;
-
-       /*
-        * If it is advantageous to resize the pipe buffer, do
-        * so.  We are write-serialized so we can block safely.
-        */
-       if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
-           (pipe_nbig < pipe_maxbig) &&
-           wpipe->pipe_wantwcnt > 4 &&
-           (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
-               /* 
-                * Recheck after lock.
-                */
-               lwkt_gettoken(&wpipe->pipe_rlock);
-               if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
-                   (pipe_nbig < pipe_maxbig) &&
-                   (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
-                       atomic_add_int(&pipe_nbig, 1);
-                       if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
-                               ++pipe_bigcount;
-                       else
-                               atomic_subtract_int(&pipe_nbig, 1);
-               }
-               lwkt_reltoken(&wpipe->pipe_rlock);
-       }
-
        orig_resid = uio->uio_resid;
        wcount = 0;
 
@@ -764,7 +748,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
        bigcount = 10;
 
        while (uio->uio_resid) {
-               if (wpipe->pipe_state & PIPE_WEOF) {
+               if (wpb->state & PIPE_WEOF) {
                        error = EPIPE;
                        break;
                }
@@ -781,10 +765,8 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                        }
                }
 
-               windex = wpipe->pipe_buffer.windex &
-                        (wpipe->pipe_buffer.size - 1);
-               space = wpipe->pipe_buffer.size -
-                       (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
+               windex = wpb->windex & (wpb->size - 1);
+               space = wpb->size - (wpb->windex - wpb->rindex);
                cpu_lfence();
 
                /* Writes of size <= PIPE_BUF must be atomic. */
@@ -797,22 +779,21 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * writes to spin.
                 */
                if (space > 0) {
-                       u_int segsize;
+                       size_t segsize;
 
                        /*
                         * Transfer size is minimum of uio transfer
                         * and free space in pipe buffer.
                         *
-                        * Limit each uiocopy to no more then PIPE_SIZE
+                        * Limit each uiocopy to no more then wpb->size
                         * so we can keep the gravy train going on a
-                        * SMP box.  This doubles the performance for
-                        * write sizes > 16K.  Otherwise large writes
-                        * wind up doing an inefficient synchronous
-                        * ping-pong.
+                        * SMP box.  This significantly increases write
+                        * performance.  Otherwise large writes wind up doing
+                        * an inefficient synchronous ping-pong.
                         */
                        space = szmin(space, uio->uio_resid);
-                       if (space > PIPE_SIZE)
-                               space = PIPE_SIZE;
+                       if (space > (wpb->size >> 1))
+                               space = (wpb->size >> 1);
 
                        /*
                         * First segment to transfer is minimum of
@@ -821,7 +802,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                         * is less than the transfer size, we've got
                         * a wraparound in the buffer.
                         */
-                       segsize = wpipe->pipe_buffer.size - windex;
+                       segsize = wpb->size - windex;
                        if (segsize > space)
                                segsize = space;
 
@@ -837,25 +818,23 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                         * NOTE: We can't clear WANTR here without acquiring
                         * the rlock, which we don't want to do here!
                         */
-                       if ((wpipe->pipe_state & PIPE_WANTR))
-                               wakeup(wpipe);
+                       if ((wpb->state & PIPE_WANTR))
+                               wakeup(wpb);
 
                        /*
                         * Transfer segment, which may include a wrap-around.
                         * Update windex to account for both all in one go
                         * so the reader can read() the data atomically.
                         */
-                       error = uiomove(&wpipe->pipe_buffer.buffer[windex],
-                                       segsize, uio);
+                       error = uiomove(&wpb->buffer[windex], segsize, uio);
                        if (error == 0 && segsize < space) {
                                segsize = space - segsize;
-                               error = uiomove(&wpipe->pipe_buffer.buffer[0],
-                                               segsize, uio);
+                               error = uiomove(&wpb->buffer[0], segsize, uio);
                        }
                        if (error)
                                break;
                        cpu_mfence();
-                       wpipe->pipe_buffer.windex += space;
+                       wpb->windex += space;
                        wcount += space;
                        continue;
                }
@@ -867,23 +846,23 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * These are token locks so we do not have to worry about
                 * deadlocks.
                 */
-               lwkt_gettoken(&wpipe->pipe_rlock);
+               lwkt_gettoken(&wpb->rlock);
 
                /*
                 * If the "read-side" has been blocked, wake it up now
                 * and yield to let it drain synchronously rather
                 * then block.
                 */
-               if (wpipe->pipe_state & PIPE_WANTR) {
-                       wpipe->pipe_state &= ~PIPE_WANTR;
-                       wakeup(wpipe);
+               if (wpb->state & PIPE_WANTR) {
+                       wpb->state &= ~PIPE_WANTR;
+                       wakeup(wpb);
                }
 
                /*
                 * don't block on non-blocking I/O
                 */
                if (nbio) {
-                       lwkt_reltoken(&wpipe->pipe_rlock);
+                       lwkt_reltoken(&wpb->rlock);
                        error = EAGAIN;
                        break;
                }
@@ -893,8 +872,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * acquiring both locks, in case the reader opened up
                 * some space.
                 */
-               space = wpipe->pipe_buffer.size -
-                       (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
+               space = wpb->size - (wpb->windex - wpb->rindex);
                cpu_lfence();
                if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
                        space = 0;
@@ -903,8 +881,8 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * Retest EOF - acquiring a new token can temporarily release
                 * tokens already held.
                 */
-               if (wpipe->pipe_state & PIPE_WEOF) {
-                       lwkt_reltoken(&wpipe->pipe_rlock);
+               if (wpb->state & PIPE_WEOF) {
+                       lwkt_reltoken(&wpb->rlock);
                        error = EPIPE;
                        break;
                }
@@ -914,14 +892,12 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 * wake up select/poll/kq.
                 */
                if (space == 0) {
-                       wpipe->pipe_state |= PIPE_WANTW;
-                       ++wpipe->pipe_wantwcnt;
-                       pipewakeup(wpipe, 1);
-                       if (wpipe->pipe_state & PIPE_WANTW)
-                               error = tsleep(wpipe, PCATCH, "pipewr", 0);
-                       ++pipe_wblocked_count;
+                       wpb->state |= PIPE_WANTW;
+                       pipewakeup(wpb, 1);
+                       if (wpb->state & PIPE_WANTW)
+                               error = tsleep(wpb, PCATCH, "pipewr", 0);
                }
-               lwkt_reltoken(&wpipe->pipe_rlock);
+               lwkt_reltoken(&wpb->rlock);
 
                /*
                 * Break out if we errored or the read side wants us to go
@@ -929,12 +905,12 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
                 */
                if (error)
                        break;
-               if (wpipe->pipe_state & PIPE_WEOF) {
+               if (wpb->state & PIPE_WEOF) {
                        error = EPIPE;
                        break;
                }
        }
-       pipe_end_uio(wpipe, &wpipe->pipe_wip);
+       pipe_end_uio(&wpb->wip);
 
        /*
         * If we have put any characters in the buffer, we wake up
@@ -942,40 +918,41 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
         *
         * Both rlock and wlock are required to be able to modify pipe_state.
         */
-       if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
-               if (wpipe->pipe_state & PIPE_WANTR) {
-                       lwkt_gettoken(&wpipe->pipe_rlock);
-                       if (wpipe->pipe_state & PIPE_WANTR) {
-                               wpipe->pipe_state &= ~PIPE_WANTR;
-                               lwkt_reltoken(&wpipe->pipe_rlock);
-                               wakeup(wpipe);
+       if (wpb->windex != wpb->rindex) {
+               if (wpb->state & PIPE_WANTR) {
+                       lwkt_gettoken(&wpb->rlock);
+                       if (wpb->state & PIPE_WANTR) {
+                               wpb->state &= ~PIPE_WANTR;
+                               lwkt_reltoken(&wpb->rlock);
+                               wakeup(wpb);
                        } else {
-                               lwkt_reltoken(&wpipe->pipe_rlock);
+                               lwkt_reltoken(&wpb->rlock);
                        }
                }
-               lwkt_gettoken(&wpipe->pipe_rlock);
-               pipewakeup(wpipe, 1);
-               lwkt_reltoken(&wpipe->pipe_rlock);
+               lwkt_gettoken(&wpb->rlock);
+               pipewakeup(wpb, 1);
+               lwkt_reltoken(&wpb->rlock);
        }
 
        /*
         * Don't return EPIPE if I/O was successful
         */
-       if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
+       if ((wpb->rindex == wpb->windex) &&
            (uio->uio_resid == 0) &&
            (error == EPIPE)) {
                error = 0;
        }
 
        if (error == 0)
-               vfs_timestamp(&wpipe->pipe_mtime);
+               vfs_timestamp(&wpb->mtime);
 
        /*
         * We have something to offer,
         * wake up select/poll/kq.
         */
-       /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
-       lwkt_reltoken(&wpipe->pipe_wlock);
+       /*space = wpb->windex - wpb->rindex;*/
+       lwkt_reltoken(&wpb->wlock);
+
        return (error);
 }
 
@@ -986,51 +963,56 @@ static int
 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
           struct ucred *cred, struct sysmsg *msg)
 {
-       struct pipe *mpipe;
+       struct pipebuf *rpb;
+       struct pipe *pipe;
        int error;
 
-       mpipe = (struct pipe *)fp->f_data;
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+       } else {
+               rpb = &pipe->bufferA;
+       }
 
-       lwkt_gettoken(&mpipe->pipe_rlock);
-       lwkt_gettoken(&mpipe->pipe_wlock);
+       lwkt_gettoken(&rpb->rlock);
+       lwkt_gettoken(&rpb->wlock);
 
        switch (cmd) {
        case FIOASYNC:
                if (*(int *)data) {
-                       mpipe->pipe_state |= PIPE_ASYNC;
+                       rpb->state |= PIPE_ASYNC;
                } else {
-                       mpipe->pipe_state &= ~PIPE_ASYNC;
+                       rpb->state &= ~PIPE_ASYNC;
                }
                error = 0;
                break;
        case FIONREAD:
-               *(int *)data = mpipe->pipe_buffer.windex -
-                               mpipe->pipe_buffer.rindex;
+               *(int *)data = (int)(rpb->windex - rpb->rindex);
                error = 0;
                break;
        case FIOSETOWN:
-               error = fsetown(*(int *)data, &mpipe->pipe_sigio);
+               error = fsetown(*(int *)data, &rpb->sigio);
                break;
        case FIOGETOWN:
-               *(int *)data = fgetown(&mpipe->pipe_sigio);
+               *(int *)data = fgetown(&rpb->sigio);
                error = 0;
                break;
        case TIOCSPGRP:
                /* This is deprecated, FIOSETOWN should be used instead. */
-               error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
+               error = fsetown(-(*(int *)data), &rpb->sigio);
                break;
 
        case TIOCGPGRP:
                /* This is deprecated, FIOGETOWN should be used instead. */
-               *(int *)data = -fgetown(&mpipe->pipe_sigio);
+               *(int *)data = -fgetown(&rpb->sigio);
                error = 0;
                break;
        default:
                error = ENOTTY;
                break;
        }
-       lwkt_reltoken(&mpipe->pipe_wlock);
-       lwkt_reltoken(&mpipe->pipe_rlock);
+       lwkt_reltoken(&rpb->wlock);
+       lwkt_reltoken(&rpb->rlock);
 
        return (error);
 }
@@ -1041,36 +1023,54 @@ pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
 static int
 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
 {
+       struct pipebuf *rpb;
        struct pipe *pipe;
 
-       pipe = (struct pipe *)fp->f_data;
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+       } else {
+               rpb = &pipe->bufferA;
+       }
 
        bzero((caddr_t)ub, sizeof(*ub));
        ub->st_mode = S_IFIFO;
-       ub->st_blksize = pipe->pipe_buffer.size;
-       ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
+       ub->st_blksize = rpb->size;
+       ub->st_size = rpb->windex - rpb->rindex;
        ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
-       ub->st_atimespec = pipe->pipe_atime;
-       ub->st_mtimespec = pipe->pipe_mtime;
-       ub->st_ctimespec = pipe->pipe_ctime;
+       ub->st_atimespec = rpb->atime;
+       ub->st_mtimespec = rpb->mtime;
+       ub->st_ctimespec = pipe->ctime;
        /*
         * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
         * st_flags, st_gen.
         * XXX (st_dev, st_ino) should be unique.
         */
+
        return (0);
 }
 
 static int
 pipe_close(struct file *fp)
 {
-       struct pipe *cpipe;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
+
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
 
-       cpipe = (struct pipe *)fp->f_data;
        fp->f_ops = &badfileops;
        fp->f_data = NULL;
-       funsetown(&cpipe->pipe_sigio);
-       pipeclose(cpipe);
+       funsetown(&rpb->sigio);
+       pipeclose(pipe, rpb, wpb);
+
        return (0);
 }
 
@@ -1080,60 +1080,67 @@ pipe_close(struct file *fp)
 static int
 pipe_shutdown(struct file *fp, int how)
 {
-       struct pipe *rpipe;
-       struct pipe *wpipe;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
        int error = EPIPE;
 
-       rpipe = (struct pipe *)fp->f_data;
-       wpipe = rpipe->pipe_peer;
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
 
        /*
         * We modify pipe_state on both pipes, which means we need
         * all four tokens!
         */
-       lwkt_gettoken(&rpipe->pipe_rlock);
-       lwkt_gettoken(&rpipe->pipe_wlock);
-       lwkt_gettoken(&wpipe->pipe_rlock);
-       lwkt_gettoken(&wpipe->pipe_wlock);
+       lwkt_gettoken(&rpb->rlock);
+       lwkt_gettoken(&rpb->wlock);
+       lwkt_gettoken(&wpb->rlock);
+       lwkt_gettoken(&wpb->wlock);
 
        switch(how) {
        case SHUT_RDWR:
        case SHUT_RD:
-               rpipe->pipe_state |= PIPE_REOF;         /* my reads */
-               rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
-               if (rpipe->pipe_state & PIPE_WANTR) {
-                       rpipe->pipe_state &= ~PIPE_WANTR;
-                       wakeup(rpipe);
+               rpb->state |= PIPE_REOF;                /* my reads */
+               rpb->state |= PIPE_WEOF;                /* peer writes */
+               if (rpb->state & PIPE_WANTR) {
+                       rpb->state &= ~PIPE_WANTR;
+                       wakeup(rpb);
                }
-               if (rpipe->pipe_state & PIPE_WANTW) {
-                       rpipe->pipe_state &= ~PIPE_WANTW;
-                       wakeup(rpipe);
+               if (rpb->state & PIPE_WANTW) {
+                       rpb->state &= ~PIPE_WANTW;
+                       wakeup(rpb);
                }
                error = 0;
                if (how == SHUT_RD)
                        break;
                /* fall through */
        case SHUT_WR:
-               wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
-               wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
-               if (wpipe->pipe_state & PIPE_WANTR) {
-                       wpipe->pipe_state &= ~PIPE_WANTR;
-                       wakeup(wpipe);
+               wpb->state |= PIPE_REOF;                /* peer reads */
+               wpb->state |= PIPE_WEOF;                /* my writes */
+               if (wpb->state & PIPE_WANTR) {
+                       wpb->state &= ~PIPE_WANTR;
+                       wakeup(wpb);
                }
-               if (wpipe->pipe_state & PIPE_WANTW) {
-                       wpipe->pipe_state &= ~PIPE_WANTW;
-                       wakeup(wpipe);
+               if (wpb->state & PIPE_WANTW) {
+                       wpb->state &= ~PIPE_WANTW;
+                       wakeup(wpb);
                }
                error = 0;
                break;
        }
-       pipewakeup(rpipe, 1);
-       pipewakeup(wpipe, 1);
+       pipewakeup(rpb, 1);
+       pipewakeup(wpb, 1);
 
-       lwkt_reltoken(&wpipe->pipe_wlock);
-       lwkt_reltoken(&wpipe->pipe_rlock);
-       lwkt_reltoken(&rpipe->pipe_wlock);
-       lwkt_reltoken(&rpipe->pipe_rlock);
+       lwkt_reltoken(&wpb->wlock);
+       lwkt_reltoken(&wpb->rlock);
+       lwkt_reltoken(&rpb->wlock);
+       lwkt_reltoken(&rpb->rlock);
 
        return (error);
 }
@@ -1142,94 +1149,75 @@ pipe_shutdown(struct file *fp, int how)
  * Destroy the pipe buffer.
  */
 static void
-pipe_free_kmem(struct pipe *cpipe)
+pipe_free_kmem(struct pipebuf *pb)
 {
-       if (cpipe->pipe_buffer.buffer != NULL) {
-               if (cpipe->pipe_buffer.size > PIPE_SIZE)
-                       atomic_subtract_int(&pipe_nbig, 1);
-               kmem_free(&kernel_map,
-                       (vm_offset_t)cpipe->pipe_buffer.buffer,
-                       cpipe->pipe_buffer.size);
-               cpipe->pipe_buffer.buffer = NULL;
-               cpipe->pipe_buffer.object = NULL;
+       if (pb->buffer != NULL) {
+               kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
+               pb->buffer = NULL;
+               pb->object = NULL;
        }
 }
 
 /*
- * Close the pipe.  The slock must be held to interlock against simultanious
- * closes.  The rlock and wlock must be held to adjust the pipe_state.
+ * Close one half of the pipe.  We are closing the pipe for reading on rpb
+ * and writing on wpb.  This routine must be called twice with the pipebufs
+ * reversed to close both directions.
  */
 static void
-pipeclose(struct pipe *cpipe)
+pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
 {
        globaldata_t gd;
-       struct pipe *ppipe;
 
-       if (cpipe == NULL)
+       if (pipe == NULL)
                return;
 
        /*
-        * The slock may not have been allocated yet (close during
-        * initialization)
-        *
         * We need both the read and write tokens to modify pipe_state.
         */
-       if (cpipe->pipe_slock)
-               lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
-       lwkt_gettoken(&cpipe->pipe_rlock);
-       lwkt_gettoken(&cpipe->pipe_wlock);
+       lwkt_gettoken(&rpb->rlock);
+       lwkt_gettoken(&rpb->wlock);
 
        /*
         * Set our state, wakeup anyone waiting in select/poll/kq, and
-        * wakeup anyone blocked on our pipe.
+        * wakeup anyone blocked on our pipe.  No action if our side
+        * is already closed.
         */
-       cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
-       pipewakeup(cpipe, 1);
-       if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
-               cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
-               wakeup(cpipe);
+       if (rpb->state & PIPE_CLOSED) {
+               lwkt_reltoken(&rpb->wlock);
+               lwkt_reltoken(&rpb->rlock);
+               return;
        }
 
-       /*
-        * Disconnect from peer.
-        */
-       if ((ppipe = cpipe->pipe_peer) != NULL) {
-               lwkt_gettoken(&ppipe->pipe_rlock);
-               lwkt_gettoken(&ppipe->pipe_wlock);
-               ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
-               pipewakeup(ppipe, 1);
-               if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
-                       ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
-                       wakeup(ppipe);
-               }
-               if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
-                       KNOTE(&ppipe->pipe_kq.ki_note, 0);
-               lwkt_reltoken(&ppipe->pipe_wlock);
-               lwkt_reltoken(&ppipe->pipe_rlock);
+       rpb->state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
+       pipewakeup(rpb, 1);
+       if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
+               rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
+               wakeup(rpb);
        }
+       lwkt_reltoken(&rpb->wlock);
+       lwkt_reltoken(&rpb->rlock);
 
        /*
-        * If the peer is also closed we can free resources for both
-        * sides, otherwise we leave our side intact to deal with any
-        * races (since we only have the slock).
+        * Disconnect from peer.
         */
-       if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
-               cpipe->pipe_peer = NULL;
-               ppipe->pipe_peer = NULL;
-               ppipe->pipe_slock = NULL;       /* we will free the slock */
-               pipeclose(ppipe);
-               ppipe = NULL;
+       lwkt_gettoken(&wpb->rlock);
+       lwkt_gettoken(&wpb->wlock);
+
+       wpb->state |= PIPE_REOF | PIPE_WEOF;
+       pipewakeup(wpb, 1);
+       if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
+               wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
+               wakeup(wpb);
        }
-
-       lwkt_reltoken(&cpipe->pipe_wlock);
-       lwkt_reltoken(&cpipe->pipe_rlock);
-       if (cpipe->pipe_slock)
-               lockmgr(cpipe->pipe_slock, LK_RELEASE);
+       if (SLIST_FIRST(&wpb->kq.ki_note))
+               KNOTE(&wpb->kq.ki_note, 0);
+       lwkt_reltoken(&wpb->wlock);
+       lwkt_reltoken(&wpb->rlock);
 
        /*
-        * If we disassociated from our peer we can free resources.  We
-        * maintain a pcpu cache to improve performance, so the actual
-        * tear-down case is limited to bulk situations.
+        * Free resources once both sides are closed.  We maintain a pcpu
+        * cache to improve performance, so the actual tear-down case is
+        * limited to bulk situations.
         *
         * However, the bulk tear-down case can cause intense contention
         * on the kernel_map when, e.g. hundreds to hundreds of thousands
@@ -1244,23 +1232,19 @@ pipeclose(struct pipe *cpipe)
         * mechanism will wind up waking them all up each time a lock
         * cycles.
         */
-       if (ppipe == NULL) {
+       if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
                gd = mycpu;
-               if (cpipe->pipe_slock) {
-                       kfree(cpipe->pipe_slock, M_PIPE);
-                       cpipe->pipe_slock = NULL;
-               }
-               if (gd->gd_pipeqcount >= pipe_maxcache ||
-                   cpipe->pipe_buffer.size != PIPE_SIZE
-               ) {
-                       mtx_lock(&pipe_gdlocks[gd->gd_cpuid]);
-                       pipe_free_kmem(cpipe);
-                       mtx_unlock(&pipe_gdlocks[gd->gd_cpuid]);
-                       kfree(cpipe, M_PIPE);
+               if (gd->gd_pipeqcount >= pipe_maxcache) {
+                       mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
+                       pipe_free_kmem(rpb);
+                       pipe_free_kmem(wpb);
+                       mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
+                       kfree(pipe, M_PIPE);
                } else {
-                       cpipe->pipe_state = 0;
-                       cpipe->pipe_peer = gd->gd_pipeq;
-                       gd->gd_pipeq = cpipe;
+                       rpb->state = 0;
+                       wpb->state = 0;
+                       pipe->next = gd->gd_pipeq;
+                       gd->gd_pipeq = pipe;
                        ++gd->gd_pipeqcount;
                }
        }
@@ -1269,9 +1253,18 @@ pipeclose(struct pipe *cpipe)
 static int
 pipe_kqfilter(struct file *fp, struct knote *kn)
 {
-       struct pipe *cpipe;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
 
-       cpipe = (struct pipe *)kn->kn_fp->f_data;
+       pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
 
        switch (kn->kn_filter) {
        case EVFILT_READ:
@@ -1279,7 +1272,7 @@ pipe_kqfilter(struct file *fp, struct knote *kn)
                break;
        case EVFILT_WRITE:
                kn->kn_fop = &pipe_wfiltops;
-               if (cpipe->pipe_peer == NULL) {
+               if (wpb->state & PIPE_CLOSED) {
                        /* other end of pipe has been closed */
                        return (EPIPE);
                }
@@ -1287,9 +1280,13 @@ pipe_kqfilter(struct file *fp, struct knote *kn)
        default:
                return (EOPNOTSUPP);
        }
-       kn->kn_hook = (caddr_t)cpipe;
 
-       knote_insert(&cpipe->pipe_kq.ki_note, kn);
+       if (rpb == &pipe->bufferA)
+               kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
+       else
+               kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
+
+       knote_insert(&rpb->kq.ki_note, kn);
 
        return (0);
 }
@@ -1297,24 +1294,45 @@ pipe_kqfilter(struct file *fp, struct knote *kn)
 static void
 filt_pipedetach(struct knote *kn)
 {
-       struct pipe *cpipe = (struct pipe *)kn->kn_hook;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
 
-       knote_remove(&cpipe->pipe_kq.ki_note, kn);
+       pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
+       if ((intptr_t)kn->kn_hook & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
+       knote_remove(&rpb->kq.ki_note, kn);
 }
 
 /*ARGSUSED*/
 static int
 filt_piperead(struct knote *kn, long hint)
 {
-       struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
        int ready = 0;
 
-       lwkt_gettoken(&rpipe->pipe_rlock);
-       lwkt_gettoken(&rpipe->pipe_wlock);
+       pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)kn->kn_fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
 
-       kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
+       lwkt_gettoken(&rpb->rlock);
+       lwkt_gettoken(&rpb->wlock);
 
-       if (rpipe->pipe_state & PIPE_REOF) {
+       kn->kn_data = rpb->windex - rpb->rindex;
+
+       if (rpb->state & PIPE_REOF) {
                /*
                 * Only set NODATA if all data has been exhausted
                 */
@@ -1324,8 +1342,8 @@ filt_piperead(struct knote *kn, long hint)
                ready = 1;
        }
 
-       lwkt_reltoken(&rpipe->pipe_wlock);
-       lwkt_reltoken(&rpipe->pipe_rlock);
+       lwkt_reltoken(&rpb->wlock);
+       lwkt_reltoken(&rpb->rlock);
 
        if (!ready)
                ready = kn->kn_data > 0;
@@ -1337,31 +1355,39 @@ filt_piperead(struct knote *kn, long hint)
 static int
 filt_pipewrite(struct knote *kn, long hint)
 {
-       struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
-       struct pipe *wpipe = rpipe->pipe_peer;
+       struct pipebuf *rpb;
+       struct pipebuf *wpb;
+       struct pipe *pipe;
        int ready = 0;
 
+       pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
+       if ((intptr_t)kn->kn_fp->f_data & 1) {
+               rpb = &pipe->bufferB;
+               wpb = &pipe->bufferA;
+       } else {
+               rpb = &pipe->bufferA;
+               wpb = &pipe->bufferB;
+       }
+
        kn->kn_data = 0;
-       if (wpipe == NULL) {
+       if (wpb->state & PIPE_CLOSED) {
                kn->kn_flags |= (EV_EOF | EV_NODATA);
                return (1);
        }
 
-       lwkt_gettoken(&wpipe->pipe_rlock);
-       lwkt_gettoken(&wpipe->pipe_wlock);
+       lwkt_gettoken(&wpb->rlock);
+       lwkt_gettoken(&wpb->wlock);
 
-       if (wpipe->pipe_state & PIPE_WEOF) {
+       if (wpb->state & PIPE_WEOF) {
                kn->kn_flags |= (EV_EOF | EV_NODATA);
                ready = 1;
        }
 
        if (!ready)
-               kn->kn_data = wpipe->pipe_buffer.size -
-                             (wpipe->pipe_buffer.windex -
-                              wpipe->pipe_buffer.rindex);
+               kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
 
-       lwkt_reltoken(&wpipe->pipe_wlock);
-       lwkt_reltoken(&wpipe->pipe_rlock);
+       lwkt_reltoken(&wpb->wlock);
+       lwkt_reltoken(&wpb->rlock);
 
        if (!ready)
                ready = kn->kn_data >= PIPE_BUF;
index c5de382..c7955e9 100644 (file)
@@ -1,6 +1,10 @@
 /*
  * Copyright (c) 1996 John S. Dyson
  * All rights reserved.
+ * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
+ *
+ * This code is derived from software contributed to The DragonFly Project
+ * by Matthew Dillon <dillon@backplane.com>
  *
  * Redistribution and use in source and binary forms, with or without
  * modification, are permitted provided that the following conditions
@@ -17,9 +21,6 @@
  *    is allowed if this notation is included.
  * 5. Modifications may be freely made to this file if the above conditions
  *    are met.
- *
- * $FreeBSD: src/sys/sys/pipe.h,v 1.16 1999/12/29 04:24:45 peter Exp $
- * $DragonFly: src/sys/sys/pipe.h,v 1.10 2006/06/10 20:00:17 dillon Exp $
  */
 
 #ifndef _SYS_PIPE_H_
 #include <machine/param.h>             /* for PAGE_SIZE */
 #endif
 
-/*
- * Pipe buffer size, keep moderate in value, pipes take kva space.
- * Must be a multiple of PAGE_SIZE.
- */
-#ifndef PIPE_SIZE
-#define PIPE_SIZE      16384
-#endif
-
-#ifndef BIG_PIPE_SIZE
-#define BIG_PIPE_SIZE  (64*1024)
-#endif
-
-#if PIPE_SIZE < PAGE_SIZE
-#error "PIPE_SIZE is too small for this architecture"
-#endif
-
 /*
  * Pipe buffer information.
- * Separate in, out, cnt are used to simplify calculations.
- * Buffered write is active when the buffer.cnt field is set.
  */
 struct pipebuf {
-       u_int   rindex;         /* FIFO read index */
-       u_int   dummy1[3];      /* cache-align */
-       u_int   windex;         /* FIFO write index */
-       u_int   dummy2[3];      /* cache-align */
-       u_int   size;           /* size of buffer */
-       caddr_t buffer;         /* kva of buffer */
-       struct  vm_object *object;      /* VM object containing buffer */
-};
+       struct {
+               struct lwkt_token rlock;
+               size_t          rindex; /* current read index (FIFO)    */
+               int32_t         rip;    /* blocking read requested (FIFO) */
+               int32_t         unu01;  /* blocking read requested (FIFO) */
+               struct timespec atime;  /* time of last access */
+       } __cachealign;
+       struct {
+               struct lwkt_token wlock;
+               size_t          windex; /* current write index (FIFO)   */
+               int32_t         wip;
+               int32_t         unu02;
+               struct timespec mtime;  /* time of last modify */
+       } __cachealign;
+       size_t          size;           /* size of buffer */
+       caddr_t         buffer;         /* kva of buffer */
+       struct vm_object *object;       /* VM object containing buffer */
+       struct kqinfo   kq;             /* for compat with select/poll/kq */
+       struct sigio    *sigio;         /* information for async I/O */
+       uint32_t        state;          /* pipe status info */
+} __cachealign;
 
 /*
- * Bits in pipe_state.
+ * Bits in pipebuf.state.
  */
 #define PIPE_ASYNC     0x0004  /* Async? I/O */
 #define PIPE_WANTR     0x0008  /* Reader wants some characters */
@@ -88,25 +84,16 @@ struct pipebuf {
 #define PIPE_CLOSED    0x1000  /* Pipe has been closed */
 
 /*
- * Per-pipe data structure.
- * Two of these are linked together to produce bi-directional pipes.
+ * The pipe() data structure encompasses two pipes.  Bit 0 in fp->f_data
+ * denotes which.
  */
 struct pipe {
-       struct  pipebuf pipe_buffer;    /* data storage */
-       struct  kqinfo pipe_kq;         /* for compat with select/poll/kq */
-       struct  timespec pipe_atime;    /* time of last access */
-       struct  timespec pipe_mtime;    /* time of last modify */
-       struct  timespec pipe_ctime;    /* time of status change */
-       struct  sigio *pipe_sigio;      /* information for async I/O */
-       struct  pipe *pipe_peer;        /* link with other direction */
-       u_int   pipe_state;             /* pipe status info */
-       int     pipe_rip;               /* read uio in-progress */
-       int     pipe_wip;               /* write uio in-progress */
-       u_int   pipe_wantwcnt;          /* for resize */
-       struct  lwkt_token pipe_rlock;  /* rindex locked */
-       struct  lwkt_token pipe_wlock;  /* windex locked */
-       struct  lock *pipe_slock;       /* state locked (shared w/peer) */
-};
+       struct pipebuf  bufferA;        /* data storage */
+       struct pipebuf  bufferB;        /* data storage */
+       struct timespec ctime;          /* time of status change */
+       struct pipe     *next;
+       uint32_t        open_count;
+} __cachealign;
 
 #endif /* _KERNEL || _KERNEL_STRUCTURES */
 #endif /* !_SYS_PIPE_H_ */