kernel - Remove bottlenecks related to mass pipe closures
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  */
21
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 #include <sys/param.h>
29 #include <sys/systm.h>
30 #include <sys/kernel.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/filio.h>
36 #include <sys/ttycom.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/sysproto.h>
40 #include <sys/pipe.h>
41 #include <sys/vnode.h>
42 #include <sys/uio.h>
43 #include <sys/event.h>
44 #include <sys/globaldata.h>
45 #include <sys/module.h>
46 #include <sys/malloc.h>
47 #include <sys/sysctl.h>
48 #include <sys/socket.h>
49 #include <sys/kern_syscall.h>
50 #include <sys/lock.h>
51 #include <sys/mutex.h>
52
53 #include <vm/vm.h>
54 #include <vm/vm_param.h>
55 #include <vm/vm_object.h>
56 #include <vm/vm_kern.h>
57 #include <vm/vm_extern.h>
58 #include <vm/pmap.h>
59 #include <vm/vm_map.h>
60 #include <vm/vm_page.h>
61 #include <vm/vm_zone.h>
62
63 #include <sys/file2.h>
64 #include <sys/signal2.h>
65 #include <sys/mutex2.h>
66
67 #include <machine/cpufunc.h>
68
69 /*
70  * interfaces to the outside world
71  */
72 static int pipe_read (struct file *fp, struct uio *uio, 
73                 struct ucred *cred, int flags);
74 static int pipe_write (struct file *fp, struct uio *uio, 
75                 struct ucred *cred, int flags);
76 static int pipe_close (struct file *fp);
77 static int pipe_shutdown (struct file *fp, int how);
78 static int pipe_kqfilter (struct file *fp, struct knote *kn);
79 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
80 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
81                 struct ucred *cred, struct sysmsg *msg);
82
83 static struct fileops pipeops = {
84         .fo_read = pipe_read, 
85         .fo_write = pipe_write,
86         .fo_ioctl = pipe_ioctl,
87         .fo_kqfilter = pipe_kqfilter,
88         .fo_stat = pipe_stat,
89         .fo_close = pipe_close,
90         .fo_shutdown = pipe_shutdown
91 };
92
93 static void     filt_pipedetach(struct knote *kn);
94 static int      filt_piperead(struct knote *kn, long hint);
95 static int      filt_pipewrite(struct knote *kn, long hint);
96
97 static struct filterops pipe_rfiltops =
98         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
99 static struct filterops pipe_wfiltops =
100         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
101
102 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
103
104 /*
105  * Default pipe buffer size(s), this can be kind-of large now because pipe
106  * space is pageable.  The pipe code will try to maintain locality of
107  * reference for performance reasons, so small amounts of outstanding I/O
108  * will not wipe the cache.
109  */
110 #define MINPIPESIZE (PIPE_SIZE/3)
111 #define MAXPIPESIZE (2*PIPE_SIZE/3)
112
113 /*
114  * Limit the number of "big" pipes
115  */
116 #define LIMITBIGPIPES   64
117 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
118
119 static int pipe_maxbig = LIMITBIGPIPES;
120 static int pipe_maxcache = PIPEQ_MAX_CACHE;
121 static int pipe_bigcount;
122 static int pipe_nbig;
123 static int pipe_bcache_alloc;
124 static int pipe_bkmem_alloc;
125 static int pipe_rblocked_count;
126 static int pipe_wblocked_count;
127 static struct mtx *pipe_gdlocks;
128
129 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
130 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
131         CTLFLAG_RD, &pipe_nbig, 0, "number of big pipes allocated");
132 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
133         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
134 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
135         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
136 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
137         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
138 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
139         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
140 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
141         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
142 static int pipe_delay = 5000;   /* 5uS default */
143 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
144         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
145 #if !defined(NO_PIPE_SYSCTL_STATS)
146 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
147         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
148 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
149         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
150 #endif
151
152 /*
153  * Auto-size pipe cache to reduce kmem allocations and frees.
154  */
155 static
156 void
157 pipeinit(void *dummy)
158 {
159         size_t mbytes = kmem_lim_size();
160         int n;
161
162         if (pipe_maxbig == LIMITBIGPIPES) {
163                 if (mbytes >= 7 * 1024)
164                         pipe_maxbig *= 2;
165                 if (mbytes >= 15 * 1024)
166                         pipe_maxbig *= 2;
167         }
168         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
169                 if (mbytes >= 7 * 1024)
170                         pipe_maxcache *= 2;
171                 if (mbytes >= 15 * 1024)
172                         pipe_maxcache *= 2;
173         }
174         pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
175                              M_PIPE, M_WAITOK | M_ZERO);
176         for (n = 0; n < ncpus; ++n)
177                 mtx_init(&pipe_gdlocks[n], "pipekm");
178 }
179 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
180
181 static void pipeclose (struct pipe *cpipe);
182 static void pipe_free_kmem (struct pipe *cpipe);
183 static int pipe_create (struct pipe **cpipep);
184 static int pipespace (struct pipe *cpipe, int size);
185
186 static __inline void
187 pipewakeup(struct pipe *cpipe, int dosigio)
188 {
189         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
190                 lwkt_gettoken(&sigio_token);
191                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
192                 lwkt_reltoken(&sigio_token);
193         }
194         KNOTE(&cpipe->pipe_kq.ki_note, 0);
195 }
196
197 /*
198  * These routines are called before and after a UIO.  The UIO
199  * may block, causing our held tokens to be lost temporarily.
200  *
201  * We use these routines to serialize reads against other reads
202  * and writes against other writes.
203  *
204  * The read token is held on entry so *ipp does not race.
205  */
206 static __inline int
207 pipe_start_uio(struct pipe *cpipe, int *ipp)
208 {
209         int error;
210
211         while (*ipp) {
212                 *ipp = -1;
213                 error = tsleep(ipp, PCATCH, "pipexx", 0);
214                 if (error)
215                         return (error);
216         }
217         *ipp = 1;
218         return (0);
219 }
220
221 static __inline void
222 pipe_end_uio(struct pipe *cpipe, int *ipp)
223 {
224         if (*ipp < 0) {
225                 *ipp = 0;
226                 wakeup(ipp);
227         } else {
228                 KKASSERT(*ipp > 0);
229                 *ipp = 0;
230         }
231 }
232
233 /*
234  * The pipe system call for the DTYPE_PIPE type of pipes
235  *
236  * pipe_args(int dummy)
237  *
238  * MPSAFE
239  */
240 int
241 sys_pipe(struct pipe_args *uap)
242 {
243         return kern_pipe(uap->sysmsg_fds, 0);
244 }
245
246 int
247 sys_pipe2(struct pipe2_args *uap)
248 {
249         return kern_pipe(uap->sysmsg_fds, uap->flags);
250 }
251
252 int
253 kern_pipe(long *fds, int flags)
254 {
255         struct thread *td = curthread;
256         struct filedesc *fdp = td->td_proc->p_fd;
257         struct file *rf, *wf;
258         struct pipe *rpipe, *wpipe;
259         int fd1, fd2, error;
260
261         rpipe = wpipe = NULL;
262         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
263                 pipeclose(rpipe); 
264                 pipeclose(wpipe); 
265                 return (ENFILE);
266         }
267         
268         error = falloc(td->td_lwp, &rf, &fd1);
269         if (error) {
270                 pipeclose(rpipe);
271                 pipeclose(wpipe);
272                 return (error);
273         }
274         fds[0] = fd1;
275
276         /*
277          * Warning: once we've gotten past allocation of the fd for the
278          * read-side, we can only drop the read side via fdrop() in order
279          * to avoid races against processes which manage to dup() the read
280          * side while we are blocked trying to allocate the write side.
281          */
282         rf->f_type = DTYPE_PIPE;
283         rf->f_flag = FREAD | FWRITE;
284         rf->f_ops = &pipeops;
285         rf->f_data = rpipe;
286         if (flags & O_NONBLOCK)
287                 rf->f_flag |= O_NONBLOCK;
288         if (flags & O_CLOEXEC)
289                 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
290
291         error = falloc(td->td_lwp, &wf, &fd2);
292         if (error) {
293                 fsetfd(fdp, NULL, fd1);
294                 fdrop(rf);
295                 /* rpipe has been closed by fdrop(). */
296                 pipeclose(wpipe);
297                 return (error);
298         }
299         wf->f_type = DTYPE_PIPE;
300         wf->f_flag = FREAD | FWRITE;
301         wf->f_ops = &pipeops;
302         wf->f_data = wpipe;
303         if (flags & O_NONBLOCK)
304                 wf->f_flag |= O_NONBLOCK;
305         if (flags & O_CLOEXEC)
306                 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
307
308         fds[1] = fd2;
309
310         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
311                                     M_PIPE, M_WAITOK|M_ZERO);
312         wpipe->pipe_slock = rpipe->pipe_slock;
313         rpipe->pipe_peer = wpipe;
314         wpipe->pipe_peer = rpipe;
315         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
316
317         /*
318          * Once activated the peer relationship remains valid until
319          * both sides are closed.
320          */
321         fsetfd(fdp, rf, fd1);
322         fsetfd(fdp, wf, fd2);
323         fdrop(rf);
324         fdrop(wf);
325
326         return (0);
327 }
328
329 /*
330  * Allocate kva for pipe circular buffer, the space is pageable
331  * This routine will 'realloc' the size of a pipe safely, if it fails
332  * it will retain the old buffer.
333  * If it fails it will return ENOMEM.
334  */
335 static int
336 pipespace(struct pipe *cpipe, int size)
337 {
338         struct vm_object *object;
339         caddr_t buffer;
340         int npages, error;
341
342         npages = round_page(size) / PAGE_SIZE;
343         object = cpipe->pipe_buffer.object;
344
345         /*
346          * [re]create the object if necessary and reserve space for it
347          * in the kernel_map.  The object and memory are pageable.  On
348          * success, free the old resources before assigning the new
349          * ones.
350          */
351         if (object == NULL || object->size != npages) {
352                 object = vm_object_allocate(OBJT_DEFAULT, npages);
353                 buffer = (caddr_t)vm_map_min(&kernel_map);
354
355                 error = vm_map_find(&kernel_map, object, NULL,
356                                     0, (vm_offset_t *)&buffer, size,
357                                     PAGE_SIZE, TRUE,
358                                     VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
359                                     VM_PROT_ALL, VM_PROT_ALL, 0);
360
361                 if (error != KERN_SUCCESS) {
362                         vm_object_deallocate(object);
363                         return (ENOMEM);
364                 }
365                 pipe_free_kmem(cpipe);
366                 cpipe->pipe_buffer.object = object;
367                 cpipe->pipe_buffer.buffer = buffer;
368                 cpipe->pipe_buffer.size = size;
369                 ++pipe_bkmem_alloc;
370         } else {
371                 ++pipe_bcache_alloc;
372         }
373         cpipe->pipe_buffer.rindex = 0;
374         cpipe->pipe_buffer.windex = 0;
375         return (0);
376 }
377
378 /*
379  * Initialize and allocate VM and memory for pipe, pulling the pipe from
380  * our per-cpu cache if possible.  For now make sure it is sized for the
381  * smaller PIPE_SIZE default.
382  */
383 static int
384 pipe_create(struct pipe **cpipep)
385 {
386         globaldata_t gd = mycpu;
387         struct pipe *cpipe;
388         int error;
389
390         if ((cpipe = gd->gd_pipeq) != NULL) {
391                 gd->gd_pipeq = cpipe->pipe_peer;
392                 --gd->gd_pipeqcount;
393                 cpipe->pipe_peer = NULL;
394                 cpipe->pipe_wantwcnt = 0;
395         } else {
396                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
397         }
398         *cpipep = cpipe;
399         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
400                 return (error);
401         vfs_timestamp(&cpipe->pipe_ctime);
402         cpipe->pipe_atime = cpipe->pipe_ctime;
403         cpipe->pipe_mtime = cpipe->pipe_ctime;
404         lwkt_token_init(&cpipe->pipe_rlock, "piper");
405         lwkt_token_init(&cpipe->pipe_wlock, "pipew");
406         return (0);
407 }
408
409 static int
410 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
411 {
412         struct pipe *rpipe;
413         struct pipe *wpipe;
414         int error;
415         size_t nread = 0;
416         int nbio;
417         u_int size;     /* total bytes available */
418         u_int nsize;    /* total bytes to read */
419         u_int rindex;   /* contiguous bytes available */
420         int notify_writer;
421         int bigread;
422         int bigcount;
423
424         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
425
426         if (uio->uio_resid == 0)
427                 return(0);
428
429         /*
430          * Setup locks, calculate nbio
431          */
432         rpipe = (struct pipe *)fp->f_data;
433         wpipe = rpipe->pipe_peer;
434         lwkt_gettoken(&rpipe->pipe_rlock);
435
436         if (fflags & O_FBLOCKING)
437                 nbio = 0;
438         else if (fflags & O_FNONBLOCKING)
439                 nbio = 1;
440         else if (fp->f_flag & O_NONBLOCK)
441                 nbio = 1;
442         else
443                 nbio = 0;
444
445         /*
446          * Reads are serialized.  Note however that pipe_buffer.buffer and
447          * pipe_buffer.size can change out from under us when the number
448          * of bytes in the buffer are zero due to the write-side doing a
449          * pipespace().
450          */
451         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
452         if (error) {
453                 lwkt_reltoken(&rpipe->pipe_rlock);
454                 return (error);
455         }
456         notify_writer = 0;
457
458         bigread = (uio->uio_resid > 10 * 1024 * 1024);
459         bigcount = 10;
460
461         while (uio->uio_resid) {
462                 /*
463                  * Don't hog the cpu.
464                  */
465                 if (bigread && --bigcount == 0) {
466                         lwkt_user_yield();
467                         bigcount = 10;
468                         if (CURSIG(curthread->td_lwp)) {
469                                 error = EINTR;
470                                 break;
471                         }
472                 }
473
474                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
475                 cpu_lfence();
476                 if (size) {
477                         rindex = rpipe->pipe_buffer.rindex &
478                                  (rpipe->pipe_buffer.size - 1);
479                         nsize = size;
480                         if (nsize > rpipe->pipe_buffer.size - rindex)
481                                 nsize = rpipe->pipe_buffer.size - rindex;
482                         nsize = szmin(nsize, uio->uio_resid);
483
484                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
485                                         nsize, uio);
486                         if (error)
487                                 break;
488                         cpu_mfence();
489                         rpipe->pipe_buffer.rindex += nsize;
490                         nread += nsize;
491
492                         /*
493                          * If the FIFO is still over half full just continue
494                          * and do not try to notify the writer yet.
495                          */
496                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
497                                 notify_writer = 0;
498                                 continue;
499                         }
500
501                         /*
502                          * When the FIFO is less then half full notify any
503                          * waiting writer.  WANTW can be checked while
504                          * holding just the rlock.
505                          */
506                         notify_writer = 1;
507                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
508                                 continue;
509                 }
510
511                 /*
512                  * If the "write-side" was blocked we wake it up.  This code
513                  * is reached either when the buffer is completely emptied
514                  * or if it becomes more then half-empty.
515                  *
516                  * Pipe_state can only be modified if both the rlock and
517                  * wlock are held.
518                  */
519                 if (rpipe->pipe_state & PIPE_WANTW) {
520                         lwkt_gettoken(&rpipe->pipe_wlock);
521                         if (rpipe->pipe_state & PIPE_WANTW) {
522                                 rpipe->pipe_state &= ~PIPE_WANTW;
523                                 lwkt_reltoken(&rpipe->pipe_wlock);
524                                 wakeup(rpipe);
525                         } else {
526                                 lwkt_reltoken(&rpipe->pipe_wlock);
527                         }
528                 }
529
530                 /*
531                  * Pick up our copy loop again if the writer sent data to
532                  * us while we were messing around.
533                  *
534                  * On a SMP box poll up to pipe_delay nanoseconds for new
535                  * data.  Typically a value of 2000 to 4000 is sufficient
536                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
537                  * is used for synchronous communications with small packets,
538                  * and 8000 or so (8uS) will pipeline large buffer xfers
539                  * between cpus over a pipe.
540                  *
541                  * For synchronous communications a hit means doing a
542                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
543                  * where as miss requiring a tsleep/wakeup sequence
544                  * will take 7uS or more.
545                  */
546                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
547                         continue;
548
549 #ifdef _RDTSC_SUPPORTED_
550                 if (pipe_delay) {
551                         int64_t tsc_target;
552                         int good = 0;
553
554                         tsc_target = tsc_get_target(pipe_delay);
555                         while (tsc_test_target(tsc_target) == 0) {
556                                 if (rpipe->pipe_buffer.windex !=
557                                     rpipe->pipe_buffer.rindex) {
558                                         good = 1;
559                                         break;
560                                 }
561                         }
562                         if (good)
563                                 continue;
564                 }
565 #endif
566
567                 /*
568                  * Detect EOF condition, do not set error.
569                  */
570                 if (rpipe->pipe_state & PIPE_REOF)
571                         break;
572
573                 /*
574                  * Break if some data was read, or if this was a non-blocking
575                  * read.
576                  */
577                 if (nread > 0)
578                         break;
579
580                 if (nbio) {
581                         error = EAGAIN;
582                         break;
583                 }
584
585                 /*
586                  * Last chance, interlock with WANTR.
587                  */
588                 lwkt_gettoken(&rpipe->pipe_wlock);
589                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
590                 if (size) {
591                         lwkt_reltoken(&rpipe->pipe_wlock);
592                         continue;
593                 }
594
595                 /*
596                  * Retest EOF - acquiring a new token can temporarily release
597                  * tokens already held.
598                  */
599                 if (rpipe->pipe_state & PIPE_REOF) {
600                         lwkt_reltoken(&rpipe->pipe_wlock);
601                         break;
602                 }
603
604                 /*
605                  * If there is no more to read in the pipe, reset its
606                  * pointers to the beginning.  This improves cache hit
607                  * stats.
608                  *
609                  * We need both locks to modify both pointers, and there
610                  * must also not be a write in progress or the uiomove()
611                  * in the write might block and temporarily release
612                  * its wlock, then reacquire and update windex.  We are
613                  * only serialized against reads, not writes.
614                  *
615                  * XXX should we even bother resetting the indices?  It
616                  *     might actually be more cache efficient not to.
617                  */
618                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
619                     rpipe->pipe_wip == 0) {
620                         rpipe->pipe_buffer.rindex = 0;
621                         rpipe->pipe_buffer.windex = 0;
622                 }
623
624                 /*
625                  * Wait for more data.
626                  *
627                  * Pipe_state can only be set if both the rlock and wlock
628                  * are held.
629                  */
630                 rpipe->pipe_state |= PIPE_WANTR;
631                 tsleep_interlock(rpipe, PCATCH);
632                 lwkt_reltoken(&rpipe->pipe_wlock);
633                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
634                 ++pipe_rblocked_count;
635                 if (error)
636                         break;
637         }
638         pipe_end_uio(rpipe, &rpipe->pipe_rip);
639
640         /*
641          * Uptime last access time
642          */
643         if (error == 0 && nread)
644                 vfs_timestamp(&rpipe->pipe_atime);
645
646         /*
647          * If we drained the FIFO more then half way then handle
648          * write blocking hysteresis.
649          *
650          * Note that PIPE_WANTW cannot be set by the writer without
651          * it holding both rlock and wlock, so we can test it
652          * while holding just rlock.
653          */
654         if (notify_writer) {
655                 /*
656                  * Synchronous blocking is done on the pipe involved
657                  */
658                 if (rpipe->pipe_state & PIPE_WANTW) {
659                         lwkt_gettoken(&rpipe->pipe_wlock);
660                         if (rpipe->pipe_state & PIPE_WANTW) {
661                                 rpipe->pipe_state &= ~PIPE_WANTW;
662                                 lwkt_reltoken(&rpipe->pipe_wlock);
663                                 wakeup(rpipe);
664                         } else {
665                                 lwkt_reltoken(&rpipe->pipe_wlock);
666                         }
667                 }
668
669                 /*
670                  * But we may also have to deal with a kqueue which is
671                  * stored on the same pipe as its descriptor, so a
672                  * EVFILT_WRITE event waiting for our side to drain will
673                  * be on the other side.
674                  */
675                 lwkt_gettoken(&wpipe->pipe_wlock);
676                 pipewakeup(wpipe, 0);
677                 lwkt_reltoken(&wpipe->pipe_wlock);
678         }
679         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
680         lwkt_reltoken(&rpipe->pipe_rlock);
681
682         return (error);
683 }
684
685 static int
686 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
687 {
688         int error;
689         int orig_resid;
690         int nbio;
691         struct pipe *wpipe;
692         struct pipe *rpipe;
693         u_int windex;
694         u_int space;
695         u_int wcount;
696         int bigwrite;
697         int bigcount;
698
699         /*
700          * Writes go to the peer.  The peer will always exist.
701          */
702         rpipe = (struct pipe *) fp->f_data;
703         wpipe = rpipe->pipe_peer;
704         lwkt_gettoken(&wpipe->pipe_wlock);
705         if (wpipe->pipe_state & PIPE_WEOF) {
706                 lwkt_reltoken(&wpipe->pipe_wlock);
707                 return (EPIPE);
708         }
709
710         /*
711          * Degenerate case (EPIPE takes prec)
712          */
713         if (uio->uio_resid == 0) {
714                 lwkt_reltoken(&wpipe->pipe_wlock);
715                 return(0);
716         }
717
718         /*
719          * Writes are serialized (start_uio must be called with wlock)
720          */
721         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
722         if (error) {
723                 lwkt_reltoken(&wpipe->pipe_wlock);
724                 return (error);
725         }
726
727         if (fflags & O_FBLOCKING)
728                 nbio = 0;
729         else if (fflags & O_FNONBLOCKING)
730                 nbio = 1;
731         else if (fp->f_flag & O_NONBLOCK)
732                 nbio = 1;
733         else
734                 nbio = 0;
735
736         /*
737          * If it is advantageous to resize the pipe buffer, do
738          * so.  We are write-serialized so we can block safely.
739          */
740         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
741             (pipe_nbig < pipe_maxbig) &&
742             wpipe->pipe_wantwcnt > 4 &&
743             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
744                 /* 
745                  * Recheck after lock.
746                  */
747                 lwkt_gettoken(&wpipe->pipe_rlock);
748                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
749                     (pipe_nbig < pipe_maxbig) &&
750                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
751                         atomic_add_int(&pipe_nbig, 1);
752                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
753                                 ++pipe_bigcount;
754                         else
755                                 atomic_subtract_int(&pipe_nbig, 1);
756                 }
757                 lwkt_reltoken(&wpipe->pipe_rlock);
758         }
759
760         orig_resid = uio->uio_resid;
761         wcount = 0;
762
763         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
764         bigcount = 10;
765
766         while (uio->uio_resid) {
767                 if (wpipe->pipe_state & PIPE_WEOF) {
768                         error = EPIPE;
769                         break;
770                 }
771
772                 /*
773                  * Don't hog the cpu.
774                  */
775                 if (bigwrite && --bigcount == 0) {
776                         lwkt_user_yield();
777                         bigcount = 10;
778                         if (CURSIG(curthread->td_lwp)) {
779                                 error = EINTR;
780                                 break;
781                         }
782                 }
783
784                 windex = wpipe->pipe_buffer.windex &
785                          (wpipe->pipe_buffer.size - 1);
786                 space = wpipe->pipe_buffer.size -
787                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
788                 cpu_lfence();
789
790                 /* Writes of size <= PIPE_BUF must be atomic. */
791                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
792                         space = 0;
793
794                 /* 
795                  * Write to fill, read size handles write hysteresis.  Also
796                  * additional restrictions can cause select-based non-blocking
797                  * writes to spin.
798                  */
799                 if (space > 0) {
800                         u_int segsize;
801
802                         /*
803                          * Transfer size is minimum of uio transfer
804                          * and free space in pipe buffer.
805                          *
806                          * Limit each uiocopy to no more then PIPE_SIZE
807                          * so we can keep the gravy train going on a
808                          * SMP box.  This doubles the performance for
809                          * write sizes > 16K.  Otherwise large writes
810                          * wind up doing an inefficient synchronous
811                          * ping-pong.
812                          */
813                         space = szmin(space, uio->uio_resid);
814                         if (space > PIPE_SIZE)
815                                 space = PIPE_SIZE;
816
817                         /*
818                          * First segment to transfer is minimum of
819                          * transfer size and contiguous space in
820                          * pipe buffer.  If first segment to transfer
821                          * is less than the transfer size, we've got
822                          * a wraparound in the buffer.
823                          */
824                         segsize = wpipe->pipe_buffer.size - windex;
825                         if (segsize > space)
826                                 segsize = space;
827
828                         /*
829                          * If this is the first loop and the reader is
830                          * blocked, do a preemptive wakeup of the reader.
831                          *
832                          * On SMP the IPI latency plus the wlock interlock
833                          * on the reader side is the fastest way to get the
834                          * reader going.  (The scheduler will hard loop on
835                          * lock tokens).
836                          *
837                          * NOTE: We can't clear WANTR here without acquiring
838                          * the rlock, which we don't want to do here!
839                          */
840                         if ((wpipe->pipe_state & PIPE_WANTR))
841                                 wakeup(wpipe);
842
843                         /*
844                          * Transfer segment, which may include a wrap-around.
845                          * Update windex to account for both all in one go
846                          * so the reader can read() the data atomically.
847                          */
848                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
849                                         segsize, uio);
850                         if (error == 0 && segsize < space) {
851                                 segsize = space - segsize;
852                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
853                                                 segsize, uio);
854                         }
855                         if (error)
856                                 break;
857                         cpu_mfence();
858                         wpipe->pipe_buffer.windex += space;
859                         wcount += space;
860                         continue;
861                 }
862
863                 /*
864                  * We need both the rlock and the wlock to interlock against
865                  * the EOF, WANTW, and size checks, and to modify pipe_state.
866                  *
867                  * These are token locks so we do not have to worry about
868                  * deadlocks.
869                  */
870                 lwkt_gettoken(&wpipe->pipe_rlock);
871
872                 /*
873                  * If the "read-side" has been blocked, wake it up now
874                  * and yield to let it drain synchronously rather
875                  * then block.
876                  */
877                 if (wpipe->pipe_state & PIPE_WANTR) {
878                         wpipe->pipe_state &= ~PIPE_WANTR;
879                         wakeup(wpipe);
880                 }
881
882                 /*
883                  * don't block on non-blocking I/O
884                  */
885                 if (nbio) {
886                         lwkt_reltoken(&wpipe->pipe_rlock);
887                         error = EAGAIN;
888                         break;
889                 }
890
891                 /*
892                  * re-test whether we have to block in the writer after
893                  * acquiring both locks, in case the reader opened up
894                  * some space.
895                  */
896                 space = wpipe->pipe_buffer.size -
897                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
898                 cpu_lfence();
899                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
900                         space = 0;
901
902                 /*
903                  * Retest EOF - acquiring a new token can temporarily release
904                  * tokens already held.
905                  */
906                 if (wpipe->pipe_state & PIPE_WEOF) {
907                         lwkt_reltoken(&wpipe->pipe_rlock);
908                         error = EPIPE;
909                         break;
910                 }
911
912                 /*
913                  * We have no more space and have something to offer,
914                  * wake up select/poll/kq.
915                  */
916                 if (space == 0) {
917                         wpipe->pipe_state |= PIPE_WANTW;
918                         ++wpipe->pipe_wantwcnt;
919                         pipewakeup(wpipe, 1);
920                         if (wpipe->pipe_state & PIPE_WANTW)
921                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
922                         ++pipe_wblocked_count;
923                 }
924                 lwkt_reltoken(&wpipe->pipe_rlock);
925
926                 /*
927                  * Break out if we errored or the read side wants us to go
928                  * away.
929                  */
930                 if (error)
931                         break;
932                 if (wpipe->pipe_state & PIPE_WEOF) {
933                         error = EPIPE;
934                         break;
935                 }
936         }
937         pipe_end_uio(wpipe, &wpipe->pipe_wip);
938
939         /*
940          * If we have put any characters in the buffer, we wake up
941          * the reader.
942          *
943          * Both rlock and wlock are required to be able to modify pipe_state.
944          */
945         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
946                 if (wpipe->pipe_state & PIPE_WANTR) {
947                         lwkt_gettoken(&wpipe->pipe_rlock);
948                         if (wpipe->pipe_state & PIPE_WANTR) {
949                                 wpipe->pipe_state &= ~PIPE_WANTR;
950                                 lwkt_reltoken(&wpipe->pipe_rlock);
951                                 wakeup(wpipe);
952                         } else {
953                                 lwkt_reltoken(&wpipe->pipe_rlock);
954                         }
955                 }
956                 lwkt_gettoken(&wpipe->pipe_rlock);
957                 pipewakeup(wpipe, 1);
958                 lwkt_reltoken(&wpipe->pipe_rlock);
959         }
960
961         /*
962          * Don't return EPIPE if I/O was successful
963          */
964         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
965             (uio->uio_resid == 0) &&
966             (error == EPIPE)) {
967                 error = 0;
968         }
969
970         if (error == 0)
971                 vfs_timestamp(&wpipe->pipe_mtime);
972
973         /*
974          * We have something to offer,
975          * wake up select/poll/kq.
976          */
977         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
978         lwkt_reltoken(&wpipe->pipe_wlock);
979         return (error);
980 }
981
982 /*
983  * we implement a very minimal set of ioctls for compatibility with sockets.
984  */
985 static int
986 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
987            struct ucred *cred, struct sysmsg *msg)
988 {
989         struct pipe *mpipe;
990         int error;
991
992         mpipe = (struct pipe *)fp->f_data;
993
994         lwkt_gettoken(&mpipe->pipe_rlock);
995         lwkt_gettoken(&mpipe->pipe_wlock);
996
997         switch (cmd) {
998         case FIOASYNC:
999                 if (*(int *)data) {
1000                         mpipe->pipe_state |= PIPE_ASYNC;
1001                 } else {
1002                         mpipe->pipe_state &= ~PIPE_ASYNC;
1003                 }
1004                 error = 0;
1005                 break;
1006         case FIONREAD:
1007                 *(int *)data = mpipe->pipe_buffer.windex -
1008                                 mpipe->pipe_buffer.rindex;
1009                 error = 0;
1010                 break;
1011         case FIOSETOWN:
1012                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
1013                 break;
1014         case FIOGETOWN:
1015                 *(int *)data = fgetown(&mpipe->pipe_sigio);
1016                 error = 0;
1017                 break;
1018         case TIOCSPGRP:
1019                 /* This is deprecated, FIOSETOWN should be used instead. */
1020                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
1021                 break;
1022
1023         case TIOCGPGRP:
1024                 /* This is deprecated, FIOGETOWN should be used instead. */
1025                 *(int *)data = -fgetown(&mpipe->pipe_sigio);
1026                 error = 0;
1027                 break;
1028         default:
1029                 error = ENOTTY;
1030                 break;
1031         }
1032         lwkt_reltoken(&mpipe->pipe_wlock);
1033         lwkt_reltoken(&mpipe->pipe_rlock);
1034
1035         return (error);
1036 }
1037
1038 /*
1039  * MPSAFE
1040  */
1041 static int
1042 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1043 {
1044         struct pipe *pipe;
1045
1046         pipe = (struct pipe *)fp->f_data;
1047
1048         bzero((caddr_t)ub, sizeof(*ub));
1049         ub->st_mode = S_IFIFO;
1050         ub->st_blksize = pipe->pipe_buffer.size;
1051         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1052         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1053         ub->st_atimespec = pipe->pipe_atime;
1054         ub->st_mtimespec = pipe->pipe_mtime;
1055         ub->st_ctimespec = pipe->pipe_ctime;
1056         /*
1057          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1058          * st_flags, st_gen.
1059          * XXX (st_dev, st_ino) should be unique.
1060          */
1061         return (0);
1062 }
1063
1064 static int
1065 pipe_close(struct file *fp)
1066 {
1067         struct pipe *cpipe;
1068
1069         cpipe = (struct pipe *)fp->f_data;
1070         fp->f_ops = &badfileops;
1071         fp->f_data = NULL;
1072         funsetown(&cpipe->pipe_sigio);
1073         pipeclose(cpipe);
1074         return (0);
1075 }
1076
1077 /*
1078  * Shutdown one or both directions of a full-duplex pipe.
1079  */
1080 static int
1081 pipe_shutdown(struct file *fp, int how)
1082 {
1083         struct pipe *rpipe;
1084         struct pipe *wpipe;
1085         int error = EPIPE;
1086
1087         rpipe = (struct pipe *)fp->f_data;
1088         wpipe = rpipe->pipe_peer;
1089
1090         /*
1091          * We modify pipe_state on both pipes, which means we need
1092          * all four tokens!
1093          */
1094         lwkt_gettoken(&rpipe->pipe_rlock);
1095         lwkt_gettoken(&rpipe->pipe_wlock);
1096         lwkt_gettoken(&wpipe->pipe_rlock);
1097         lwkt_gettoken(&wpipe->pipe_wlock);
1098
1099         switch(how) {
1100         case SHUT_RDWR:
1101         case SHUT_RD:
1102                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1103                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1104                 if (rpipe->pipe_state & PIPE_WANTR) {
1105                         rpipe->pipe_state &= ~PIPE_WANTR;
1106                         wakeup(rpipe);
1107                 }
1108                 if (rpipe->pipe_state & PIPE_WANTW) {
1109                         rpipe->pipe_state &= ~PIPE_WANTW;
1110                         wakeup(rpipe);
1111                 }
1112                 error = 0;
1113                 if (how == SHUT_RD)
1114                         break;
1115                 /* fall through */
1116         case SHUT_WR:
1117                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1118                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1119                 if (wpipe->pipe_state & PIPE_WANTR) {
1120                         wpipe->pipe_state &= ~PIPE_WANTR;
1121                         wakeup(wpipe);
1122                 }
1123                 if (wpipe->pipe_state & PIPE_WANTW) {
1124                         wpipe->pipe_state &= ~PIPE_WANTW;
1125                         wakeup(wpipe);
1126                 }
1127                 error = 0;
1128                 break;
1129         }
1130         pipewakeup(rpipe, 1);
1131         pipewakeup(wpipe, 1);
1132
1133         lwkt_reltoken(&wpipe->pipe_wlock);
1134         lwkt_reltoken(&wpipe->pipe_rlock);
1135         lwkt_reltoken(&rpipe->pipe_wlock);
1136         lwkt_reltoken(&rpipe->pipe_rlock);
1137
1138         return (error);
1139 }
1140
1141 /*
1142  * Destroy the pipe buffer.
1143  */
1144 static void
1145 pipe_free_kmem(struct pipe *cpipe)
1146 {
1147         if (cpipe->pipe_buffer.buffer != NULL) {
1148                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1149                         atomic_subtract_int(&pipe_nbig, 1);
1150                 kmem_free(&kernel_map,
1151                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1152                         cpipe->pipe_buffer.size);
1153                 cpipe->pipe_buffer.buffer = NULL;
1154                 cpipe->pipe_buffer.object = NULL;
1155         }
1156 }
1157
1158 /*
1159  * Close the pipe.  The slock must be held to interlock against simultanious
1160  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1161  */
1162 static void
1163 pipeclose(struct pipe *cpipe)
1164 {
1165         globaldata_t gd;
1166         struct pipe *ppipe;
1167
1168         if (cpipe == NULL)
1169                 return;
1170
1171         /*
1172          * The slock may not have been allocated yet (close during
1173          * initialization)
1174          *
1175          * We need both the read and write tokens to modify pipe_state.
1176          */
1177         if (cpipe->pipe_slock)
1178                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1179         lwkt_gettoken(&cpipe->pipe_rlock);
1180         lwkt_gettoken(&cpipe->pipe_wlock);
1181
1182         /*
1183          * Set our state, wakeup anyone waiting in select/poll/kq, and
1184          * wakeup anyone blocked on our pipe.
1185          */
1186         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1187         pipewakeup(cpipe, 1);
1188         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1189                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1190                 wakeup(cpipe);
1191         }
1192
1193         /*
1194          * Disconnect from peer.
1195          */
1196         if ((ppipe = cpipe->pipe_peer) != NULL) {
1197                 lwkt_gettoken(&ppipe->pipe_rlock);
1198                 lwkt_gettoken(&ppipe->pipe_wlock);
1199                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1200                 pipewakeup(ppipe, 1);
1201                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1202                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1203                         wakeup(ppipe);
1204                 }
1205                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1206                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1207                 lwkt_reltoken(&ppipe->pipe_wlock);
1208                 lwkt_reltoken(&ppipe->pipe_rlock);
1209         }
1210
1211         /*
1212          * If the peer is also closed we can free resources for both
1213          * sides, otherwise we leave our side intact to deal with any
1214          * races (since we only have the slock).
1215          */
1216         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1217                 cpipe->pipe_peer = NULL;
1218                 ppipe->pipe_peer = NULL;
1219                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1220                 pipeclose(ppipe);
1221                 ppipe = NULL;
1222         }
1223
1224         lwkt_reltoken(&cpipe->pipe_wlock);
1225         lwkt_reltoken(&cpipe->pipe_rlock);
1226         if (cpipe->pipe_slock)
1227                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1228
1229         /*
1230          * If we disassociated from our peer we can free resources.  We
1231          * maintain a pcpu cache to improve performance, so the actual
1232          * tear-down case is limited to bulk situations.
1233          *
1234          * However, the bulk tear-down case can cause intense contention
1235          * on the kernel_map when, e.g. hundreds to hundreds of thousands
1236          * of processes are killed at the same time.  To deal with this we
1237          * use a pcpu mutex to maintain concurrency but also limit the
1238          * number of threads banging on the map and pmap.
1239          *
1240          * We use the mtx mechanism instead of the lockmgr mechanism because
1241          * the mtx mechanism utilizes a queued design which will not break
1242          * down in the face of thousands to hundreds of thousands of
1243          * processes trying to free pipes simultaneously.  The lockmgr
1244          * mechanism will wind up waking them all up each time a lock
1245          * cycles.
1246          */
1247         if (ppipe == NULL) {
1248                 gd = mycpu;
1249                 if (cpipe->pipe_slock) {
1250                         kfree(cpipe->pipe_slock, M_PIPE);
1251                         cpipe->pipe_slock = NULL;
1252                 }
1253                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1254                     cpipe->pipe_buffer.size != PIPE_SIZE
1255                 ) {
1256                         mtx_lock(&pipe_gdlocks[gd->gd_cpuid]);
1257                         pipe_free_kmem(cpipe);
1258                         mtx_unlock(&pipe_gdlocks[gd->gd_cpuid]);
1259                         kfree(cpipe, M_PIPE);
1260                 } else {
1261                         cpipe->pipe_state = 0;
1262                         cpipe->pipe_peer = gd->gd_pipeq;
1263                         gd->gd_pipeq = cpipe;
1264                         ++gd->gd_pipeqcount;
1265                 }
1266         }
1267 }
1268
1269 static int
1270 pipe_kqfilter(struct file *fp, struct knote *kn)
1271 {
1272         struct pipe *cpipe;
1273
1274         cpipe = (struct pipe *)kn->kn_fp->f_data;
1275
1276         switch (kn->kn_filter) {
1277         case EVFILT_READ:
1278                 kn->kn_fop = &pipe_rfiltops;
1279                 break;
1280         case EVFILT_WRITE:
1281                 kn->kn_fop = &pipe_wfiltops;
1282                 if (cpipe->pipe_peer == NULL) {
1283                         /* other end of pipe has been closed */
1284                         return (EPIPE);
1285                 }
1286                 break;
1287         default:
1288                 return (EOPNOTSUPP);
1289         }
1290         kn->kn_hook = (caddr_t)cpipe;
1291
1292         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1293
1294         return (0);
1295 }
1296
1297 static void
1298 filt_pipedetach(struct knote *kn)
1299 {
1300         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1301
1302         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1303 }
1304
1305 /*ARGSUSED*/
1306 static int
1307 filt_piperead(struct knote *kn, long hint)
1308 {
1309         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1310         int ready = 0;
1311
1312         lwkt_gettoken(&rpipe->pipe_rlock);
1313         lwkt_gettoken(&rpipe->pipe_wlock);
1314
1315         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1316
1317         if (rpipe->pipe_state & PIPE_REOF) {
1318                 /*
1319                  * Only set NODATA if all data has been exhausted
1320                  */
1321                 if (kn->kn_data == 0)
1322                         kn->kn_flags |= EV_NODATA;
1323                 kn->kn_flags |= EV_EOF; 
1324                 ready = 1;
1325         }
1326
1327         lwkt_reltoken(&rpipe->pipe_wlock);
1328         lwkt_reltoken(&rpipe->pipe_rlock);
1329
1330         if (!ready)
1331                 ready = kn->kn_data > 0;
1332
1333         return (ready);
1334 }
1335
1336 /*ARGSUSED*/
1337 static int
1338 filt_pipewrite(struct knote *kn, long hint)
1339 {
1340         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1341         struct pipe *wpipe = rpipe->pipe_peer;
1342         int ready = 0;
1343
1344         kn->kn_data = 0;
1345         if (wpipe == NULL) {
1346                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1347                 return (1);
1348         }
1349
1350         lwkt_gettoken(&wpipe->pipe_rlock);
1351         lwkt_gettoken(&wpipe->pipe_wlock);
1352
1353         if (wpipe->pipe_state & PIPE_WEOF) {
1354                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1355                 ready = 1;
1356         }
1357
1358         if (!ready)
1359                 kn->kn_data = wpipe->pipe_buffer.size -
1360                               (wpipe->pipe_buffer.windex -
1361                                wpipe->pipe_buffer.rindex);
1362
1363         lwkt_reltoken(&wpipe->pipe_wlock);
1364         lwkt_reltoken(&wpipe->pipe_rlock);
1365
1366         if (!ready)
1367                 ready = kn->kn_data >= PIPE_BUF;
1368
1369         return (ready);
1370 }