kernel: Clean up some no longer used types from makesyscalls.sh.
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68
69 #include <machine/cpufunc.h>
70
71 struct pipegdlock {
72         struct mtx      mtx;
73 } __cachealign;
74
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio, 
79                 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio, 
81                 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87                 struct ucred *cred, struct sysmsg *msg);
88
89 __read_mostly static struct fileops pipeops = {
90         .fo_read = pipe_read, 
91         .fo_write = pipe_write,
92         .fo_ioctl = pipe_ioctl,
93         .fo_kqfilter = pipe_kqfilter,
94         .fo_stat = pipe_stat,
95         .fo_close = pipe_close,
96         .fo_shutdown = pipe_shutdown
97 };
98
99 static void     filt_pipedetach(struct knote *kn);
100 static int      filt_piperead(struct knote *kn, long hint);
101 static int      filt_pipewrite(struct knote *kn, long hint);
102
103 __read_mostly static struct filterops pipe_rfiltops =
104         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 __read_mostly static struct filterops pipe_wfiltops =
106         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111
112 __read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 __read_mostly static struct pipegdlock *pipe_gdlocks;
114
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 __read_mostly static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 __read_mostly static int pipe_delay = 4000;     /* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151         size_t mbytes = kmem_lim_size();
152         int n;
153
154         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155                 if (mbytes >= 7 * 1024)
156                         pipe_maxcache *= 2;
157                 if (mbytes >= 15 * 1024)
158                         pipe_maxcache *= 2;
159         }
160
161         /*
162          * Detune the pcpu caching a bit on systems with an insane number
163          * of cpu threads to reduce memory waste.
164          */
165         if (ncpus > 64) {
166                 pipe_maxcache = pipe_maxcache * 64 / ncpus;
167                 if (pipe_maxcache < PIPEQ_MAX_CACHE)
168                         pipe_maxcache = PIPEQ_MAX_CACHE;
169         }
170
171         pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
172                              M_PIPE, M_WAITOK | M_ZERO);
173         for (n = 0; n < ncpus; ++n)
174                 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
175 }
176 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
177
178 static void pipeclose (struct pipe *pipe,
179                 struct pipebuf *pbr, struct pipebuf *pbw);
180 static void pipe_free_kmem (struct pipebuf *buf);
181 static int pipe_create (struct pipe **pipep);
182
183 /*
184  * Test and clear the specified flag, wakeup(pb) if it was set.
185  * This function must also act as a memory barrier.
186  */
187 static __inline void
188 pipesignal(struct pipebuf *pb, uint32_t flags)
189 {
190         uint32_t oflags;
191         uint32_t nflags;
192
193         for (;;) {
194                 oflags = pb->state;
195                 cpu_ccfence();
196                 nflags = oflags & ~flags;
197                 if (atomic_cmpset_int(&pb->state, oflags, nflags))
198                         break;
199         }
200         if (oflags & flags)
201                 wakeup(pb);
202 }
203
204 /*
205  *
206  */
207 static __inline void
208 pipewakeup(struct pipebuf *pb, int dosigio)
209 {
210         if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
211                 lwkt_gettoken(&sigio_token);
212                 pgsigio(pb->sigio, SIGIO, 0);
213                 lwkt_reltoken(&sigio_token);
214         }
215         KNOTE(&pb->kq.ki_note, 0);
216 }
217
218 /*
219  * These routines are called before and after a UIO.  The UIO
220  * may block, causing our held tokens to be lost temporarily.
221  *
222  * We use these routines to serialize reads against other reads
223  * and writes against other writes.
224  *
225  * The appropriate token is held on entry so *ipp does not race.
226  */
227 static __inline int
228 pipe_start_uio(int *ipp)
229 {
230         int error;
231
232         while (*ipp) {
233                 *ipp = -1;
234                 error = tsleep(ipp, PCATCH, "pipexx", 0);
235                 if (error)
236                         return (error);
237         }
238         *ipp = 1;
239         return (0);
240 }
241
242 static __inline void
243 pipe_end_uio(int *ipp)
244 {
245         if (*ipp < 0) {
246                 *ipp = 0;
247                 wakeup(ipp);
248         } else {
249                 KKASSERT(*ipp > 0);
250                 *ipp = 0;
251         }
252 }
253
254 /*
255  * The pipe system call for the DTYPE_PIPE type of pipes
256  *
257  * pipe_args(int dummy)
258  *
259  * MPSAFE
260  */
261 int
262 sys_pipe(struct pipe_args *uap)
263 {
264         return kern_pipe(uap->sysmsg_fds, 0);
265 }
266
267 int
268 sys_pipe2(struct pipe2_args *uap)
269 {
270         return kern_pipe(uap->sysmsg_fds, uap->flags);
271 }
272
273 int
274 kern_pipe(long *fds, int flags)
275 {
276         struct thread *td = curthread;
277         struct filedesc *fdp = td->td_proc->p_fd;
278         struct file *rf, *wf;
279         struct pipe *pipe;
280         int fd1, fd2, error;
281
282         pipe = NULL;
283         if (pipe_create(&pipe)) {
284                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
285                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
286                 return (ENFILE);
287         }
288         
289         error = falloc(td->td_lwp, &rf, &fd1);
290         if (error) {
291                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
292                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
293                 return (error);
294         }
295         fds[0] = fd1;
296
297         /*
298          * Warning: once we've gotten past allocation of the fd for the
299          * read-side, we can only drop the read side via fdrop() in order
300          * to avoid races against processes which manage to dup() the read
301          * side while we are blocked trying to allocate the write side.
302          */
303         rf->f_type = DTYPE_PIPE;
304         rf->f_flag = FREAD | FWRITE;
305         rf->f_ops = &pipeops;
306         rf->f_data = (void *)((intptr_t)pipe | 0);
307         if (flags & O_NONBLOCK)
308                 rf->f_flag |= O_NONBLOCK;
309         if (flags & O_CLOEXEC)
310                 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
311
312         error = falloc(td->td_lwp, &wf, &fd2);
313         if (error) {
314                 fsetfd(fdp, NULL, fd1);
315                 fdrop(rf);
316                 /* pipeA has been closed by fdrop() */
317                 /* close pipeB here */
318                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
319                 return (error);
320         }
321         wf->f_type = DTYPE_PIPE;
322         wf->f_flag = FREAD | FWRITE;
323         wf->f_ops = &pipeops;
324         wf->f_data = (void *)((intptr_t)pipe | 1);
325         if (flags & O_NONBLOCK)
326                 wf->f_flag |= O_NONBLOCK;
327         if (flags & O_CLOEXEC)
328                 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
329
330         fds[1] = fd2;
331
332         /*
333          * Once activated the peer relationship remains valid until
334          * both sides are closed.
335          */
336         fsetfd(fdp, rf, fd1);
337         fsetfd(fdp, wf, fd2);
338         fdrop(rf);
339         fdrop(wf);
340
341         return (0);
342 }
343
344 /*
345  * [re]allocates KVA for the pipe's circular buffer.  The space is
346  * pageable.  Called twice to setup full-duplex communications.
347  *
348  * NOTE: Independent vm_object's are used to improve performance.
349  *
350  * Returns 0 on success, ENOMEM on failure.
351  */
352 static int
353 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
354 {
355         struct vm_object *object;
356         caddr_t buffer;
357         vm_pindex_t npages;
358         int error;
359
360         size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
361         if (size < 16384)
362                 size = 16384;
363         if (size > 1024*1024)
364                 size = 1024*1024;
365
366         npages = round_page(size) / PAGE_SIZE;
367         object = pb->object;
368
369         /*
370          * [re]create the object if necessary and reserve space for it
371          * in the kernel_map.  The object and memory are pageable.  On
372          * success, free the old resources before assigning the new
373          * ones.
374          */
375         if (object == NULL || object->size != npages) {
376                 object = vm_object_allocate(OBJT_DEFAULT, npages);
377                 buffer = (caddr_t)vm_map_min(&kernel_map);
378
379                 error = vm_map_find(&kernel_map, object, NULL,
380                                     0, (vm_offset_t *)&buffer, size,
381                                     PAGE_SIZE, TRUE,
382                                     VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
383                                     VM_PROT_ALL, VM_PROT_ALL, 0);
384
385                 if (error != KERN_SUCCESS) {
386                         vm_object_deallocate(object);
387                         return (ENOMEM);
388                 }
389                 pipe_free_kmem(pb);
390                 pb->object = object;
391                 pb->buffer = buffer;
392                 pb->size = size;
393         }
394         pb->rindex = 0;
395         pb->windex = 0;
396
397         return (0);
398 }
399
400 /*
401  * Initialize and allocate VM and memory for pipe, pulling the pipe from
402  * our per-cpu cache if possible.
403  *
404  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
405  * must still deallocate the pipe on failure.
406  */
407 static int
408 pipe_create(struct pipe **pipep)
409 {
410         globaldata_t gd = mycpu;
411         struct pipe *pipe;
412         int error;
413
414         if ((pipe = gd->gd_pipeq) != NULL) {
415                 gd->gd_pipeq = pipe->next;
416                 --gd->gd_pipeqcount;
417                 pipe->next = NULL;
418         } else {
419                 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
420                 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
421                 lwkt_token_init(&pipe->bufferA.rlock, "piper");
422                 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
423                 lwkt_token_init(&pipe->bufferB.rlock, "piper");
424                 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
425         }
426         *pipep = pipe;
427         if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
428                 return (error);
429         }
430         if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
431                 return (error);
432         }
433         vfs_timestamp(&pipe->ctime);
434         pipe->bufferA.atime = pipe->ctime;
435         pipe->bufferA.mtime = pipe->ctime;
436         pipe->bufferB.atime = pipe->ctime;
437         pipe->bufferB.mtime = pipe->ctime;
438         pipe->open_count = 2;
439
440         return (0);
441 }
442
443 /*
444  * Read data from a pipe
445  */
446 static int
447 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
448 {
449         struct pipebuf *rpb;
450         struct pipebuf *wpb;
451         struct pipe *pipe;
452         size_t nread = 0;
453         size_t size;    /* total bytes available */
454         size_t nsize;   /* total bytes to read */
455         size_t rindex;  /* contiguous bytes available */
456         int notify_writer;
457         int bigread;
458         int bigcount;
459         int error;
460         int nbio;
461
462         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
463         if ((intptr_t)fp->f_data & 1) {
464                 rpb = &pipe->bufferB;
465                 wpb = &pipe->bufferA;
466         } else {
467                 rpb = &pipe->bufferA;
468                 wpb = &pipe->bufferB;
469         }
470         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
471
472         if (uio->uio_resid == 0)
473                 return(0);
474
475         /*
476          * Calculate nbio
477          */
478         if (fflags & O_FBLOCKING)
479                 nbio = 0;
480         else if (fflags & O_FNONBLOCKING)
481                 nbio = 1;
482         else if (fp->f_flag & O_NONBLOCK)
483                 nbio = 1;
484         else
485                 nbio = 0;
486
487         /*
488          * 'quick' NBIO test before things get expensive.
489          */
490         if (nbio && rpb->rindex == rpb->windex &&
491             (rpb->state & PIPE_REOF) == 0) {
492                 return EAGAIN;
493         }
494
495         /*
496          * Reads are serialized.  Note however that buffer.buffer and
497          * buffer.size can change out from under us when the number
498          * of bytes in the buffer are zero due to the write-side doing a
499          * pipespace().
500          */
501         lwkt_gettoken(&rpb->rlock);
502         error = pipe_start_uio(&rpb->rip);
503         if (error) {
504                 lwkt_reltoken(&rpb->rlock);
505                 return (error);
506         }
507         notify_writer = 0;
508
509         bigread = (uio->uio_resid > 10 * 1024 * 1024);
510         bigcount = 10;
511
512         while (uio->uio_resid) {
513                 /*
514                  * Don't hog the cpu.
515                  */
516                 if (bigread && --bigcount == 0) {
517                         lwkt_user_yield();
518                         bigcount = 10;
519                         if (CURSIG(curthread->td_lwp)) {
520                                 error = EINTR;
521                                 break;
522                         }
523                 }
524
525                 /*
526                  * lfence required to avoid read-reordering of buffer
527                  * contents prior to validation of size.
528                  */
529                 size = rpb->windex - rpb->rindex;
530                 cpu_lfence();
531                 if (size) {
532                         rindex = rpb->rindex & (rpb->size - 1);
533                         nsize = size;
534                         if (nsize > rpb->size - rindex)
535                                 nsize = rpb->size - rindex;
536                         nsize = szmin(nsize, uio->uio_resid);
537
538                         /*
539                          * Limit how much we move in one go so we have a
540                          * chance to kick the writer while data is still
541                          * available in the pipe.  This avoids getting into
542                          * a ping-pong with the writer.
543                          */
544                         if (nsize > (rpb->size >> 1))
545                                 nsize = rpb->size >> 1;
546
547                         error = uiomove(&rpb->buffer[rindex], nsize, uio);
548                         if (error)
549                                 break;
550                         rpb->rindex += nsize;
551                         nread += nsize;
552
553                         /*
554                          * If the FIFO is still over half full just continue
555                          * and do not try to notify the writer yet.  If
556                          * less than half full notify any waiting writer.
557                          */
558                         if (size - nsize > (rpb->size >> 1)) {
559                                 notify_writer = 0;
560                         } else {
561                                 notify_writer = 1;
562                                 pipesignal(rpb, PIPE_WANTW);
563                         }
564                         continue;
565                 }
566
567                 /*
568                  * If the "write-side" was blocked we wake it up.  This code
569                  * is reached when the buffer is completely emptied.
570                  */
571                 pipesignal(rpb, PIPE_WANTW);
572
573                 /*
574                  * Pick up our copy loop again if the writer sent data to
575                  * us while we were messing around.
576                  *
577                  * On a SMP box poll up to pipe_delay nanoseconds for new
578                  * data.  Typically a value of 2000 to 4000 is sufficient
579                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
580                  * is used for synchronous communications with small packets,
581                  * and 8000 or so (8uS) will pipeline large buffer xfers
582                  * between cpus over a pipe.
583                  *
584                  * For synchronous communications a hit means doing a
585                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
586                  * where as miss requiring a tsleep/wakeup sequence
587                  * will take 7uS or more.
588                  */
589                 if (rpb->windex != rpb->rindex)
590                         continue;
591
592 #ifdef _RDTSC_SUPPORTED_
593                 if (pipe_delay) {
594                         int64_t tsc_target;
595                         int good = 0;
596
597                         tsc_target = tsc_get_target(pipe_delay);
598                         while (tsc_test_target(tsc_target) == 0) {
599                                 cpu_lfence();
600                                 if (rpb->windex != rpb->rindex) {
601                                         good = 1;
602                                         break;
603                                 }
604                                 cpu_pause();
605                         }
606                         if (good)
607                                 continue;
608                 }
609 #endif
610
611                 /*
612                  * Detect EOF condition, do not set error.
613                  */
614                 if (rpb->state & PIPE_REOF)
615                         break;
616
617                 /*
618                  * Break if some data was read, or if this was a non-blocking
619                  * read.
620                  */
621                 if (nread > 0)
622                         break;
623
624                 if (nbio) {
625                         error = EAGAIN;
626                         break;
627                 }
628
629                 /*
630                  * Last chance, interlock with WANTR
631                  */
632                 tsleep_interlock(rpb, PCATCH);
633                 atomic_set_int(&rpb->state, PIPE_WANTR);
634
635                 /*
636                  * Retest bytes available after memory barrier above.
637                  */
638                 size = rpb->windex - rpb->rindex;
639                 if (size)
640                         continue;
641
642                 /*
643                  * Retest EOF after memory barrier above.
644                  */
645                 if (rpb->state & PIPE_REOF)
646                         break;
647
648                 /*
649                  * Wait for more data or state change
650                  */
651                 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
652                 if (error)
653                         break;
654         }
655         pipe_end_uio(&rpb->rip);
656
657         /*
658          * Uptime last access time
659          */
660         if (error == 0 && nread && rpb->lticks != ticks) {
661                 vfs_timestamp(&rpb->atime);
662                 rpb->lticks = ticks;
663         }
664
665         /*
666          * If we drained the FIFO more then half way then handle
667          * write blocking hysteresis.
668          *
669          * Note that PIPE_WANTW cannot be set by the writer without
670          * it holding both rlock and wlock, so we can test it
671          * while holding just rlock.
672          */
673         if (notify_writer) {
674                 /*
675                  * Synchronous blocking is done on the pipe involved
676                  */
677                 pipesignal(rpb, PIPE_WANTW);
678
679                 /*
680                  * But we may also have to deal with a kqueue which is
681                  * stored on the same pipe as its descriptor, so a
682                  * EVFILT_WRITE event waiting for our side to drain will
683                  * be on the other side.
684                  */
685                 pipewakeup(wpb, 0);
686         }
687         /*size = rpb->windex - rpb->rindex;*/
688         lwkt_reltoken(&rpb->rlock);
689
690         return (error);
691 }
692
693 static int
694 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
695 {
696         struct pipebuf *rpb;
697         struct pipebuf *wpb;
698         struct pipe *pipe;
699         size_t windex;
700         size_t space;
701         size_t wcount;
702         size_t orig_resid;
703         int bigwrite;
704         int bigcount;
705         int error;
706         int nbio;
707
708         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
709         if ((intptr_t)fp->f_data & 1) {
710                 rpb = &pipe->bufferB;
711                 wpb = &pipe->bufferA;
712         } else {
713                 rpb = &pipe->bufferA;
714                 wpb = &pipe->bufferB;
715         }
716
717         /*
718          * Calculate nbio
719          */
720         if (fflags & O_FBLOCKING)
721                 nbio = 0;
722         else if (fflags & O_FNONBLOCKING)
723                 nbio = 1;
724         else if (fp->f_flag & O_NONBLOCK)
725                 nbio = 1;
726         else
727                 nbio = 0;
728
729         /*
730          * 'quick' NBIO test before things get expensive.
731          */
732         if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
733             uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
734                 return EAGAIN;
735         }
736
737         /*
738          * Writes go to the peer.  The peer will always exist.
739          */
740         lwkt_gettoken(&wpb->wlock);
741         if (wpb->state & PIPE_WEOF) {
742                 lwkt_reltoken(&wpb->wlock);
743                 return (EPIPE);
744         }
745
746         /*
747          * Degenerate case (EPIPE takes prec)
748          */
749         if (uio->uio_resid == 0) {
750                 lwkt_reltoken(&wpb->wlock);
751                 return(0);
752         }
753
754         /*
755          * Writes are serialized (start_uio must be called with wlock)
756          */
757         error = pipe_start_uio(&wpb->wip);
758         if (error) {
759                 lwkt_reltoken(&wpb->wlock);
760                 return (error);
761         }
762
763         orig_resid = uio->uio_resid;
764         wcount = 0;
765
766         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
767         bigcount = 10;
768
769         while (uio->uio_resid) {
770                 if (wpb->state & PIPE_WEOF) {
771                         error = EPIPE;
772                         break;
773                 }
774
775                 /*
776                  * Don't hog the cpu.
777                  */
778                 if (bigwrite && --bigcount == 0) {
779                         lwkt_user_yield();
780                         bigcount = 10;
781                         if (CURSIG(curthread->td_lwp)) {
782                                 error = EINTR;
783                                 break;
784                         }
785                 }
786
787                 windex = wpb->windex & (wpb->size - 1);
788                 space = wpb->size - (wpb->windex - wpb->rindex);
789
790                 /*
791                  * Writes of size <= PIPE_BUF must be atomic.
792                  */
793                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
794                         space = 0;
795
796                 /* 
797                  * Write to fill, read size handles write hysteresis.  Also
798                  * additional restrictions can cause select-based non-blocking
799                  * writes to spin.
800                  */
801                 if (space > 0) {
802                         size_t segsize;
803
804                         /*
805                          * We want to notify a potentially waiting reader
806                          * before we exhaust the write buffer for SMP
807                          * pipelining.  Otherwise the write/read will begin
808                          * to ping-pong.
809                          */
810                         space = szmin(space, uio->uio_resid);
811                         if (space > (wpb->size >> 1))
812                                 space = (wpb->size >> 1);
813
814                         /*
815                          * First segment to transfer is minimum of
816                          * transfer size and contiguous space in
817                          * pipe buffer.  If first segment to transfer
818                          * is less than the transfer size, we've got
819                          * a wraparound in the buffer.
820                          */
821                         segsize = wpb->size - windex;
822                         if (segsize > space)
823                                 segsize = space;
824
825                         /*
826                          * If this is the first loop and the reader is
827                          * blocked, do a preemptive wakeup of the reader.
828                          *
829                          * On SMP the IPI latency plus the wlock interlock
830                          * on the reader side is the fastest way to get the
831                          * reader going.  (The scheduler will hard loop on
832                          * lock tokens).
833                          */
834                         if (wcount == 0)
835                                 pipesignal(wpb, PIPE_WANTR);
836
837                         /*
838                          * Transfer segment, which may include a wrap-around.
839                          * Update windex to account for both all in one go
840                          * so the reader can read() the data atomically.
841                          */
842                         error = uiomove(&wpb->buffer[windex], segsize, uio);
843                         if (error == 0 && segsize < space) {
844                                 segsize = space - segsize;
845                                 error = uiomove(&wpb->buffer[0], segsize, uio);
846                         }
847                         if (error)
848                                 break;
849
850                         /*
851                          * Memory fence prior to windex updating (note: not
852                          * needed so this is a NOP on Intel).
853                          */
854                         cpu_sfence();
855                         wpb->windex += space;
856
857                         /*
858                          * Signal reader
859                          */
860                         if (wcount != 0)
861                                 pipesignal(wpb, PIPE_WANTR);
862                         wcount += space;
863                         continue;
864                 }
865
866                 /*
867                  * Wakeup any pending reader
868                  */
869                 pipesignal(wpb, PIPE_WANTR);
870
871                 /*
872                  * don't block on non-blocking I/O
873                  */
874                 if (nbio) {
875                         error = EAGAIN;
876                         break;
877                 }
878
879 #ifdef _RDTSC_SUPPORTED_
880                 if (pipe_delay) {
881                         int64_t tsc_target;
882                         int good = 0;
883
884                         tsc_target = tsc_get_target(pipe_delay);
885                         while (tsc_test_target(tsc_target) == 0) {
886                                 cpu_lfence();
887                                 space = wpb->size - (wpb->windex - wpb->rindex);
888                                 if ((space < uio->uio_resid) &&
889                                     (orig_resid <= PIPE_BUF)) {
890                                         space = 0;
891                                 }
892                                 if (space) {
893                                         good = 1;
894                                         break;
895                                 }
896                                 cpu_pause();
897                         }
898                         if (good)
899                                 continue;
900                 }
901 #endif
902
903                 /*
904                  * Interlocked test.   Atomic op enforces the memory barrier.
905                  */
906                 tsleep_interlock(wpb, PCATCH);
907                 atomic_set_int(&wpb->state, PIPE_WANTW);
908
909                 /*
910                  * Retest space available after memory barrier above.
911                  * Writes of size <= PIPE_BUF must be atomic.
912                  */
913                 space = wpb->size - (wpb->windex - wpb->rindex);
914                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
915                         space = 0;
916
917                 /*
918                  * Retest EOF after memory barrier above.
919                  */
920                 if (wpb->state & PIPE_WEOF) {
921                         error = EPIPE;
922                         break;
923                 }
924
925                 /*
926                  * We have no more space and have something to offer,
927                  * wake up select/poll/kq.
928                  */
929                 if (space == 0) {
930                         pipewakeup(wpb, 1);
931                         error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
932                 }
933
934                 /*
935                  * Break out if we errored or the read side wants us to go
936                  * away.
937                  */
938                 if (error)
939                         break;
940                 if (wpb->state & PIPE_WEOF) {
941                         error = EPIPE;
942                         break;
943                 }
944         }
945         pipe_end_uio(&wpb->wip);
946
947         /*
948          * If we have put any characters in the buffer, we wake up
949          * the reader.
950          *
951          * Both rlock and wlock are required to be able to modify pipe_state.
952          */
953         if (wpb->windex != wpb->rindex) {
954                 pipesignal(wpb, PIPE_WANTR);
955                 pipewakeup(wpb, 1);
956         }
957
958         /*
959          * Don't return EPIPE if I/O was successful
960          */
961         if ((wpb->rindex == wpb->windex) &&
962             (uio->uio_resid == 0) &&
963             (error == EPIPE)) {
964                 error = 0;
965         }
966
967         if (error == 0 && wpb->lticks != ticks) {
968                 vfs_timestamp(&wpb->mtime);
969                 wpb->lticks = ticks;
970         }
971
972         /*
973          * We have something to offer,
974          * wake up select/poll/kq.
975          */
976         /*space = wpb->windex - wpb->rindex;*/
977         lwkt_reltoken(&wpb->wlock);
978
979         return (error);
980 }
981
982 /*
983  * we implement a very minimal set of ioctls for compatibility with sockets.
984  */
985 static int
986 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
987            struct ucred *cred, struct sysmsg *msg)
988 {
989         struct pipebuf *rpb;
990         struct pipe *pipe;
991         int error;
992
993         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
994         if ((intptr_t)fp->f_data & 1) {
995                 rpb = &pipe->bufferB;
996         } else {
997                 rpb = &pipe->bufferA;
998         }
999
1000         lwkt_gettoken(&rpb->rlock);
1001         lwkt_gettoken(&rpb->wlock);
1002
1003         switch (cmd) {
1004         case FIOASYNC:
1005                 if (*(int *)data) {
1006                         atomic_set_int(&rpb->state, PIPE_ASYNC);
1007                 } else {
1008                         atomic_clear_int(&rpb->state, PIPE_ASYNC);
1009                 }
1010                 error = 0;
1011                 break;
1012         case FIONREAD:
1013                 *(int *)data = (int)(rpb->windex - rpb->rindex);
1014                 error = 0;
1015                 break;
1016         case FIOSETOWN:
1017                 error = fsetown(*(int *)data, &rpb->sigio);
1018                 break;
1019         case FIOGETOWN:
1020                 *(int *)data = fgetown(&rpb->sigio);
1021                 error = 0;
1022                 break;
1023         case TIOCSPGRP:
1024                 /* This is deprecated, FIOSETOWN should be used instead. */
1025                 error = fsetown(-(*(int *)data), &rpb->sigio);
1026                 break;
1027
1028         case TIOCGPGRP:
1029                 /* This is deprecated, FIOGETOWN should be used instead. */
1030                 *(int *)data = -fgetown(&rpb->sigio);
1031                 error = 0;
1032                 break;
1033         default:
1034                 error = ENOTTY;
1035                 break;
1036         }
1037         lwkt_reltoken(&rpb->wlock);
1038         lwkt_reltoken(&rpb->rlock);
1039
1040         return (error);
1041 }
1042
1043 /*
1044  * MPSAFE
1045  */
1046 static int
1047 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1048 {
1049         struct pipebuf *rpb;
1050         struct pipe *pipe;
1051
1052         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1053         if ((intptr_t)fp->f_data & 1) {
1054                 rpb = &pipe->bufferB;
1055         } else {
1056                 rpb = &pipe->bufferA;
1057         }
1058
1059         bzero((caddr_t)ub, sizeof(*ub));
1060         ub->st_mode = S_IFIFO;
1061         ub->st_blksize = rpb->size;
1062         ub->st_size = rpb->windex - rpb->rindex;
1063         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1064         ub->st_atimespec = rpb->atime;
1065         ub->st_mtimespec = rpb->mtime;
1066         ub->st_ctimespec = pipe->ctime;
1067         ub->st_uid = fp->f_cred->cr_uid;
1068         ub->st_gid = fp->f_cred->cr_gid;
1069         ub->st_ino = pipe->inum;
1070         /*
1071          * Left as 0: st_dev, st_nlink, st_rdev,
1072          * st_flags, st_gen.
1073          * XXX (st_dev, st_ino) should be unique.
1074          */
1075
1076         return (0);
1077 }
1078
1079 static int
1080 pipe_close(struct file *fp)
1081 {
1082         struct pipebuf *rpb;
1083         struct pipebuf *wpb;
1084         struct pipe *pipe;
1085
1086         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1087         if ((intptr_t)fp->f_data & 1) {
1088                 rpb = &pipe->bufferB;
1089                 wpb = &pipe->bufferA;
1090         } else {
1091                 rpb = &pipe->bufferA;
1092                 wpb = &pipe->bufferB;
1093         }
1094
1095         fp->f_ops = &badfileops;
1096         fp->f_data = NULL;
1097         funsetown(&rpb->sigio);
1098         pipeclose(pipe, rpb, wpb);
1099
1100         return (0);
1101 }
1102
1103 /*
1104  * Shutdown one or both directions of a full-duplex pipe.
1105  */
1106 static int
1107 pipe_shutdown(struct file *fp, int how)
1108 {
1109         struct pipebuf *rpb;
1110         struct pipebuf *wpb;
1111         struct pipe *pipe;
1112         int error = EPIPE;
1113
1114         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1115         if ((intptr_t)fp->f_data & 1) {
1116                 rpb = &pipe->bufferB;
1117                 wpb = &pipe->bufferA;
1118         } else {
1119                 rpb = &pipe->bufferA;
1120                 wpb = &pipe->bufferB;
1121         }
1122
1123         /*
1124          * We modify pipe_state on both pipes, which means we need
1125          * all four tokens!
1126          */
1127         lwkt_gettoken(&rpb->rlock);
1128         lwkt_gettoken(&rpb->wlock);
1129         lwkt_gettoken(&wpb->rlock);
1130         lwkt_gettoken(&wpb->wlock);
1131
1132         switch(how) {
1133         case SHUT_RDWR:
1134         case SHUT_RD:
1135                 /*
1136                  * EOF on my reads and peer writes
1137                  */
1138                 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1139                 if (rpb->state & PIPE_WANTR) {
1140                         rpb->state &= ~PIPE_WANTR;
1141                         wakeup(rpb);
1142                 }
1143                 if (rpb->state & PIPE_WANTW) {
1144                         rpb->state &= ~PIPE_WANTW;
1145                         wakeup(rpb);
1146                 }
1147                 error = 0;
1148                 if (how == SHUT_RD)
1149                         break;
1150                 /* fall through */
1151         case SHUT_WR:
1152                 /*
1153                  * EOF on peer reads and my writes
1154                  */
1155                 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1156                 if (wpb->state & PIPE_WANTR) {
1157                         wpb->state &= ~PIPE_WANTR;
1158                         wakeup(wpb);
1159                 }
1160                 if (wpb->state & PIPE_WANTW) {
1161                         wpb->state &= ~PIPE_WANTW;
1162                         wakeup(wpb);
1163                 }
1164                 error = 0;
1165                 break;
1166         }
1167         pipewakeup(rpb, 1);
1168         pipewakeup(wpb, 1);
1169
1170         lwkt_reltoken(&wpb->wlock);
1171         lwkt_reltoken(&wpb->rlock);
1172         lwkt_reltoken(&rpb->wlock);
1173         lwkt_reltoken(&rpb->rlock);
1174
1175         return (error);
1176 }
1177
1178 /*
1179  * Destroy the pipe buffer.
1180  */
1181 static void
1182 pipe_free_kmem(struct pipebuf *pb)
1183 {
1184         if (pb->buffer != NULL) {
1185                 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1186                 pb->buffer = NULL;
1187                 pb->object = NULL;
1188         }
1189 }
1190
1191 /*
1192  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1193  * and writing on wpb.  This routine must be called twice with the pipebufs
1194  * reversed to close both directions.
1195  */
1196 static void
1197 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1198 {
1199         globaldata_t gd;
1200
1201         if (pipe == NULL)
1202                 return;
1203
1204         /*
1205          * We need both the read and write tokens to modify pipe_state.
1206          */
1207         lwkt_gettoken(&rpb->rlock);
1208         lwkt_gettoken(&rpb->wlock);
1209
1210         /*
1211          * Set our state, wakeup anyone waiting in select/poll/kq, and
1212          * wakeup anyone blocked on our pipe.  No action if our side
1213          * is already closed.
1214          */
1215         if (rpb->state & PIPE_CLOSED) {
1216                 lwkt_reltoken(&rpb->wlock);
1217                 lwkt_reltoken(&rpb->rlock);
1218                 return;
1219         }
1220
1221         atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1222         pipewakeup(rpb, 1);
1223         if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224                 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225                 wakeup(rpb);
1226         }
1227         lwkt_reltoken(&rpb->wlock);
1228         lwkt_reltoken(&rpb->rlock);
1229
1230         /*
1231          * Disconnect from peer.
1232          */
1233         lwkt_gettoken(&wpb->rlock);
1234         lwkt_gettoken(&wpb->wlock);
1235
1236         atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1237         pipewakeup(wpb, 1);
1238         if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1239                 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1240                 wakeup(wpb);
1241         }
1242         if (SLIST_FIRST(&wpb->kq.ki_note))
1243                 KNOTE(&wpb->kq.ki_note, 0);
1244         lwkt_reltoken(&wpb->wlock);
1245         lwkt_reltoken(&wpb->rlock);
1246
1247         /*
1248          * Free resources once both sides are closed.  We maintain a pcpu
1249          * cache to improve performance, so the actual tear-down case is
1250          * limited to bulk situations.
1251          *
1252          * However, the bulk tear-down case can cause intense contention
1253          * on the kernel_map when, e.g. hundreds to hundreds of thousands
1254          * of processes are killed at the same time.  To deal with this we
1255          * use a pcpu mutex to maintain concurrency but also limit the
1256          * number of threads banging on the map and pmap.
1257          *
1258          * We use the mtx mechanism instead of the lockmgr mechanism because
1259          * the mtx mechanism utilizes a queued design which will not break
1260          * down in the face of thousands to hundreds of thousands of
1261          * processes trying to free pipes simultaneously.  The lockmgr
1262          * mechanism will wind up waking them all up each time a lock
1263          * cycles.
1264          */
1265         if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1266                 gd = mycpu;
1267                 if (gd->gd_pipeqcount >= pipe_maxcache) {
1268                         mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1269                         pipe_free_kmem(rpb);
1270                         pipe_free_kmem(wpb);
1271                         mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1272                         kfree(pipe, M_PIPE);
1273                 } else {
1274                         rpb->state = 0;
1275                         wpb->state = 0;
1276                         pipe->next = gd->gd_pipeq;
1277                         gd->gd_pipeq = pipe;
1278                         ++gd->gd_pipeqcount;
1279                 }
1280         }
1281 }
1282
1283 static int
1284 pipe_kqfilter(struct file *fp, struct knote *kn)
1285 {
1286         struct pipebuf *rpb;
1287         struct pipebuf *wpb;
1288         struct pipe *pipe;
1289
1290         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1291         if ((intptr_t)fp->f_data & 1) {
1292                 rpb = &pipe->bufferB;
1293                 wpb = &pipe->bufferA;
1294         } else {
1295                 rpb = &pipe->bufferA;
1296                 wpb = &pipe->bufferB;
1297         }
1298
1299         switch (kn->kn_filter) {
1300         case EVFILT_READ:
1301                 kn->kn_fop = &pipe_rfiltops;
1302                 break;
1303         case EVFILT_WRITE:
1304                 kn->kn_fop = &pipe_wfiltops;
1305                 if (wpb->state & PIPE_CLOSED) {
1306                         /* other end of pipe has been closed */
1307                         return (EPIPE);
1308                 }
1309                 break;
1310         default:
1311                 return (EOPNOTSUPP);
1312         }
1313
1314         if (rpb == &pipe->bufferA)
1315                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1316         else
1317                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1318
1319         knote_insert(&rpb->kq.ki_note, kn);
1320
1321         return (0);
1322 }
1323
1324 static void
1325 filt_pipedetach(struct knote *kn)
1326 {
1327         struct pipebuf *rpb;
1328         struct pipebuf *wpb;
1329         struct pipe *pipe;
1330
1331         pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1332         if ((intptr_t)kn->kn_hook & 1) {
1333                 rpb = &pipe->bufferB;
1334                 wpb = &pipe->bufferA;
1335         } else {
1336                 rpb = &pipe->bufferA;
1337                 wpb = &pipe->bufferB;
1338         }
1339         knote_remove(&rpb->kq.ki_note, kn);
1340 }
1341
1342 /*ARGSUSED*/
1343 static int
1344 filt_piperead(struct knote *kn, long hint)
1345 {
1346         struct pipebuf *rpb;
1347         struct pipebuf *wpb;
1348         struct pipe *pipe;
1349         int ready = 0;
1350
1351         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1352         if ((intptr_t)kn->kn_fp->f_data & 1) {
1353                 rpb = &pipe->bufferB;
1354                 wpb = &pipe->bufferA;
1355         } else {
1356                 rpb = &pipe->bufferA;
1357                 wpb = &pipe->bufferB;
1358         }
1359
1360         /*
1361          * We shouldn't need the pipe locks because the knote itself is
1362          * locked via KN_PROCESSING.  If we lose a race against the writer,
1363          * the writer will just issue a KNOTE() after us.
1364          */
1365 #if 0
1366         lwkt_gettoken(&rpb->rlock);
1367         lwkt_gettoken(&rpb->wlock);
1368 #endif
1369
1370         kn->kn_data = rpb->windex - rpb->rindex;
1371         if (kn->kn_data < 0)
1372                 kn->kn_data = 0;
1373
1374         if (rpb->state & PIPE_REOF) {
1375                 /*
1376                  * Only set NODATA if all data has been exhausted
1377                  */
1378                 if (kn->kn_data == 0)
1379                         kn->kn_flags |= EV_NODATA;
1380                 kn->kn_flags |= EV_EOF; 
1381                 ready = 1;
1382         }
1383
1384 #if 0
1385         lwkt_reltoken(&rpb->wlock);
1386         lwkt_reltoken(&rpb->rlock);
1387 #endif
1388
1389         if (!ready)
1390                 ready = kn->kn_data > 0;
1391
1392         return (ready);
1393 }
1394
1395 /*ARGSUSED*/
1396 static int
1397 filt_pipewrite(struct knote *kn, long hint)
1398 {
1399         struct pipebuf *rpb;
1400         struct pipebuf *wpb;
1401         struct pipe *pipe;
1402         int ready = 0;
1403
1404         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1405         if ((intptr_t)kn->kn_fp->f_data & 1) {
1406                 rpb = &pipe->bufferB;
1407                 wpb = &pipe->bufferA;
1408         } else {
1409                 rpb = &pipe->bufferA;
1410                 wpb = &pipe->bufferB;
1411         }
1412
1413         kn->kn_data = 0;
1414         if (wpb->state & PIPE_CLOSED) {
1415                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1416                 return (1);
1417         }
1418
1419         /*
1420          * We shouldn't need the pipe locks because the knote itself is
1421          * locked via KN_PROCESSING.  If we lose a race against the reader,
1422          * the writer will just issue a KNOTE() after us.
1423          */
1424 #if 0
1425         lwkt_gettoken(&wpb->rlock);
1426         lwkt_gettoken(&wpb->wlock);
1427 #endif
1428
1429         if (wpb->state & PIPE_WEOF) {
1430                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1431                 ready = 1;
1432         }
1433
1434         if (!ready) {
1435                 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1436                 if (kn->kn_data < 0)
1437                         kn->kn_data = 0;
1438         }
1439
1440 #if 0
1441         lwkt_reltoken(&wpb->wlock);
1442         lwkt_reltoken(&wpb->rlock);
1443 #endif
1444
1445         if (!ready)
1446                 ready = kn->kn_data >= PIPE_BUF;
1447
1448         return (ready);
1449 }