carp: add carp_group_demote_adj()
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64 #include <sys/mplock2.h>
65
66 #include <machine/cpufunc.h>
67
68 /*
69  * interfaces to the outside world
70  */
71 static int pipe_read (struct file *fp, struct uio *uio, 
72                 struct ucred *cred, int flags);
73 static int pipe_write (struct file *fp, struct uio *uio, 
74                 struct ucred *cred, int flags);
75 static int pipe_close (struct file *fp);
76 static int pipe_shutdown (struct file *fp, int how);
77 static int pipe_kqfilter (struct file *fp, struct knote *kn);
78 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
79 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
80                 struct ucred *cred, struct sysmsg *msg);
81
82 static struct fileops pipeops = {
83         .fo_read = pipe_read, 
84         .fo_write = pipe_write,
85         .fo_ioctl = pipe_ioctl,
86         .fo_kqfilter = pipe_kqfilter,
87         .fo_stat = pipe_stat,
88         .fo_close = pipe_close,
89         .fo_shutdown = pipe_shutdown
90 };
91
92 static void     filt_pipedetach(struct knote *kn);
93 static int      filt_piperead(struct knote *kn, long hint);
94 static int      filt_pipewrite(struct knote *kn, long hint);
95
96 static struct filterops pipe_rfiltops =
97         { FILTEROP_ISFD, NULL, filt_pipedetach, filt_piperead };
98 static struct filterops pipe_wfiltops =
99         { FILTEROP_ISFD, NULL, filt_pipedetach, filt_pipewrite };
100
101 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
102
103 /*
104  * Default pipe buffer size(s), this can be kind-of large now because pipe
105  * space is pageable.  The pipe code will try to maintain locality of
106  * reference for performance reasons, so small amounts of outstanding I/O
107  * will not wipe the cache.
108  */
109 #define MINPIPESIZE (PIPE_SIZE/3)
110 #define MAXPIPESIZE (2*PIPE_SIZE/3)
111
112 /*
113  * Limit the number of "big" pipes
114  */
115 #define LIMITBIGPIPES   64
116 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
117
118 static int pipe_maxbig = LIMITBIGPIPES;
119 static int pipe_maxcache = PIPEQ_MAX_CACHE;
120 static int pipe_bigcount;
121 static int pipe_nbig;
122 static int pipe_bcache_alloc;
123 static int pipe_bkmem_alloc;
124 static int pipe_rblocked_count;
125 static int pipe_wblocked_count;
126
127 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
128 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
129         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
130 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
131         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
132 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
133         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
134 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
135         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
136 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
137         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
138 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
139         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
140 #ifdef SMP
141 static int pipe_delay = 5000;   /* 5uS default */
142 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
143         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
144 static int pipe_mpsafe = 1;
145 SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe,
146         CTLFLAG_RW, &pipe_mpsafe, 0, "");
147 #endif
148 #if !defined(NO_PIPE_SYSCTL_STATS)
149 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
150         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
151 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
152         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
153 #endif
154
155 static void pipeclose (struct pipe *cpipe);
156 static void pipe_free_kmem (struct pipe *cpipe);
157 static int pipe_create (struct pipe **cpipep);
158 static __inline void pipewakeup (struct pipe *cpipe);
159 static int pipespace (struct pipe *cpipe, int size);
160
161 static __inline void
162 pipewakeup(struct pipe *cpipe)
163 {
164         if ((cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
165                 get_mplock();
166                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
167                 rel_mplock();
168         }
169         KNOTE(&cpipe->pipe_kq.ki_note, 0);
170 }
171
172 /*
173  * These routines are called before and after a UIO.  The UIO
174  * may block, causing our held tokens to be lost temporarily.
175  *
176  * We use these routines to serialize reads against other reads
177  * and writes against other writes.
178  *
179  * The read token is held on entry so *ipp does not race.
180  */
181 static __inline int
182 pipe_start_uio(struct pipe *cpipe, int *ipp)
183 {
184         int error;
185
186         while (*ipp) {
187                 *ipp = -1;
188                 error = tsleep(ipp, PCATCH, "pipexx", 0);
189                 if (error)
190                         return (error);
191         }
192         *ipp = 1;
193         return (0);
194 }
195
196 static __inline void
197 pipe_end_uio(struct pipe *cpipe, int *ipp)
198 {
199         if (*ipp < 0) {
200                 *ipp = 0;
201                 wakeup(ipp);
202         } else {
203                 KKASSERT(*ipp > 0);
204                 *ipp = 0;
205         }
206 }
207
208 static __inline void
209 pipe_get_mplock(int *save)
210 {
211 #ifdef SMP
212         if (pipe_mpsafe == 0) {
213                 get_mplock();
214                 *save = 1;
215         } else
216 #endif
217         {
218                 *save = 0;
219         }
220 }
221
222 static __inline void
223 pipe_rel_mplock(int *save)
224 {
225 #ifdef SMP
226         if (*save)
227                 rel_mplock();
228 #endif
229 }
230
231
232 /*
233  * The pipe system call for the DTYPE_PIPE type of pipes
234  *
235  * pipe_args(int dummy)
236  *
237  * MPSAFE
238  */
239 int
240 sys_pipe(struct pipe_args *uap)
241 {
242         struct thread *td = curthread;
243         struct filedesc *fdp = td->td_proc->p_fd;
244         struct file *rf, *wf;
245         struct pipe *rpipe, *wpipe;
246         int fd1, fd2, error;
247
248         rpipe = wpipe = NULL;
249         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
250                 pipeclose(rpipe); 
251                 pipeclose(wpipe); 
252                 return (ENFILE);
253         }
254         
255         error = falloc(td->td_lwp, &rf, &fd1);
256         if (error) {
257                 pipeclose(rpipe);
258                 pipeclose(wpipe);
259                 return (error);
260         }
261         uap->sysmsg_fds[0] = fd1;
262
263         /*
264          * Warning: once we've gotten past allocation of the fd for the
265          * read-side, we can only drop the read side via fdrop() in order
266          * to avoid races against processes which manage to dup() the read
267          * side while we are blocked trying to allocate the write side.
268          */
269         rf->f_type = DTYPE_PIPE;
270         rf->f_flag = FREAD | FWRITE;
271         rf->f_ops = &pipeops;
272         rf->f_data = rpipe;
273         error = falloc(td->td_lwp, &wf, &fd2);
274         if (error) {
275                 fsetfd(fdp, NULL, fd1);
276                 fdrop(rf);
277                 /* rpipe has been closed by fdrop(). */
278                 pipeclose(wpipe);
279                 return (error);
280         }
281         wf->f_type = DTYPE_PIPE;
282         wf->f_flag = FREAD | FWRITE;
283         wf->f_ops = &pipeops;
284         wf->f_data = wpipe;
285         uap->sysmsg_fds[1] = fd2;
286
287         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
288                                     M_PIPE, M_WAITOK|M_ZERO);
289         wpipe->pipe_slock = rpipe->pipe_slock;
290         rpipe->pipe_peer = wpipe;
291         wpipe->pipe_peer = rpipe;
292         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
293
294         /*
295          * Once activated the peer relationship remains valid until
296          * both sides are closed.
297          */
298         fsetfd(fdp, rf, fd1);
299         fsetfd(fdp, wf, fd2);
300         fdrop(rf);
301         fdrop(wf);
302
303         return (0);
304 }
305
306 /*
307  * Allocate kva for pipe circular buffer, the space is pageable
308  * This routine will 'realloc' the size of a pipe safely, if it fails
309  * it will retain the old buffer.
310  * If it fails it will return ENOMEM.
311  */
312 static int
313 pipespace(struct pipe *cpipe, int size)
314 {
315         struct vm_object *object;
316         caddr_t buffer;
317         int npages, error;
318
319         npages = round_page(size) / PAGE_SIZE;
320         object = cpipe->pipe_buffer.object;
321
322         /*
323          * [re]create the object if necessary and reserve space for it
324          * in the kernel_map.  The object and memory are pageable.  On
325          * success, free the old resources before assigning the new
326          * ones.
327          */
328         if (object == NULL || object->size != npages) {
329                 get_mplock();
330                 object = vm_object_allocate(OBJT_DEFAULT, npages);
331                 buffer = (caddr_t)vm_map_min(&kernel_map);
332
333                 error = vm_map_find(&kernel_map, object, 0,
334                                     (vm_offset_t *)&buffer,
335                                     size, PAGE_SIZE,
336                                     1, VM_MAPTYPE_NORMAL,
337                                     VM_PROT_ALL, VM_PROT_ALL,
338                                     0);
339
340                 if (error != KERN_SUCCESS) {
341                         vm_object_deallocate(object);
342                         rel_mplock();
343                         return (ENOMEM);
344                 }
345                 pipe_free_kmem(cpipe);
346                 rel_mplock();
347                 cpipe->pipe_buffer.object = object;
348                 cpipe->pipe_buffer.buffer = buffer;
349                 cpipe->pipe_buffer.size = size;
350                 ++pipe_bkmem_alloc;
351         } else {
352                 ++pipe_bcache_alloc;
353         }
354         cpipe->pipe_buffer.rindex = 0;
355         cpipe->pipe_buffer.windex = 0;
356         return (0);
357 }
358
359 /*
360  * Initialize and allocate VM and memory for pipe, pulling the pipe from
361  * our per-cpu cache if possible.  For now make sure it is sized for the
362  * smaller PIPE_SIZE default.
363  */
364 static int
365 pipe_create(struct pipe **cpipep)
366 {
367         globaldata_t gd = mycpu;
368         struct pipe *cpipe;
369         int error;
370
371         if ((cpipe = gd->gd_pipeq) != NULL) {
372                 gd->gd_pipeq = cpipe->pipe_peer;
373                 --gd->gd_pipeqcount;
374                 cpipe->pipe_peer = NULL;
375                 cpipe->pipe_wantwcnt = 0;
376         } else {
377                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
378         }
379         *cpipep = cpipe;
380         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
381                 return (error);
382         vfs_timestamp(&cpipe->pipe_ctime);
383         cpipe->pipe_atime = cpipe->pipe_ctime;
384         cpipe->pipe_mtime = cpipe->pipe_ctime;
385         lwkt_token_init(&cpipe->pipe_rlock, 1);
386         lwkt_token_init(&cpipe->pipe_wlock, 1);
387         return (0);
388 }
389
390 /*
391  * MPALMOSTSAFE (acquires mplock)
392  */
393 static int
394 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
395 {
396         struct pipe *rpipe;
397         int error;
398         size_t nread = 0;
399         int nbio;
400         u_int size;     /* total bytes available */
401         u_int nsize;    /* total bytes to read */
402         u_int rindex;   /* contiguous bytes available */
403         int notify_writer;
404         int mpsave;
405         int bigread;
406         int bigcount;
407
408         if (uio->uio_resid == 0)
409                 return(0);
410
411         /*
412          * Setup locks, calculate nbio
413          */
414         pipe_get_mplock(&mpsave);
415         rpipe = (struct pipe *)fp->f_data;
416         lwkt_gettoken(&rpipe->pipe_rlock);
417
418         if (fflags & O_FBLOCKING)
419                 nbio = 0;
420         else if (fflags & O_FNONBLOCKING)
421                 nbio = 1;
422         else if (fp->f_flag & O_NONBLOCK)
423                 nbio = 1;
424         else
425                 nbio = 0;
426
427         /*
428          * Reads are serialized.  Note however that pipe_buffer.buffer and
429          * pipe_buffer.size can change out from under us when the number
430          * of bytes in the buffer are zero due to the write-side doing a
431          * pipespace().
432          */
433         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
434         if (error) {
435                 pipe_rel_mplock(&mpsave);
436                 lwkt_reltoken(&rpipe->pipe_rlock);
437                 return (error);
438         }
439         notify_writer = 0;
440
441         bigread = (uio->uio_resid > 10 * 1024 * 1024);
442         bigcount = 10;
443
444         while (uio->uio_resid) {
445                 /*
446                  * Don't hog the cpu.
447                  */
448                 if (bigread && --bigcount == 0) {
449                         lwkt_user_yield();
450                         bigcount = 10;
451                         if (CURSIG(curthread->td_lwp)) {
452                                 error = EINTR;
453                                 break;
454                         }
455                 }
456
457                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
458                 cpu_lfence();
459                 if (size) {
460                         rindex = rpipe->pipe_buffer.rindex &
461                                  (rpipe->pipe_buffer.size - 1);
462                         nsize = size;
463                         if (nsize > rpipe->pipe_buffer.size - rindex)
464                                 nsize = rpipe->pipe_buffer.size - rindex;
465                         nsize = szmin(nsize, uio->uio_resid);
466
467                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
468                                         nsize, uio);
469                         if (error)
470                                 break;
471                         cpu_mfence();
472                         rpipe->pipe_buffer.rindex += nsize;
473                         nread += nsize;
474
475                         /*
476                          * If the FIFO is still over half full just continue
477                          * and do not try to notify the writer yet.
478                          */
479                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
480                                 notify_writer = 0;
481                                 continue;
482                         }
483
484                         /*
485                          * When the FIFO is less then half full notify any
486                          * waiting writer.  WANTW can be checked while
487                          * holding just the rlock.
488                          */
489                         notify_writer = 1;
490                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
491                                 continue;
492                 }
493
494                 /*
495                  * If the "write-side" was blocked we wake it up.  This code
496                  * is reached either when the buffer is completely emptied
497                  * or if it becomes more then half-empty.
498                  *
499                  * Pipe_state can only be modified if both the rlock and
500                  * wlock are held.
501                  */
502                 if (rpipe->pipe_state & PIPE_WANTW) {
503                         lwkt_gettoken(&rpipe->pipe_wlock);
504                         if (rpipe->pipe_state & PIPE_WANTW) {
505                                 notify_writer = 0;
506                                 rpipe->pipe_state &= ~PIPE_WANTW;
507                                 lwkt_reltoken(&rpipe->pipe_wlock);
508                                 wakeup(rpipe);
509                         } else {
510                                 lwkt_reltoken(&rpipe->pipe_wlock);
511                         }
512                 }
513
514                 /*
515                  * Pick up our copy loop again if the writer sent data to
516                  * us while we were messing around.
517                  *
518                  * On a SMP box poll up to pipe_delay nanoseconds for new
519                  * data.  Typically a value of 2000 to 4000 is sufficient
520                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
521                  * is used for synchronous communications with small packets,
522                  * and 8000 or so (8uS) will pipeline large buffer xfers
523                  * between cpus over a pipe.
524                  *
525                  * For synchronous communications a hit means doing a
526                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
527                  * where as miss requiring a tsleep/wakeup sequence
528                  * will take 7uS or more.
529                  */
530                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
531                         continue;
532
533 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
534                 if (pipe_delay) {
535                         int64_t tsc_target;
536                         int good = 0;
537
538                         tsc_target = tsc_get_target(pipe_delay);
539                         while (tsc_test_target(tsc_target) == 0) {
540                                 if (rpipe->pipe_buffer.windex !=
541                                     rpipe->pipe_buffer.rindex) {
542                                         good = 1;
543                                         break;
544                                 }
545                         }
546                         if (good)
547                                 continue;
548                 }
549 #endif
550
551                 /*
552                  * Detect EOF condition, do not set error.
553                  */
554                 if (rpipe->pipe_state & PIPE_REOF)
555                         break;
556
557                 /*
558                  * Break if some data was read, or if this was a non-blocking
559                  * read.
560                  */
561                 if (nread > 0)
562                         break;
563
564                 if (nbio) {
565                         error = EAGAIN;
566                         break;
567                 }
568
569                 /*
570                  * Last chance, interlock with WANTR.
571                  */
572                 lwkt_gettoken(&rpipe->pipe_wlock);
573                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
574                 if (size) {
575                         lwkt_reltoken(&rpipe->pipe_wlock);
576                         continue;
577                 }
578
579                 /*
580                  * Retest EOF - acquiring a new token can temporarily release
581                  * tokens already held.
582                  */
583                 if (rpipe->pipe_state & PIPE_REOF) {
584                         lwkt_reltoken(&rpipe->pipe_wlock);
585                         break;
586                 }
587
588                 /*
589                  * If there is no more to read in the pipe, reset its
590                  * pointers to the beginning.  This improves cache hit
591                  * stats.
592                  *
593                  * We need both locks to modify both pointers, and there
594                  * must also not be a write in progress or the uiomove()
595                  * in the write might block and temporarily release
596                  * its wlock, then reacquire and update windex.  We are
597                  * only serialized against reads, not writes.
598                  *
599                  * XXX should we even bother resetting the indices?  It
600                  *     might actually be more cache efficient not to.
601                  */
602                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
603                     rpipe->pipe_wip == 0) {
604                         rpipe->pipe_buffer.rindex = 0;
605                         rpipe->pipe_buffer.windex = 0;
606                 }
607
608                 /*
609                  * Wait for more data.
610                  *
611                  * Pipe_state can only be set if both the rlock and wlock
612                  * are held.
613                  */
614                 rpipe->pipe_state |= PIPE_WANTR;
615                 tsleep_interlock(rpipe, PCATCH);
616                 lwkt_reltoken(&rpipe->pipe_wlock);
617                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
618                 ++pipe_rblocked_count;
619                 if (error)
620                         break;
621         }
622         pipe_end_uio(rpipe, &rpipe->pipe_rip);
623
624         /*
625          * Uptime last access time
626          */
627         if (error == 0 && nread)
628                 vfs_timestamp(&rpipe->pipe_atime);
629
630         /*
631          * If we drained the FIFO more then half way then handle
632          * write blocking hysteresis.
633          *
634          * Note that PIPE_WANTW cannot be set by the writer without
635          * it holding both rlock and wlock, so we can test it
636          * while holding just rlock.
637          */
638         if (notify_writer) {
639                 if (rpipe->pipe_state & PIPE_WANTW) {
640                         lwkt_gettoken(&rpipe->pipe_wlock);
641                         if (rpipe->pipe_state & PIPE_WANTW) {
642                                 rpipe->pipe_state &= ~PIPE_WANTW;
643                                 lwkt_reltoken(&rpipe->pipe_wlock);
644                                 wakeup(rpipe);
645                         } else {
646                                 lwkt_reltoken(&rpipe->pipe_wlock);
647                         }
648                 }
649                 lwkt_gettoken(&rpipe->pipe_wlock);
650                 pipewakeup(rpipe);
651                 lwkt_reltoken(&rpipe->pipe_wlock);
652         }
653         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
654         lwkt_reltoken(&rpipe->pipe_rlock);
655
656         pipe_rel_mplock(&mpsave);
657         return (error);
658 }
659
660 /*
661  * MPALMOSTSAFE - acquires mplock
662  */
663 static int
664 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
665 {
666         int error;
667         int orig_resid;
668         int nbio;
669         struct pipe *wpipe, *rpipe;
670         u_int windex;
671         u_int space;
672         u_int wcount;
673         int mpsave;
674         int bigwrite;
675         int bigcount;
676
677         pipe_get_mplock(&mpsave);
678
679         /*
680          * Writes go to the peer.  The peer will always exist.
681          */
682         rpipe = (struct pipe *) fp->f_data;
683         wpipe = rpipe->pipe_peer;
684         lwkt_gettoken(&wpipe->pipe_wlock);
685         if (wpipe->pipe_state & PIPE_WEOF) {
686                 pipe_rel_mplock(&mpsave);
687                 lwkt_reltoken(&wpipe->pipe_wlock);
688                 return (EPIPE);
689         }
690
691         /*
692          * Degenerate case (EPIPE takes prec)
693          */
694         if (uio->uio_resid == 0) {
695                 pipe_rel_mplock(&mpsave);
696                 lwkt_reltoken(&wpipe->pipe_wlock);
697                 return(0);
698         }
699
700         /*
701          * Writes are serialized (start_uio must be called with wlock)
702          */
703         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
704         if (error) {
705                 pipe_rel_mplock(&mpsave);
706                 lwkt_reltoken(&wpipe->pipe_wlock);
707                 return (error);
708         }
709
710         if (fflags & O_FBLOCKING)
711                 nbio = 0;
712         else if (fflags & O_FNONBLOCKING)
713                 nbio = 1;
714         else if (fp->f_flag & O_NONBLOCK)
715                 nbio = 1;
716         else
717                 nbio = 0;
718
719         /*
720          * If it is advantageous to resize the pipe buffer, do
721          * so.  We are write-serialized so we can block safely.
722          */
723         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
724             (pipe_nbig < pipe_maxbig) &&
725             wpipe->pipe_wantwcnt > 4 &&
726             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
727                 /* 
728                  * Recheck after lock.
729                  */
730                 lwkt_gettoken(&wpipe->pipe_rlock);
731                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
732                     (pipe_nbig < pipe_maxbig) &&
733                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
734                         atomic_add_int(&pipe_nbig, 1);
735                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
736                                 ++pipe_bigcount;
737                         else
738                                 atomic_subtract_int(&pipe_nbig, 1);
739                 }
740                 lwkt_reltoken(&wpipe->pipe_rlock);
741         }
742
743         orig_resid = uio->uio_resid;
744         wcount = 0;
745
746         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
747         bigcount = 10;
748
749         while (uio->uio_resid) {
750                 if (wpipe->pipe_state & PIPE_WEOF) {
751                         error = EPIPE;
752                         break;
753                 }
754
755                 /*
756                  * Don't hog the cpu.
757                  */
758                 if (bigwrite && --bigcount == 0) {
759                         lwkt_user_yield();
760                         bigcount = 10;
761                         if (CURSIG(curthread->td_lwp)) {
762                                 error = EINTR;
763                                 break;
764                         }
765                 }
766
767                 windex = wpipe->pipe_buffer.windex &
768                          (wpipe->pipe_buffer.size - 1);
769                 space = wpipe->pipe_buffer.size -
770                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
771                 cpu_lfence();
772
773                 /* Writes of size <= PIPE_BUF must be atomic. */
774                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
775                         space = 0;
776
777                 /* 
778                  * Write to fill, read size handles write hysteresis.  Also
779                  * additional restrictions can cause select-based non-blocking
780                  * writes to spin.
781                  */
782                 if (space > 0) {
783                         u_int segsize;
784
785                         /*
786                          * Transfer size is minimum of uio transfer
787                          * and free space in pipe buffer.
788                          *
789                          * Limit each uiocopy to no more then PIPE_SIZE
790                          * so we can keep the gravy train going on a
791                          * SMP box.  This doubles the performance for
792                          * write sizes > 16K.  Otherwise large writes
793                          * wind up doing an inefficient synchronous
794                          * ping-pong.
795                          */
796                         space = szmin(space, uio->uio_resid);
797                         if (space > PIPE_SIZE)
798                                 space = PIPE_SIZE;
799
800                         /*
801                          * First segment to transfer is minimum of
802                          * transfer size and contiguous space in
803                          * pipe buffer.  If first segment to transfer
804                          * is less than the transfer size, we've got
805                          * a wraparound in the buffer.
806                          */
807                         segsize = wpipe->pipe_buffer.size - windex;
808                         if (segsize > space)
809                                 segsize = space;
810
811 #ifdef SMP
812                         /*
813                          * If this is the first loop and the reader is
814                          * blocked, do a preemptive wakeup of the reader.
815                          *
816                          * On SMP the IPI latency plus the wlock interlock
817                          * on the reader side is the fastest way to get the
818                          * reader going.  (The scheduler will hard loop on
819                          * lock tokens).
820                          *
821                          * NOTE: We can't clear WANTR here without acquiring
822                          * the rlock, which we don't want to do here!
823                          */
824                         if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1)
825                                 wakeup(wpipe);
826 #endif
827
828                         /*
829                          * Transfer segment, which may include a wrap-around.
830                          * Update windex to account for both all in one go
831                          * so the reader can read() the data atomically.
832                          */
833                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
834                                         segsize, uio);
835                         if (error == 0 && segsize < space) {
836                                 segsize = space - segsize;
837                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
838                                                 segsize, uio);
839                         }
840                         if (error)
841                                 break;
842                         cpu_mfence();
843                         wpipe->pipe_buffer.windex += space;
844                         wcount += space;
845                         continue;
846                 }
847
848                 /*
849                  * We need both the rlock and the wlock to interlock against
850                  * the EOF, WANTW, and size checks, and to modify pipe_state.
851                  *
852                  * These are token locks so we do not have to worry about
853                  * deadlocks.
854                  */
855                 lwkt_gettoken(&wpipe->pipe_rlock);
856
857                 /*
858                  * If the "read-side" has been blocked, wake it up now
859                  * and yield to let it drain synchronously rather
860                  * then block.
861                  */
862                 if (wpipe->pipe_state & PIPE_WANTR) {
863                         wpipe->pipe_state &= ~PIPE_WANTR;
864                         wakeup(wpipe);
865                 }
866
867                 /*
868                  * don't block on non-blocking I/O
869                  */
870                 if (nbio) {
871                         lwkt_reltoken(&wpipe->pipe_rlock);
872                         error = EAGAIN;
873                         break;
874                 }
875
876                 /*
877                  * re-test whether we have to block in the writer after
878                  * acquiring both locks, in case the reader opened up
879                  * some space.
880                  */
881                 space = wpipe->pipe_buffer.size -
882                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
883                 cpu_lfence();
884                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
885                         space = 0;
886
887                 /*
888                  * Retest EOF - acquiring a new token can temporarily release
889                  * tokens already held.
890                  */
891                 if (wpipe->pipe_state & PIPE_WEOF) {
892                         lwkt_reltoken(&wpipe->pipe_rlock);
893                         error = EPIPE;
894                         break;
895                 }
896
897                 /*
898                  * We have no more space and have something to offer,
899                  * wake up select/poll/kq.
900                  */
901                 if (space == 0) {
902                         wpipe->pipe_state |= PIPE_WANTW;
903                         ++wpipe->pipe_wantwcnt;
904                         pipewakeup(wpipe);
905                         if (wpipe->pipe_state & PIPE_WANTW)
906                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
907                         ++pipe_wblocked_count;
908                 }
909                 lwkt_reltoken(&wpipe->pipe_rlock);
910
911                 /*
912                  * Break out if we errored or the read side wants us to go
913                  * away.
914                  */
915                 if (error)
916                         break;
917                 if (wpipe->pipe_state & PIPE_WEOF) {
918                         error = EPIPE;
919                         break;
920                 }
921         }
922         pipe_end_uio(wpipe, &wpipe->pipe_wip);
923
924         /*
925          * If we have put any characters in the buffer, we wake up
926          * the reader.
927          *
928          * Both rlock and wlock are required to be able to modify pipe_state.
929          */
930         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
931                 if (wpipe->pipe_state & PIPE_WANTR) {
932                         lwkt_gettoken(&wpipe->pipe_rlock);
933                         if (wpipe->pipe_state & PIPE_WANTR) {
934                                 wpipe->pipe_state &= ~PIPE_WANTR;
935                                 lwkt_reltoken(&wpipe->pipe_rlock);
936                                 wakeup(wpipe);
937                         } else {
938                                 lwkt_reltoken(&wpipe->pipe_rlock);
939                         }
940                 }
941                 lwkt_gettoken(&wpipe->pipe_rlock);
942                 pipewakeup(wpipe);
943                 lwkt_reltoken(&wpipe->pipe_rlock);
944         }
945
946         /*
947          * Don't return EPIPE if I/O was successful
948          */
949         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
950             (uio->uio_resid == 0) &&
951             (error == EPIPE)) {
952                 error = 0;
953         }
954
955         if (error == 0)
956                 vfs_timestamp(&wpipe->pipe_mtime);
957
958         /*
959          * We have something to offer,
960          * wake up select/poll/kq.
961          */
962         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
963         lwkt_reltoken(&wpipe->pipe_wlock);
964         pipe_rel_mplock(&mpsave);
965         return (error);
966 }
967
968 /*
969  * MPALMOSTSAFE - acquires mplock
970  *
971  * we implement a very minimal set of ioctls for compatibility with sockets.
972  */
973 int
974 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
975            struct ucred *cred, struct sysmsg *msg)
976 {
977         struct pipe *mpipe;
978         int error;
979         int mpsave;
980
981         pipe_get_mplock(&mpsave);
982         mpipe = (struct pipe *)fp->f_data;
983
984         lwkt_gettoken(&mpipe->pipe_rlock);
985         lwkt_gettoken(&mpipe->pipe_wlock);
986
987         switch (cmd) {
988         case FIOASYNC:
989                 if (*(int *)data) {
990                         mpipe->pipe_state |= PIPE_ASYNC;
991                 } else {
992                         mpipe->pipe_state &= ~PIPE_ASYNC;
993                 }
994                 error = 0;
995                 break;
996         case FIONREAD:
997                 *(int *)data = mpipe->pipe_buffer.windex -
998                                 mpipe->pipe_buffer.rindex;
999                 error = 0;
1000                 break;
1001         case FIOSETOWN:
1002                 get_mplock();
1003                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
1004                 rel_mplock();
1005                 break;
1006         case FIOGETOWN:
1007                 *(int *)data = fgetown(mpipe->pipe_sigio);
1008                 error = 0;
1009                 break;
1010         case TIOCSPGRP:
1011                 /* This is deprecated, FIOSETOWN should be used instead. */
1012                 get_mplock();
1013                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
1014                 rel_mplock();
1015                 break;
1016
1017         case TIOCGPGRP:
1018                 /* This is deprecated, FIOGETOWN should be used instead. */
1019                 *(int *)data = -fgetown(mpipe->pipe_sigio);
1020                 error = 0;
1021                 break;
1022         default:
1023                 error = ENOTTY;
1024                 break;
1025         }
1026         lwkt_reltoken(&mpipe->pipe_wlock);
1027         lwkt_reltoken(&mpipe->pipe_rlock);
1028         pipe_rel_mplock(&mpsave);
1029
1030         return (error);
1031 }
1032
1033 /*
1034  * MPSAFE
1035  */
1036 static int
1037 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1038 {
1039         struct pipe *pipe;
1040         int mpsave;
1041
1042         pipe_get_mplock(&mpsave);
1043         pipe = (struct pipe *)fp->f_data;
1044
1045         bzero((caddr_t)ub, sizeof(*ub));
1046         ub->st_mode = S_IFIFO;
1047         ub->st_blksize = pipe->pipe_buffer.size;
1048         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1049         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1050         ub->st_atimespec = pipe->pipe_atime;
1051         ub->st_mtimespec = pipe->pipe_mtime;
1052         ub->st_ctimespec = pipe->pipe_ctime;
1053         /*
1054          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1055          * st_flags, st_gen.
1056          * XXX (st_dev, st_ino) should be unique.
1057          */
1058         pipe_rel_mplock(&mpsave);
1059         return (0);
1060 }
1061
1062 /*
1063  * MPALMOSTSAFE - acquires mplock
1064  */
1065 static int
1066 pipe_close(struct file *fp)
1067 {
1068         struct pipe *cpipe;
1069
1070         get_mplock();
1071         cpipe = (struct pipe *)fp->f_data;
1072         fp->f_ops = &badfileops;
1073         fp->f_data = NULL;
1074         funsetown(cpipe->pipe_sigio);
1075         pipeclose(cpipe);
1076         rel_mplock();
1077         return (0);
1078 }
1079
1080 /*
1081  * Shutdown one or both directions of a full-duplex pipe.
1082  *
1083  * MPALMOSTSAFE - acquires mplock
1084  */
1085 static int
1086 pipe_shutdown(struct file *fp, int how)
1087 {
1088         struct pipe *rpipe;
1089         struct pipe *wpipe;
1090         int error = EPIPE;
1091         int mpsave;
1092
1093         pipe_get_mplock(&mpsave);
1094         rpipe = (struct pipe *)fp->f_data;
1095         wpipe = rpipe->pipe_peer;
1096
1097         /*
1098          * We modify pipe_state on both pipes, which means we need
1099          * all four tokens!
1100          */
1101         lwkt_gettoken(&rpipe->pipe_rlock);
1102         lwkt_gettoken(&rpipe->pipe_wlock);
1103         lwkt_gettoken(&wpipe->pipe_rlock);
1104         lwkt_gettoken(&wpipe->pipe_wlock);
1105
1106         switch(how) {
1107         case SHUT_RDWR:
1108         case SHUT_RD:
1109                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1110                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1111                 if (rpipe->pipe_state & PIPE_WANTR) {
1112                         rpipe->pipe_state &= ~PIPE_WANTR;
1113                         wakeup(rpipe);
1114                 }
1115                 if (rpipe->pipe_state & PIPE_WANTW) {
1116                         rpipe->pipe_state &= ~PIPE_WANTW;
1117                         wakeup(rpipe);
1118                 }
1119                 error = 0;
1120                 if (how == SHUT_RD)
1121                         break;
1122                 /* fall through */
1123         case SHUT_WR:
1124                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1125                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1126                 if (wpipe->pipe_state & PIPE_WANTR) {
1127                         wpipe->pipe_state &= ~PIPE_WANTR;
1128                         wakeup(wpipe);
1129                 }
1130                 if (wpipe->pipe_state & PIPE_WANTW) {
1131                         wpipe->pipe_state &= ~PIPE_WANTW;
1132                         wakeup(wpipe);
1133                 }
1134                 error = 0;
1135                 break;
1136         }
1137         pipewakeup(rpipe);
1138         pipewakeup(wpipe);
1139
1140         lwkt_reltoken(&wpipe->pipe_wlock);
1141         lwkt_reltoken(&wpipe->pipe_rlock);
1142         lwkt_reltoken(&rpipe->pipe_wlock);
1143         lwkt_reltoken(&rpipe->pipe_rlock);
1144
1145         pipe_rel_mplock(&mpsave);
1146         return (error);
1147 }
1148
1149 static void
1150 pipe_free_kmem(struct pipe *cpipe)
1151 {
1152         if (cpipe->pipe_buffer.buffer != NULL) {
1153                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1154                         atomic_subtract_int(&pipe_nbig, 1);
1155                 kmem_free(&kernel_map,
1156                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1157                         cpipe->pipe_buffer.size);
1158                 cpipe->pipe_buffer.buffer = NULL;
1159                 cpipe->pipe_buffer.object = NULL;
1160         }
1161 }
1162
1163 /*
1164  * Close the pipe.  The slock must be held to interlock against simultanious
1165  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1166  */
1167 static void
1168 pipeclose(struct pipe *cpipe)
1169 {
1170         globaldata_t gd;
1171         struct pipe *ppipe;
1172
1173         if (cpipe == NULL)
1174                 return;
1175
1176         /*
1177          * The slock may not have been allocated yet (close during
1178          * initialization)
1179          *
1180          * We need both the read and write tokens to modify pipe_state.
1181          */
1182         if (cpipe->pipe_slock)
1183                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1184         lwkt_gettoken(&cpipe->pipe_rlock);
1185         lwkt_gettoken(&cpipe->pipe_wlock);
1186
1187         /*
1188          * Set our state, wakeup anyone waiting in select/poll/kq, and
1189          * wakeup anyone blocked on our pipe.
1190          */
1191         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1192         pipewakeup(cpipe);
1193         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1194                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1195                 wakeup(cpipe);
1196         }
1197
1198         /*
1199          * Disconnect from peer.
1200          */
1201         if ((ppipe = cpipe->pipe_peer) != NULL) {
1202                 lwkt_gettoken(&ppipe->pipe_rlock);
1203                 lwkt_gettoken(&ppipe->pipe_wlock);
1204                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1205                 pipewakeup(ppipe);
1206                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1207                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1208                         wakeup(ppipe);
1209                 }
1210                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1211                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1212                 lwkt_reltoken(&ppipe->pipe_wlock);
1213                 lwkt_reltoken(&ppipe->pipe_rlock);
1214         }
1215
1216         /*
1217          * If the peer is also closed we can free resources for both
1218          * sides, otherwise we leave our side intact to deal with any
1219          * races (since we only have the slock).
1220          */
1221         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1222                 cpipe->pipe_peer = NULL;
1223                 ppipe->pipe_peer = NULL;
1224                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1225                 pipeclose(ppipe);
1226                 ppipe = NULL;
1227         }
1228
1229         lwkt_reltoken(&cpipe->pipe_wlock);
1230         lwkt_reltoken(&cpipe->pipe_rlock);
1231         if (cpipe->pipe_slock)
1232                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1233
1234         /*
1235          * If we disassociated from our peer we can free resources
1236          */
1237         if (ppipe == NULL) {
1238                 gd = mycpu;
1239                 if (cpipe->pipe_slock) {
1240                         kfree(cpipe->pipe_slock, M_PIPE);
1241                         cpipe->pipe_slock = NULL;
1242                 }
1243                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1244                     cpipe->pipe_buffer.size != PIPE_SIZE
1245                 ) {
1246                         pipe_free_kmem(cpipe);
1247                         kfree(cpipe, M_PIPE);
1248                 } else {
1249                         cpipe->pipe_state = 0;
1250                         cpipe->pipe_peer = gd->gd_pipeq;
1251                         gd->gd_pipeq = cpipe;
1252                         ++gd->gd_pipeqcount;
1253                 }
1254         }
1255 }
1256
1257 /*
1258  * MPALMOSTSAFE - acquires mplock
1259  */
1260 static int
1261 pipe_kqfilter(struct file *fp, struct knote *kn)
1262 {
1263         struct pipe *cpipe;
1264
1265         cpipe = (struct pipe *)kn->kn_fp->f_data;
1266
1267         switch (kn->kn_filter) {
1268         case EVFILT_READ:
1269                 kn->kn_fop = &pipe_rfiltops;
1270                 break;
1271         case EVFILT_WRITE:
1272                 kn->kn_fop = &pipe_wfiltops;
1273                 if (cpipe->pipe_peer == NULL) {
1274                         /* other end of pipe has been closed */
1275                         return (EPIPE);
1276                 }
1277                 break;
1278         default:
1279                 return (EOPNOTSUPP);
1280         }
1281         kn->kn_hook = (caddr_t)cpipe;
1282
1283         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1284
1285         return (0);
1286 }
1287
1288 static void
1289 filt_pipedetach(struct knote *kn)
1290 {
1291         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1292
1293         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1294 }
1295
1296 /*ARGSUSED*/
1297 static int
1298 filt_piperead(struct knote *kn, long hint)
1299 {
1300         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1301         int ready = 0;
1302
1303         lwkt_gettoken(&rpipe->pipe_rlock);
1304         lwkt_gettoken(&rpipe->pipe_wlock);
1305
1306         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1307         if (rpipe->pipe_state & PIPE_REOF) {
1308                 kn->kn_flags |= EV_EOF; 
1309                 ready = 1;
1310         }
1311
1312         lwkt_reltoken(&rpipe->pipe_wlock);
1313         lwkt_reltoken(&rpipe->pipe_rlock);
1314
1315         if (!ready)
1316                 ready = kn->kn_data > 0;
1317
1318         return (ready);
1319 }
1320
1321 /*ARGSUSED*/
1322 static int
1323 filt_pipewrite(struct knote *kn, long hint)
1324 {
1325         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1326         struct pipe *wpipe = rpipe->pipe_peer;
1327         int ready = 0;
1328
1329         kn->kn_data = 0;
1330         if (wpipe == NULL) {
1331                 kn->kn_flags |= EV_EOF;
1332                 return (1);
1333         }
1334
1335         lwkt_gettoken(&wpipe->pipe_rlock);
1336         lwkt_gettoken(&wpipe->pipe_wlock);
1337
1338         if (wpipe->pipe_state & PIPE_WEOF) {
1339                 kn->kn_flags |= EV_EOF; 
1340                 ready = 1;
1341         }
1342
1343         if (!ready)
1344                 kn->kn_data = wpipe->pipe_buffer.size -
1345                               (wpipe->pipe_buffer.windex -
1346                                wpipe->pipe_buffer.rindex);
1347
1348         lwkt_reltoken(&wpipe->pipe_wlock);
1349         lwkt_reltoken(&wpipe->pipe_rlock);
1350
1351         if (!ready)
1352                 ready = kn->kn_data >= PIPE_BUF;
1353
1354         return (ready);
1355 }