From: Matthew Dillon Date: Sat, 14 Oct 2017 00:55:41 +0000 (-0700) Subject: kernel - Refactor sys_pipe X-Git-Tag: v5.3.0~1028 X-Git-Url: https://gitweb.dragonflybsd.org/dragonfly.git/commitdiff_plain/1dfdffca0e9d55ac5dd659c1c0dc07b6671dff0a kernel - Refactor sys_pipe * Refactor the pipe code in preparation for optimization. Get rid of the dual-pipe structure and instead have one pipe structure with two buffers. * Scrap a ton of global statistics variables that nobody uses any more, get rid of pipe_peer, and get rid of the slock. --- diff --git a/sys/kern/sys_pipe.c b/sys/kern/sys_pipe.c index ac2c0c7fa8..839f1a3b13 100644 --- a/sys/kern/sys_pipe.c +++ b/sys/kern/sys_pipe.c @@ -1,6 +1,10 @@ /* * Copyright (c) 1996 John S. Dyson * All rights reserved. + * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -15,8 +19,6 @@ * John S. Dyson. * 4. Modifications may be freely made to this file if the above conditions * are met. - * - * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ */ /* @@ -66,6 +68,10 @@ #include +struct pipegdlock { + struct mtx mtx; +} __cachealign; + /* * interfaces to the outside world */ @@ -101,53 +107,20 @@ static struct filterops pipe_wfiltops = MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); -/* - * Default pipe buffer size(s), this can be kind-of large now because pipe - * space is pageable. The pipe code will try to maintain locality of - * reference for performance reasons, so small amounts of outstanding I/O - * will not wipe the cache. - */ -#define MINPIPESIZE (PIPE_SIZE/3) -#define MAXPIPESIZE (2*PIPE_SIZE/3) - -/* - * Limit the number of "big" pipes - */ -#define LIMITBIGPIPES 64 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ -static int pipe_maxbig = LIMITBIGPIPES; static int pipe_maxcache = PIPEQ_MAX_CACHE; -static int pipe_bigcount; -static int pipe_nbig; -static int pipe_bcache_alloc; -static int pipe_bkmem_alloc; -static int pipe_rblocked_count; -static int pipe_wblocked_count; -static struct mtx *pipe_gdlocks; +static struct pipegdlock *pipe_gdlocks; SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); -SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, - CTLFLAG_RD, &pipe_nbig, 0, "number of big pipes allocated"); -SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount, - CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded"); -SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked, - CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded"); -SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked, - CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded"); SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); -SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, - CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); +static int pipe_size = 32768; +SYSCTL_INT(_kern_pipe, OID_AUTO, size, + CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)"); static int pipe_delay = 5000; /* 5uS default */ SYSCTL_INT(_kern_pipe, OID_AUTO, delay, CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); -#if !defined(NO_PIPE_SYSCTL_STATS) -SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, - CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); -SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, - CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); -#endif /* * Auto-size pipe cache to reduce kmem allocations and frees. @@ -159,12 +132,6 @@ pipeinit(void *dummy) size_t mbytes = kmem_lim_size(); int n; - if (pipe_maxbig == LIMITBIGPIPES) { - if (mbytes >= 7 * 1024) - pipe_maxbig *= 2; - if (mbytes >= 15 * 1024) - pipe_maxbig *= 2; - } if (pipe_maxcache == PIPEQ_MAX_CACHE) { if (mbytes >= 7 * 1024) pipe_maxcache *= 2; @@ -174,24 +141,24 @@ pipeinit(void *dummy) pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus, M_PIPE, M_WAITOK | M_ZERO); for (n = 0; n < ncpus; ++n) - mtx_init(&pipe_gdlocks[n], "pipekm"); + mtx_init(&pipe_gdlocks[n].mtx, "pipekm"); } SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL); -static void pipeclose (struct pipe *cpipe); -static void pipe_free_kmem (struct pipe *cpipe); -static int pipe_create (struct pipe **cpipep); -static int pipespace (struct pipe *cpipe, int size); +static void pipeclose (struct pipe *pipe, + struct pipebuf *pbr, struct pipebuf *pbw); +static void pipe_free_kmem (struct pipebuf *buf); +static int pipe_create (struct pipe **pipep); static __inline void -pipewakeup(struct pipe *cpipe, int dosigio) +pipewakeup(struct pipebuf *pb, int dosigio) { - if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) { + if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) { lwkt_gettoken(&sigio_token); - pgsigio(cpipe->pipe_sigio, SIGIO, 0); + pgsigio(pb->sigio, SIGIO, 0); lwkt_reltoken(&sigio_token); } - KNOTE(&cpipe->pipe_kq.ki_note, 0); + KNOTE(&pb->kq.ki_note, 0); } /* @@ -204,7 +171,7 @@ pipewakeup(struct pipe *cpipe, int dosigio) * The read token is held on entry so *ipp does not race. */ static __inline int -pipe_start_uio(struct pipe *cpipe, int *ipp) +pipe_start_uio(int *ipp) { int error; @@ -219,7 +186,7 @@ pipe_start_uio(struct pipe *cpipe, int *ipp) } static __inline void -pipe_end_uio(struct pipe *cpipe, int *ipp) +pipe_end_uio(int *ipp) { if (*ipp < 0) { *ipp = 0; @@ -255,20 +222,20 @@ kern_pipe(long *fds, int flags) struct thread *td = curthread; struct filedesc *fdp = td->td_proc->p_fd; struct file *rf, *wf; - struct pipe *rpipe, *wpipe; + struct pipe *pipe; int fd1, fd2, error; - rpipe = wpipe = NULL; - if (pipe_create(&rpipe) || pipe_create(&wpipe)) { - pipeclose(rpipe); - pipeclose(wpipe); + pipe = NULL; + if (pipe_create(&pipe)) { + pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); + pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); return (ENFILE); } error = falloc(td->td_lwp, &rf, &fd1); if (error) { - pipeclose(rpipe); - pipeclose(wpipe); + pipeclose(pipe, &pipe->bufferA, &pipe->bufferB); + pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); return (error); } fds[0] = fd1; @@ -282,7 +249,7 @@ kern_pipe(long *fds, int flags) rf->f_type = DTYPE_PIPE; rf->f_flag = FREAD | FWRITE; rf->f_ops = &pipeops; - rf->f_data = rpipe; + rf->f_data = (void *)((intptr_t)pipe | 0); if (flags & O_NONBLOCK) rf->f_flag |= O_NONBLOCK; if (flags & O_CLOEXEC) @@ -292,14 +259,15 @@ kern_pipe(long *fds, int flags) if (error) { fsetfd(fdp, NULL, fd1); fdrop(rf); - /* rpipe has been closed by fdrop(). */ - pipeclose(wpipe); + /* pipeA has been closed by fdrop() */ + /* close pipeB here */ + pipeclose(pipe, &pipe->bufferB, &pipe->bufferA); return (error); } wf->f_type = DTYPE_PIPE; wf->f_flag = FREAD | FWRITE; wf->f_ops = &pipeops; - wf->f_data = wpipe; + wf->f_data = (void *)((intptr_t)pipe | 1); if (flags & O_NONBLOCK) wf->f_flag |= O_NONBLOCK; if (flags & O_CLOEXEC) @@ -307,13 +275,6 @@ kern_pipe(long *fds, int flags) fds[1] = fd2; - rpipe->pipe_slock = kmalloc(sizeof(struct lock), - M_PIPE, M_WAITOK|M_ZERO); - wpipe->pipe_slock = rpipe->pipe_slock; - rpipe->pipe_peer = wpipe; - wpipe->pipe_peer = rpipe; - lockinit(rpipe->pipe_slock, "pipecl", 0, 0); - /* * Once activated the peer relationship remains valid until * both sides are closed. @@ -327,20 +288,29 @@ kern_pipe(long *fds, int flags) } /* - * Allocate kva for pipe circular buffer, the space is pageable - * This routine will 'realloc' the size of a pipe safely, if it fails - * it will retain the old buffer. - * If it fails it will return ENOMEM. + * [re]allocates KVA for the pipe's circular buffer. The space is + * pageable. Called twice to setup full-duplex communications. + * + * NOTE: Independent vm_object's are used to improve performance. + * + * Returns 0 on success, ENOMEM on failure. */ static int -pipespace(struct pipe *cpipe, int size) +pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size) { struct vm_object *object; caddr_t buffer; - int npages, error; + vm_pindex_t npages; + int error; + + size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK; + if (size < 16384) + size = 16384; + if (size > 1024*1024) + size = 1024*1024; npages = round_page(size) / PAGE_SIZE; - object = cpipe->pipe_buffer.object; + object = pb->object; /* * [re]create the object if necessary and reserve space for it @@ -362,77 +332,94 @@ pipespace(struct pipe *cpipe, int size) vm_object_deallocate(object); return (ENOMEM); } - pipe_free_kmem(cpipe); - cpipe->pipe_buffer.object = object; - cpipe->pipe_buffer.buffer = buffer; - cpipe->pipe_buffer.size = size; - ++pipe_bkmem_alloc; - } else { - ++pipe_bcache_alloc; + pipe_free_kmem(pb); + pb->object = object; + pb->buffer = buffer; + pb->size = size; } - cpipe->pipe_buffer.rindex = 0; - cpipe->pipe_buffer.windex = 0; + pb->rindex = 0; + pb->windex = 0; + return (0); } /* * Initialize and allocate VM and memory for pipe, pulling the pipe from - * our per-cpu cache if possible. For now make sure it is sized for the - * smaller PIPE_SIZE default. + * our per-cpu cache if possible. + * + * Returns 0 on success, else an error code (typically ENOMEM). Caller + * must still deallocate the pipe on failure. */ static int -pipe_create(struct pipe **cpipep) +pipe_create(struct pipe **pipep) { globaldata_t gd = mycpu; - struct pipe *cpipe; + struct pipe *pipe; int error; - if ((cpipe = gd->gd_pipeq) != NULL) { - gd->gd_pipeq = cpipe->pipe_peer; + if ((pipe = gd->gd_pipeq) != NULL) { + gd->gd_pipeq = pipe->next; --gd->gd_pipeqcount; - cpipe->pipe_peer = NULL; - cpipe->pipe_wantwcnt = 0; + pipe->next = NULL; } else { - cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); + pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO); + lwkt_token_init(&pipe->bufferA.rlock, "piper"); + lwkt_token_init(&pipe->bufferA.wlock, "pipew"); + lwkt_token_init(&pipe->bufferB.rlock, "piper"); + lwkt_token_init(&pipe->bufferB.wlock, "pipew"); } - *cpipep = cpipe; - if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) + *pipep = pipe; + if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) { return (error); - vfs_timestamp(&cpipe->pipe_ctime); - cpipe->pipe_atime = cpipe->pipe_ctime; - cpipe->pipe_mtime = cpipe->pipe_ctime; - lwkt_token_init(&cpipe->pipe_rlock, "piper"); - lwkt_token_init(&cpipe->pipe_wlock, "pipew"); + } + if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) { + return (error); + } + vfs_timestamp(&pipe->ctime); + pipe->bufferA.atime = pipe->ctime; + pipe->bufferA.mtime = pipe->ctime; + pipe->bufferB.atime = pipe->ctime; + pipe->bufferB.mtime = pipe->ctime; + pipe->open_count = 2; + return (0); } +/* + * Read data from a pipe + */ static int pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) { - struct pipe *rpipe; - struct pipe *wpipe; - int error; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; size_t nread = 0; - int nbio; - u_int size; /* total bytes available */ - u_int nsize; /* total bytes to read */ - u_int rindex; /* contiguous bytes available */ + size_t size; /* total bytes available */ + size_t nsize; /* total bytes to read */ + size_t rindex; /* contiguous bytes available */ int notify_writer; int bigread; int bigcount; + int error; + int nbio; + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC); if (uio->uio_resid == 0) return(0); /* - * Setup locks, calculate nbio + * Calculate nbio */ - rpipe = (struct pipe *)fp->f_data; - wpipe = rpipe->pipe_peer; - lwkt_gettoken(&rpipe->pipe_rlock); - if (fflags & O_FBLOCKING) nbio = 0; else if (fflags & O_FNONBLOCKING) @@ -443,14 +430,21 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) nbio = 0; /* - * Reads are serialized. Note however that pipe_buffer.buffer and - * pipe_buffer.size can change out from under us when the number + * 'quick' NBIO test before things get expensive. + */ + if (nbio && rpb->rindex == rpb->windex) + return EAGAIN; + + /* + * Reads are serialized. Note however that buffer.buffer and + * buffer.size can change out from under us when the number * of bytes in the buffer are zero due to the write-side doing a * pipespace(). */ - error = pipe_start_uio(rpipe, &rpipe->pipe_rip); + lwkt_gettoken(&rpb->rlock); + error = pipe_start_uio(&rpb->rip); if (error) { - lwkt_reltoken(&rpipe->pipe_rlock); + lwkt_reltoken(&rpb->rlock); return (error); } notify_writer = 0; @@ -471,29 +465,27 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) } } - size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + size = rpb->windex - rpb->rindex; cpu_lfence(); if (size) { - rindex = rpipe->pipe_buffer.rindex & - (rpipe->pipe_buffer.size - 1); + rindex = rpb->rindex & (rpb->size - 1); nsize = size; - if (nsize > rpipe->pipe_buffer.size - rindex) - nsize = rpipe->pipe_buffer.size - rindex; + if (nsize > rpb->size - rindex) + nsize = rpb->size - rindex; nsize = szmin(nsize, uio->uio_resid); - error = uiomove(&rpipe->pipe_buffer.buffer[rindex], - nsize, uio); + error = uiomove(&rpb->buffer[rindex], nsize, uio); if (error) break; cpu_mfence(); - rpipe->pipe_buffer.rindex += nsize; + rpb->rindex += nsize; nread += nsize; /* * If the FIFO is still over half full just continue * and do not try to notify the writer yet. */ - if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) { + if (size - nsize >= (rpb->size >> 1)) { notify_writer = 0; continue; } @@ -504,7 +496,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * holding just the rlock. */ notify_writer = 1; - if ((rpipe->pipe_state & PIPE_WANTW) == 0) + if ((rpb->state & PIPE_WANTW) == 0) continue; } @@ -516,14 +508,14 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * Pipe_state can only be modified if both the rlock and * wlock are held. */ - if (rpipe->pipe_state & PIPE_WANTW) { - lwkt_gettoken(&rpipe->pipe_wlock); - if (rpipe->pipe_state & PIPE_WANTW) { - rpipe->pipe_state &= ~PIPE_WANTW; - lwkt_reltoken(&rpipe->pipe_wlock); - wakeup(rpipe); + if (rpb->state & PIPE_WANTW) { + lwkt_gettoken(&rpb->wlock); + if (rpb->state & PIPE_WANTW) { + rpb->state &= ~PIPE_WANTW; + lwkt_reltoken(&rpb->wlock); + wakeup(rpb); } else { - lwkt_reltoken(&rpipe->pipe_wlock); + lwkt_reltoken(&rpb->wlock); } } @@ -543,7 +535,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * where as miss requiring a tsleep/wakeup sequence * will take 7uS or more. */ - if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) + if (rpb->windex != rpb->rindex) continue; #ifdef _RDTSC_SUPPORTED_ @@ -553,8 +545,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) tsc_target = tsc_get_target(pipe_delay); while (tsc_test_target(tsc_target) == 0) { - if (rpipe->pipe_buffer.windex != - rpipe->pipe_buffer.rindex) { + if (rpb->windex != rpb->rindex) { good = 1; break; } @@ -567,7 +558,7 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) /* * Detect EOF condition, do not set error. */ - if (rpipe->pipe_state & PIPE_REOF) + if (rpb->state & PIPE_REOF) break; /* @@ -585,10 +576,10 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) /* * Last chance, interlock with WANTR. */ - lwkt_gettoken(&rpipe->pipe_wlock); - size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + lwkt_gettoken(&rpb->wlock); + size = rpb->windex - rpb->rindex; if (size) { - lwkt_reltoken(&rpipe->pipe_wlock); + lwkt_reltoken(&rpb->wlock); continue; } @@ -596,8 +587,8 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * Retest EOF - acquiring a new token can temporarily release * tokens already held. */ - if (rpipe->pipe_state & PIPE_REOF) { - lwkt_reltoken(&rpipe->pipe_wlock); + if (rpb->state & PIPE_REOF) { + lwkt_reltoken(&rpb->wlock); break; } @@ -615,10 +606,9 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * XXX should we even bother resetting the indices? It * might actually be more cache efficient not to. */ - if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex && - rpipe->pipe_wip == 0) { - rpipe->pipe_buffer.rindex = 0; - rpipe->pipe_buffer.windex = 0; + if (rpb->rindex == rpb->windex && rpb->wip == 0) { + rpb->rindex = 0; + rpb->windex = 0; } /* @@ -627,21 +617,20 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * Pipe_state can only be set if both the rlock and wlock * are held. */ - rpipe->pipe_state |= PIPE_WANTR; - tsleep_interlock(rpipe, PCATCH); - lwkt_reltoken(&rpipe->pipe_wlock); - error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0); - ++pipe_rblocked_count; + rpb->state |= PIPE_WANTR; + tsleep_interlock(rpb, PCATCH); + lwkt_reltoken(&rpb->wlock); + error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0); if (error) break; } - pipe_end_uio(rpipe, &rpipe->pipe_rip); + pipe_end_uio(&rpb->rip); /* * Uptime last access time */ if (error == 0 && nread) - vfs_timestamp(&rpipe->pipe_atime); + vfs_timestamp(&rpb->atime); /* * If we drained the FIFO more then half way then handle @@ -655,14 +644,14 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) /* * Synchronous blocking is done on the pipe involved */ - if (rpipe->pipe_state & PIPE_WANTW) { - lwkt_gettoken(&rpipe->pipe_wlock); - if (rpipe->pipe_state & PIPE_WANTW) { - rpipe->pipe_state &= ~PIPE_WANTW; - lwkt_reltoken(&rpipe->pipe_wlock); - wakeup(rpipe); + if (rpb->state & PIPE_WANTW) { + lwkt_gettoken(&rpb->wlock); + if (rpb->state & PIPE_WANTW) { + rpb->state &= ~PIPE_WANTW; + lwkt_reltoken(&rpb->wlock); + wakeup(rpb); } else { - lwkt_reltoken(&rpipe->pipe_wlock); + lwkt_reltoken(&rpb->wlock); } } @@ -672,12 +661,12 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * EVFILT_WRITE event waiting for our side to drain will * be on the other side. */ - lwkt_gettoken(&wpipe->pipe_wlock); - pipewakeup(wpipe, 0); - lwkt_reltoken(&wpipe->pipe_wlock); + lwkt_gettoken(&wpb->wlock); + pipewakeup(wpb, 0); + lwkt_reltoken(&wpb->wlock); } - /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/ - lwkt_reltoken(&rpipe->pipe_rlock); + /*size = rpb->windex - rpb->rindex;*/ + lwkt_reltoken(&rpb->rlock); return (error); } @@ -685,25 +674,53 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) static int pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) { - int error; - int orig_resid; - int nbio; - struct pipe *wpipe; - struct pipe *rpipe; - u_int windex; - u_int space; - u_int wcount; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; + size_t windex; + size_t space; + size_t wcount; + size_t orig_resid; int bigwrite; int bigcount; + int error; + int nbio; + + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } + + /* + * Calculate nbio + */ + if (fflags & O_FBLOCKING) + nbio = 0; + else if (fflags & O_FNONBLOCKING) + nbio = 1; + else if (fp->f_flag & O_NONBLOCK) + nbio = 1; + else + nbio = 0; + + /* + * 'quick' NBIO test before things get expensive. + */ + if (nbio && wpb->size == (wpb->windex - wpb->rindex) && + uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) { + return EAGAIN; + } /* * Writes go to the peer. The peer will always exist. */ - rpipe = (struct pipe *) fp->f_data; - wpipe = rpipe->pipe_peer; - lwkt_gettoken(&wpipe->pipe_wlock); - if (wpipe->pipe_state & PIPE_WEOF) { - lwkt_reltoken(&wpipe->pipe_wlock); + lwkt_gettoken(&wpb->wlock); + if (wpb->state & PIPE_WEOF) { + lwkt_reltoken(&wpb->wlock); return (EPIPE); } @@ -711,52 +728,19 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * Degenerate case (EPIPE takes prec) */ if (uio->uio_resid == 0) { - lwkt_reltoken(&wpipe->pipe_wlock); + lwkt_reltoken(&wpb->wlock); return(0); } /* * Writes are serialized (start_uio must be called with wlock) */ - error = pipe_start_uio(wpipe, &wpipe->pipe_wip); + error = pipe_start_uio(&wpb->wip); if (error) { - lwkt_reltoken(&wpipe->pipe_wlock); + lwkt_reltoken(&wpb->wlock); return (error); } - if (fflags & O_FBLOCKING) - nbio = 0; - else if (fflags & O_FNONBLOCKING) - nbio = 1; - else if (fp->f_flag & O_NONBLOCK) - nbio = 1; - else - nbio = 0; - - /* - * If it is advantageous to resize the pipe buffer, do - * so. We are write-serialized so we can block safely. - */ - if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && - (pipe_nbig < pipe_maxbig) && - wpipe->pipe_wantwcnt > 4 && - (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { - /* - * Recheck after lock. - */ - lwkt_gettoken(&wpipe->pipe_rlock); - if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && - (pipe_nbig < pipe_maxbig) && - (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { - atomic_add_int(&pipe_nbig, 1); - if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) - ++pipe_bigcount; - else - atomic_subtract_int(&pipe_nbig, 1); - } - lwkt_reltoken(&wpipe->pipe_rlock); - } - orig_resid = uio->uio_resid; wcount = 0; @@ -764,7 +748,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) bigcount = 10; while (uio->uio_resid) { - if (wpipe->pipe_state & PIPE_WEOF) { + if (wpb->state & PIPE_WEOF) { error = EPIPE; break; } @@ -781,10 +765,8 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) } } - windex = wpipe->pipe_buffer.windex & - (wpipe->pipe_buffer.size - 1); - space = wpipe->pipe_buffer.size - - (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); + windex = wpb->windex & (wpb->size - 1); + space = wpb->size - (wpb->windex - wpb->rindex); cpu_lfence(); /* Writes of size <= PIPE_BUF must be atomic. */ @@ -797,22 +779,21 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * writes to spin. */ if (space > 0) { - u_int segsize; + size_t segsize; /* * Transfer size is minimum of uio transfer * and free space in pipe buffer. * - * Limit each uiocopy to no more then PIPE_SIZE + * Limit each uiocopy to no more then wpb->size * so we can keep the gravy train going on a - * SMP box. This doubles the performance for - * write sizes > 16K. Otherwise large writes - * wind up doing an inefficient synchronous - * ping-pong. + * SMP box. This significantly increases write + * performance. Otherwise large writes wind up doing + * an inefficient synchronous ping-pong. */ space = szmin(space, uio->uio_resid); - if (space > PIPE_SIZE) - space = PIPE_SIZE; + if (space > (wpb->size >> 1)) + space = (wpb->size >> 1); /* * First segment to transfer is minimum of @@ -821,7 +802,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * is less than the transfer size, we've got * a wraparound in the buffer. */ - segsize = wpipe->pipe_buffer.size - windex; + segsize = wpb->size - windex; if (segsize > space) segsize = space; @@ -837,25 +818,23 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * NOTE: We can't clear WANTR here without acquiring * the rlock, which we don't want to do here! */ - if ((wpipe->pipe_state & PIPE_WANTR)) - wakeup(wpipe); + if ((wpb->state & PIPE_WANTR)) + wakeup(wpb); /* * Transfer segment, which may include a wrap-around. * Update windex to account for both all in one go * so the reader can read() the data atomically. */ - error = uiomove(&wpipe->pipe_buffer.buffer[windex], - segsize, uio); + error = uiomove(&wpb->buffer[windex], segsize, uio); if (error == 0 && segsize < space) { segsize = space - segsize; - error = uiomove(&wpipe->pipe_buffer.buffer[0], - segsize, uio); + error = uiomove(&wpb->buffer[0], segsize, uio); } if (error) break; cpu_mfence(); - wpipe->pipe_buffer.windex += space; + wpb->windex += space; wcount += space; continue; } @@ -867,23 +846,23 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * These are token locks so we do not have to worry about * deadlocks. */ - lwkt_gettoken(&wpipe->pipe_rlock); + lwkt_gettoken(&wpb->rlock); /* * If the "read-side" has been blocked, wake it up now * and yield to let it drain synchronously rather * then block. */ - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); + if (wpb->state & PIPE_WANTR) { + wpb->state &= ~PIPE_WANTR; + wakeup(wpb); } /* * don't block on non-blocking I/O */ if (nbio) { - lwkt_reltoken(&wpipe->pipe_rlock); + lwkt_reltoken(&wpb->rlock); error = EAGAIN; break; } @@ -893,8 +872,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * acquiring both locks, in case the reader opened up * some space. */ - space = wpipe->pipe_buffer.size - - (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); + space = wpb->size - (wpb->windex - wpb->rindex); cpu_lfence(); if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) space = 0; @@ -903,8 +881,8 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * Retest EOF - acquiring a new token can temporarily release * tokens already held. */ - if (wpipe->pipe_state & PIPE_WEOF) { - lwkt_reltoken(&wpipe->pipe_rlock); + if (wpb->state & PIPE_WEOF) { + lwkt_reltoken(&wpb->rlock); error = EPIPE; break; } @@ -914,14 +892,12 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * wake up select/poll/kq. */ if (space == 0) { - wpipe->pipe_state |= PIPE_WANTW; - ++wpipe->pipe_wantwcnt; - pipewakeup(wpipe, 1); - if (wpipe->pipe_state & PIPE_WANTW) - error = tsleep(wpipe, PCATCH, "pipewr", 0); - ++pipe_wblocked_count; + wpb->state |= PIPE_WANTW; + pipewakeup(wpb, 1); + if (wpb->state & PIPE_WANTW) + error = tsleep(wpb, PCATCH, "pipewr", 0); } - lwkt_reltoken(&wpipe->pipe_rlock); + lwkt_reltoken(&wpb->rlock); /* * Break out if we errored or the read side wants us to go @@ -929,12 +905,12 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) */ if (error) break; - if (wpipe->pipe_state & PIPE_WEOF) { + if (wpb->state & PIPE_WEOF) { error = EPIPE; break; } } - pipe_end_uio(wpipe, &wpipe->pipe_wip); + pipe_end_uio(&wpb->wip); /* * If we have put any characters in the buffer, we wake up @@ -942,40 +918,41 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * * Both rlock and wlock are required to be able to modify pipe_state. */ - if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { - if (wpipe->pipe_state & PIPE_WANTR) { - lwkt_gettoken(&wpipe->pipe_rlock); - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - lwkt_reltoken(&wpipe->pipe_rlock); - wakeup(wpipe); + if (wpb->windex != wpb->rindex) { + if (wpb->state & PIPE_WANTR) { + lwkt_gettoken(&wpb->rlock); + if (wpb->state & PIPE_WANTR) { + wpb->state &= ~PIPE_WANTR; + lwkt_reltoken(&wpb->rlock); + wakeup(wpb); } else { - lwkt_reltoken(&wpipe->pipe_rlock); + lwkt_reltoken(&wpb->rlock); } } - lwkt_gettoken(&wpipe->pipe_rlock); - pipewakeup(wpipe, 1); - lwkt_reltoken(&wpipe->pipe_rlock); + lwkt_gettoken(&wpb->rlock); + pipewakeup(wpb, 1); + lwkt_reltoken(&wpb->rlock); } /* * Don't return EPIPE if I/O was successful */ - if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) && + if ((wpb->rindex == wpb->windex) && (uio->uio_resid == 0) && (error == EPIPE)) { error = 0; } if (error == 0) - vfs_timestamp(&wpipe->pipe_mtime); + vfs_timestamp(&wpb->mtime); /* * We have something to offer, * wake up select/poll/kq. */ - /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/ - lwkt_reltoken(&wpipe->pipe_wlock); + /*space = wpb->windex - wpb->rindex;*/ + lwkt_reltoken(&wpb->wlock); + return (error); } @@ -986,51 +963,56 @@ static int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred, struct sysmsg *msg) { - struct pipe *mpipe; + struct pipebuf *rpb; + struct pipe *pipe; int error; - mpipe = (struct pipe *)fp->f_data; + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + } else { + rpb = &pipe->bufferA; + } - lwkt_gettoken(&mpipe->pipe_rlock); - lwkt_gettoken(&mpipe->pipe_wlock); + lwkt_gettoken(&rpb->rlock); + lwkt_gettoken(&rpb->wlock); switch (cmd) { case FIOASYNC: if (*(int *)data) { - mpipe->pipe_state |= PIPE_ASYNC; + rpb->state |= PIPE_ASYNC; } else { - mpipe->pipe_state &= ~PIPE_ASYNC; + rpb->state &= ~PIPE_ASYNC; } error = 0; break; case FIONREAD: - *(int *)data = mpipe->pipe_buffer.windex - - mpipe->pipe_buffer.rindex; + *(int *)data = (int)(rpb->windex - rpb->rindex); error = 0; break; case FIOSETOWN: - error = fsetown(*(int *)data, &mpipe->pipe_sigio); + error = fsetown(*(int *)data, &rpb->sigio); break; case FIOGETOWN: - *(int *)data = fgetown(&mpipe->pipe_sigio); + *(int *)data = fgetown(&rpb->sigio); error = 0; break; case TIOCSPGRP: /* This is deprecated, FIOSETOWN should be used instead. */ - error = fsetown(-(*(int *)data), &mpipe->pipe_sigio); + error = fsetown(-(*(int *)data), &rpb->sigio); break; case TIOCGPGRP: /* This is deprecated, FIOGETOWN should be used instead. */ - *(int *)data = -fgetown(&mpipe->pipe_sigio); + *(int *)data = -fgetown(&rpb->sigio); error = 0; break; default: error = ENOTTY; break; } - lwkt_reltoken(&mpipe->pipe_wlock); - lwkt_reltoken(&mpipe->pipe_rlock); + lwkt_reltoken(&rpb->wlock); + lwkt_reltoken(&rpb->rlock); return (error); } @@ -1041,36 +1023,54 @@ pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, static int pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) { + struct pipebuf *rpb; struct pipe *pipe; - pipe = (struct pipe *)fp->f_data; + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + } else { + rpb = &pipe->bufferA; + } bzero((caddr_t)ub, sizeof(*ub)); ub->st_mode = S_IFIFO; - ub->st_blksize = pipe->pipe_buffer.size; - ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex; + ub->st_blksize = rpb->size; + ub->st_size = rpb->windex - rpb->rindex; ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; - ub->st_atimespec = pipe->pipe_atime; - ub->st_mtimespec = pipe->pipe_mtime; - ub->st_ctimespec = pipe->pipe_ctime; + ub->st_atimespec = rpb->atime; + ub->st_mtimespec = rpb->mtime; + ub->st_ctimespec = pipe->ctime; /* * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, * st_flags, st_gen. * XXX (st_dev, st_ino) should be unique. */ + return (0); } static int pipe_close(struct file *fp) { - struct pipe *cpipe; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; + + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } - cpipe = (struct pipe *)fp->f_data; fp->f_ops = &badfileops; fp->f_data = NULL; - funsetown(&cpipe->pipe_sigio); - pipeclose(cpipe); + funsetown(&rpb->sigio); + pipeclose(pipe, rpb, wpb); + return (0); } @@ -1080,60 +1080,67 @@ pipe_close(struct file *fp) static int pipe_shutdown(struct file *fp, int how) { - struct pipe *rpipe; - struct pipe *wpipe; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; int error = EPIPE; - rpipe = (struct pipe *)fp->f_data; - wpipe = rpipe->pipe_peer; + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } /* * We modify pipe_state on both pipes, which means we need * all four tokens! */ - lwkt_gettoken(&rpipe->pipe_rlock); - lwkt_gettoken(&rpipe->pipe_wlock); - lwkt_gettoken(&wpipe->pipe_rlock); - lwkt_gettoken(&wpipe->pipe_wlock); + lwkt_gettoken(&rpb->rlock); + lwkt_gettoken(&rpb->wlock); + lwkt_gettoken(&wpb->rlock); + lwkt_gettoken(&wpb->wlock); switch(how) { case SHUT_RDWR: case SHUT_RD: - rpipe->pipe_state |= PIPE_REOF; /* my reads */ - rpipe->pipe_state |= PIPE_WEOF; /* peer writes */ - if (rpipe->pipe_state & PIPE_WANTR) { - rpipe->pipe_state &= ~PIPE_WANTR; - wakeup(rpipe); + rpb->state |= PIPE_REOF; /* my reads */ + rpb->state |= PIPE_WEOF; /* peer writes */ + if (rpb->state & PIPE_WANTR) { + rpb->state &= ~PIPE_WANTR; + wakeup(rpb); } - if (rpipe->pipe_state & PIPE_WANTW) { - rpipe->pipe_state &= ~PIPE_WANTW; - wakeup(rpipe); + if (rpb->state & PIPE_WANTW) { + rpb->state &= ~PIPE_WANTW; + wakeup(rpb); } error = 0; if (how == SHUT_RD) break; /* fall through */ case SHUT_WR: - wpipe->pipe_state |= PIPE_REOF; /* peer reads */ - wpipe->pipe_state |= PIPE_WEOF; /* my writes */ - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); + wpb->state |= PIPE_REOF; /* peer reads */ + wpb->state |= PIPE_WEOF; /* my writes */ + if (wpb->state & PIPE_WANTR) { + wpb->state &= ~PIPE_WANTR; + wakeup(wpb); } - if (wpipe->pipe_state & PIPE_WANTW) { - wpipe->pipe_state &= ~PIPE_WANTW; - wakeup(wpipe); + if (wpb->state & PIPE_WANTW) { + wpb->state &= ~PIPE_WANTW; + wakeup(wpb); } error = 0; break; } - pipewakeup(rpipe, 1); - pipewakeup(wpipe, 1); + pipewakeup(rpb, 1); + pipewakeup(wpb, 1); - lwkt_reltoken(&wpipe->pipe_wlock); - lwkt_reltoken(&wpipe->pipe_rlock); - lwkt_reltoken(&rpipe->pipe_wlock); - lwkt_reltoken(&rpipe->pipe_rlock); + lwkt_reltoken(&wpb->wlock); + lwkt_reltoken(&wpb->rlock); + lwkt_reltoken(&rpb->wlock); + lwkt_reltoken(&rpb->rlock); return (error); } @@ -1142,94 +1149,75 @@ pipe_shutdown(struct file *fp, int how) * Destroy the pipe buffer. */ static void -pipe_free_kmem(struct pipe *cpipe) +pipe_free_kmem(struct pipebuf *pb) { - if (cpipe->pipe_buffer.buffer != NULL) { - if (cpipe->pipe_buffer.size > PIPE_SIZE) - atomic_subtract_int(&pipe_nbig, 1); - kmem_free(&kernel_map, - (vm_offset_t)cpipe->pipe_buffer.buffer, - cpipe->pipe_buffer.size); - cpipe->pipe_buffer.buffer = NULL; - cpipe->pipe_buffer.object = NULL; + if (pb->buffer != NULL) { + kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size); + pb->buffer = NULL; + pb->object = NULL; } } /* - * Close the pipe. The slock must be held to interlock against simultanious - * closes. The rlock and wlock must be held to adjust the pipe_state. + * Close one half of the pipe. We are closing the pipe for reading on rpb + * and writing on wpb. This routine must be called twice with the pipebufs + * reversed to close both directions. */ static void -pipeclose(struct pipe *cpipe) +pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb) { globaldata_t gd; - struct pipe *ppipe; - if (cpipe == NULL) + if (pipe == NULL) return; /* - * The slock may not have been allocated yet (close during - * initialization) - * * We need both the read and write tokens to modify pipe_state. */ - if (cpipe->pipe_slock) - lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE); - lwkt_gettoken(&cpipe->pipe_rlock); - lwkt_gettoken(&cpipe->pipe_wlock); + lwkt_gettoken(&rpb->rlock); + lwkt_gettoken(&rpb->wlock); /* * Set our state, wakeup anyone waiting in select/poll/kq, and - * wakeup anyone blocked on our pipe. + * wakeup anyone blocked on our pipe. No action if our side + * is already closed. */ - cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; - pipewakeup(cpipe, 1); - if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { - cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); - wakeup(cpipe); + if (rpb->state & PIPE_CLOSED) { + lwkt_reltoken(&rpb->wlock); + lwkt_reltoken(&rpb->rlock); + return; } - /* - * Disconnect from peer. - */ - if ((ppipe = cpipe->pipe_peer) != NULL) { - lwkt_gettoken(&ppipe->pipe_rlock); - lwkt_gettoken(&ppipe->pipe_wlock); - ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF; - pipewakeup(ppipe, 1); - if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { - ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); - wakeup(ppipe); - } - if (SLIST_FIRST(&ppipe->pipe_kq.ki_note)) - KNOTE(&ppipe->pipe_kq.ki_note, 0); - lwkt_reltoken(&ppipe->pipe_wlock); - lwkt_reltoken(&ppipe->pipe_rlock); + rpb->state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; + pipewakeup(rpb, 1); + if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) { + rpb->state &= ~(PIPE_WANTR | PIPE_WANTW); + wakeup(rpb); } + lwkt_reltoken(&rpb->wlock); + lwkt_reltoken(&rpb->rlock); /* - * If the peer is also closed we can free resources for both - * sides, otherwise we leave our side intact to deal with any - * races (since we only have the slock). + * Disconnect from peer. */ - if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) { - cpipe->pipe_peer = NULL; - ppipe->pipe_peer = NULL; - ppipe->pipe_slock = NULL; /* we will free the slock */ - pipeclose(ppipe); - ppipe = NULL; + lwkt_gettoken(&wpb->rlock); + lwkt_gettoken(&wpb->wlock); + + wpb->state |= PIPE_REOF | PIPE_WEOF; + pipewakeup(wpb, 1); + if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) { + wpb->state &= ~(PIPE_WANTR | PIPE_WANTW); + wakeup(wpb); } - - lwkt_reltoken(&cpipe->pipe_wlock); - lwkt_reltoken(&cpipe->pipe_rlock); - if (cpipe->pipe_slock) - lockmgr(cpipe->pipe_slock, LK_RELEASE); + if (SLIST_FIRST(&wpb->kq.ki_note)) + KNOTE(&wpb->kq.ki_note, 0); + lwkt_reltoken(&wpb->wlock); + lwkt_reltoken(&wpb->rlock); /* - * If we disassociated from our peer we can free resources. We - * maintain a pcpu cache to improve performance, so the actual - * tear-down case is limited to bulk situations. + * Free resources once both sides are closed. We maintain a pcpu + * cache to improve performance, so the actual tear-down case is + * limited to bulk situations. * * However, the bulk tear-down case can cause intense contention * on the kernel_map when, e.g. hundreds to hundreds of thousands @@ -1244,23 +1232,19 @@ pipeclose(struct pipe *cpipe) * mechanism will wind up waking them all up each time a lock * cycles. */ - if (ppipe == NULL) { + if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) { gd = mycpu; - if (cpipe->pipe_slock) { - kfree(cpipe->pipe_slock, M_PIPE); - cpipe->pipe_slock = NULL; - } - if (gd->gd_pipeqcount >= pipe_maxcache || - cpipe->pipe_buffer.size != PIPE_SIZE - ) { - mtx_lock(&pipe_gdlocks[gd->gd_cpuid]); - pipe_free_kmem(cpipe); - mtx_unlock(&pipe_gdlocks[gd->gd_cpuid]); - kfree(cpipe, M_PIPE); + if (gd->gd_pipeqcount >= pipe_maxcache) { + mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx); + pipe_free_kmem(rpb); + pipe_free_kmem(wpb); + mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx); + kfree(pipe, M_PIPE); } else { - cpipe->pipe_state = 0; - cpipe->pipe_peer = gd->gd_pipeq; - gd->gd_pipeq = cpipe; + rpb->state = 0; + wpb->state = 0; + pipe->next = gd->gd_pipeq; + gd->gd_pipeq = pipe; ++gd->gd_pipeqcount; } } @@ -1269,9 +1253,18 @@ pipeclose(struct pipe *cpipe) static int pipe_kqfilter(struct file *fp, struct knote *kn) { - struct pipe *cpipe; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; - cpipe = (struct pipe *)kn->kn_fp->f_data; + pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1); + if ((intptr_t)fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } switch (kn->kn_filter) { case EVFILT_READ: @@ -1279,7 +1272,7 @@ pipe_kqfilter(struct file *fp, struct knote *kn) break; case EVFILT_WRITE: kn->kn_fop = &pipe_wfiltops; - if (cpipe->pipe_peer == NULL) { + if (wpb->state & PIPE_CLOSED) { /* other end of pipe has been closed */ return (EPIPE); } @@ -1287,9 +1280,13 @@ pipe_kqfilter(struct file *fp, struct knote *kn) default: return (EOPNOTSUPP); } - kn->kn_hook = (caddr_t)cpipe; - knote_insert(&cpipe->pipe_kq.ki_note, kn); + if (rpb == &pipe->bufferA) + kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0); + else + kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1); + + knote_insert(&rpb->kq.ki_note, kn); return (0); } @@ -1297,24 +1294,45 @@ pipe_kqfilter(struct file *fp, struct knote *kn) static void filt_pipedetach(struct knote *kn) { - struct pipe *cpipe = (struct pipe *)kn->kn_hook; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; - knote_remove(&cpipe->pipe_kq.ki_note, kn); + pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1); + if ((intptr_t)kn->kn_hook & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } + knote_remove(&rpb->kq.ki_note, kn); } /*ARGSUSED*/ static int filt_piperead(struct knote *kn, long hint) { - struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; int ready = 0; - lwkt_gettoken(&rpipe->pipe_rlock); - lwkt_gettoken(&rpipe->pipe_wlock); + pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); + if ((intptr_t)kn->kn_fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } - kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + lwkt_gettoken(&rpb->rlock); + lwkt_gettoken(&rpb->wlock); - if (rpipe->pipe_state & PIPE_REOF) { + kn->kn_data = rpb->windex - rpb->rindex; + + if (rpb->state & PIPE_REOF) { /* * Only set NODATA if all data has been exhausted */ @@ -1324,8 +1342,8 @@ filt_piperead(struct knote *kn, long hint) ready = 1; } - lwkt_reltoken(&rpipe->pipe_wlock); - lwkt_reltoken(&rpipe->pipe_rlock); + lwkt_reltoken(&rpb->wlock); + lwkt_reltoken(&rpb->rlock); if (!ready) ready = kn->kn_data > 0; @@ -1337,31 +1355,39 @@ filt_piperead(struct knote *kn, long hint) static int filt_pipewrite(struct knote *kn, long hint) { - struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; - struct pipe *wpipe = rpipe->pipe_peer; + struct pipebuf *rpb; + struct pipebuf *wpb; + struct pipe *pipe; int ready = 0; + pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1); + if ((intptr_t)kn->kn_fp->f_data & 1) { + rpb = &pipe->bufferB; + wpb = &pipe->bufferA; + } else { + rpb = &pipe->bufferA; + wpb = &pipe->bufferB; + } + kn->kn_data = 0; - if (wpipe == NULL) { + if (wpb->state & PIPE_CLOSED) { kn->kn_flags |= (EV_EOF | EV_NODATA); return (1); } - lwkt_gettoken(&wpipe->pipe_rlock); - lwkt_gettoken(&wpipe->pipe_wlock); + lwkt_gettoken(&wpb->rlock); + lwkt_gettoken(&wpb->wlock); - if (wpipe->pipe_state & PIPE_WEOF) { + if (wpb->state & PIPE_WEOF) { kn->kn_flags |= (EV_EOF | EV_NODATA); ready = 1; } if (!ready) - kn->kn_data = wpipe->pipe_buffer.size - - (wpipe->pipe_buffer.windex - - wpipe->pipe_buffer.rindex); + kn->kn_data = wpb->size - (wpb->windex - wpb->rindex); - lwkt_reltoken(&wpipe->pipe_wlock); - lwkt_reltoken(&wpipe->pipe_rlock); + lwkt_reltoken(&wpb->wlock); + lwkt_reltoken(&wpb->rlock); if (!ready) ready = kn->kn_data >= PIPE_BUF; diff --git a/sys/sys/pipe.h b/sys/sys/pipe.h index c5de382916..c7955e9986 100644 --- a/sys/sys/pipe.h +++ b/sys/sys/pipe.h @@ -1,6 +1,10 @@ /* * Copyright (c) 1996 John S. Dyson * All rights reserved. + * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved. + * + * This code is derived from software contributed to The DragonFly Project + * by Matthew Dillon * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions @@ -17,9 +21,6 @@ * is allowed if this notation is included. * 5. Modifications may be freely made to this file if the above conditions * are met. - * - * $FreeBSD: src/sys/sys/pipe.h,v 1.16 1999/12/29 04:24:45 peter Exp $ - * $DragonFly: src/sys/sys/pipe.h,v 1.10 2006/06/10 20:00:17 dillon Exp $ */ #ifndef _SYS_PIPE_H_ @@ -46,39 +47,34 @@ #include /* for PAGE_SIZE */ #endif -/* - * Pipe buffer size, keep moderate in value, pipes take kva space. - * Must be a multiple of PAGE_SIZE. - */ -#ifndef PIPE_SIZE -#define PIPE_SIZE 16384 -#endif - -#ifndef BIG_PIPE_SIZE -#define BIG_PIPE_SIZE (64*1024) -#endif - -#if PIPE_SIZE < PAGE_SIZE -#error "PIPE_SIZE is too small for this architecture" -#endif - /* * Pipe buffer information. - * Separate in, out, cnt are used to simplify calculations. - * Buffered write is active when the buffer.cnt field is set. */ struct pipebuf { - u_int rindex; /* FIFO read index */ - u_int dummy1[3]; /* cache-align */ - u_int windex; /* FIFO write index */ - u_int dummy2[3]; /* cache-align */ - u_int size; /* size of buffer */ - caddr_t buffer; /* kva of buffer */ - struct vm_object *object; /* VM object containing buffer */ -}; + struct { + struct lwkt_token rlock; + size_t rindex; /* current read index (FIFO) */ + int32_t rip; /* blocking read requested (FIFO) */ + int32_t unu01; /* blocking read requested (FIFO) */ + struct timespec atime; /* time of last access */ + } __cachealign; + struct { + struct lwkt_token wlock; + size_t windex; /* current write index (FIFO) */ + int32_t wip; + int32_t unu02; + struct timespec mtime; /* time of last modify */ + } __cachealign; + size_t size; /* size of buffer */ + caddr_t buffer; /* kva of buffer */ + struct vm_object *object; /* VM object containing buffer */ + struct kqinfo kq; /* for compat with select/poll/kq */ + struct sigio *sigio; /* information for async I/O */ + uint32_t state; /* pipe status info */ +} __cachealign; /* - * Bits in pipe_state. + * Bits in pipebuf.state. */ #define PIPE_ASYNC 0x0004 /* Async? I/O */ #define PIPE_WANTR 0x0008 /* Reader wants some characters */ @@ -88,25 +84,16 @@ struct pipebuf { #define PIPE_CLOSED 0x1000 /* Pipe has been closed */ /* - * Per-pipe data structure. - * Two of these are linked together to produce bi-directional pipes. + * The pipe() data structure encompasses two pipes. Bit 0 in fp->f_data + * denotes which. */ struct pipe { - struct pipebuf pipe_buffer; /* data storage */ - struct kqinfo pipe_kq; /* for compat with select/poll/kq */ - struct timespec pipe_atime; /* time of last access */ - struct timespec pipe_mtime; /* time of last modify */ - struct timespec pipe_ctime; /* time of status change */ - struct sigio *pipe_sigio; /* information for async I/O */ - struct pipe *pipe_peer; /* link with other direction */ - u_int pipe_state; /* pipe status info */ - int pipe_rip; /* read uio in-progress */ - int pipe_wip; /* write uio in-progress */ - u_int pipe_wantwcnt; /* for resize */ - struct lwkt_token pipe_rlock; /* rindex locked */ - struct lwkt_token pipe_wlock; /* windex locked */ - struct lock *pipe_slock; /* state locked (shared w/peer) */ -}; + struct pipebuf bufferA; /* data storage */ + struct pipebuf bufferB; /* data storage */ + struct timespec ctime; /* time of status change */ + struct pipe *next; + uint32_t open_count; +} __cachealign; #endif /* _KERNEL || _KERNEL_STRUCTURES */ #endif /* !_SYS_PIPE_H_ */