Merge branch 'vendor/GCC50'
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  */
21
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 #include <sys/param.h>
29 #include <sys/systm.h>
30 #include <sys/kernel.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/filio.h>
36 #include <sys/ttycom.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/sysproto.h>
40 #include <sys/pipe.h>
41 #include <sys/vnode.h>
42 #include <sys/uio.h>
43 #include <sys/event.h>
44 #include <sys/globaldata.h>
45 #include <sys/module.h>
46 #include <sys/malloc.h>
47 #include <sys/sysctl.h>
48 #include <sys/socket.h>
49 #include <sys/kern_syscall.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64
65 #include <machine/cpufunc.h>
66
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio, 
71                 struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio, 
73                 struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79                 struct ucred *cred, struct sysmsg *msg);
80
81 static struct fileops pipeops = {
82         .fo_read = pipe_read, 
83         .fo_write = pipe_write,
84         .fo_ioctl = pipe_ioctl,
85         .fo_kqfilter = pipe_kqfilter,
86         .fo_stat = pipe_stat,
87         .fo_close = pipe_close,
88         .fo_shutdown = pipe_shutdown
89 };
90
91 static void     filt_pipedetach(struct knote *kn);
92 static int      filt_piperead(struct knote *kn, long hint);
93 static int      filt_pipewrite(struct knote *kn, long hint);
94
95 static struct filterops pipe_rfiltops =
96         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES   64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "number of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 static int pipe_delay = 5000;   /* 5uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #if !defined(NO_PIPE_SYSCTL_STATS)
143 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
144         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
146         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
147 #endif
148
149 /*
150  * Auto-size pipe cache to reduce kmem allocations and frees.
151  */
152 static
153 void
154 pipeinit(void *dummy)
155 {
156         size_t mbytes = kmem_lim_size();
157
158         if (pipe_maxbig == LIMITBIGPIPES) {
159                 if (mbytes >= 7 * 1024)
160                         pipe_maxbig *= 2;
161                 if (mbytes >= 15 * 1024)
162                         pipe_maxbig *= 2;
163         }
164         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
165                 if (mbytes >= 7 * 1024)
166                         pipe_maxcache *= 2;
167                 if (mbytes >= 15 * 1024)
168                         pipe_maxcache *= 2;
169         }
170 }
171 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
172
173 static void pipeclose (struct pipe *cpipe);
174 static void pipe_free_kmem (struct pipe *cpipe);
175 static int pipe_create (struct pipe **cpipep);
176 static int pipespace (struct pipe *cpipe, int size);
177
178 static __inline void
179 pipewakeup(struct pipe *cpipe, int dosigio)
180 {
181         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
182                 lwkt_gettoken(&sigio_token);
183                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
184                 lwkt_reltoken(&sigio_token);
185         }
186         KNOTE(&cpipe->pipe_kq.ki_note, 0);
187 }
188
189 /*
190  * These routines are called before and after a UIO.  The UIO
191  * may block, causing our held tokens to be lost temporarily.
192  *
193  * We use these routines to serialize reads against other reads
194  * and writes against other writes.
195  *
196  * The read token is held on entry so *ipp does not race.
197  */
198 static __inline int
199 pipe_start_uio(struct pipe *cpipe, int *ipp)
200 {
201         int error;
202
203         while (*ipp) {
204                 *ipp = -1;
205                 error = tsleep(ipp, PCATCH, "pipexx", 0);
206                 if (error)
207                         return (error);
208         }
209         *ipp = 1;
210         return (0);
211 }
212
213 static __inline void
214 pipe_end_uio(struct pipe *cpipe, int *ipp)
215 {
216         if (*ipp < 0) {
217                 *ipp = 0;
218                 wakeup(ipp);
219         } else {
220                 KKASSERT(*ipp > 0);
221                 *ipp = 0;
222         }
223 }
224
225 /*
226  * The pipe system call for the DTYPE_PIPE type of pipes
227  *
228  * pipe_args(int dummy)
229  *
230  * MPSAFE
231  */
232 int
233 sys_pipe(struct pipe_args *uap)
234 {
235         return kern_pipe(uap->sysmsg_fds, 0);
236 }
237
238 int
239 sys_pipe2(struct pipe2_args *uap)
240 {
241         return kern_pipe(uap->sysmsg_fds, uap->flags);
242 }
243
244 int
245 kern_pipe(long *fds, int flags)
246 {
247         struct thread *td = curthread;
248         struct filedesc *fdp = td->td_proc->p_fd;
249         struct file *rf, *wf;
250         struct pipe *rpipe, *wpipe;
251         int fd1, fd2, error;
252
253         rpipe = wpipe = NULL;
254         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
255                 pipeclose(rpipe); 
256                 pipeclose(wpipe); 
257                 return (ENFILE);
258         }
259         
260         error = falloc(td->td_lwp, &rf, &fd1);
261         if (error) {
262                 pipeclose(rpipe);
263                 pipeclose(wpipe);
264                 return (error);
265         }
266         fds[0] = fd1;
267
268         /*
269          * Warning: once we've gotten past allocation of the fd for the
270          * read-side, we can only drop the read side via fdrop() in order
271          * to avoid races against processes which manage to dup() the read
272          * side while we are blocked trying to allocate the write side.
273          */
274         rf->f_type = DTYPE_PIPE;
275         rf->f_flag = FREAD | FWRITE;
276         rf->f_ops = &pipeops;
277         rf->f_data = rpipe;
278         if (flags & O_NONBLOCK)
279                 rf->f_flag |= O_NONBLOCK;
280         if (flags & O_CLOEXEC)
281                 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
282
283         error = falloc(td->td_lwp, &wf, &fd2);
284         if (error) {
285                 fsetfd(fdp, NULL, fd1);
286                 fdrop(rf);
287                 /* rpipe has been closed by fdrop(). */
288                 pipeclose(wpipe);
289                 return (error);
290         }
291         wf->f_type = DTYPE_PIPE;
292         wf->f_flag = FREAD | FWRITE;
293         wf->f_ops = &pipeops;
294         wf->f_data = wpipe;
295         if (flags & O_NONBLOCK)
296                 wf->f_flag |= O_NONBLOCK;
297         if (flags & O_CLOEXEC)
298                 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
299
300         fds[1] = fd2;
301
302         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
303                                     M_PIPE, M_WAITOK|M_ZERO);
304         wpipe->pipe_slock = rpipe->pipe_slock;
305         rpipe->pipe_peer = wpipe;
306         wpipe->pipe_peer = rpipe;
307         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
308
309         /*
310          * Once activated the peer relationship remains valid until
311          * both sides are closed.
312          */
313         fsetfd(fdp, rf, fd1);
314         fsetfd(fdp, wf, fd2);
315         fdrop(rf);
316         fdrop(wf);
317
318         return (0);
319 }
320
321 /*
322  * Allocate kva for pipe circular buffer, the space is pageable
323  * This routine will 'realloc' the size of a pipe safely, if it fails
324  * it will retain the old buffer.
325  * If it fails it will return ENOMEM.
326  */
327 static int
328 pipespace(struct pipe *cpipe, int size)
329 {
330         struct vm_object *object;
331         caddr_t buffer;
332         int npages, error;
333
334         npages = round_page(size) / PAGE_SIZE;
335         object = cpipe->pipe_buffer.object;
336
337         /*
338          * [re]create the object if necessary and reserve space for it
339          * in the kernel_map.  The object and memory are pageable.  On
340          * success, free the old resources before assigning the new
341          * ones.
342          */
343         if (object == NULL || object->size != npages) {
344                 object = vm_object_allocate(OBJT_DEFAULT, npages);
345                 buffer = (caddr_t)vm_map_min(&kernel_map);
346
347                 error = vm_map_find(&kernel_map, object, NULL,
348                                     0, (vm_offset_t *)&buffer, size,
349                                     PAGE_SIZE,
350                                     1, VM_MAPTYPE_NORMAL,
351                                     VM_PROT_ALL, VM_PROT_ALL, 0);
352
353                 if (error != KERN_SUCCESS) {
354                         vm_object_deallocate(object);
355                         return (ENOMEM);
356                 }
357                 pipe_free_kmem(cpipe);
358                 cpipe->pipe_buffer.object = object;
359                 cpipe->pipe_buffer.buffer = buffer;
360                 cpipe->pipe_buffer.size = size;
361                 ++pipe_bkmem_alloc;
362         } else {
363                 ++pipe_bcache_alloc;
364         }
365         cpipe->pipe_buffer.rindex = 0;
366         cpipe->pipe_buffer.windex = 0;
367         return (0);
368 }
369
370 /*
371  * Initialize and allocate VM and memory for pipe, pulling the pipe from
372  * our per-cpu cache if possible.  For now make sure it is sized for the
373  * smaller PIPE_SIZE default.
374  */
375 static int
376 pipe_create(struct pipe **cpipep)
377 {
378         globaldata_t gd = mycpu;
379         struct pipe *cpipe;
380         int error;
381
382         if ((cpipe = gd->gd_pipeq) != NULL) {
383                 gd->gd_pipeq = cpipe->pipe_peer;
384                 --gd->gd_pipeqcount;
385                 cpipe->pipe_peer = NULL;
386                 cpipe->pipe_wantwcnt = 0;
387         } else {
388                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
389         }
390         *cpipep = cpipe;
391         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
392                 return (error);
393         vfs_timestamp(&cpipe->pipe_ctime);
394         cpipe->pipe_atime = cpipe->pipe_ctime;
395         cpipe->pipe_mtime = cpipe->pipe_ctime;
396         lwkt_token_init(&cpipe->pipe_rlock, "piper");
397         lwkt_token_init(&cpipe->pipe_wlock, "pipew");
398         return (0);
399 }
400
401 static int
402 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
403 {
404         struct pipe *rpipe;
405         struct pipe *wpipe;
406         int error;
407         size_t nread = 0;
408         int nbio;
409         u_int size;     /* total bytes available */
410         u_int nsize;    /* total bytes to read */
411         u_int rindex;   /* contiguous bytes available */
412         int notify_writer;
413         int bigread;
414         int bigcount;
415
416         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
417
418         if (uio->uio_resid == 0)
419                 return(0);
420
421         /*
422          * Setup locks, calculate nbio
423          */
424         rpipe = (struct pipe *)fp->f_data;
425         wpipe = rpipe->pipe_peer;
426         lwkt_gettoken(&rpipe->pipe_rlock);
427
428         if (fflags & O_FBLOCKING)
429                 nbio = 0;
430         else if (fflags & O_FNONBLOCKING)
431                 nbio = 1;
432         else if (fp->f_flag & O_NONBLOCK)
433                 nbio = 1;
434         else
435                 nbio = 0;
436
437         /*
438          * Reads are serialized.  Note however that pipe_buffer.buffer and
439          * pipe_buffer.size can change out from under us when the number
440          * of bytes in the buffer are zero due to the write-side doing a
441          * pipespace().
442          */
443         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
444         if (error) {
445                 lwkt_reltoken(&rpipe->pipe_rlock);
446                 return (error);
447         }
448         notify_writer = 0;
449
450         bigread = (uio->uio_resid > 10 * 1024 * 1024);
451         bigcount = 10;
452
453         while (uio->uio_resid) {
454                 /*
455                  * Don't hog the cpu.
456                  */
457                 if (bigread && --bigcount == 0) {
458                         lwkt_user_yield();
459                         bigcount = 10;
460                         if (CURSIG(curthread->td_lwp)) {
461                                 error = EINTR;
462                                 break;
463                         }
464                 }
465
466                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
467                 cpu_lfence();
468                 if (size) {
469                         rindex = rpipe->pipe_buffer.rindex &
470                                  (rpipe->pipe_buffer.size - 1);
471                         nsize = size;
472                         if (nsize > rpipe->pipe_buffer.size - rindex)
473                                 nsize = rpipe->pipe_buffer.size - rindex;
474                         nsize = szmin(nsize, uio->uio_resid);
475
476                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
477                                         nsize, uio);
478                         if (error)
479                                 break;
480                         cpu_mfence();
481                         rpipe->pipe_buffer.rindex += nsize;
482                         nread += nsize;
483
484                         /*
485                          * If the FIFO is still over half full just continue
486                          * and do not try to notify the writer yet.
487                          */
488                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
489                                 notify_writer = 0;
490                                 continue;
491                         }
492
493                         /*
494                          * When the FIFO is less then half full notify any
495                          * waiting writer.  WANTW can be checked while
496                          * holding just the rlock.
497                          */
498                         notify_writer = 1;
499                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
500                                 continue;
501                 }
502
503                 /*
504                  * If the "write-side" was blocked we wake it up.  This code
505                  * is reached either when the buffer is completely emptied
506                  * or if it becomes more then half-empty.
507                  *
508                  * Pipe_state can only be modified if both the rlock and
509                  * wlock are held.
510                  */
511                 if (rpipe->pipe_state & PIPE_WANTW) {
512                         lwkt_gettoken(&rpipe->pipe_wlock);
513                         if (rpipe->pipe_state & PIPE_WANTW) {
514                                 rpipe->pipe_state &= ~PIPE_WANTW;
515                                 lwkt_reltoken(&rpipe->pipe_wlock);
516                                 wakeup(rpipe);
517                         } else {
518                                 lwkt_reltoken(&rpipe->pipe_wlock);
519                         }
520                 }
521
522                 /*
523                  * Pick up our copy loop again if the writer sent data to
524                  * us while we were messing around.
525                  *
526                  * On a SMP box poll up to pipe_delay nanoseconds for new
527                  * data.  Typically a value of 2000 to 4000 is sufficient
528                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
529                  * is used for synchronous communications with small packets,
530                  * and 8000 or so (8uS) will pipeline large buffer xfers
531                  * between cpus over a pipe.
532                  *
533                  * For synchronous communications a hit means doing a
534                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
535                  * where as miss requiring a tsleep/wakeup sequence
536                  * will take 7uS or more.
537                  */
538                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
539                         continue;
540
541 #ifdef _RDTSC_SUPPORTED_
542                 if (pipe_delay) {
543                         int64_t tsc_target;
544                         int good = 0;
545
546                         tsc_target = tsc_get_target(pipe_delay);
547                         while (tsc_test_target(tsc_target) == 0) {
548                                 if (rpipe->pipe_buffer.windex !=
549                                     rpipe->pipe_buffer.rindex) {
550                                         good = 1;
551                                         break;
552                                 }
553                         }
554                         if (good)
555                                 continue;
556                 }
557 #endif
558
559                 /*
560                  * Detect EOF condition, do not set error.
561                  */
562                 if (rpipe->pipe_state & PIPE_REOF)
563                         break;
564
565                 /*
566                  * Break if some data was read, or if this was a non-blocking
567                  * read.
568                  */
569                 if (nread > 0)
570                         break;
571
572                 if (nbio) {
573                         error = EAGAIN;
574                         break;
575                 }
576
577                 /*
578                  * Last chance, interlock with WANTR.
579                  */
580                 lwkt_gettoken(&rpipe->pipe_wlock);
581                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
582                 if (size) {
583                         lwkt_reltoken(&rpipe->pipe_wlock);
584                         continue;
585                 }
586
587                 /*
588                  * Retest EOF - acquiring a new token can temporarily release
589                  * tokens already held.
590                  */
591                 if (rpipe->pipe_state & PIPE_REOF) {
592                         lwkt_reltoken(&rpipe->pipe_wlock);
593                         break;
594                 }
595
596                 /*
597                  * If there is no more to read in the pipe, reset its
598                  * pointers to the beginning.  This improves cache hit
599                  * stats.
600                  *
601                  * We need both locks to modify both pointers, and there
602                  * must also not be a write in progress or the uiomove()
603                  * in the write might block and temporarily release
604                  * its wlock, then reacquire and update windex.  We are
605                  * only serialized against reads, not writes.
606                  *
607                  * XXX should we even bother resetting the indices?  It
608                  *     might actually be more cache efficient not to.
609                  */
610                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
611                     rpipe->pipe_wip == 0) {
612                         rpipe->pipe_buffer.rindex = 0;
613                         rpipe->pipe_buffer.windex = 0;
614                 }
615
616                 /*
617                  * Wait for more data.
618                  *
619                  * Pipe_state can only be set if both the rlock and wlock
620                  * are held.
621                  */
622                 rpipe->pipe_state |= PIPE_WANTR;
623                 tsleep_interlock(rpipe, PCATCH);
624                 lwkt_reltoken(&rpipe->pipe_wlock);
625                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
626                 ++pipe_rblocked_count;
627                 if (error)
628                         break;
629         }
630         pipe_end_uio(rpipe, &rpipe->pipe_rip);
631
632         /*
633          * Uptime last access time
634          */
635         if (error == 0 && nread)
636                 vfs_timestamp(&rpipe->pipe_atime);
637
638         /*
639          * If we drained the FIFO more then half way then handle
640          * write blocking hysteresis.
641          *
642          * Note that PIPE_WANTW cannot be set by the writer without
643          * it holding both rlock and wlock, so we can test it
644          * while holding just rlock.
645          */
646         if (notify_writer) {
647                 /*
648                  * Synchronous blocking is done on the pipe involved
649                  */
650                 if (rpipe->pipe_state & PIPE_WANTW) {
651                         lwkt_gettoken(&rpipe->pipe_wlock);
652                         if (rpipe->pipe_state & PIPE_WANTW) {
653                                 rpipe->pipe_state &= ~PIPE_WANTW;
654                                 lwkt_reltoken(&rpipe->pipe_wlock);
655                                 wakeup(rpipe);
656                         } else {
657                                 lwkt_reltoken(&rpipe->pipe_wlock);
658                         }
659                 }
660
661                 /*
662                  * But we may also have to deal with a kqueue which is
663                  * stored on the same pipe as its descriptor, so a
664                  * EVFILT_WRITE event waiting for our side to drain will
665                  * be on the other side.
666                  */
667                 lwkt_gettoken(&wpipe->pipe_wlock);
668                 pipewakeup(wpipe, 0);
669                 lwkt_reltoken(&wpipe->pipe_wlock);
670         }
671         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
672         lwkt_reltoken(&rpipe->pipe_rlock);
673
674         return (error);
675 }
676
677 static int
678 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
679 {
680         int error;
681         int orig_resid;
682         int nbio;
683         struct pipe *wpipe;
684         struct pipe *rpipe;
685         u_int windex;
686         u_int space;
687         u_int wcount;
688         int bigwrite;
689         int bigcount;
690
691         /*
692          * Writes go to the peer.  The peer will always exist.
693          */
694         rpipe = (struct pipe *) fp->f_data;
695         wpipe = rpipe->pipe_peer;
696         lwkt_gettoken(&wpipe->pipe_wlock);
697         if (wpipe->pipe_state & PIPE_WEOF) {
698                 lwkt_reltoken(&wpipe->pipe_wlock);
699                 return (EPIPE);
700         }
701
702         /*
703          * Degenerate case (EPIPE takes prec)
704          */
705         if (uio->uio_resid == 0) {
706                 lwkt_reltoken(&wpipe->pipe_wlock);
707                 return(0);
708         }
709
710         /*
711          * Writes are serialized (start_uio must be called with wlock)
712          */
713         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
714         if (error) {
715                 lwkt_reltoken(&wpipe->pipe_wlock);
716                 return (error);
717         }
718
719         if (fflags & O_FBLOCKING)
720                 nbio = 0;
721         else if (fflags & O_FNONBLOCKING)
722                 nbio = 1;
723         else if (fp->f_flag & O_NONBLOCK)
724                 nbio = 1;
725         else
726                 nbio = 0;
727
728         /*
729          * If it is advantageous to resize the pipe buffer, do
730          * so.  We are write-serialized so we can block safely.
731          */
732         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
733             (pipe_nbig < pipe_maxbig) &&
734             wpipe->pipe_wantwcnt > 4 &&
735             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
736                 /* 
737                  * Recheck after lock.
738                  */
739                 lwkt_gettoken(&wpipe->pipe_rlock);
740                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
741                     (pipe_nbig < pipe_maxbig) &&
742                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
743                         atomic_add_int(&pipe_nbig, 1);
744                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
745                                 ++pipe_bigcount;
746                         else
747                                 atomic_subtract_int(&pipe_nbig, 1);
748                 }
749                 lwkt_reltoken(&wpipe->pipe_rlock);
750         }
751
752         orig_resid = uio->uio_resid;
753         wcount = 0;
754
755         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
756         bigcount = 10;
757
758         while (uio->uio_resid) {
759                 if (wpipe->pipe_state & PIPE_WEOF) {
760                         error = EPIPE;
761                         break;
762                 }
763
764                 /*
765                  * Don't hog the cpu.
766                  */
767                 if (bigwrite && --bigcount == 0) {
768                         lwkt_user_yield();
769                         bigcount = 10;
770                         if (CURSIG(curthread->td_lwp)) {
771                                 error = EINTR;
772                                 break;
773                         }
774                 }
775
776                 windex = wpipe->pipe_buffer.windex &
777                          (wpipe->pipe_buffer.size - 1);
778                 space = wpipe->pipe_buffer.size -
779                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
780                 cpu_lfence();
781
782                 /* Writes of size <= PIPE_BUF must be atomic. */
783                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
784                         space = 0;
785
786                 /* 
787                  * Write to fill, read size handles write hysteresis.  Also
788                  * additional restrictions can cause select-based non-blocking
789                  * writes to spin.
790                  */
791                 if (space > 0) {
792                         u_int segsize;
793
794                         /*
795                          * Transfer size is minimum of uio transfer
796                          * and free space in pipe buffer.
797                          *
798                          * Limit each uiocopy to no more then PIPE_SIZE
799                          * so we can keep the gravy train going on a
800                          * SMP box.  This doubles the performance for
801                          * write sizes > 16K.  Otherwise large writes
802                          * wind up doing an inefficient synchronous
803                          * ping-pong.
804                          */
805                         space = szmin(space, uio->uio_resid);
806                         if (space > PIPE_SIZE)
807                                 space = PIPE_SIZE;
808
809                         /*
810                          * First segment to transfer is minimum of
811                          * transfer size and contiguous space in
812                          * pipe buffer.  If first segment to transfer
813                          * is less than the transfer size, we've got
814                          * a wraparound in the buffer.
815                          */
816                         segsize = wpipe->pipe_buffer.size - windex;
817                         if (segsize > space)
818                                 segsize = space;
819
820                         /*
821                          * If this is the first loop and the reader is
822                          * blocked, do a preemptive wakeup of the reader.
823                          *
824                          * On SMP the IPI latency plus the wlock interlock
825                          * on the reader side is the fastest way to get the
826                          * reader going.  (The scheduler will hard loop on
827                          * lock tokens).
828                          *
829                          * NOTE: We can't clear WANTR here without acquiring
830                          * the rlock, which we don't want to do here!
831                          */
832                         if ((wpipe->pipe_state & PIPE_WANTR))
833                                 wakeup(wpipe);
834
835                         /*
836                          * Transfer segment, which may include a wrap-around.
837                          * Update windex to account for both all in one go
838                          * so the reader can read() the data atomically.
839                          */
840                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
841                                         segsize, uio);
842                         if (error == 0 && segsize < space) {
843                                 segsize = space - segsize;
844                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
845                                                 segsize, uio);
846                         }
847                         if (error)
848                                 break;
849                         cpu_mfence();
850                         wpipe->pipe_buffer.windex += space;
851                         wcount += space;
852                         continue;
853                 }
854
855                 /*
856                  * We need both the rlock and the wlock to interlock against
857                  * the EOF, WANTW, and size checks, and to modify pipe_state.
858                  *
859                  * These are token locks so we do not have to worry about
860                  * deadlocks.
861                  */
862                 lwkt_gettoken(&wpipe->pipe_rlock);
863
864                 /*
865                  * If the "read-side" has been blocked, wake it up now
866                  * and yield to let it drain synchronously rather
867                  * then block.
868                  */
869                 if (wpipe->pipe_state & PIPE_WANTR) {
870                         wpipe->pipe_state &= ~PIPE_WANTR;
871                         wakeup(wpipe);
872                 }
873
874                 /*
875                  * don't block on non-blocking I/O
876                  */
877                 if (nbio) {
878                         lwkt_reltoken(&wpipe->pipe_rlock);
879                         error = EAGAIN;
880                         break;
881                 }
882
883                 /*
884                  * re-test whether we have to block in the writer after
885                  * acquiring both locks, in case the reader opened up
886                  * some space.
887                  */
888                 space = wpipe->pipe_buffer.size -
889                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
890                 cpu_lfence();
891                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
892                         space = 0;
893
894                 /*
895                  * Retest EOF - acquiring a new token can temporarily release
896                  * tokens already held.
897                  */
898                 if (wpipe->pipe_state & PIPE_WEOF) {
899                         lwkt_reltoken(&wpipe->pipe_rlock);
900                         error = EPIPE;
901                         break;
902                 }
903
904                 /*
905                  * We have no more space and have something to offer,
906                  * wake up select/poll/kq.
907                  */
908                 if (space == 0) {
909                         wpipe->pipe_state |= PIPE_WANTW;
910                         ++wpipe->pipe_wantwcnt;
911                         pipewakeup(wpipe, 1);
912                         if (wpipe->pipe_state & PIPE_WANTW)
913                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
914                         ++pipe_wblocked_count;
915                 }
916                 lwkt_reltoken(&wpipe->pipe_rlock);
917
918                 /*
919                  * Break out if we errored or the read side wants us to go
920                  * away.
921                  */
922                 if (error)
923                         break;
924                 if (wpipe->pipe_state & PIPE_WEOF) {
925                         error = EPIPE;
926                         break;
927                 }
928         }
929         pipe_end_uio(wpipe, &wpipe->pipe_wip);
930
931         /*
932          * If we have put any characters in the buffer, we wake up
933          * the reader.
934          *
935          * Both rlock and wlock are required to be able to modify pipe_state.
936          */
937         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
938                 if (wpipe->pipe_state & PIPE_WANTR) {
939                         lwkt_gettoken(&wpipe->pipe_rlock);
940                         if (wpipe->pipe_state & PIPE_WANTR) {
941                                 wpipe->pipe_state &= ~PIPE_WANTR;
942                                 lwkt_reltoken(&wpipe->pipe_rlock);
943                                 wakeup(wpipe);
944                         } else {
945                                 lwkt_reltoken(&wpipe->pipe_rlock);
946                         }
947                 }
948                 lwkt_gettoken(&wpipe->pipe_rlock);
949                 pipewakeup(wpipe, 1);
950                 lwkt_reltoken(&wpipe->pipe_rlock);
951         }
952
953         /*
954          * Don't return EPIPE if I/O was successful
955          */
956         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
957             (uio->uio_resid == 0) &&
958             (error == EPIPE)) {
959                 error = 0;
960         }
961
962         if (error == 0)
963                 vfs_timestamp(&wpipe->pipe_mtime);
964
965         /*
966          * We have something to offer,
967          * wake up select/poll/kq.
968          */
969         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
970         lwkt_reltoken(&wpipe->pipe_wlock);
971         return (error);
972 }
973
974 /*
975  * we implement a very minimal set of ioctls for compatibility with sockets.
976  */
977 static int
978 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
979            struct ucred *cred, struct sysmsg *msg)
980 {
981         struct pipe *mpipe;
982         int error;
983
984         mpipe = (struct pipe *)fp->f_data;
985
986         lwkt_gettoken(&mpipe->pipe_rlock);
987         lwkt_gettoken(&mpipe->pipe_wlock);
988
989         switch (cmd) {
990         case FIOASYNC:
991                 if (*(int *)data) {
992                         mpipe->pipe_state |= PIPE_ASYNC;
993                 } else {
994                         mpipe->pipe_state &= ~PIPE_ASYNC;
995                 }
996                 error = 0;
997                 break;
998         case FIONREAD:
999                 *(int *)data = mpipe->pipe_buffer.windex -
1000                                 mpipe->pipe_buffer.rindex;
1001                 error = 0;
1002                 break;
1003         case FIOSETOWN:
1004                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
1005                 break;
1006         case FIOGETOWN:
1007                 *(int *)data = fgetown(&mpipe->pipe_sigio);
1008                 error = 0;
1009                 break;
1010         case TIOCSPGRP:
1011                 /* This is deprecated, FIOSETOWN should be used instead. */
1012                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
1013                 break;
1014
1015         case TIOCGPGRP:
1016                 /* This is deprecated, FIOGETOWN should be used instead. */
1017                 *(int *)data = -fgetown(&mpipe->pipe_sigio);
1018                 error = 0;
1019                 break;
1020         default:
1021                 error = ENOTTY;
1022                 break;
1023         }
1024         lwkt_reltoken(&mpipe->pipe_wlock);
1025         lwkt_reltoken(&mpipe->pipe_rlock);
1026
1027         return (error);
1028 }
1029
1030 /*
1031  * MPSAFE
1032  */
1033 static int
1034 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1035 {
1036         struct pipe *pipe;
1037
1038         pipe = (struct pipe *)fp->f_data;
1039
1040         bzero((caddr_t)ub, sizeof(*ub));
1041         ub->st_mode = S_IFIFO;
1042         ub->st_blksize = pipe->pipe_buffer.size;
1043         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1044         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1045         ub->st_atimespec = pipe->pipe_atime;
1046         ub->st_mtimespec = pipe->pipe_mtime;
1047         ub->st_ctimespec = pipe->pipe_ctime;
1048         /*
1049          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1050          * st_flags, st_gen.
1051          * XXX (st_dev, st_ino) should be unique.
1052          */
1053         return (0);
1054 }
1055
1056 static int
1057 pipe_close(struct file *fp)
1058 {
1059         struct pipe *cpipe;
1060
1061         cpipe = (struct pipe *)fp->f_data;
1062         fp->f_ops = &badfileops;
1063         fp->f_data = NULL;
1064         funsetown(&cpipe->pipe_sigio);
1065         pipeclose(cpipe);
1066         return (0);
1067 }
1068
1069 /*
1070  * Shutdown one or both directions of a full-duplex pipe.
1071  */
1072 static int
1073 pipe_shutdown(struct file *fp, int how)
1074 {
1075         struct pipe *rpipe;
1076         struct pipe *wpipe;
1077         int error = EPIPE;
1078
1079         rpipe = (struct pipe *)fp->f_data;
1080         wpipe = rpipe->pipe_peer;
1081
1082         /*
1083          * We modify pipe_state on both pipes, which means we need
1084          * all four tokens!
1085          */
1086         lwkt_gettoken(&rpipe->pipe_rlock);
1087         lwkt_gettoken(&rpipe->pipe_wlock);
1088         lwkt_gettoken(&wpipe->pipe_rlock);
1089         lwkt_gettoken(&wpipe->pipe_wlock);
1090
1091         switch(how) {
1092         case SHUT_RDWR:
1093         case SHUT_RD:
1094                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1095                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1096                 if (rpipe->pipe_state & PIPE_WANTR) {
1097                         rpipe->pipe_state &= ~PIPE_WANTR;
1098                         wakeup(rpipe);
1099                 }
1100                 if (rpipe->pipe_state & PIPE_WANTW) {
1101                         rpipe->pipe_state &= ~PIPE_WANTW;
1102                         wakeup(rpipe);
1103                 }
1104                 error = 0;
1105                 if (how == SHUT_RD)
1106                         break;
1107                 /* fall through */
1108         case SHUT_WR:
1109                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1110                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1111                 if (wpipe->pipe_state & PIPE_WANTR) {
1112                         wpipe->pipe_state &= ~PIPE_WANTR;
1113                         wakeup(wpipe);
1114                 }
1115                 if (wpipe->pipe_state & PIPE_WANTW) {
1116                         wpipe->pipe_state &= ~PIPE_WANTW;
1117                         wakeup(wpipe);
1118                 }
1119                 error = 0;
1120                 break;
1121         }
1122         pipewakeup(rpipe, 1);
1123         pipewakeup(wpipe, 1);
1124
1125         lwkt_reltoken(&wpipe->pipe_wlock);
1126         lwkt_reltoken(&wpipe->pipe_rlock);
1127         lwkt_reltoken(&rpipe->pipe_wlock);
1128         lwkt_reltoken(&rpipe->pipe_rlock);
1129
1130         return (error);
1131 }
1132
1133 static void
1134 pipe_free_kmem(struct pipe *cpipe)
1135 {
1136         if (cpipe->pipe_buffer.buffer != NULL) {
1137                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1138                         atomic_subtract_int(&pipe_nbig, 1);
1139                 kmem_free(&kernel_map,
1140                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1141                         cpipe->pipe_buffer.size);
1142                 cpipe->pipe_buffer.buffer = NULL;
1143                 cpipe->pipe_buffer.object = NULL;
1144         }
1145 }
1146
1147 /*
1148  * Close the pipe.  The slock must be held to interlock against simultanious
1149  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1150  */
1151 static void
1152 pipeclose(struct pipe *cpipe)
1153 {
1154         globaldata_t gd;
1155         struct pipe *ppipe;
1156
1157         if (cpipe == NULL)
1158                 return;
1159
1160         /*
1161          * The slock may not have been allocated yet (close during
1162          * initialization)
1163          *
1164          * We need both the read and write tokens to modify pipe_state.
1165          */
1166         if (cpipe->pipe_slock)
1167                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1168         lwkt_gettoken(&cpipe->pipe_rlock);
1169         lwkt_gettoken(&cpipe->pipe_wlock);
1170
1171         /*
1172          * Set our state, wakeup anyone waiting in select/poll/kq, and
1173          * wakeup anyone blocked on our pipe.
1174          */
1175         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1176         pipewakeup(cpipe, 1);
1177         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1178                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1179                 wakeup(cpipe);
1180         }
1181
1182         /*
1183          * Disconnect from peer.
1184          */
1185         if ((ppipe = cpipe->pipe_peer) != NULL) {
1186                 lwkt_gettoken(&ppipe->pipe_rlock);
1187                 lwkt_gettoken(&ppipe->pipe_wlock);
1188                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1189                 pipewakeup(ppipe, 1);
1190                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1191                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1192                         wakeup(ppipe);
1193                 }
1194                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1195                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1196                 lwkt_reltoken(&ppipe->pipe_wlock);
1197                 lwkt_reltoken(&ppipe->pipe_rlock);
1198         }
1199
1200         /*
1201          * If the peer is also closed we can free resources for both
1202          * sides, otherwise we leave our side intact to deal with any
1203          * races (since we only have the slock).
1204          */
1205         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1206                 cpipe->pipe_peer = NULL;
1207                 ppipe->pipe_peer = NULL;
1208                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1209                 pipeclose(ppipe);
1210                 ppipe = NULL;
1211         }
1212
1213         lwkt_reltoken(&cpipe->pipe_wlock);
1214         lwkt_reltoken(&cpipe->pipe_rlock);
1215         if (cpipe->pipe_slock)
1216                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1217
1218         /*
1219          * If we disassociated from our peer we can free resources
1220          */
1221         if (ppipe == NULL) {
1222                 gd = mycpu;
1223                 if (cpipe->pipe_slock) {
1224                         kfree(cpipe->pipe_slock, M_PIPE);
1225                         cpipe->pipe_slock = NULL;
1226                 }
1227                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1228                     cpipe->pipe_buffer.size != PIPE_SIZE
1229                 ) {
1230                         pipe_free_kmem(cpipe);
1231                         kfree(cpipe, M_PIPE);
1232                 } else {
1233                         cpipe->pipe_state = 0;
1234                         cpipe->pipe_peer = gd->gd_pipeq;
1235                         gd->gd_pipeq = cpipe;
1236                         ++gd->gd_pipeqcount;
1237                 }
1238         }
1239 }
1240
1241 static int
1242 pipe_kqfilter(struct file *fp, struct knote *kn)
1243 {
1244         struct pipe *cpipe;
1245
1246         cpipe = (struct pipe *)kn->kn_fp->f_data;
1247
1248         switch (kn->kn_filter) {
1249         case EVFILT_READ:
1250                 kn->kn_fop = &pipe_rfiltops;
1251                 break;
1252         case EVFILT_WRITE:
1253                 kn->kn_fop = &pipe_wfiltops;
1254                 if (cpipe->pipe_peer == NULL) {
1255                         /* other end of pipe has been closed */
1256                         return (EPIPE);
1257                 }
1258                 break;
1259         default:
1260                 return (EOPNOTSUPP);
1261         }
1262         kn->kn_hook = (caddr_t)cpipe;
1263
1264         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1265
1266         return (0);
1267 }
1268
1269 static void
1270 filt_pipedetach(struct knote *kn)
1271 {
1272         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1273
1274         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1275 }
1276
1277 /*ARGSUSED*/
1278 static int
1279 filt_piperead(struct knote *kn, long hint)
1280 {
1281         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1282         int ready = 0;
1283
1284         lwkt_gettoken(&rpipe->pipe_rlock);
1285         lwkt_gettoken(&rpipe->pipe_wlock);
1286
1287         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1288
1289         if (rpipe->pipe_state & PIPE_REOF) {
1290                 /*
1291                  * Only set NODATA if all data has been exhausted
1292                  */
1293                 if (kn->kn_data == 0)
1294                         kn->kn_flags |= EV_NODATA;
1295                 kn->kn_flags |= EV_EOF; 
1296                 ready = 1;
1297         }
1298
1299         lwkt_reltoken(&rpipe->pipe_wlock);
1300         lwkt_reltoken(&rpipe->pipe_rlock);
1301
1302         if (!ready)
1303                 ready = kn->kn_data > 0;
1304
1305         return (ready);
1306 }
1307
1308 /*ARGSUSED*/
1309 static int
1310 filt_pipewrite(struct knote *kn, long hint)
1311 {
1312         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1313         struct pipe *wpipe = rpipe->pipe_peer;
1314         int ready = 0;
1315
1316         kn->kn_data = 0;
1317         if (wpipe == NULL) {
1318                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1319                 return (1);
1320         }
1321
1322         lwkt_gettoken(&wpipe->pipe_rlock);
1323         lwkt_gettoken(&wpipe->pipe_wlock);
1324
1325         if (wpipe->pipe_state & PIPE_WEOF) {
1326                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1327                 ready = 1;
1328         }
1329
1330         if (!ready)
1331                 kn->kn_data = wpipe->pipe_buffer.size -
1332                               (wpipe->pipe_buffer.windex -
1333                                wpipe->pipe_buffer.rindex);
1334
1335         lwkt_reltoken(&wpipe->pipe_wlock);
1336         lwkt_reltoken(&wpipe->pipe_rlock);
1337
1338         if (!ready)
1339                 ready = kn->kn_data >= PIPE_BUF;
1340
1341         return (ready);
1342 }