/* * Copyright (c) 1996 John S. Dyson * All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions * are met: * 1. Redistributions of source code must retain the above copyright * notice immediately at the beginning of the file, without modification, * this list of conditions, and the following disclaimer. * 2. Redistributions in binary form must reproduce the above copyright * notice, this list of conditions and the following disclaimer in the * documentation and/or other materials provided with the distribution. * 3. Absolutely no warranty of function or purpose is made by the author * John S. Dyson. * 4. Modifications may be freely made to this file if the above conditions * are met. * * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $ * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $ */ /* * This file contains a high-performance replacement for the socket-based * pipes scheme originally used in FreeBSD/4.4Lite. It does not support * all features of sockets, but does do everything that pipes normally * do. */ #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include /* * interfaces to the outside world */ static int pipe_read (struct file *fp, struct uio *uio, struct ucred *cred, int flags); static int pipe_write (struct file *fp, struct uio *uio, struct ucred *cred, int flags); static int pipe_close (struct file *fp); static int pipe_shutdown (struct file *fp, int how); static int pipe_poll (struct file *fp, int events, struct ucred *cred); static int pipe_kqfilter (struct file *fp, struct knote *kn); static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred); static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data, struct ucred *cred); static struct fileops pipeops = { .fo_read = pipe_read, .fo_write = pipe_write, .fo_ioctl = pipe_ioctl, .fo_poll = pipe_poll, .fo_kqfilter = pipe_kqfilter, .fo_stat = pipe_stat, .fo_close = pipe_close, .fo_shutdown = pipe_shutdown }; static void filt_pipedetach(struct knote *kn); static int filt_piperead(struct knote *kn, long hint); static int filt_pipewrite(struct knote *kn, long hint); static struct filterops pipe_rfiltops = { 1, NULL, filt_pipedetach, filt_piperead }; static struct filterops pipe_wfiltops = { 1, NULL, filt_pipedetach, filt_pipewrite }; MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures"); /* * Default pipe buffer size(s), this can be kind-of large now because pipe * space is pageable. The pipe code will try to maintain locality of * reference for performance reasons, so small amounts of outstanding I/O * will not wipe the cache. */ #define MINPIPESIZE (PIPE_SIZE/3) #define MAXPIPESIZE (2*PIPE_SIZE/3) /* * Limit the number of "big" pipes */ #define LIMITBIGPIPES 64 #define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */ static int pipe_maxbig = LIMITBIGPIPES; static int pipe_maxcache = PIPEQ_MAX_CACHE; static int pipe_bigcount; static int pipe_nbig; static int pipe_bcache_alloc; static int pipe_bkmem_alloc; static int pipe_rblocked_count; static int pipe_wblocked_count; SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation"); SYSCTL_INT(_kern_pipe, OID_AUTO, nbig, CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated"); SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount, CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded"); SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked, CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded"); SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked, CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded"); SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache, CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu"); SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig, CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes"); #ifdef SMP static int pipe_delay = 5000; /* 5uS default */ SYSCTL_INT(_kern_pipe, OID_AUTO, delay, CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns"); static int pipe_mpsafe = 1; SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe, CTLFLAG_RW, &pipe_mpsafe, 0, ""); #endif #if !defined(NO_PIPE_SYSCTL_STATS) SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc, CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache"); SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc, CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem"); #endif static void pipeclose (struct pipe *cpipe); static void pipe_free_kmem (struct pipe *cpipe); static int pipe_create (struct pipe **cpipep); static __inline void pipeselwakeup (struct pipe *cpipe); static int pipespace (struct pipe *cpipe, int size); static __inline void pipeselwakeup(struct pipe *cpipe) { if (cpipe->pipe_state & PIPE_SEL) { get_mplock(); cpipe->pipe_state &= ~PIPE_SEL; selwakeup(&cpipe->pipe_sel); rel_mplock(); } if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) { get_mplock(); pgsigio(cpipe->pipe_sigio, SIGIO, 0); rel_mplock(); } if (SLIST_FIRST(&cpipe->pipe_sel.si_note)) { get_mplock(); KNOTE(&cpipe->pipe_sel.si_note, 0); rel_mplock(); } } /* * These routines are called before and after a UIO. The UIO * may block, causing our held tokens to be lost temporarily. * * We use these routines to serialize reads against other reads * and writes against other writes. * * The read token is held on entry so *ipp does not race. */ static __inline int pipe_start_uio(struct pipe *cpipe, u_int *ipp) { int error; while (*ipp) { *ipp = -1; error = tsleep(ipp, PCATCH, "pipexx", 0); if (error) return (error); } *ipp = 1; return (0); } static __inline void pipe_end_uio(struct pipe *cpipe, u_int *ipp) { if (*ipp < 0) { *ipp = 0; wakeup(ipp); } else { *ipp = 0; } } static __inline void pipe_get_mplock(int *save) { #ifdef SMP if (pipe_mpsafe == 0) { get_mplock(); *save = 1; } else #endif { *save = 0; } } static __inline void pipe_rel_mplock(int *save) { #ifdef SMP if (*save) rel_mplock(); #endif } /* * The pipe system call for the DTYPE_PIPE type of pipes * * pipe_ARgs(int dummy) */ /* ARGSUSED */ int sys_pipe(struct pipe_args *uap) { struct thread *td = curthread; struct proc *p = td->td_proc; struct file *rf, *wf; struct pipe *rpipe, *wpipe; int fd1, fd2, error; KKASSERT(p); rpipe = wpipe = NULL; if (pipe_create(&rpipe) || pipe_create(&wpipe)) { pipeclose(rpipe); pipeclose(wpipe); return (ENFILE); } error = falloc(p, &rf, &fd1); if (error) { pipeclose(rpipe); pipeclose(wpipe); return (error); } uap->sysmsg_fds[0] = fd1; /* * Warning: once we've gotten past allocation of the fd for the * read-side, we can only drop the read side via fdrop() in order * to avoid races against processes which manage to dup() the read * side while we are blocked trying to allocate the write side. */ rf->f_type = DTYPE_PIPE; rf->f_flag = FREAD | FWRITE; rf->f_ops = &pipeops; rf->f_data = rpipe; error = falloc(p, &wf, &fd2); if (error) { fsetfd(p, NULL, fd1); fdrop(rf); /* rpipe has been closed by fdrop(). */ pipeclose(wpipe); return (error); } wf->f_type = DTYPE_PIPE; wf->f_flag = FREAD | FWRITE; wf->f_ops = &pipeops; wf->f_data = wpipe; uap->sysmsg_fds[1] = fd2; rpipe->pipe_slock = kmalloc(sizeof(struct lock), M_PIPE, M_WAITOK|M_ZERO); wpipe->pipe_slock = rpipe->pipe_slock; rpipe->pipe_peer = wpipe; wpipe->pipe_peer = rpipe; lockinit(rpipe->pipe_slock, "pipecl", 0, 0); /* * Once activated the peer relationship remains valid until * both sides are closed. */ fsetfd(p, rf, fd1); fsetfd(p, wf, fd2); fdrop(rf); fdrop(wf); return (0); } /* * Allocate kva for pipe circular buffer, the space is pageable * This routine will 'realloc' the size of a pipe safely, if it fails * it will retain the old buffer. * If it fails it will return ENOMEM. */ static int pipespace(struct pipe *cpipe, int size) { struct vm_object *object; caddr_t buffer; int npages, error; npages = round_page(size) / PAGE_SIZE; object = cpipe->pipe_buffer.object; /* * [re]create the object if necessary and reserve space for it * in the kernel_map. The object and memory are pageable. On * success, free the old resources before assigning the new * ones. */ if (object == NULL || object->size != npages) { get_mplock(); object = vm_object_allocate(OBJT_DEFAULT, npages); buffer = (caddr_t)vm_map_min(&kernel_map); error = vm_map_find(&kernel_map, object, 0, (vm_offset_t *)&buffer, size, 1, VM_MAPTYPE_NORMAL, VM_PROT_ALL, VM_PROT_ALL, 0); if (error != KERN_SUCCESS) { vm_object_deallocate(object); rel_mplock(); return (ENOMEM); } pipe_free_kmem(cpipe); rel_mplock(); cpipe->pipe_buffer.object = object; cpipe->pipe_buffer.buffer = buffer; cpipe->pipe_buffer.size = size; ++pipe_bkmem_alloc; } else { ++pipe_bcache_alloc; } cpipe->pipe_buffer.rindex = 0; cpipe->pipe_buffer.windex = 0; return (0); } /* * Initialize and allocate VM and memory for pipe, pulling the pipe from * our per-cpu cache if possible. For now make sure it is sized for the * smaller PIPE_SIZE default. */ static int pipe_create(struct pipe **cpipep) { globaldata_t gd = mycpu; struct pipe *cpipe; int error; if ((cpipe = gd->gd_pipeq) != NULL) { gd->gd_pipeq = cpipe->pipe_peer; --gd->gd_pipeqcount; cpipe->pipe_peer = NULL; cpipe->pipe_wantwcnt = 0; } else { cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO); } *cpipep = cpipe; if ((error = pipespace(cpipe, PIPE_SIZE)) != 0) return (error); vfs_timestamp(&cpipe->pipe_ctime); cpipe->pipe_atime = cpipe->pipe_ctime; cpipe->pipe_mtime = cpipe->pipe_ctime; lwkt_token_init(&cpipe->pipe_rlock); lwkt_token_init(&cpipe->pipe_wlock); return (0); } /* * MPALMOSTSAFE (acquires mplock) */ static int pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) { struct pipe *rpipe; int error; int orig_resid; int nread = 0; int nbio; u_int size; /* total bytes available */ u_int nsize; /* total bytes to read */ u_int rindex; /* contiguous bytes available */ int notify_writer; lwkt_tokref rlock; lwkt_tokref wlock; int mpsave; /* * Degenerate case */ orig_resid = uio->uio_resid; if (orig_resid == 0) return(0); /* * Setup locks, calculate nbio */ pipe_get_mplock(&mpsave); rpipe = (struct pipe *)fp->f_data; lwkt_gettoken(&rlock, &rpipe->pipe_rlock); if (fflags & O_FBLOCKING) nbio = 0; else if (fflags & O_FNONBLOCKING) nbio = 1; else if (fp->f_flag & O_NONBLOCK) nbio = 1; else nbio = 0; /* * Reads are serialized. Note howeverthat pipe_buffer.buffer and * pipe_buffer.size can change out from under us when the number * of bytes in the buffer are zero due to the write-side doing a * pipespace(). */ error = pipe_start_uio(rpipe, &rpipe->pipe_rip); if (error) { pipe_rel_mplock(&mpsave); lwkt_reltoken(&rlock); return (error); } notify_writer = 0; while (uio->uio_resid) { size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; cpu_lfence(); if (size) { rindex = rpipe->pipe_buffer.rindex & (rpipe->pipe_buffer.size - 1); nsize = size; if (nsize > rpipe->pipe_buffer.size - rindex) nsize = rpipe->pipe_buffer.size - rindex; if (nsize > (u_int)uio->uio_resid) nsize = (u_int)uio->uio_resid; error = uiomove(&rpipe->pipe_buffer.buffer[rindex], nsize, uio); if (error) break; cpu_mfence(); rpipe->pipe_buffer.rindex += nsize; nread += nsize; /* * If the FIFO is still over half full just continue * and do not try to notify the writer yet. */ if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) { notify_writer = 0; continue; } /* * When the FIFO is less then half full notify any * waiting writer. WANTW can be checked while * holding just the rlock. */ notify_writer = 1; if ((rpipe->pipe_state & PIPE_WANTW) == 0) continue; } /* * If the "write-side" was blocked we wake it up. This code * is reached either when the buffer is completely emptied * or if it becomes more then half-empty. * * Pipe_state can only be modified if both the rlock and * wlock are held. */ if (rpipe->pipe_state & PIPE_WANTW) { lwkt_gettoken(&wlock, &rpipe->pipe_wlock); if (rpipe->pipe_state & PIPE_WANTW) { notify_writer = 0; rpipe->pipe_state &= ~PIPE_WANTW; lwkt_reltoken(&wlock); wakeup(rpipe); } else { lwkt_reltoken(&wlock); } } /* * Pick up our copy loop again if the writer sent data to * us while we were messing around. * * On a SMP box poll up to pipe_delay nanoseconds for new * data. Typically a value of 2000 to 4000 is sufficient * to eradicate most IPIs/tsleeps/wakeups when a pipe * is used for synchronous communications with small packets, * and 8000 or so (8uS) will pipeline large buffer xfers * between cpus over a pipe. * * For synchronous communications a hit means doing a * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS, * where as miss requiring a tsleep/wakeup sequence * will take 7uS or more. */ if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) continue; #if defined(SMP) && defined(_RDTSC_SUPPORTED_) if (pipe_delay) { int64_t tsc_target; int good = 0; tsc_target = tsc_get_target(pipe_delay); while (tsc_test_target(tsc_target) == 0) { if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) { good = 1; break; } } if (good) continue; } #endif /* * Detect EOF condition, do not set error. */ if (rpipe->pipe_state & PIPE_REOF) break; /* * Break if some data was read, or if this was a non-blocking * read. */ if (nread > 0) break; if (nbio) { error = EAGAIN; break; } /* * Last chance, interlock with WANTR. */ lwkt_gettoken(&wlock, &rpipe->pipe_wlock); size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; if (size) { lwkt_reltoken(&wlock); continue; } /* * If there is no more to read in the pipe, reset its * pointers to the beginning. This improves cache hit * stats. * * We need both locks to modify both pointers, and there * must also not be a write in progress or the uiomove() * in the write might block and temporarily release * its wlock, then reacquire and update windex. We are * only serialized against reads, not writes. * * XXX should we even bother resetting the indices? It * might actually be more cache efficient not to. */ if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex && rpipe->pipe_wip == 0) { rpipe->pipe_buffer.rindex = 0; rpipe->pipe_buffer.windex = 0; } /* * Wait for more data. * * Pipe_state can only be set if both the rlock and wlock * are held. */ rpipe->pipe_state |= PIPE_WANTR; tsleep_interlock(rpipe, PCATCH); lwkt_reltoken(&wlock); error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0); ++pipe_rblocked_count; if (error) break; } pipe_end_uio(rpipe, &rpipe->pipe_rip); /* * Uptime last access time */ if (error == 0 && nread) vfs_timestamp(&rpipe->pipe_atime); /* * If we drained the FIFO more then half way then handle * write blocking hysteresis. * * Note that PIPE_WANTW cannot be set by the writer without * it holding both rlock and wlock, so we can test it * while holding just rlock. */ if (notify_writer) { if (rpipe->pipe_state & PIPE_WANTW) { lwkt_gettoken(&wlock, &rpipe->pipe_wlock); if (rpipe->pipe_state & PIPE_WANTW) { rpipe->pipe_state &= ~PIPE_WANTW; lwkt_reltoken(&wlock); wakeup(rpipe); } else { lwkt_reltoken(&wlock); } } } size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; lwkt_reltoken(&rlock); /* * If enough space is available in buffer then wakeup sel writers? */ if ((rpipe->pipe_buffer.size - size) >= PIPE_BUF) pipeselwakeup(rpipe); pipe_rel_mplock(&mpsave); return (error); } /* * MPALMOSTSAFE - acquires mplock */ static int pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags) { int error; int orig_resid; int nbio; struct pipe *wpipe, *rpipe; lwkt_tokref rlock; lwkt_tokref wlock; u_int windex; u_int space; u_int wcount; int mpsave; pipe_get_mplock(&mpsave); /* * Writes go to the peer. The peer will always exist. */ rpipe = (struct pipe *) fp->f_data; wpipe = rpipe->pipe_peer; lwkt_gettoken(&wlock, &wpipe->pipe_wlock); if (wpipe->pipe_state & PIPE_WEOF) { pipe_rel_mplock(&mpsave); lwkt_reltoken(&wlock); return (EPIPE); } /* * Degenerate case (EPIPE takes prec) */ if (uio->uio_resid == 0) { pipe_rel_mplock(&mpsave); lwkt_reltoken(&wlock); return(0); } /* * Writes are serialized (start_uio must be called with wlock) */ error = pipe_start_uio(wpipe, &wpipe->pipe_wip); if (error) { pipe_rel_mplock(&mpsave); lwkt_reltoken(&wlock); return (error); } if (fflags & O_FBLOCKING) nbio = 0; else if (fflags & O_FNONBLOCKING) nbio = 1; else if (fp->f_flag & O_NONBLOCK) nbio = 1; else nbio = 0; /* * If it is advantageous to resize the pipe buffer, do * so. We are write-serialized so we can block safely. */ if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && (pipe_nbig < pipe_maxbig) && wpipe->pipe_wantwcnt > 4 && (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { /* * Recheck after lock. */ lwkt_gettoken(&rlock, &wpipe->pipe_rlock); if ((wpipe->pipe_buffer.size <= PIPE_SIZE) && (pipe_nbig < pipe_maxbig) && (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) { atomic_add_int(&pipe_nbig, 1); if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) ++pipe_bigcount; else atomic_subtract_int(&pipe_nbig, 1); } lwkt_reltoken(&rlock); } orig_resid = uio->uio_resid; wcount = 0; while (uio->uio_resid) { if (wpipe->pipe_state & PIPE_WEOF) { error = EPIPE; break; } windex = wpipe->pipe_buffer.windex & (wpipe->pipe_buffer.size - 1); space = wpipe->pipe_buffer.size - (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); cpu_lfence(); /* Writes of size <= PIPE_BUF must be atomic. */ if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) space = 0; /* * Write to fill, read size handles write hysteresis. Also * additional restrictions can cause select-based non-blocking * writes to spin. */ if (space > 0) { u_int segsize; /* * Transfer size is minimum of uio transfer * and free space in pipe buffer. * * Limit each uiocopy to no more then PIPE_SIZE * so we can keep the gravy train going on a * SMP box. This doubles the performance for * write sizes > 16K. Otherwise large writes * wind up doing an inefficient synchronous * ping-pong. */ if (space > (u_int)uio->uio_resid) space = (u_int)uio->uio_resid; if (space > PIPE_SIZE) space = PIPE_SIZE; /* * First segment to transfer is minimum of * transfer size and contiguous space in * pipe buffer. If first segment to transfer * is less than the transfer size, we've got * a wraparound in the buffer. */ segsize = wpipe->pipe_buffer.size - windex; if (segsize > space) segsize = space; #ifdef SMP /* * If this is the first loop and the reader is * blocked, do a preemptive wakeup of the reader. * * On SMP the IPI latency plus the wlock interlock * on the reader side is the fastest way to get the * reader going. (The scheduler will hard loop on * lock tokens). * * NOTE: We can't clear WANTR here without acquiring * the rlock, which we don't want to do here! */ if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1) wakeup(wpipe); #endif /* * Transfer segment, which may include a wrap-around. * Update windex to account for both all in one go * so the reader can read() the data atomically. */ error = uiomove(&wpipe->pipe_buffer.buffer[windex], segsize, uio); if (error == 0 && segsize < space) { segsize = space - segsize; error = uiomove(&wpipe->pipe_buffer.buffer[0], segsize, uio); } if (error) break; cpu_mfence(); wpipe->pipe_buffer.windex += space; wcount += space; continue; } /* * We need both the rlock and the wlock to interlock against * the EOF, WANTW, and size checks, and to modify pipe_state. * * These are token locks so we do not have to worry about * deadlocks. */ lwkt_gettoken(&rlock, &wpipe->pipe_rlock); /* * If the "read-side" has been blocked, wake it up now * and yield to let it drain synchronously rather * then block. */ if (wpipe->pipe_state & PIPE_WANTR) { wpipe->pipe_state &= ~PIPE_WANTR; wakeup(wpipe); } /* * don't block on non-blocking I/O */ if (nbio) { lwkt_reltoken(&rlock); error = EAGAIN; break; } /* * re-test whether we have to block in the writer after * acquiring both locks, in case the reader opened up * some space. */ space = wpipe->pipe_buffer.size - (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex); cpu_lfence(); if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF)) space = 0; /* * We have no more space and have something to offer, * wake up select/poll. */ if (space == 0) { pipeselwakeup(wpipe); ++wpipe->pipe_wantwcnt; wpipe->pipe_state |= PIPE_WANTW; error = tsleep(wpipe, PCATCH, "pipewr", 0); ++pipe_wblocked_count; } lwkt_reltoken(&rlock); /* * Break out if we errored or the read side wants us to go * away. */ if (error) break; if (wpipe->pipe_state & PIPE_WEOF) { error = EPIPE; break; } } pipe_end_uio(wpipe, &wpipe->pipe_wip); /* * If we have put any characters in the buffer, we wake up * the reader. * * Both rlock and wlock are required to be able to modify pipe_state. */ if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) { if (wpipe->pipe_state & PIPE_WANTR) { lwkt_gettoken(&rlock, &wpipe->pipe_rlock); if (wpipe->pipe_state & PIPE_WANTR) { wpipe->pipe_state &= ~PIPE_WANTR; lwkt_reltoken(&rlock); wakeup(wpipe); } else { lwkt_reltoken(&rlock); } } } /* * Don't return EPIPE if I/O was successful */ if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) && (uio->uio_resid == 0) && (error == EPIPE)) { error = 0; } if (error == 0) vfs_timestamp(&wpipe->pipe_mtime); /* * We have something to offer, * wake up select/poll. */ space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex; lwkt_reltoken(&wlock); if (space) pipeselwakeup(wpipe); pipe_rel_mplock(&mpsave); return (error); } /* * MPALMOSTSAFE - acquires mplock * * we implement a very minimal set of ioctls for compatibility with sockets. */ int pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred) { struct pipe *mpipe; lwkt_tokref rlock; lwkt_tokref wlock; int error; int mpsave; pipe_get_mplock(&mpsave); mpipe = (struct pipe *)fp->f_data; lwkt_gettoken(&rlock, &mpipe->pipe_rlock); lwkt_gettoken(&wlock, &mpipe->pipe_wlock); switch (cmd) { case FIOASYNC: if (*(int *)data) { mpipe->pipe_state |= PIPE_ASYNC; } else { mpipe->pipe_state &= ~PIPE_ASYNC; } error = 0; break; case FIONREAD: *(int *)data = mpipe->pipe_buffer.windex - mpipe->pipe_buffer.rindex; error = 0; break; case FIOSETOWN: get_mplock(); error = fsetown(*(int *)data, &mpipe->pipe_sigio); rel_mplock(); break; case FIOGETOWN: *(int *)data = fgetown(mpipe->pipe_sigio); error = 0; break; case TIOCSPGRP: /* This is deprecated, FIOSETOWN should be used instead. */ get_mplock(); error = fsetown(-(*(int *)data), &mpipe->pipe_sigio); rel_mplock(); break; case TIOCGPGRP: /* This is deprecated, FIOGETOWN should be used instead. */ *(int *)data = -fgetown(mpipe->pipe_sigio); error = 0; break; default: error = ENOTTY; break; } lwkt_reltoken(&rlock); lwkt_reltoken(&wlock); pipe_rel_mplock(&mpsave); return (error); } /* * MPALMOSTSAFE - acquires mplock */ int pipe_poll(struct file *fp, int events, struct ucred *cred) { struct pipe *rpipe; struct pipe *wpipe; int revents = 0; u_int space; int mpsave; pipe_get_mplock(&mpsave); rpipe = (struct pipe *)fp->f_data; wpipe = rpipe->pipe_peer; if (events & (POLLIN | POLLRDNORM)) { if ((rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex) || (rpipe->pipe_state & PIPE_REOF)) { revents |= events & (POLLIN | POLLRDNORM); } } if (events & (POLLOUT | POLLWRNORM)) { if (wpipe == NULL || (wpipe->pipe_state & PIPE_WEOF)) { revents |= events & (POLLOUT | POLLWRNORM); } else { space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex; space = wpipe->pipe_buffer.size - space; if (space >= PIPE_BUF) revents |= events & (POLLOUT | POLLWRNORM); } } if ((rpipe->pipe_state & PIPE_REOF) || (wpipe == NULL) || (wpipe->pipe_state & PIPE_WEOF)) revents |= POLLHUP; if (revents == 0) { if (events & (POLLIN | POLLRDNORM)) { selrecord(curthread, &rpipe->pipe_sel); rpipe->pipe_state |= PIPE_SEL; } if (events & (POLLOUT | POLLWRNORM)) { selrecord(curthread, &wpipe->pipe_sel); wpipe->pipe_state |= PIPE_SEL; } } pipe_rel_mplock(&mpsave); return (revents); } /* * MPSAFE */ static int pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred) { struct pipe *pipe; int mpsave; pipe_get_mplock(&mpsave); pipe = (struct pipe *)fp->f_data; bzero((caddr_t)ub, sizeof(*ub)); ub->st_mode = S_IFIFO; ub->st_blksize = pipe->pipe_buffer.size; ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex; ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize; ub->st_atimespec = pipe->pipe_atime; ub->st_mtimespec = pipe->pipe_mtime; ub->st_ctimespec = pipe->pipe_ctime; /* * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev, * st_flags, st_gen. * XXX (st_dev, st_ino) should be unique. */ pipe_rel_mplock(&mpsave); return (0); } /* * MPALMOSTSAFE - acquires mplock */ static int pipe_close(struct file *fp) { struct pipe *cpipe; get_mplock(); cpipe = (struct pipe *)fp->f_data; fp->f_ops = &badfileops; fp->f_data = NULL; funsetown(cpipe->pipe_sigio); pipeclose(cpipe); rel_mplock(); return (0); } /* * Shutdown one or both directions of a full-duplex pipe. * * MPALMOSTSAFE - acquires mplock */ static int pipe_shutdown(struct file *fp, int how) { struct pipe *rpipe; struct pipe *wpipe; int error = EPIPE; lwkt_tokref rpipe_rlock; lwkt_tokref rpipe_wlock; lwkt_tokref wpipe_rlock; lwkt_tokref wpipe_wlock; int mpsave; pipe_get_mplock(&mpsave); rpipe = (struct pipe *)fp->f_data; wpipe = rpipe->pipe_peer; /* * We modify pipe_state on both pipes, which means we need * all four tokens! */ lwkt_gettoken(&rpipe_rlock, &rpipe->pipe_rlock); lwkt_gettoken(&rpipe_wlock, &rpipe->pipe_wlock); lwkt_gettoken(&wpipe_rlock, &wpipe->pipe_rlock); lwkt_gettoken(&wpipe_wlock, &wpipe->pipe_wlock); switch(how) { case SHUT_RDWR: case SHUT_RD: rpipe->pipe_state |= PIPE_REOF; wpipe->pipe_state |= PIPE_WEOF; if (rpipe->pipe_state & PIPE_WANTR) { rpipe->pipe_state &= ~PIPE_WANTR; wakeup(rpipe); } if (wpipe->pipe_state & PIPE_WANTW) { wpipe->pipe_state &= ~PIPE_WANTW; wakeup(wpipe); } pipeselwakeup(rpipe); error = 0; if (how == SHUT_RD) break; /* fall through */ case SHUT_WR: wpipe->pipe_state |= PIPE_WEOF; rpipe->pipe_state |= PIPE_REOF; if (wpipe->pipe_state & PIPE_WANTW) { wpipe->pipe_state &= ~PIPE_WANTW; wakeup(wpipe); } if (rpipe->pipe_state & PIPE_WANTR) { rpipe->pipe_state &= ~PIPE_WANTR; wakeup(rpipe); } pipeselwakeup(wpipe); error = 0; break; } lwkt_reltoken(&rpipe_rlock); lwkt_reltoken(&rpipe_wlock); lwkt_reltoken(&wpipe_rlock); lwkt_reltoken(&wpipe_wlock); pipe_rel_mplock(&mpsave); return (error); } static void pipe_free_kmem(struct pipe *cpipe) { if (cpipe->pipe_buffer.buffer != NULL) { if (cpipe->pipe_buffer.size > PIPE_SIZE) atomic_subtract_int(&pipe_nbig, 1); kmem_free(&kernel_map, (vm_offset_t)cpipe->pipe_buffer.buffer, cpipe->pipe_buffer.size); cpipe->pipe_buffer.buffer = NULL; cpipe->pipe_buffer.object = NULL; } } /* * Close the pipe. The slock must be held to interlock against simultanious * closes. The rlock and wlock must be held to adjust the pipe_state. */ static void pipeclose(struct pipe *cpipe) { globaldata_t gd; struct pipe *ppipe; lwkt_tokref cpipe_rlock; lwkt_tokref cpipe_wlock; lwkt_tokref ppipe_rlock; lwkt_tokref ppipe_wlock; if (cpipe == NULL) return; /* * The slock may not have been allocated yet (close during * initialization) * * We need both the read and write tokens to modify pipe_state. */ if (cpipe->pipe_slock) lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE); lwkt_gettoken(&cpipe_rlock, &cpipe->pipe_rlock); lwkt_gettoken(&cpipe_wlock, &cpipe->pipe_wlock); /* * Set our state, wakeup anyone waiting in select, and * wakeup anyone blocked on our pipe. */ cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF; pipeselwakeup(cpipe); if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); wakeup(cpipe); } /* * Disconnect from peer */ if ((ppipe = cpipe->pipe_peer) != NULL) { lwkt_gettoken(&ppipe_rlock, &ppipe->pipe_rlock); lwkt_gettoken(&ppipe_wlock, &ppipe->pipe_wlock); ppipe->pipe_state |= PIPE_REOF; pipeselwakeup(ppipe); if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) { ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW); wakeup(ppipe); } if (SLIST_FIRST(&ppipe->pipe_sel.si_note)) { get_mplock(); KNOTE(&ppipe->pipe_sel.si_note, 0); rel_mplock(); } lwkt_reltoken(&ppipe_rlock); lwkt_reltoken(&ppipe_wlock); } /* * If the peer is also closed we can free resources for both * sides, otherwise we leave our side intact to deal with any * races (since we only have the slock). */ if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) { cpipe->pipe_peer = NULL; ppipe->pipe_peer = NULL; ppipe->pipe_slock = NULL; /* we will free the slock */ pipeclose(ppipe); ppipe = NULL; } lwkt_reltoken(&cpipe_rlock); lwkt_reltoken(&cpipe_wlock); if (cpipe->pipe_slock) lockmgr(cpipe->pipe_slock, LK_RELEASE); /* * If we disassociated from our peer we can free resources */ if (ppipe == NULL) { gd = mycpu; if (cpipe->pipe_slock) { kfree(cpipe->pipe_slock, M_PIPE); cpipe->pipe_slock = NULL; } if (gd->gd_pipeqcount >= pipe_maxcache || cpipe->pipe_buffer.size != PIPE_SIZE ) { pipe_free_kmem(cpipe); kfree(cpipe, M_PIPE); } else { cpipe->pipe_state = 0; cpipe->pipe_peer = gd->gd_pipeq; gd->gd_pipeq = cpipe; ++gd->gd_pipeqcount; } } } /* * MPALMOSTSAFE - acquires mplock */ static int pipe_kqfilter(struct file *fp, struct knote *kn) { struct pipe *cpipe; get_mplock(); cpipe = (struct pipe *)kn->kn_fp->f_data; switch (kn->kn_filter) { case EVFILT_READ: kn->kn_fop = &pipe_rfiltops; break; case EVFILT_WRITE: kn->kn_fop = &pipe_wfiltops; cpipe = cpipe->pipe_peer; if (cpipe == NULL) { /* other end of pipe has been closed */ rel_mplock(); return (EPIPE); } break; default: return (1); } kn->kn_hook = (caddr_t)cpipe; SLIST_INSERT_HEAD(&cpipe->pipe_sel.si_note, kn, kn_selnext); rel_mplock(); return (0); } static void filt_pipedetach(struct knote *kn) { struct pipe *cpipe = (struct pipe *)kn->kn_hook; SLIST_REMOVE(&cpipe->pipe_sel.si_note, kn, knote, kn_selnext); } /*ARGSUSED*/ static int filt_piperead(struct knote *kn, long hint) { struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex; /* XXX RACE */ if (rpipe->pipe_state & PIPE_REOF) { kn->kn_flags |= EV_EOF; return (1); } return (kn->kn_data > 0); } /*ARGSUSED*/ static int filt_pipewrite(struct knote *kn, long hint) { struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data; struct pipe *wpipe = rpipe->pipe_peer; u_int32_t space; /* XXX RACE */ if ((wpipe == NULL) || (wpipe->pipe_state & PIPE_WEOF)) { kn->kn_data = 0; kn->kn_flags |= EV_EOF; return (1); } space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex; space = wpipe->pipe_buffer.size - space; kn->kn_data = space; return (kn->kn_data >= PIPE_BUF); }