2fbc592e8bef39397c00273301759c34f9b16d62
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64 #include <sys/mplock2.h>
65
66 #include <machine/cpufunc.h>
67
68 /*
69  * interfaces to the outside world
70  */
71 static int pipe_read (struct file *fp, struct uio *uio, 
72                 struct ucred *cred, int flags);
73 static int pipe_write (struct file *fp, struct uio *uio, 
74                 struct ucred *cred, int flags);
75 static int pipe_close (struct file *fp);
76 static int pipe_shutdown (struct file *fp, int how);
77 static int pipe_kqfilter (struct file *fp, struct knote *kn);
78 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
79 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
80                 struct ucred *cred, struct sysmsg *msg);
81
82 static struct fileops pipeops = {
83         .fo_read = pipe_read, 
84         .fo_write = pipe_write,
85         .fo_ioctl = pipe_ioctl,
86         .fo_kqfilter = pipe_kqfilter,
87         .fo_stat = pipe_stat,
88         .fo_close = pipe_close,
89         .fo_shutdown = pipe_shutdown
90 };
91
92 static void     filt_pipedetach(struct knote *kn);
93 static int      filt_piperead(struct knote *kn, long hint);
94 static int      filt_pipewrite(struct knote *kn, long hint);
95
96 static struct filterops pipe_rfiltops =
97         { FILTEROP_ISFD, NULL, filt_pipedetach, filt_piperead };
98 static struct filterops pipe_wfiltops =
99         { FILTEROP_ISFD, NULL, filt_pipedetach, filt_pipewrite };
100
101 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
102
103 /*
104  * Default pipe buffer size(s), this can be kind-of large now because pipe
105  * space is pageable.  The pipe code will try to maintain locality of
106  * reference for performance reasons, so small amounts of outstanding I/O
107  * will not wipe the cache.
108  */
109 #define MINPIPESIZE (PIPE_SIZE/3)
110 #define MAXPIPESIZE (2*PIPE_SIZE/3)
111
112 /*
113  * Limit the number of "big" pipes
114  */
115 #define LIMITBIGPIPES   64
116 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
117
118 static int pipe_maxbig = LIMITBIGPIPES;
119 static int pipe_maxcache = PIPEQ_MAX_CACHE;
120 static int pipe_bigcount;
121 static int pipe_nbig;
122 static int pipe_bcache_alloc;
123 static int pipe_bkmem_alloc;
124 static int pipe_rblocked_count;
125 static int pipe_wblocked_count;
126
127 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
128 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
129         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
130 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
131         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
132 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
133         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
134 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
135         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
136 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
137         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
138 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
139         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
140 #ifdef SMP
141 static int pipe_delay = 5000;   /* 5uS default */
142 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
143         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
144 static int pipe_mpsafe = 1;
145 SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe,
146         CTLFLAG_RW, &pipe_mpsafe, 0, "");
147 #endif
148 #if !defined(NO_PIPE_SYSCTL_STATS)
149 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
150         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
151 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
152         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
153 #endif
154
155 static void pipeclose (struct pipe *cpipe);
156 static void pipe_free_kmem (struct pipe *cpipe);
157 static int pipe_create (struct pipe **cpipep);
158 static int pipespace (struct pipe *cpipe, int size);
159
160 static __inline void
161 pipewakeup(struct pipe *cpipe, int dosigio)
162 {
163         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
164                 get_mplock();
165                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
166                 rel_mplock();
167         }
168         KNOTE(&cpipe->pipe_kq.ki_note, 0);
169 }
170
171 /*
172  * These routines are called before and after a UIO.  The UIO
173  * may block, causing our held tokens to be lost temporarily.
174  *
175  * We use these routines to serialize reads against other reads
176  * and writes against other writes.
177  *
178  * The read token is held on entry so *ipp does not race.
179  */
180 static __inline int
181 pipe_start_uio(struct pipe *cpipe, int *ipp)
182 {
183         int error;
184
185         while (*ipp) {
186                 *ipp = -1;
187                 error = tsleep(ipp, PCATCH, "pipexx", 0);
188                 if (error)
189                         return (error);
190         }
191         *ipp = 1;
192         return (0);
193 }
194
195 static __inline void
196 pipe_end_uio(struct pipe *cpipe, int *ipp)
197 {
198         if (*ipp < 0) {
199                 *ipp = 0;
200                 wakeup(ipp);
201         } else {
202                 KKASSERT(*ipp > 0);
203                 *ipp = 0;
204         }
205 }
206
207 static __inline void
208 pipe_get_mplock(int *save)
209 {
210 #ifdef SMP
211         if (pipe_mpsafe == 0) {
212                 get_mplock();
213                 *save = 1;
214         } else
215 #endif
216         {
217                 *save = 0;
218         }
219 }
220
221 static __inline void
222 pipe_rel_mplock(int *save)
223 {
224 #ifdef SMP
225         if (*save)
226                 rel_mplock();
227 #endif
228 }
229
230
231 /*
232  * The pipe system call for the DTYPE_PIPE type of pipes
233  *
234  * pipe_args(int dummy)
235  *
236  * MPSAFE
237  */
238 int
239 sys_pipe(struct pipe_args *uap)
240 {
241         struct thread *td = curthread;
242         struct filedesc *fdp = td->td_proc->p_fd;
243         struct file *rf, *wf;
244         struct pipe *rpipe, *wpipe;
245         int fd1, fd2, error;
246
247         rpipe = wpipe = NULL;
248         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
249                 pipeclose(rpipe); 
250                 pipeclose(wpipe); 
251                 return (ENFILE);
252         }
253         
254         error = falloc(td->td_lwp, &rf, &fd1);
255         if (error) {
256                 pipeclose(rpipe);
257                 pipeclose(wpipe);
258                 return (error);
259         }
260         uap->sysmsg_fds[0] = fd1;
261
262         /*
263          * Warning: once we've gotten past allocation of the fd for the
264          * read-side, we can only drop the read side via fdrop() in order
265          * to avoid races against processes which manage to dup() the read
266          * side while we are blocked trying to allocate the write side.
267          */
268         rf->f_type = DTYPE_PIPE;
269         rf->f_flag = FREAD | FWRITE;
270         rf->f_ops = &pipeops;
271         rf->f_data = rpipe;
272         error = falloc(td->td_lwp, &wf, &fd2);
273         if (error) {
274                 fsetfd(fdp, NULL, fd1);
275                 fdrop(rf);
276                 /* rpipe has been closed by fdrop(). */
277                 pipeclose(wpipe);
278                 return (error);
279         }
280         wf->f_type = DTYPE_PIPE;
281         wf->f_flag = FREAD | FWRITE;
282         wf->f_ops = &pipeops;
283         wf->f_data = wpipe;
284         uap->sysmsg_fds[1] = fd2;
285
286         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
287                                     M_PIPE, M_WAITOK|M_ZERO);
288         wpipe->pipe_slock = rpipe->pipe_slock;
289         rpipe->pipe_peer = wpipe;
290         wpipe->pipe_peer = rpipe;
291         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
292
293         /*
294          * Once activated the peer relationship remains valid until
295          * both sides are closed.
296          */
297         fsetfd(fdp, rf, fd1);
298         fsetfd(fdp, wf, fd2);
299         fdrop(rf);
300         fdrop(wf);
301
302         return (0);
303 }
304
305 /*
306  * Allocate kva for pipe circular buffer, the space is pageable
307  * This routine will 'realloc' the size of a pipe safely, if it fails
308  * it will retain the old buffer.
309  * If it fails it will return ENOMEM.
310  */
311 static int
312 pipespace(struct pipe *cpipe, int size)
313 {
314         struct vm_object *object;
315         caddr_t buffer;
316         int npages, error;
317
318         npages = round_page(size) / PAGE_SIZE;
319         object = cpipe->pipe_buffer.object;
320
321         /*
322          * [re]create the object if necessary and reserve space for it
323          * in the kernel_map.  The object and memory are pageable.  On
324          * success, free the old resources before assigning the new
325          * ones.
326          */
327         if (object == NULL || object->size != npages) {
328                 get_mplock();
329                 object = vm_object_allocate(OBJT_DEFAULT, npages);
330                 buffer = (caddr_t)vm_map_min(&kernel_map);
331
332                 error = vm_map_find(&kernel_map, object, 0,
333                                     (vm_offset_t *)&buffer,
334                                     size, PAGE_SIZE,
335                                     1, VM_MAPTYPE_NORMAL,
336                                     VM_PROT_ALL, VM_PROT_ALL,
337                                     0);
338
339                 if (error != KERN_SUCCESS) {
340                         vm_object_deallocate(object);
341                         rel_mplock();
342                         return (ENOMEM);
343                 }
344                 pipe_free_kmem(cpipe);
345                 rel_mplock();
346                 cpipe->pipe_buffer.object = object;
347                 cpipe->pipe_buffer.buffer = buffer;
348                 cpipe->pipe_buffer.size = size;
349                 ++pipe_bkmem_alloc;
350         } else {
351                 ++pipe_bcache_alloc;
352         }
353         cpipe->pipe_buffer.rindex = 0;
354         cpipe->pipe_buffer.windex = 0;
355         return (0);
356 }
357
358 /*
359  * Initialize and allocate VM and memory for pipe, pulling the pipe from
360  * our per-cpu cache if possible.  For now make sure it is sized for the
361  * smaller PIPE_SIZE default.
362  */
363 static int
364 pipe_create(struct pipe **cpipep)
365 {
366         globaldata_t gd = mycpu;
367         struct pipe *cpipe;
368         int error;
369
370         if ((cpipe = gd->gd_pipeq) != NULL) {
371                 gd->gd_pipeq = cpipe->pipe_peer;
372                 --gd->gd_pipeqcount;
373                 cpipe->pipe_peer = NULL;
374                 cpipe->pipe_wantwcnt = 0;
375         } else {
376                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
377         }
378         *cpipep = cpipe;
379         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
380                 return (error);
381         vfs_timestamp(&cpipe->pipe_ctime);
382         cpipe->pipe_atime = cpipe->pipe_ctime;
383         cpipe->pipe_mtime = cpipe->pipe_ctime;
384         lwkt_token_init(&cpipe->pipe_rlock, 1);
385         lwkt_token_init(&cpipe->pipe_wlock, 1);
386         return (0);
387 }
388
389 /*
390  * MPALMOSTSAFE (acquires mplock)
391  */
392 static int
393 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
394 {
395         struct pipe *rpipe;
396         struct pipe *wpipe;
397         int error;
398         size_t nread = 0;
399         int nbio;
400         u_int size;     /* total bytes available */
401         u_int nsize;    /* total bytes to read */
402         u_int rindex;   /* contiguous bytes available */
403         int notify_writer;
404         int mpsave;
405         int bigread;
406         int bigcount;
407
408         if (uio->uio_resid == 0)
409                 return(0);
410
411         /*
412          * Setup locks, calculate nbio
413          */
414         pipe_get_mplock(&mpsave);
415         rpipe = (struct pipe *)fp->f_data;
416         wpipe = rpipe->pipe_peer;
417         lwkt_gettoken(&rpipe->pipe_rlock);
418
419         if (fflags & O_FBLOCKING)
420                 nbio = 0;
421         else if (fflags & O_FNONBLOCKING)
422                 nbio = 1;
423         else if (fp->f_flag & O_NONBLOCK)
424                 nbio = 1;
425         else
426                 nbio = 0;
427
428         /*
429          * Reads are serialized.  Note however that pipe_buffer.buffer and
430          * pipe_buffer.size can change out from under us when the number
431          * of bytes in the buffer are zero due to the write-side doing a
432          * pipespace().
433          */
434         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
435         if (error) {
436                 pipe_rel_mplock(&mpsave);
437                 lwkt_reltoken(&rpipe->pipe_rlock);
438                 return (error);
439         }
440         notify_writer = 0;
441
442         bigread = (uio->uio_resid > 10 * 1024 * 1024);
443         bigcount = 10;
444
445         while (uio->uio_resid) {
446                 /*
447                  * Don't hog the cpu.
448                  */
449                 if (bigread && --bigcount == 0) {
450                         lwkt_user_yield();
451                         bigcount = 10;
452                         if (CURSIG(curthread->td_lwp)) {
453                                 error = EINTR;
454                                 break;
455                         }
456                 }
457
458                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
459                 cpu_lfence();
460                 if (size) {
461                         rindex = rpipe->pipe_buffer.rindex &
462                                  (rpipe->pipe_buffer.size - 1);
463                         nsize = size;
464                         if (nsize > rpipe->pipe_buffer.size - rindex)
465                                 nsize = rpipe->pipe_buffer.size - rindex;
466                         nsize = szmin(nsize, uio->uio_resid);
467
468                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
469                                         nsize, uio);
470                         if (error)
471                                 break;
472                         cpu_mfence();
473                         rpipe->pipe_buffer.rindex += nsize;
474                         nread += nsize;
475
476                         /*
477                          * If the FIFO is still over half full just continue
478                          * and do not try to notify the writer yet.
479                          */
480                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
481                                 notify_writer = 0;
482                                 continue;
483                         }
484
485                         /*
486                          * When the FIFO is less then half full notify any
487                          * waiting writer.  WANTW can be checked while
488                          * holding just the rlock.
489                          */
490                         notify_writer = 1;
491                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
492                                 continue;
493                 }
494
495                 /*
496                  * If the "write-side" was blocked we wake it up.  This code
497                  * is reached either when the buffer is completely emptied
498                  * or if it becomes more then half-empty.
499                  *
500                  * Pipe_state can only be modified if both the rlock and
501                  * wlock are held.
502                  */
503                 if (rpipe->pipe_state & PIPE_WANTW) {
504                         lwkt_gettoken(&rpipe->pipe_wlock);
505                         if (rpipe->pipe_state & PIPE_WANTW) {
506                                 rpipe->pipe_state &= ~PIPE_WANTW;
507                                 lwkt_reltoken(&rpipe->pipe_wlock);
508                                 wakeup(rpipe);
509                         } else {
510                                 lwkt_reltoken(&rpipe->pipe_wlock);
511                         }
512                 }
513
514                 /*
515                  * Pick up our copy loop again if the writer sent data to
516                  * us while we were messing around.
517                  *
518                  * On a SMP box poll up to pipe_delay nanoseconds for new
519                  * data.  Typically a value of 2000 to 4000 is sufficient
520                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
521                  * is used for synchronous communications with small packets,
522                  * and 8000 or so (8uS) will pipeline large buffer xfers
523                  * between cpus over a pipe.
524                  *
525                  * For synchronous communications a hit means doing a
526                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
527                  * where as miss requiring a tsleep/wakeup sequence
528                  * will take 7uS or more.
529                  */
530                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
531                         continue;
532
533 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
534                 if (pipe_delay) {
535                         int64_t tsc_target;
536                         int good = 0;
537
538                         tsc_target = tsc_get_target(pipe_delay);
539                         while (tsc_test_target(tsc_target) == 0) {
540                                 if (rpipe->pipe_buffer.windex !=
541                                     rpipe->pipe_buffer.rindex) {
542                                         good = 1;
543                                         break;
544                                 }
545                         }
546                         if (good)
547                                 continue;
548                 }
549 #endif
550
551                 /*
552                  * Detect EOF condition, do not set error.
553                  */
554                 if (rpipe->pipe_state & PIPE_REOF)
555                         break;
556
557                 /*
558                  * Break if some data was read, or if this was a non-blocking
559                  * read.
560                  */
561                 if (nread > 0)
562                         break;
563
564                 if (nbio) {
565                         error = EAGAIN;
566                         break;
567                 }
568
569                 /*
570                  * Last chance, interlock with WANTR.
571                  */
572                 lwkt_gettoken(&rpipe->pipe_wlock);
573                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
574                 if (size) {
575                         lwkt_reltoken(&rpipe->pipe_wlock);
576                         continue;
577                 }
578
579                 /*
580                  * Retest EOF - acquiring a new token can temporarily release
581                  * tokens already held.
582                  */
583                 if (rpipe->pipe_state & PIPE_REOF) {
584                         lwkt_reltoken(&rpipe->pipe_wlock);
585                         break;
586                 }
587
588                 /*
589                  * If there is no more to read in the pipe, reset its
590                  * pointers to the beginning.  This improves cache hit
591                  * stats.
592                  *
593                  * We need both locks to modify both pointers, and there
594                  * must also not be a write in progress or the uiomove()
595                  * in the write might block and temporarily release
596                  * its wlock, then reacquire and update windex.  We are
597                  * only serialized against reads, not writes.
598                  *
599                  * XXX should we even bother resetting the indices?  It
600                  *     might actually be more cache efficient not to.
601                  */
602                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
603                     rpipe->pipe_wip == 0) {
604                         rpipe->pipe_buffer.rindex = 0;
605                         rpipe->pipe_buffer.windex = 0;
606                 }
607
608                 /*
609                  * Wait for more data.
610                  *
611                  * Pipe_state can only be set if both the rlock and wlock
612                  * are held.
613                  */
614                 rpipe->pipe_state |= PIPE_WANTR;
615                 tsleep_interlock(rpipe, PCATCH);
616                 lwkt_reltoken(&rpipe->pipe_wlock);
617                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
618                 ++pipe_rblocked_count;
619                 if (error)
620                         break;
621         }
622         pipe_end_uio(rpipe, &rpipe->pipe_rip);
623
624         /*
625          * Uptime last access time
626          */
627         if (error == 0 && nread)
628                 vfs_timestamp(&rpipe->pipe_atime);
629
630         /*
631          * If we drained the FIFO more then half way then handle
632          * write blocking hysteresis.
633          *
634          * Note that PIPE_WANTW cannot be set by the writer without
635          * it holding both rlock and wlock, so we can test it
636          * while holding just rlock.
637          */
638         if (notify_writer) {
639                 /*
640                  * Synchronous blocking is done on the pipe involved
641                  */
642                 if (rpipe->pipe_state & PIPE_WANTW) {
643                         lwkt_gettoken(&rpipe->pipe_wlock);
644                         if (rpipe->pipe_state & PIPE_WANTW) {
645                                 rpipe->pipe_state &= ~PIPE_WANTW;
646                                 lwkt_reltoken(&rpipe->pipe_wlock);
647                                 wakeup(rpipe);
648                         } else {
649                                 lwkt_reltoken(&rpipe->pipe_wlock);
650                         }
651                 }
652
653                 /*
654                  * But we may also have to deal with a kqueue which is
655                  * stored on the same pipe as its descriptor, so a
656                  * EVFILT_WRITE event waiting for our side to drain will
657                  * be on the other side.
658                  */
659                 lwkt_gettoken(&wpipe->pipe_wlock);
660                 pipewakeup(wpipe, 0);
661                 lwkt_reltoken(&wpipe->pipe_wlock);
662         }
663         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
664         lwkt_reltoken(&rpipe->pipe_rlock);
665
666         pipe_rel_mplock(&mpsave);
667         return (error);
668 }
669
670 /*
671  * MPALMOSTSAFE - acquires mplock
672  */
673 static int
674 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
675 {
676         int error;
677         int orig_resid;
678         int nbio;
679         struct pipe *wpipe;
680         struct pipe *rpipe;
681         u_int windex;
682         u_int space;
683         u_int wcount;
684         int mpsave;
685         int bigwrite;
686         int bigcount;
687
688         pipe_get_mplock(&mpsave);
689
690         /*
691          * Writes go to the peer.  The peer will always exist.
692          */
693         rpipe = (struct pipe *) fp->f_data;
694         wpipe = rpipe->pipe_peer;
695         lwkt_gettoken(&wpipe->pipe_wlock);
696         if (wpipe->pipe_state & PIPE_WEOF) {
697                 pipe_rel_mplock(&mpsave);
698                 lwkt_reltoken(&wpipe->pipe_wlock);
699                 return (EPIPE);
700         }
701
702         /*
703          * Degenerate case (EPIPE takes prec)
704          */
705         if (uio->uio_resid == 0) {
706                 pipe_rel_mplock(&mpsave);
707                 lwkt_reltoken(&wpipe->pipe_wlock);
708                 return(0);
709         }
710
711         /*
712          * Writes are serialized (start_uio must be called with wlock)
713          */
714         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
715         if (error) {
716                 pipe_rel_mplock(&mpsave);
717                 lwkt_reltoken(&wpipe->pipe_wlock);
718                 return (error);
719         }
720
721         if (fflags & O_FBLOCKING)
722                 nbio = 0;
723         else if (fflags & O_FNONBLOCKING)
724                 nbio = 1;
725         else if (fp->f_flag & O_NONBLOCK)
726                 nbio = 1;
727         else
728                 nbio = 0;
729
730         /*
731          * If it is advantageous to resize the pipe buffer, do
732          * so.  We are write-serialized so we can block safely.
733          */
734         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
735             (pipe_nbig < pipe_maxbig) &&
736             wpipe->pipe_wantwcnt > 4 &&
737             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
738                 /* 
739                  * Recheck after lock.
740                  */
741                 lwkt_gettoken(&wpipe->pipe_rlock);
742                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
743                     (pipe_nbig < pipe_maxbig) &&
744                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
745                         atomic_add_int(&pipe_nbig, 1);
746                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
747                                 ++pipe_bigcount;
748                         else
749                                 atomic_subtract_int(&pipe_nbig, 1);
750                 }
751                 lwkt_reltoken(&wpipe->pipe_rlock);
752         }
753
754         orig_resid = uio->uio_resid;
755         wcount = 0;
756
757         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
758         bigcount = 10;
759
760         while (uio->uio_resid) {
761                 if (wpipe->pipe_state & PIPE_WEOF) {
762                         error = EPIPE;
763                         break;
764                 }
765
766                 /*
767                  * Don't hog the cpu.
768                  */
769                 if (bigwrite && --bigcount == 0) {
770                         lwkt_user_yield();
771                         bigcount = 10;
772                         if (CURSIG(curthread->td_lwp)) {
773                                 error = EINTR;
774                                 break;
775                         }
776                 }
777
778                 windex = wpipe->pipe_buffer.windex &
779                          (wpipe->pipe_buffer.size - 1);
780                 space = wpipe->pipe_buffer.size -
781                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
782                 cpu_lfence();
783
784                 /* Writes of size <= PIPE_BUF must be atomic. */
785                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
786                         space = 0;
787
788                 /* 
789                  * Write to fill, read size handles write hysteresis.  Also
790                  * additional restrictions can cause select-based non-blocking
791                  * writes to spin.
792                  */
793                 if (space > 0) {
794                         u_int segsize;
795
796                         /*
797                          * Transfer size is minimum of uio transfer
798                          * and free space in pipe buffer.
799                          *
800                          * Limit each uiocopy to no more then PIPE_SIZE
801                          * so we can keep the gravy train going on a
802                          * SMP box.  This doubles the performance for
803                          * write sizes > 16K.  Otherwise large writes
804                          * wind up doing an inefficient synchronous
805                          * ping-pong.
806                          */
807                         space = szmin(space, uio->uio_resid);
808                         if (space > PIPE_SIZE)
809                                 space = PIPE_SIZE;
810
811                         /*
812                          * First segment to transfer is minimum of
813                          * transfer size and contiguous space in
814                          * pipe buffer.  If first segment to transfer
815                          * is less than the transfer size, we've got
816                          * a wraparound in the buffer.
817                          */
818                         segsize = wpipe->pipe_buffer.size - windex;
819                         if (segsize > space)
820                                 segsize = space;
821
822 #ifdef SMP
823                         /*
824                          * If this is the first loop and the reader is
825                          * blocked, do a preemptive wakeup of the reader.
826                          *
827                          * On SMP the IPI latency plus the wlock interlock
828                          * on the reader side is the fastest way to get the
829                          * reader going.  (The scheduler will hard loop on
830                          * lock tokens).
831                          *
832                          * NOTE: We can't clear WANTR here without acquiring
833                          * the rlock, which we don't want to do here!
834                          */
835                         if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1)
836                                 wakeup(wpipe);
837 #endif
838
839                         /*
840                          * Transfer segment, which may include a wrap-around.
841                          * Update windex to account for both all in one go
842                          * so the reader can read() the data atomically.
843                          */
844                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
845                                         segsize, uio);
846                         if (error == 0 && segsize < space) {
847                                 segsize = space - segsize;
848                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
849                                                 segsize, uio);
850                         }
851                         if (error)
852                                 break;
853                         cpu_mfence();
854                         wpipe->pipe_buffer.windex += space;
855                         wcount += space;
856                         continue;
857                 }
858
859                 /*
860                  * We need both the rlock and the wlock to interlock against
861                  * the EOF, WANTW, and size checks, and to modify pipe_state.
862                  *
863                  * These are token locks so we do not have to worry about
864                  * deadlocks.
865                  */
866                 lwkt_gettoken(&wpipe->pipe_rlock);
867
868                 /*
869                  * If the "read-side" has been blocked, wake it up now
870                  * and yield to let it drain synchronously rather
871                  * then block.
872                  */
873                 if (wpipe->pipe_state & PIPE_WANTR) {
874                         wpipe->pipe_state &= ~PIPE_WANTR;
875                         wakeup(wpipe);
876                 }
877
878                 /*
879                  * don't block on non-blocking I/O
880                  */
881                 if (nbio) {
882                         lwkt_reltoken(&wpipe->pipe_rlock);
883                         error = EAGAIN;
884                         break;
885                 }
886
887                 /*
888                  * re-test whether we have to block in the writer after
889                  * acquiring both locks, in case the reader opened up
890                  * some space.
891                  */
892                 space = wpipe->pipe_buffer.size -
893                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
894                 cpu_lfence();
895                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
896                         space = 0;
897
898                 /*
899                  * Retest EOF - acquiring a new token can temporarily release
900                  * tokens already held.
901                  */
902                 if (wpipe->pipe_state & PIPE_WEOF) {
903                         lwkt_reltoken(&wpipe->pipe_rlock);
904                         error = EPIPE;
905                         break;
906                 }
907
908                 /*
909                  * We have no more space and have something to offer,
910                  * wake up select/poll/kq.
911                  */
912                 if (space == 0) {
913                         wpipe->pipe_state |= PIPE_WANTW;
914                         ++wpipe->pipe_wantwcnt;
915                         pipewakeup(wpipe, 1);
916                         if (wpipe->pipe_state & PIPE_WANTW)
917                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
918                         ++pipe_wblocked_count;
919                 }
920                 lwkt_reltoken(&wpipe->pipe_rlock);
921
922                 /*
923                  * Break out if we errored or the read side wants us to go
924                  * away.
925                  */
926                 if (error)
927                         break;
928                 if (wpipe->pipe_state & PIPE_WEOF) {
929                         error = EPIPE;
930                         break;
931                 }
932         }
933         pipe_end_uio(wpipe, &wpipe->pipe_wip);
934
935         /*
936          * If we have put any characters in the buffer, we wake up
937          * the reader.
938          *
939          * Both rlock and wlock are required to be able to modify pipe_state.
940          */
941         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
942                 if (wpipe->pipe_state & PIPE_WANTR) {
943                         lwkt_gettoken(&wpipe->pipe_rlock);
944                         if (wpipe->pipe_state & PIPE_WANTR) {
945                                 wpipe->pipe_state &= ~PIPE_WANTR;
946                                 lwkt_reltoken(&wpipe->pipe_rlock);
947                                 wakeup(wpipe);
948                         } else {
949                                 lwkt_reltoken(&wpipe->pipe_rlock);
950                         }
951                 }
952                 lwkt_gettoken(&wpipe->pipe_rlock);
953                 pipewakeup(wpipe, 1);
954                 lwkt_reltoken(&wpipe->pipe_rlock);
955         }
956
957         /*
958          * Don't return EPIPE if I/O was successful
959          */
960         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
961             (uio->uio_resid == 0) &&
962             (error == EPIPE)) {
963                 error = 0;
964         }
965
966         if (error == 0)
967                 vfs_timestamp(&wpipe->pipe_mtime);
968
969         /*
970          * We have something to offer,
971          * wake up select/poll/kq.
972          */
973         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
974         lwkt_reltoken(&wpipe->pipe_wlock);
975         pipe_rel_mplock(&mpsave);
976         return (error);
977 }
978
979 /*
980  * MPALMOSTSAFE - acquires mplock
981  *
982  * we implement a very minimal set of ioctls for compatibility with sockets.
983  */
984 int
985 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
986            struct ucred *cred, struct sysmsg *msg)
987 {
988         struct pipe *mpipe;
989         int error;
990         int mpsave;
991
992         pipe_get_mplock(&mpsave);
993         mpipe = (struct pipe *)fp->f_data;
994
995         lwkt_gettoken(&mpipe->pipe_rlock);
996         lwkt_gettoken(&mpipe->pipe_wlock);
997
998         switch (cmd) {
999         case FIOASYNC:
1000                 if (*(int *)data) {
1001                         mpipe->pipe_state |= PIPE_ASYNC;
1002                 } else {
1003                         mpipe->pipe_state &= ~PIPE_ASYNC;
1004                 }
1005                 error = 0;
1006                 break;
1007         case FIONREAD:
1008                 *(int *)data = mpipe->pipe_buffer.windex -
1009                                 mpipe->pipe_buffer.rindex;
1010                 error = 0;
1011                 break;
1012         case FIOSETOWN:
1013                 get_mplock();
1014                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
1015                 rel_mplock();
1016                 break;
1017         case FIOGETOWN:
1018                 *(int *)data = fgetown(mpipe->pipe_sigio);
1019                 error = 0;
1020                 break;
1021         case TIOCSPGRP:
1022                 /* This is deprecated, FIOSETOWN should be used instead. */
1023                 get_mplock();
1024                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
1025                 rel_mplock();
1026                 break;
1027
1028         case TIOCGPGRP:
1029                 /* This is deprecated, FIOGETOWN should be used instead. */
1030                 *(int *)data = -fgetown(mpipe->pipe_sigio);
1031                 error = 0;
1032                 break;
1033         default:
1034                 error = ENOTTY;
1035                 break;
1036         }
1037         lwkt_reltoken(&mpipe->pipe_wlock);
1038         lwkt_reltoken(&mpipe->pipe_rlock);
1039         pipe_rel_mplock(&mpsave);
1040
1041         return (error);
1042 }
1043
1044 /*
1045  * MPSAFE
1046  */
1047 static int
1048 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1049 {
1050         struct pipe *pipe;
1051         int mpsave;
1052
1053         pipe_get_mplock(&mpsave);
1054         pipe = (struct pipe *)fp->f_data;
1055
1056         bzero((caddr_t)ub, sizeof(*ub));
1057         ub->st_mode = S_IFIFO;
1058         ub->st_blksize = pipe->pipe_buffer.size;
1059         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1060         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1061         ub->st_atimespec = pipe->pipe_atime;
1062         ub->st_mtimespec = pipe->pipe_mtime;
1063         ub->st_ctimespec = pipe->pipe_ctime;
1064         /*
1065          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1066          * st_flags, st_gen.
1067          * XXX (st_dev, st_ino) should be unique.
1068          */
1069         pipe_rel_mplock(&mpsave);
1070         return (0);
1071 }
1072
1073 /*
1074  * MPALMOSTSAFE - acquires mplock
1075  */
1076 static int
1077 pipe_close(struct file *fp)
1078 {
1079         struct pipe *cpipe;
1080
1081         get_mplock();
1082         cpipe = (struct pipe *)fp->f_data;
1083         fp->f_ops = &badfileops;
1084         fp->f_data = NULL;
1085         funsetown(cpipe->pipe_sigio);
1086         pipeclose(cpipe);
1087         rel_mplock();
1088         return (0);
1089 }
1090
1091 /*
1092  * Shutdown one or both directions of a full-duplex pipe.
1093  *
1094  * MPALMOSTSAFE - acquires mplock
1095  */
1096 static int
1097 pipe_shutdown(struct file *fp, int how)
1098 {
1099         struct pipe *rpipe;
1100         struct pipe *wpipe;
1101         int error = EPIPE;
1102         int mpsave;
1103
1104         pipe_get_mplock(&mpsave);
1105         rpipe = (struct pipe *)fp->f_data;
1106         wpipe = rpipe->pipe_peer;
1107
1108         /*
1109          * We modify pipe_state on both pipes, which means we need
1110          * all four tokens!
1111          */
1112         lwkt_gettoken(&rpipe->pipe_rlock);
1113         lwkt_gettoken(&rpipe->pipe_wlock);
1114         lwkt_gettoken(&wpipe->pipe_rlock);
1115         lwkt_gettoken(&wpipe->pipe_wlock);
1116
1117         switch(how) {
1118         case SHUT_RDWR:
1119         case SHUT_RD:
1120                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1121                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1122                 if (rpipe->pipe_state & PIPE_WANTR) {
1123                         rpipe->pipe_state &= ~PIPE_WANTR;
1124                         wakeup(rpipe);
1125                 }
1126                 if (rpipe->pipe_state & PIPE_WANTW) {
1127                         rpipe->pipe_state &= ~PIPE_WANTW;
1128                         wakeup(rpipe);
1129                 }
1130                 error = 0;
1131                 if (how == SHUT_RD)
1132                         break;
1133                 /* fall through */
1134         case SHUT_WR:
1135                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1136                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1137                 if (wpipe->pipe_state & PIPE_WANTR) {
1138                         wpipe->pipe_state &= ~PIPE_WANTR;
1139                         wakeup(wpipe);
1140                 }
1141                 if (wpipe->pipe_state & PIPE_WANTW) {
1142                         wpipe->pipe_state &= ~PIPE_WANTW;
1143                         wakeup(wpipe);
1144                 }
1145                 error = 0;
1146                 break;
1147         }
1148         pipewakeup(rpipe, 1);
1149         pipewakeup(wpipe, 1);
1150
1151         lwkt_reltoken(&wpipe->pipe_wlock);
1152         lwkt_reltoken(&wpipe->pipe_rlock);
1153         lwkt_reltoken(&rpipe->pipe_wlock);
1154         lwkt_reltoken(&rpipe->pipe_rlock);
1155
1156         pipe_rel_mplock(&mpsave);
1157         return (error);
1158 }
1159
1160 static void
1161 pipe_free_kmem(struct pipe *cpipe)
1162 {
1163         if (cpipe->pipe_buffer.buffer != NULL) {
1164                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1165                         atomic_subtract_int(&pipe_nbig, 1);
1166                 kmem_free(&kernel_map,
1167                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1168                         cpipe->pipe_buffer.size);
1169                 cpipe->pipe_buffer.buffer = NULL;
1170                 cpipe->pipe_buffer.object = NULL;
1171         }
1172 }
1173
1174 /*
1175  * Close the pipe.  The slock must be held to interlock against simultanious
1176  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1177  */
1178 static void
1179 pipeclose(struct pipe *cpipe)
1180 {
1181         globaldata_t gd;
1182         struct pipe *ppipe;
1183
1184         if (cpipe == NULL)
1185                 return;
1186
1187         /*
1188          * The slock may not have been allocated yet (close during
1189          * initialization)
1190          *
1191          * We need both the read and write tokens to modify pipe_state.
1192          */
1193         if (cpipe->pipe_slock)
1194                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1195         lwkt_gettoken(&cpipe->pipe_rlock);
1196         lwkt_gettoken(&cpipe->pipe_wlock);
1197
1198         /*
1199          * Set our state, wakeup anyone waiting in select/poll/kq, and
1200          * wakeup anyone blocked on our pipe.
1201          */
1202         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1203         pipewakeup(cpipe, 1);
1204         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1205                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1206                 wakeup(cpipe);
1207         }
1208
1209         /*
1210          * Disconnect from peer.
1211          */
1212         if ((ppipe = cpipe->pipe_peer) != NULL) {
1213                 lwkt_gettoken(&ppipe->pipe_rlock);
1214                 lwkt_gettoken(&ppipe->pipe_wlock);
1215                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1216                 pipewakeup(ppipe, 1);
1217                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1218                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1219                         wakeup(ppipe);
1220                 }
1221                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1222                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1223                 lwkt_reltoken(&ppipe->pipe_wlock);
1224                 lwkt_reltoken(&ppipe->pipe_rlock);
1225         }
1226
1227         /*
1228          * If the peer is also closed we can free resources for both
1229          * sides, otherwise we leave our side intact to deal with any
1230          * races (since we only have the slock).
1231          */
1232         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1233                 cpipe->pipe_peer = NULL;
1234                 ppipe->pipe_peer = NULL;
1235                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1236                 pipeclose(ppipe);
1237                 ppipe = NULL;
1238         }
1239
1240         lwkt_reltoken(&cpipe->pipe_wlock);
1241         lwkt_reltoken(&cpipe->pipe_rlock);
1242         if (cpipe->pipe_slock)
1243                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1244
1245         /*
1246          * If we disassociated from our peer we can free resources
1247          */
1248         if (ppipe == NULL) {
1249                 gd = mycpu;
1250                 if (cpipe->pipe_slock) {
1251                         kfree(cpipe->pipe_slock, M_PIPE);
1252                         cpipe->pipe_slock = NULL;
1253                 }
1254                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1255                     cpipe->pipe_buffer.size != PIPE_SIZE
1256                 ) {
1257                         pipe_free_kmem(cpipe);
1258                         kfree(cpipe, M_PIPE);
1259                 } else {
1260                         cpipe->pipe_state = 0;
1261                         cpipe->pipe_peer = gd->gd_pipeq;
1262                         gd->gd_pipeq = cpipe;
1263                         ++gd->gd_pipeqcount;
1264                 }
1265         }
1266 }
1267
1268 /*
1269  * MPALMOSTSAFE - acquires mplock
1270  */
1271 static int
1272 pipe_kqfilter(struct file *fp, struct knote *kn)
1273 {
1274         struct pipe *cpipe;
1275
1276         cpipe = (struct pipe *)kn->kn_fp->f_data;
1277
1278         switch (kn->kn_filter) {
1279         case EVFILT_READ:
1280                 kn->kn_fop = &pipe_rfiltops;
1281                 break;
1282         case EVFILT_WRITE:
1283                 kn->kn_fop = &pipe_wfiltops;
1284                 if (cpipe->pipe_peer == NULL) {
1285                         /* other end of pipe has been closed */
1286                         return (EPIPE);
1287                 }
1288                 break;
1289         default:
1290                 return (EOPNOTSUPP);
1291         }
1292         kn->kn_hook = (caddr_t)cpipe;
1293
1294         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1295
1296         return (0);
1297 }
1298
1299 static void
1300 filt_pipedetach(struct knote *kn)
1301 {
1302         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1303
1304         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1305 }
1306
1307 /*ARGSUSED*/
1308 static int
1309 filt_piperead(struct knote *kn, long hint)
1310 {
1311         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1312         int ready = 0;
1313
1314         lwkt_gettoken(&rpipe->pipe_rlock);
1315         lwkt_gettoken(&rpipe->pipe_wlock);
1316
1317         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1318
1319         /*
1320          * Only set EOF if all data has been exhausted
1321          */
1322         if ((rpipe->pipe_state & PIPE_REOF) && kn->kn_data == 0) {
1323                 kn->kn_flags |= EV_EOF; 
1324                 ready = 1;
1325         }
1326
1327         lwkt_reltoken(&rpipe->pipe_wlock);
1328         lwkt_reltoken(&rpipe->pipe_rlock);
1329
1330         if (!ready)
1331                 ready = kn->kn_data > 0;
1332
1333         return (ready);
1334 }
1335
1336 /*ARGSUSED*/
1337 static int
1338 filt_pipewrite(struct knote *kn, long hint)
1339 {
1340         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1341         struct pipe *wpipe = rpipe->pipe_peer;
1342         int ready = 0;
1343
1344         kn->kn_data = 0;
1345         if (wpipe == NULL) {
1346                 kn->kn_flags |= EV_EOF;
1347                 return (1);
1348         }
1349
1350         lwkt_gettoken(&wpipe->pipe_rlock);
1351         lwkt_gettoken(&wpipe->pipe_wlock);
1352
1353         if (wpipe->pipe_state & PIPE_WEOF) {
1354                 kn->kn_flags |= EV_EOF; 
1355                 ready = 1;
1356         }
1357
1358         if (!ready)
1359                 kn->kn_data = wpipe->pipe_buffer.size -
1360                               (wpipe->pipe_buffer.windex -
1361                                wpipe->pipe_buffer.rindex);
1362
1363         lwkt_reltoken(&wpipe->pipe_wlock);
1364         lwkt_reltoken(&wpipe->pipe_rlock);
1365
1366         if (!ready)
1367                 ready = kn->kn_data >= PIPE_BUF;
1368
1369         return (ready);
1370 }