From 1ae37239759b49870dcaae946fc436b7f9f53f73 Mon Sep 17 00:00:00 2001 From: Matthew Dillon Date: Sun, 12 Jul 2009 15:42:43 -0700 Subject: [PATCH] pipe - Make pipe r/w MPSAFE, add kern.pipe.mpsafe (disabled by default) * Make pipe_read and pipe_write MPSAFE. * Add a sysctl kern.pipe.mpsafe which defaults to disabled. Set to 1 to test the MPSAFE pipe code. The expectation is that it will be set to 1 for the release. Currently only pipe_read and pipe_write is MPSAFE. * The new code in mpsafe mode also implements a streaming optimization to avoid unnecessary tsleep/wakeup/IPIs. If the reader and writer are operating on different cpus this feature results in more uniform performance across a wide swath of block sizes. * The new code currently does not use any page mapping optimizations. Page table overhead is fairly nasty on SMP so for now we rely on cpu caches and do an extra copy. This means the code is tuned better for more recent cpus and tuned worse for older cpus. At least for now. OLD pipe code: dwe = dwrite_enable, sfb = dwrite_sfbuf mode NEW pipe code: mpsafe = 0 (gets bgl) or 1 (does not use bgl) Using /usr/src/test/sysperf/pipe2.c to test, all results in MBytes/sec 8K 16K 32K 64K 128K 256K ---- ---- ---- ---- ---- ---- OLD dwe=0 1193 1167 1555 1525 1473 1477 OLD dwe=1 sfb=0 856 1458 2307 2182 2275 2307 OLD dwe=1 sfb=1 955 1537 2300 2356 2363 2708 OLD dwe=1 sfb=2 939 1561 2367 2477 2379 2360 NEW mpsafe=0 1150 1369 1536 1591 1358 1270 NEW mpsafe=1 2133 2319 2375 2387 2396 2418 --- sys/kern/sys_pipe.c | 865 ++++++++++++++++++++++++++++---------------- sys/sys/pipe.h | 22 +- 2 files changed, 577 insertions(+), 310 deletions(-) diff --git a/sys/kern/sys_pipe.c b/sys/kern/sys_pipe.c index f77dd1c7f8..485fe38a21 100644 --- a/sys/kern/sys_pipe.c +++ b/sys/kern/sys_pipe.c @@ -132,6 +132,11 @@ SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); +#ifdef SMP +static int pipe_mpsafe = 0; +SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe, + CTLFLAG_RW, &pipe_mpsafe, 0, ""); +#endif #if !defined(NO_PIPE_SYSCTL_STATS) SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); @@ -142,11 +147,89 @@ SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, static void pipeclose (struct pipe *cpipe); static void pipe_free_kmem (struct pipe *cpipe); static int pipe_create (struct pipe **cpipep); -static __inline int pipelock (struct pipe *cpipe, int catch); -static __inline void pipeunlock (struct pipe *cpipe); static __inline void pipeselwakeup (struct pipe *cpipe); static int pipespace (struct pipe *cpipe, int size); +static __inline void +pipeselwakeup(struct pipe *cpipe) +{ + if (cpipe->pipe_state & PIPE_SEL) { + get_mplock(); + cpipe->pipe_state &= ~PIPE_SEL; + selwakeup(&cpipe->pipe_sel); + rel_mplock(); + } + if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) { + get_mplock(); + pgsigio(cpipe->pipe_sigio, SIGIO, 0); + rel_mplock(); + } + if (SLIST_FIRST(&cpipe->pipe_sel.si_note)) { + get_mplock(); + KNOTE(&cpipe->pipe_sel.si_note, 0); + rel_mplock(); + } +} + +/* + * These routines are called before and after a UIO. The UIO + * may block, causing our held tokens to be lost temporarily. + * + * We use these routines to serialize reads against other reads + * and writes against other writes. + * + * The read token is held on entry so *ipp does not race. + */ +static __inline int +pipe_start_uio(struct pipe *cpipe, u_int *ipp) +{ + int error; + + while (*ipp) { + *ipp = -1; + error = tsleep(ipp, PCATCH, "pipexx", 0); + if (error) + return (error); + } + *ipp = 1; + return (0); +} + +static __inline void +pipe_end_uio(struct pipe *cpipe, u_int *ipp) +{ + if (*ipp < 0) { + *ipp = 0; + wakeup(ipp); + } else { + *ipp = 0; + } +} + +static __inline void +pipe_get_mplock(int *save) +{ +#ifdef SMP + if (pipe_mpsafe == 0) { + get_mplock(); + *save = 1; + } else +#endif + { + *save = 0; + } +} + +static __inline void +pipe_rel_mplock(int *save) +{ +#ifdef SMP + if (*save) + rel_mplock(); +#endif +} + + /* * The pipe system call for the DTYPE_PIPE type of pipes * @@ -204,9 +287,17 @@ sys_pipe(struct pipe_args *uap) wf->f_data = wpipe; uap->sysmsg_fds[1] = fd2; + rpipe->pipe_slock = kmalloc(sizeof(struct lock), + M_PIPE, M_WAITOK|M_ZERO); + wpipe->pipe_slock = rpipe->pipe_slock; rpipe->pipe_peer = wpipe; wpipe->pipe_peer = rpipe; + lockinit(rpipe->pipe_slock, "pipecl", 0, 0); + /* + * Once activated the peer relationship remains valid until + * both sides are closed. + */ fsetfd(p, rf, fd1); fsetfd(p, wf, fd2); fdrop(rf); @@ -238,6 +329,7 @@ pipespace(struct pipe *cpipe, int size) * ones. */ if (object == NULL || object->size != npages) { + get_mplock(); object = vm_object_allocate(OBJT_DEFAULT, npages); buffer = (caddr_t)vm_map_min(&kernel_map); @@ -250,9 +342,11 @@ pipespace(struct pipe *cpipe, int size) if (error != KERN_SUCCESS) { vm_object_deallocate(object); + rel_mplock(); return (ENOMEM); } pipe_free_kmem(cpipe); + rel_mplock(); cpipe->pipe_buffer.object = object; cpipe->pipe_buffer.buffer = buffer; cpipe->pipe_buffer.size = size; @@ -281,6 +375,7 @@ pipe_create(struct pipe **cpipep) gd->gd_pipeq = cpipe->pipe_peer; --gd->gd_pipeqcount; cpipe->pipe_peer = NULL; + cpipe->pipe_wantwcnt = 0; } else { cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); } @@ -290,55 +385,11 @@ pipe_create(struct pipe **cpipep) vfs_timestamp(&cpipe->pipe_ctime); cpipe->pipe_atime = cpipe->pipe_ctime; cpipe->pipe_mtime = cpipe->pipe_ctime; + lwkt_token_init(&cpipe->pipe_rlock); + lwkt_token_init(&cpipe->pipe_wlock); return (0); } - -/* - * lock a pipe for I/O, blocking other access - */ -static __inline int -pipelock(struct pipe *cpipe, int catch) -{ - int error; - - while (cpipe->pipe_state & PIPE_LOCK) { - cpipe->pipe_state |= PIPE_LWANT; - error = tsleep(cpipe, (catch ? PCATCH : 0), "pipelk", 0); - if (error != 0) - return (error); - } - cpipe->pipe_state |= PIPE_LOCK; - return (0); -} - -/* - * unlock a pipe I/O lock - */ -static __inline void -pipeunlock(struct pipe *cpipe) -{ - - cpipe->pipe_state &= ~PIPE_LOCK; - if (cpipe->pipe_state & PIPE_LWANT) { - cpipe->pipe_state &= ~PIPE_LWANT; - wakeup(cpipe); - } -} - -static __inline void -pipeselwakeup(struct pipe *cpipe) -{ - - if (cpipe->pipe_state & PIPE_SEL) { - cpipe->pipe_state &= ~PIPE_SEL; - selwakeup(&cpipe->pipe_sel); - } - if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) - pgsigio(cpipe->pipe_sigio, SIGIO, 0); - KNOTE(&cpipe->pipe_sel.si_note, 0); -} - /* * MPALMOSTSAFE (acquires mplock) */ @@ -350,14 +401,25 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) int nread = 0; int nbio; u_int size; /* total bytes available */ + u_int nsize; /* total bytes to read */ u_int rindex; /* contiguous bytes available */ + u_int half_way; + lwkt_tokref rlock; + lwkt_tokref wlock; + int mpsave; - get_mplock(); - rpipe = (struct pipe *) fp->f_data; - ++rpipe->pipe_busy; - error = pipelock(rpipe, 1); - if (error) - goto unlocked_error; + /* + * Degenerate case + */ + if (uio->uio_resid == 0) + return(0); + + /* + * Setup locks, calculate nbio + */ + pipe_get_mplock(&mpsave); + rpipe = (struct pipe *)fp->f_data; + lwkt_gettoken(&rlock, &rpipe->pipe_rlock); if (fflags & O_FBLOCKING) nbio = 0; @@ -368,107 +430,193 @@ pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) else nbio = 0; + /* + * Reads are serialized. Note howeverthat pipe_buffer.buffer and + * pipe_buffer.size can change out from under us when the number + * of bytes in the buffer are zero due to the write-side doing a + * pipespace(). + */ + error = pipe_start_uio(rpipe, &rpipe->pipe_rip); + if (error) { + pipe_rel_mplock(&mpsave); + lwkt_reltoken(&rlock); + return (error); + } while (uio->uio_resid) { size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + cpu_lfence(); if (size) { rindex = rpipe->pipe_buffer.rindex & (rpipe->pipe_buffer.size - 1); - if (size > rpipe->pipe_buffer.size - rindex) - size = rpipe->pipe_buffer.size - rindex; - if (size > (u_int)uio->uio_resid) - size = (u_int)uio->uio_resid; + nsize = size; + if (nsize > rpipe->pipe_buffer.size - rindex) + nsize = rpipe->pipe_buffer.size - rindex; + if (nsize > (u_int)uio->uio_resid) + nsize = (u_int)uio->uio_resid; error = uiomove(&rpipe->pipe_buffer.buffer[rindex], - size, uio); + nsize, uio); if (error) break; - rpipe->pipe_buffer.rindex += size; + cpu_mfence(); + rpipe->pipe_buffer.rindex += nsize; + nread += nsize; /* - * If there is no more to read in the pipe, reset - * its pointers to the beginning. This improves - * cache hit stats. + * Shortcut to the loop-up if there is no writer + * waiting or if we have not transitioned across + * the half-way point. */ - if (rpipe->pipe_buffer.rindex == - rpipe->pipe_buffer.windex) { - rpipe->pipe_buffer.rindex = 0; - rpipe->pipe_buffer.windex = 0; + half_way = rpipe->pipe_buffer.size >> 1; + if ((rpipe->pipe_state & PIPE_WANTW) == 0 || + size <= half_way || size - nsize > half_way) { + } { + continue; } - nread += size; - } else { - /* - * detect EOF condition - * read returns 0 on EOF, no need to set error - */ - if (rpipe->pipe_state & PIPE_EOF) - break; + } - /* - * If the "write-side" has been blocked, wake it up now. - */ + /* + * If the "write-side" was blocked we wake it up. This code + * is reached either when the buffer is completely emptied + * or if it becomes more then half-empty. + * + * Pipe_state can only be modified if both the rlock and + * wlock are held. + */ + if (rpipe->pipe_state & PIPE_WANTW) { + lwkt_gettoken(&wlock, &rpipe->pipe_wlock); if (rpipe->pipe_state & PIPE_WANTW) { rpipe->pipe_state &= ~PIPE_WANTW; + lwkt_reltoken(&wlock); wakeup(rpipe); + } else { + lwkt_reltoken(&wlock); } + } - /* - * Break if some data was read. - */ - if (nread > 0) - break; + /* + * Pick up our copy loop again if the writer sent data to + * us. + */ + size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + if (size) + continue; - /* - * Unlock the pipe buffer for our remaining - * processing. We will either break out with an - * error or we will sleep and relock to loop. - */ - pipeunlock(rpipe); + /* + * Detect EOF condition, do not set error. + */ + if (rpipe->pipe_state & PIPE_REOF) + break; - /* - * Handle non-blocking mode operation or - * wait for more data. - */ - if (nbio) { - error = EAGAIN; - } else { - rpipe->pipe_state |= PIPE_WANTR; - if ((error = tsleep(rpipe, PCATCH, - "piperd", 0)) == 0) { - error = pipelock(rpipe, 1); - } - } - if (error) - goto unlocked_error; +#ifdef SMP + /* + * Gravy train if SMP box. This saves a ton of IPIs and + * allows two cpus to operate in lockstep. + * + * XXX check pipe_wip also? + */ + DELAY(1); + size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + if (size) + continue; +#endif + + /* + * Break if some data was read, or if this was a non-blocking + * read. + */ + if (nread > 0) + break; + + if (nbio) { + error = EAGAIN; + break; + } + + /* + * Last chance, interlock with WANTR. + */ + lwkt_gettoken(&wlock, &rpipe->pipe_wlock); + size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + if (size) { + lwkt_reltoken(&wlock); + continue; } + + /* + * If there is no more to read in the pipe, reset its + * pointers to the beginning. This improves cache hit + * stats. + * + * We need both locks to modify both pointers, and there + * must also not be a write in progress or the uiomove() + * in the write might block and temporarily release + * its wlock, then reacquire and update windex. We are + * only serialized against reads, not writes. + * + * XXX should we even bother resetting the indices? It + * might actually be more cache efficient not to. + */ + if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex && + rpipe->pipe_wip == 0) { + rpipe->pipe_buffer.rindex = 0; + rpipe->pipe_buffer.windex = 0; + } + + /* + * Wait for more data. + * + * Pipe_state can only be set if both the rlock and wlock + * are held. + */ + rpipe->pipe_state |= PIPE_WANTR; + tsleep_interlock(rpipe); + lwkt_reltoken(&wlock); + error = tsleep(rpipe, PCATCH, "piperd", 0); + if (error) + break; } - pipeunlock(rpipe); + pipe_end_uio(rpipe, &rpipe->pipe_rip); - if (error == 0) + /* + * Uptime last access time + */ + if (error == 0 && nread) vfs_timestamp(&rpipe->pipe_atime); -unlocked_error: - --rpipe->pipe_busy; +#if 0 /* - * PIPE_WANT processing only makes sense if pipe_busy is 0. + * Handle write blocking hysteresis. size can only increase while + * we hold the rlock. + * + * XXX shouldn't need this any more. We will wakeup the writer + * when we've drained past half-way. The worst the writer + * can do is fill the buffer up, not make it smaller, so + * we are guaranteed our half-way test. */ - size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; - - if ((rpipe->pipe_busy == 0) && (rpipe->pipe_state & PIPE_WANT)) { - rpipe->pipe_state &= ~(PIPE_WANT|PIPE_WANTW); - wakeup(rpipe); - } else if (size < MINPIPESIZE) { - /* - * Handle write blocking hysteresis. - */ - if (rpipe->pipe_state & PIPE_WANTW) { - rpipe->pipe_state &= ~PIPE_WANTW; - wakeup(rpipe); + if (rpipe->pipe_state & PIPE_WANTW) { + size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + if (size <= (rpipe->pipe_buffer.size >> 1)) { + lwkt_gettoken(&wlock, &rpipe->pipe_wlock); + if (rpipe->pipe_state & PIPE_WANTW) { + rpipe->pipe_state &= ~PIPE_WANTW; + lwkt_reltoken(&wlock); + wakeup(rpipe); + } else { + lwkt_reltoken(&wlock); + } } } +#endif + size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; + lwkt_reltoken(&rlock); + /* + * If enough space is available in buffer then wakeup sel writers? + */ if ((rpipe->pipe_buffer.size - size) >= PIPE_BUF) pipeselwakeup(rpipe); - rel_mplock(); + pipe_rel_mplock(&mpsave); return (error); } @@ -478,25 +626,49 @@ unlocked_error: static int pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) { - int error = 0; + int error; int orig_resid; int nbio; struct pipe *wpipe, *rpipe; + lwkt_tokref rlock; + lwkt_tokref wlock; u_int windex; u_int space; + u_int wcount; + int mpsave; - get_mplock(); + pipe_get_mplock(&mpsave); + + /* + * Writes go to the peer. The peer will always exist. + */ rpipe = (struct pipe *) fp->f_data; wpipe = rpipe->pipe_peer; + lwkt_gettoken(&wlock, &wpipe->pipe_wlock); + if (wpipe->pipe_state & PIPE_WEOF) { + pipe_rel_mplock(&mpsave); + lwkt_reltoken(&wlock); + return (EPIPE); + } /* - * detect loss of pipe read side, issue SIGPIPE if lost. + * Degenerate case (EPIPE takes prec) */ - if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { - rel_mplock(); - return (EPIPE); + if (uio->uio_resid == 0) { + pipe_rel_mplock(&mpsave); + lwkt_reltoken(&wlock); + return(0); + } + + /* + * Writes are serialized (start_uio must be called with wlock) + */ + error = pipe_start_uio(wpipe, &wpipe->pipe_wip); + if (error) { + pipe_rel_mplock(&mpsave); + lwkt_reltoken(&wlock); + return (error); } - ++wpipe->pipe_busy; if (fflags & O_FBLOCKING) nbio = 0; @@ -509,48 +681,32 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) /* * If it is advantageous to resize the pipe buffer, do - * so. + * so. We are write-serialized so we can block safely. */ - if ((uio->uio_resid > PIPE_SIZE) && + if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && (pipe_nbig < pipe_maxbig) && - (wpipe->pipe_buffer.size <= PIPE_SIZE) && - (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) && - (error = pipelock(wpipe, 1)) == 0) { + wpipe->pipe_wantwcnt > 4 && + (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { /* * Recheck after lock. */ - if ((pipe_nbig < pipe_maxbig) && - (wpipe->pipe_buffer.size <= PIPE_SIZE) && + lwkt_gettoken(&rlock, &wpipe->pipe_rlock); + if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && + (pipe_nbig < pipe_maxbig) && (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) { ++pipe_bigcount; pipe_nbig++; } } - pipeunlock(wpipe); - } - - /* - * If an early error occured unbusy and return, waking up any pending - * readers. - */ - if (error) { - --wpipe->pipe_busy; - if ((wpipe->pipe_busy == 0) && - (wpipe->pipe_state & PIPE_WANT)) { - wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); - wakeup(wpipe); - } - rel_mplock(); - return(error); + lwkt_reltoken(&rlock); } - - KASSERT(wpipe->pipe_buffer.buffer != NULL, ("pipe buffer gone")); orig_resid = uio->uio_resid; + wcount = 0; while (uio->uio_resid) { - if (wpipe->pipe_state & PIPE_EOF) { + if (wpipe->pipe_state & PIPE_WEOF) { error = EPIPE; break; } @@ -559,6 +715,7 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) (wpipe->pipe_buffer.size - 1); space = wpipe->pipe_buffer.size - (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); + cpu_lfence(); /* Writes of size <= PIPE_BUF must be atomic. */ if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) @@ -570,120 +727,145 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * writes to spin. */ if (space > 0) { - if ((error = pipelock(wpipe,1)) == 0) { - u_int segsize; - - /* - * If a process blocked in uiomove, our - * value for space might be bad. - * - * XXX will we be ok if the reader has gone - * away here? - */ - if (space > (wpipe->pipe_buffer.size - - (wpipe->pipe_buffer.windex - - wpipe->pipe_buffer.rindex))) { - pipeunlock(wpipe); - continue; - } - windex = wpipe->pipe_buffer.windex & - (wpipe->pipe_buffer.size - 1); - - /* - * Transfer size is minimum of uio transfer - * and free space in pipe buffer. - */ - if (space > (u_int)uio->uio_resid) - space = (u_int)uio->uio_resid; - - /* - * First segment to transfer is minimum of - * transfer size and contiguous space in - * pipe buffer. If first segment to transfer - * is less than the transfer size, we've got - * a wraparound in the buffer. - */ - segsize = wpipe->pipe_buffer.size - windex; - if (segsize > space) - segsize = space; - - /* Transfer first segment */ - - error = uiomove( - &wpipe->pipe_buffer.buffer[windex], - segsize, uio); - - if (error == 0 && segsize < space) { - /* - * Transfer remaining part now, to - * support atomic writes. Wraparound - * happened. - */ - error = uiomove(&wpipe->pipe_buffer. - buffer[0], - space - segsize, uio); - } - if (error == 0) - wpipe->pipe_buffer.windex += space; - pipeunlock(wpipe); - } - if (error) - break; + u_int segsize; - } else { /* - * If the "read-side" has been blocked, wake it up now - * and yield to let it drain synchronously rather - * then block. + * Transfer size is minimum of uio transfer + * and free space in pipe buffer. + * + * Limit each uiocopy to no more then PIPE_SIZE + * so we can keep the gravy train going on a + * SMP box. This doubles the performance for + * write sizes > 16K. Otherwise large writes + * wind up doing an inefficient synchronous + * ping-pong. */ - if (wpipe->pipe_state & PIPE_WANTR) { - wpipe->pipe_state &= ~PIPE_WANTR; - wakeup(wpipe); - } + if (space > (u_int)uio->uio_resid) + space = (u_int)uio->uio_resid; + if (space > PIPE_SIZE) + space = PIPE_SIZE; /* - * don't block on non-blocking I/O + * First segment to transfer is minimum of + * transfer size and contiguous space in + * pipe buffer. If first segment to transfer + * is less than the transfer size, we've got + * a wraparound in the buffer. */ - if (nbio) { - error = EAGAIN; - break; - } + segsize = wpipe->pipe_buffer.size - windex; + if (segsize > space) + segsize = space; /* - * We have no more space and have something to offer, - * wake up select/poll. + * If this is the first loop and the reader is + * blocked, do a preemptive wakeup of the reader. + * + * This works for both SMP and UP. On SMP the IPI + * latency plus the wlock interlock on the reader + * side is the fastest way to get the reader going. + * (The scheduler will hard loop on lock tokens). + * + * NOTE: We can't clear WANTR here without acquiring + * the rlock, which we don't want to do here! */ - pipeselwakeup(wpipe); + if (wpipe->pipe_state & PIPE_WANTR) + wakeup(wpipe); - wpipe->pipe_state |= PIPE_WANTW; - error = tsleep(wpipe, PCATCH, "pipewr", 0); - if (error != 0) - break; /* - * If read side wants to go away, we just issue a signal - * to ourselves. + * Transfer first segment */ - if (wpipe->pipe_state & PIPE_EOF) { - error = EPIPE; + error = uiomove(&wpipe->pipe_buffer.buffer[windex], + segsize, uio); + cpu_mfence(); + wpipe->pipe_buffer.windex += segsize; + + if (error == 0 && segsize < space) { + /* + * Transfer remaining part now, to + * support atomic writes. Wraparound + * happened. + */ + segsize = space - segsize; + error = uiomove(&wpipe->pipe_buffer.buffer[0], + segsize, uio); + cpu_mfence(); + wpipe->pipe_buffer.windex += segsize; + } + if (error) break; - } + wcount += space; + continue; } - } - --wpipe->pipe_busy; + /* + * We need both the rlock and the wlock to interlock against + * the EOF, WANTW, and size checks, and to modify pipe_state. + * + * These are token locks so we do not have to worry about + * deadlocks. + */ + lwkt_gettoken(&rlock, &wpipe->pipe_rlock); - if ((wpipe->pipe_busy == 0) && (wpipe->pipe_state & PIPE_WANT)) { - wpipe->pipe_state &= ~(PIPE_WANT | PIPE_WANTR); - wakeup(wpipe); - } else if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { /* - * If we have put any characters in the buffer, we wake up - * the reader. + * If the "read-side" has been blocked, wake it up now + * and yield to let it drain synchronously rather + * then block. */ if (wpipe->pipe_state & PIPE_WANTR) { wpipe->pipe_state &= ~PIPE_WANTR; wakeup(wpipe); } + + /* + * don't block on non-blocking I/O + */ + if (nbio) { + lwkt_reltoken(&rlock); + error = EAGAIN; + break; + } + + /* + * We have no more space and have something to offer, + * wake up select/poll. + */ + pipeselwakeup(wpipe); + + ++wpipe->pipe_wantwcnt; /* don't care about overflow */ + wpipe->pipe_state |= PIPE_WANTW; + error = tsleep(wpipe, PCATCH, "pipewr", 0); + lwkt_reltoken(&rlock); + + /* + * Break out if we errored or the read side wants us to go + * away. + */ + if (error) + break; + if (wpipe->pipe_state & PIPE_WEOF) { + error = EPIPE; + break; + } + } + pipe_end_uio(wpipe, &wpipe->pipe_wip); + + /* + * If we have put any characters in the buffer, we wake up + * the reader. + * + * Both rlock and wlock are required to be able to modify pipe_state. + */ + if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { + if (wpipe->pipe_state & PIPE_WANTR) { + lwkt_gettoken(&rlock, &wpipe->pipe_rlock); + if (wpipe->pipe_state & PIPE_WANTR) { + wpipe->pipe_state &= ~PIPE_WANTR; + lwkt_reltoken(&rlock); + wakeup(wpipe); + } else { + lwkt_reltoken(&rlock); + } + } } /* @@ -702,9 +884,11 @@ pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) * We have something to offer, * wake up select/poll. */ - if (wpipe->pipe_buffer.rindex != wpipe->pipe_buffer.windex) + space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex; + lwkt_reltoken(&wlock); + if (space) pipeselwakeup(wpipe); - rel_mplock(); + pipe_rel_mplock(&mpsave); return (error); } @@ -771,19 +955,20 @@ pipe_poll(struct file *fp, int events, struct ucred *cred) struct pipe *wpipe; int revents = 0; u_int space; + int mpsave; - get_mplock(); + pipe_get_mplock(&mpsave); rpipe = (struct pipe *)fp->f_data; wpipe = rpipe->pipe_peer; if (events & (POLLIN | POLLRDNORM)) { if ((rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) || - (rpipe->pipe_state & PIPE_EOF)) { + (rpipe->pipe_state & PIPE_REOF)) { revents |= events & (POLLIN | POLLRDNORM); } } if (events & (POLLOUT | POLLWRNORM)) { - if (wpipe == NULL || (wpipe->pipe_state & PIPE_EOF)) { + if (wpipe == NULL || (wpipe->pipe_state & PIPE_WEOF)) { revents |= events & (POLLOUT | POLLWRNORM); } else { space = wpipe->pipe_buffer.windex - @@ -794,9 +979,9 @@ pipe_poll(struct file *fp, int events, struct ucred *cred) } } - if ((rpipe->pipe_state & PIPE_EOF) || + if ((rpipe->pipe_state & PIPE_REOF) || (wpipe == NULL) || - (wpipe->pipe_state & PIPE_EOF)) + (wpipe->pipe_state & PIPE_WEOF)) revents |= POLLHUP; if (revents == 0) { @@ -810,7 +995,7 @@ pipe_poll(struct file *fp, int events, struct ucred *cred) wpipe->pipe_state |= PIPE_SEL; } } - rel_mplock(); + pipe_rel_mplock(&mpsave); return (revents); } @@ -821,8 +1006,9 @@ static int pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) { struct pipe *pipe; + int mpsave; - get_mplock(); + pipe_get_mplock(&mpsave); pipe = (struct pipe *)fp->f_data; bzero((caddr_t)ub, sizeof(*ub)); @@ -838,7 +1024,7 @@ pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) * st_flags, st_gen. * XXX (st_dev, st_ino) should be unique. */ - rel_mplock(); + pipe_rel_mplock(&mpsave); return (0); } @@ -871,33 +1057,65 @@ pipe_shutdown(struct file *fp, int how) struct pipe *rpipe; struct pipe *wpipe; int error = EPIPE; + lwkt_tokref rpipe_rlock; + lwkt_tokref rpipe_wlock; + lwkt_tokref wpipe_rlock; + lwkt_tokref wpipe_wlock; + int mpsave; - get_mplock(); + pipe_get_mplock(&mpsave); rpipe = (struct pipe *)fp->f_data; + wpipe = rpipe->pipe_peer; + + /* + * We modify pipe_state on both pipes, which means we need + * all four tokens! + */ + lwkt_gettoken(&rpipe_rlock, &rpipe->pipe_rlock); + lwkt_gettoken(&rpipe_wlock, &rpipe->pipe_wlock); + lwkt_gettoken(&wpipe_rlock, &wpipe->pipe_rlock); + lwkt_gettoken(&wpipe_wlock, &wpipe->pipe_wlock); switch(how) { case SHUT_RDWR: case SHUT_RD: - if (rpipe) { - rpipe->pipe_state |= PIPE_EOF; - pipeselwakeup(rpipe); - if (rpipe->pipe_busy) - wakeup(rpipe); - error = 0; + rpipe->pipe_state |= PIPE_REOF; + wpipe->pipe_state |= PIPE_WEOF; + if (rpipe->pipe_state & PIPE_WANTR) { + rpipe->pipe_state &= ~PIPE_WANTR; + wakeup(rpipe); } + if (wpipe->pipe_state & PIPE_WANTW) { + wpipe->pipe_state &= ~PIPE_WANTW; + wakeup(wpipe); + } + pipeselwakeup(rpipe); + error = 0; if (how == SHUT_RD) break; /* fall through */ case SHUT_WR: - if (rpipe && (wpipe = rpipe->pipe_peer) != NULL) { - wpipe->pipe_state |= PIPE_EOF; - pipeselwakeup(wpipe); - if (wpipe->pipe_busy) - wakeup(wpipe); - error = 0; + wpipe->pipe_state |= PIPE_WEOF; + rpipe->pipe_state |= PIPE_REOF; + if (wpipe->pipe_state & PIPE_WANTW) { + wpipe->pipe_state &= ~PIPE_WANTW; + wakeup(wpipe); + } + if (rpipe->pipe_state & PIPE_WANTR) { + rpipe->pipe_state &= ~PIPE_WANTR; + wakeup(rpipe); } + pipeselwakeup(wpipe); + error = 0; + break; } - rel_mplock(); + + lwkt_reltoken(&rpipe_rlock); + lwkt_reltoken(&rpipe_wlock); + lwkt_reltoken(&wpipe_rlock); + lwkt_reltoken(&wpipe_wlock); + + pipe_rel_mplock(&mpsave); return (error); } @@ -916,62 +1134,103 @@ pipe_free_kmem(struct pipe *cpipe) } /* - * shutdown the pipe + * Close the pipe. The slock must be held to interlock against simultanious + * closes. The rlock and wlock must be held to adjust the pipe_state. */ static void pipeclose(struct pipe *cpipe) { globaldata_t gd; struct pipe *ppipe; + lwkt_tokref cpipe_rlock; + lwkt_tokref cpipe_wlock; + lwkt_tokref ppipe_rlock; + lwkt_tokref ppipe_wlock; if (cpipe == NULL) return; - pipeselwakeup(cpipe); + /* + * The slock may not have been allocated yet (close during + * initialization) + * + * We need both the read and write tokens to modify pipe_state. + */ + if (cpipe->pipe_slock) + lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE); + lwkt_gettoken(&cpipe_rlock, &cpipe->pipe_rlock); + lwkt_gettoken(&cpipe_wlock, &cpipe->pipe_wlock); /* - * If the other side is blocked, wake it up saying that - * we want to close it down. + * Set our state, wakeup anyone waiting in select, and + * wakeup anyone blocked on our pipe. */ - while (cpipe->pipe_busy) { + cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; + pipeselwakeup(cpipe); + if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { + cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); wakeup(cpipe); - cpipe->pipe_state |= PIPE_WANT | PIPE_EOF; - tsleep(cpipe, 0, "pipecl", 0); } /* * Disconnect from peer */ if ((ppipe = cpipe->pipe_peer) != NULL) { + lwkt_gettoken(&ppipe_rlock, &ppipe->pipe_rlock); + lwkt_gettoken(&ppipe_wlock, &ppipe->pipe_wlock); + ppipe->pipe_state |= PIPE_REOF; pipeselwakeup(ppipe); + if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { + ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); + wakeup(ppipe); + } + if (SLIST_FIRST(&ppipe->pipe_sel.si_note)) { + get_mplock(); + KNOTE(&ppipe->pipe_sel.si_note, 0); + rel_mplock(); + } + lwkt_reltoken(&ppipe_rlock); + lwkt_reltoken(&ppipe_wlock); + } - ppipe->pipe_state |= PIPE_EOF; - wakeup(ppipe); - KNOTE(&ppipe->pipe_sel.si_note, 0); + /* + * If the peer is also closed we can free resources for both + * sides, otherwise we leave our side intact to deal with any + * races (since we only have the slock). + */ + if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) { + cpipe->pipe_peer = NULL; ppipe->pipe_peer = NULL; + ppipe->pipe_slock = NULL; /* we will free the slock */ + pipeclose(ppipe); + ppipe = NULL; } - if (cpipe->pipe_kva) { - pmap_qremove(cpipe->pipe_kva, XIO_INTERNAL_PAGES); - kmem_free(&kernel_map, cpipe->pipe_kva, XIO_INTERNAL_SIZE); - cpipe->pipe_kva = 0; - } + lwkt_reltoken(&cpipe_rlock); + lwkt_reltoken(&cpipe_wlock); + if (cpipe->pipe_slock) + lockmgr(cpipe->pipe_slock, LK_RELEASE); /* - * free or cache resources + * If we disassociated from our peer we can free resources */ - gd = mycpu; - if (gd->gd_pipeqcount >= pipe_maxcache || - cpipe->pipe_buffer.size != PIPE_SIZE - ) { - pipe_free_kmem(cpipe); - kfree(cpipe, M_PIPE); - } else { - cpipe->pipe_state = 0; - cpipe->pipe_busy = 0; - cpipe->pipe_peer = gd->gd_pipeq; - gd->gd_pipeq = cpipe; - ++gd->gd_pipeqcount; + if (ppipe == NULL) { + gd = mycpu; + if (cpipe->pipe_slock) { + kfree(cpipe->pipe_slock, M_PIPE); + cpipe->pipe_slock = NULL; + } + if (gd->gd_pipeqcount >= pipe_maxcache || + cpipe->pipe_buffer.size != PIPE_SIZE + ) { + pipe_free_kmem(cpipe); + kfree(cpipe, M_PIPE); + } else { + cpipe->pipe_state = 0; + cpipe->pipe_peer = gd->gd_pipeq; + gd->gd_pipeq = cpipe; + ++gd->gd_pipeqcount; + } } } @@ -1022,12 +1281,11 @@ static int filt_piperead(struct knote *kn, long hint) { struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; - struct pipe *wpipe = rpipe->pipe_peer; kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; - if ((rpipe->pipe_state & PIPE_EOF) || - (wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { + /* XXX RACE */ + if (rpipe->pipe_state & PIPE_REOF) { kn->kn_flags |= EV_EOF; return (1); } @@ -1042,7 +1300,8 @@ filt_pipewrite(struct knote *kn, long hint) struct pipe *wpipe = rpipe->pipe_peer; u_int32_t space; - if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_EOF)) { + /* XXX RACE */ + if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_WEOF)) { kn->kn_data = 0; kn->kn_flags |= EV_EOF; return (1); diff --git a/sys/sys/pipe.h b/sys/sys/pipe.h index 47f68be010..2dc1047f86 100644 --- a/sys/sys/pipe.h +++ b/sys/sys/pipe.h @@ -39,6 +39,9 @@ #ifndef _SYS_XIO_H_ #include /* for struct xio */ #endif +#ifndef _SYS_THREAD_H_ +#include /* for struct lwkt_token */ +#endif #ifndef _MACHINE_PARAM_H_ #include /* for PAGE_SIZE */ #endif @@ -66,7 +69,9 @@ */ struct pipebuf { u_int rindex; /* FIFO read index */ + u_int dummy1[3]; /* cache-align */ u_int windex; /* FIFO write index */ + u_int dummy2[3]; /* cache-align */ u_int size; /* size of buffer */ caddr_t buffer; /* kva of buffer */ struct vm_object *object; /* VM object containing buffer */ @@ -78,11 +83,10 @@ struct pipebuf { #define PIPE_ASYNC 0x0004 /* Async? I/O */ #define PIPE_WANTR 0x0008 /* Reader wants some characters */ #define PIPE_WANTW 0x0010 /* Writer wants space to put characters */ -#define PIPE_WANT 0x0020 /* Pipe is wanted to be run-down */ -#define PIPE_SEL 0x0040 /* Pipe has a select active */ -#define PIPE_EOF 0x0080 /* Pipe is in EOF condition */ -#define PIPE_LOCK 0x0100 /* Process has exclusive access to pts/data */ -#define PIPE_LWANT 0x0200 /* Process wants exclusive access to pts/data */ +#define PIPE_SEL 0x0020 /* Pipe has a select active */ +#define PIPE_REOF 0x0040 /* Pipe is in EOF condition (read EOF) */ +#define PIPE_WEOF 0x0080 /* Pipe is in EOF condition (write shutdown) */ +#define PIPE_CLOSED 0x1000 /* Pipe has been closed */ /* * Per-pipe data structure. @@ -90,7 +94,6 @@ struct pipebuf { */ struct pipe { struct pipebuf pipe_buffer; /* data storage */ - vm_offset_t pipe_kva; /* kva mapping (testing only) */ struct selinfo pipe_sel; /* for compat with select */ struct timespec pipe_atime; /* time of last access */ struct timespec pipe_mtime; /* time of last modify */ @@ -98,7 +101,12 @@ struct pipe { struct sigio *pipe_sigio; /* information for async I/O */ struct pipe *pipe_peer; /* link with other direction */ u_int pipe_state; /* pipe status info */ - int pipe_busy; /* busy flag, mostly to handle rundown sanely */ + u_int pipe_rip; /* read uio in-progress */ + u_int pipe_wip; /* write uio in-progress */ + u_int pipe_wantwcnt; /* for resize */ + struct lwkt_token pipe_rlock; /* rindex locked */ + struct lwkt_token pipe_wlock; /* windex locked */ + struct lock *pipe_slock; /* state locked (shared w/peer) */ }; #endif /* _KERNEL || _KERNEL_STRUCTURES */ -- 2.41.0