sorecvtcp: Remove unapplied code
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64
65 #include <machine/cpufunc.h>
66
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio, 
71                 struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio, 
73                 struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79                 struct ucred *cred, struct sysmsg *msg);
80
81 static struct fileops pipeops = {
82         .fo_read = pipe_read, 
83         .fo_write = pipe_write,
84         .fo_ioctl = pipe_ioctl,
85         .fo_kqfilter = pipe_kqfilter,
86         .fo_stat = pipe_stat,
87         .fo_close = pipe_close,
88         .fo_shutdown = pipe_shutdown
89 };
90
91 static void     filt_pipedetach(struct knote *kn);
92 static int      filt_piperead(struct knote *kn, long hint);
93 static int      filt_pipewrite(struct knote *kn, long hint);
94
95 static struct filterops pipe_rfiltops =
96         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES   64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 #ifdef SMP
140 static int pipe_delay = 5000;   /* 5uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 #if !defined(NO_PIPE_SYSCTL_STATS)
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
146         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
147 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
148         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
149 #endif
150
151 /*
152  * Auto-size pipe cache to reduce kmem allocations and frees.
153  */
154 static
155 void
156 pipeinit(void *dummy)
157 {
158         size_t mbytes = kmem_lim_size();
159
160         if (pipe_maxbig == LIMITBIGPIPES) {
161                 if (mbytes >= 7 * 1024)
162                         pipe_maxbig *= 2;
163                 if (mbytes >= 15 * 1024)
164                         pipe_maxbig *= 2;
165         }
166         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
167                 if (mbytes >= 7 * 1024)
168                         pipe_maxcache *= 2;
169                 if (mbytes >= 15 * 1024)
170                         pipe_maxcache *= 2;
171         }
172 }
173 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL)
174
175 static void pipeclose (struct pipe *cpipe);
176 static void pipe_free_kmem (struct pipe *cpipe);
177 static int pipe_create (struct pipe **cpipep);
178 static int pipespace (struct pipe *cpipe, int size);
179
180 static __inline void
181 pipewakeup(struct pipe *cpipe, int dosigio)
182 {
183         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
184                 lwkt_gettoken(&proc_token);
185                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
186                 lwkt_reltoken(&proc_token);
187         }
188         KNOTE(&cpipe->pipe_kq.ki_note, 0);
189 }
190
191 /*
192  * These routines are called before and after a UIO.  The UIO
193  * may block, causing our held tokens to be lost temporarily.
194  *
195  * We use these routines to serialize reads against other reads
196  * and writes against other writes.
197  *
198  * The read token is held on entry so *ipp does not race.
199  */
200 static __inline int
201 pipe_start_uio(struct pipe *cpipe, int *ipp)
202 {
203         int error;
204
205         while (*ipp) {
206                 *ipp = -1;
207                 error = tsleep(ipp, PCATCH, "pipexx", 0);
208                 if (error)
209                         return (error);
210         }
211         *ipp = 1;
212         return (0);
213 }
214
215 static __inline void
216 pipe_end_uio(struct pipe *cpipe, int *ipp)
217 {
218         if (*ipp < 0) {
219                 *ipp = 0;
220                 wakeup(ipp);
221         } else {
222                 KKASSERT(*ipp > 0);
223                 *ipp = 0;
224         }
225 }
226
227 /*
228  * The pipe system call for the DTYPE_PIPE type of pipes
229  *
230  * pipe_args(int dummy)
231  *
232  * MPSAFE
233  */
234 int
235 sys_pipe(struct pipe_args *uap)
236 {
237         struct thread *td = curthread;
238         struct filedesc *fdp = td->td_proc->p_fd;
239         struct file *rf, *wf;
240         struct pipe *rpipe, *wpipe;
241         int fd1, fd2, error;
242
243         rpipe = wpipe = NULL;
244         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
245                 pipeclose(rpipe); 
246                 pipeclose(wpipe); 
247                 return (ENFILE);
248         }
249         
250         error = falloc(td->td_lwp, &rf, &fd1);
251         if (error) {
252                 pipeclose(rpipe);
253                 pipeclose(wpipe);
254                 return (error);
255         }
256         uap->sysmsg_fds[0] = fd1;
257
258         /*
259          * Warning: once we've gotten past allocation of the fd for the
260          * read-side, we can only drop the read side via fdrop() in order
261          * to avoid races against processes which manage to dup() the read
262          * side while we are blocked trying to allocate the write side.
263          */
264         rf->f_type = DTYPE_PIPE;
265         rf->f_flag = FREAD | FWRITE;
266         rf->f_ops = &pipeops;
267         rf->f_data = rpipe;
268         error = falloc(td->td_lwp, &wf, &fd2);
269         if (error) {
270                 fsetfd(fdp, NULL, fd1);
271                 fdrop(rf);
272                 /* rpipe has been closed by fdrop(). */
273                 pipeclose(wpipe);
274                 return (error);
275         }
276         wf->f_type = DTYPE_PIPE;
277         wf->f_flag = FREAD | FWRITE;
278         wf->f_ops = &pipeops;
279         wf->f_data = wpipe;
280         uap->sysmsg_fds[1] = fd2;
281
282         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
283                                     M_PIPE, M_WAITOK|M_ZERO);
284         wpipe->pipe_slock = rpipe->pipe_slock;
285         rpipe->pipe_peer = wpipe;
286         wpipe->pipe_peer = rpipe;
287         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
288
289         /*
290          * Once activated the peer relationship remains valid until
291          * both sides are closed.
292          */
293         fsetfd(fdp, rf, fd1);
294         fsetfd(fdp, wf, fd2);
295         fdrop(rf);
296         fdrop(wf);
297
298         return (0);
299 }
300
301 /*
302  * Allocate kva for pipe circular buffer, the space is pageable
303  * This routine will 'realloc' the size of a pipe safely, if it fails
304  * it will retain the old buffer.
305  * If it fails it will return ENOMEM.
306  */
307 static int
308 pipespace(struct pipe *cpipe, int size)
309 {
310         struct vm_object *object;
311         caddr_t buffer;
312         int npages, error;
313
314         npages = round_page(size) / PAGE_SIZE;
315         object = cpipe->pipe_buffer.object;
316
317         /*
318          * [re]create the object if necessary and reserve space for it
319          * in the kernel_map.  The object and memory are pageable.  On
320          * success, free the old resources before assigning the new
321          * ones.
322          */
323         if (object == NULL || object->size != npages) {
324                 object = vm_object_allocate(OBJT_DEFAULT, npages);
325                 buffer = (caddr_t)vm_map_min(&kernel_map);
326
327                 error = vm_map_find(&kernel_map, object, 0,
328                                     (vm_offset_t *)&buffer,
329                                     size, PAGE_SIZE,
330                                     1, VM_MAPTYPE_NORMAL,
331                                     VM_PROT_ALL, VM_PROT_ALL,
332                                     0);
333
334                 if (error != KERN_SUCCESS) {
335                         vm_object_deallocate(object);
336                         return (ENOMEM);
337                 }
338                 pipe_free_kmem(cpipe);
339                 cpipe->pipe_buffer.object = object;
340                 cpipe->pipe_buffer.buffer = buffer;
341                 cpipe->pipe_buffer.size = size;
342                 ++pipe_bkmem_alloc;
343         } else {
344                 ++pipe_bcache_alloc;
345         }
346         cpipe->pipe_buffer.rindex = 0;
347         cpipe->pipe_buffer.windex = 0;
348         return (0);
349 }
350
351 /*
352  * Initialize and allocate VM and memory for pipe, pulling the pipe from
353  * our per-cpu cache if possible.  For now make sure it is sized for the
354  * smaller PIPE_SIZE default.
355  */
356 static int
357 pipe_create(struct pipe **cpipep)
358 {
359         globaldata_t gd = mycpu;
360         struct pipe *cpipe;
361         int error;
362
363         if ((cpipe = gd->gd_pipeq) != NULL) {
364                 gd->gd_pipeq = cpipe->pipe_peer;
365                 --gd->gd_pipeqcount;
366                 cpipe->pipe_peer = NULL;
367                 cpipe->pipe_wantwcnt = 0;
368         } else {
369                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
370         }
371         *cpipep = cpipe;
372         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
373                 return (error);
374         vfs_timestamp(&cpipe->pipe_ctime);
375         cpipe->pipe_atime = cpipe->pipe_ctime;
376         cpipe->pipe_mtime = cpipe->pipe_ctime;
377         lwkt_token_init(&cpipe->pipe_rlock, "piper");
378         lwkt_token_init(&cpipe->pipe_wlock, "pipew");
379         return (0);
380 }
381
382 static int
383 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
384 {
385         struct pipe *rpipe;
386         struct pipe *wpipe;
387         int error;
388         size_t nread = 0;
389         int nbio;
390         u_int size;     /* total bytes available */
391         u_int nsize;    /* total bytes to read */
392         u_int rindex;   /* contiguous bytes available */
393         int notify_writer;
394         int bigread;
395         int bigcount;
396
397         if (uio->uio_resid == 0)
398                 return(0);
399
400         /*
401          * Setup locks, calculate nbio
402          */
403         rpipe = (struct pipe *)fp->f_data;
404         wpipe = rpipe->pipe_peer;
405         lwkt_gettoken(&rpipe->pipe_rlock);
406
407         if (fflags & O_FBLOCKING)
408                 nbio = 0;
409         else if (fflags & O_FNONBLOCKING)
410                 nbio = 1;
411         else if (fp->f_flag & O_NONBLOCK)
412                 nbio = 1;
413         else
414                 nbio = 0;
415
416         /*
417          * Reads are serialized.  Note however that pipe_buffer.buffer and
418          * pipe_buffer.size can change out from under us when the number
419          * of bytes in the buffer are zero due to the write-side doing a
420          * pipespace().
421          */
422         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
423         if (error) {
424                 lwkt_reltoken(&rpipe->pipe_rlock);
425                 return (error);
426         }
427         notify_writer = 0;
428
429         bigread = (uio->uio_resid > 10 * 1024 * 1024);
430         bigcount = 10;
431
432         while (uio->uio_resid) {
433                 /*
434                  * Don't hog the cpu.
435                  */
436                 if (bigread && --bigcount == 0) {
437                         lwkt_user_yield();
438                         bigcount = 10;
439                         if (CURSIG(curthread->td_lwp)) {
440                                 error = EINTR;
441                                 break;
442                         }
443                 }
444
445                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
446                 cpu_lfence();
447                 if (size) {
448                         rindex = rpipe->pipe_buffer.rindex &
449                                  (rpipe->pipe_buffer.size - 1);
450                         nsize = size;
451                         if (nsize > rpipe->pipe_buffer.size - rindex)
452                                 nsize = rpipe->pipe_buffer.size - rindex;
453                         nsize = szmin(nsize, uio->uio_resid);
454
455                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
456                                         nsize, uio);
457                         if (error)
458                                 break;
459                         cpu_mfence();
460                         rpipe->pipe_buffer.rindex += nsize;
461                         nread += nsize;
462
463                         /*
464                          * If the FIFO is still over half full just continue
465                          * and do not try to notify the writer yet.
466                          */
467                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
468                                 notify_writer = 0;
469                                 continue;
470                         }
471
472                         /*
473                          * When the FIFO is less then half full notify any
474                          * waiting writer.  WANTW can be checked while
475                          * holding just the rlock.
476                          */
477                         notify_writer = 1;
478                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
479                                 continue;
480                 }
481
482                 /*
483                  * If the "write-side" was blocked we wake it up.  This code
484                  * is reached either when the buffer is completely emptied
485                  * or if it becomes more then half-empty.
486                  *
487                  * Pipe_state can only be modified if both the rlock and
488                  * wlock are held.
489                  */
490                 if (rpipe->pipe_state & PIPE_WANTW) {
491                         lwkt_gettoken(&rpipe->pipe_wlock);
492                         if (rpipe->pipe_state & PIPE_WANTW) {
493                                 rpipe->pipe_state &= ~PIPE_WANTW;
494                                 lwkt_reltoken(&rpipe->pipe_wlock);
495                                 wakeup(rpipe);
496                         } else {
497                                 lwkt_reltoken(&rpipe->pipe_wlock);
498                         }
499                 }
500
501                 /*
502                  * Pick up our copy loop again if the writer sent data to
503                  * us while we were messing around.
504                  *
505                  * On a SMP box poll up to pipe_delay nanoseconds for new
506                  * data.  Typically a value of 2000 to 4000 is sufficient
507                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
508                  * is used for synchronous communications with small packets,
509                  * and 8000 or so (8uS) will pipeline large buffer xfers
510                  * between cpus over a pipe.
511                  *
512                  * For synchronous communications a hit means doing a
513                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
514                  * where as miss requiring a tsleep/wakeup sequence
515                  * will take 7uS or more.
516                  */
517                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
518                         continue;
519
520 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
521                 if (pipe_delay) {
522                         int64_t tsc_target;
523                         int good = 0;
524
525                         tsc_target = tsc_get_target(pipe_delay);
526                         while (tsc_test_target(tsc_target) == 0) {
527                                 if (rpipe->pipe_buffer.windex !=
528                                     rpipe->pipe_buffer.rindex) {
529                                         good = 1;
530                                         break;
531                                 }
532                         }
533                         if (good)
534                                 continue;
535                 }
536 #endif
537
538                 /*
539                  * Detect EOF condition, do not set error.
540                  */
541                 if (rpipe->pipe_state & PIPE_REOF)
542                         break;
543
544                 /*
545                  * Break if some data was read, or if this was a non-blocking
546                  * read.
547                  */
548                 if (nread > 0)
549                         break;
550
551                 if (nbio) {
552                         error = EAGAIN;
553                         break;
554                 }
555
556                 /*
557                  * Last chance, interlock with WANTR.
558                  */
559                 lwkt_gettoken(&rpipe->pipe_wlock);
560                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
561                 if (size) {
562                         lwkt_reltoken(&rpipe->pipe_wlock);
563                         continue;
564                 }
565
566                 /*
567                  * Retest EOF - acquiring a new token can temporarily release
568                  * tokens already held.
569                  */
570                 if (rpipe->pipe_state & PIPE_REOF) {
571                         lwkt_reltoken(&rpipe->pipe_wlock);
572                         break;
573                 }
574
575                 /*
576                  * If there is no more to read in the pipe, reset its
577                  * pointers to the beginning.  This improves cache hit
578                  * stats.
579                  *
580                  * We need both locks to modify both pointers, and there
581                  * must also not be a write in progress or the uiomove()
582                  * in the write might block and temporarily release
583                  * its wlock, then reacquire and update windex.  We are
584                  * only serialized against reads, not writes.
585                  *
586                  * XXX should we even bother resetting the indices?  It
587                  *     might actually be more cache efficient not to.
588                  */
589                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
590                     rpipe->pipe_wip == 0) {
591                         rpipe->pipe_buffer.rindex = 0;
592                         rpipe->pipe_buffer.windex = 0;
593                 }
594
595                 /*
596                  * Wait for more data.
597                  *
598                  * Pipe_state can only be set if both the rlock and wlock
599                  * are held.
600                  */
601                 rpipe->pipe_state |= PIPE_WANTR;
602                 tsleep_interlock(rpipe, PCATCH);
603                 lwkt_reltoken(&rpipe->pipe_wlock);
604                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
605                 ++pipe_rblocked_count;
606                 if (error)
607                         break;
608         }
609         pipe_end_uio(rpipe, &rpipe->pipe_rip);
610
611         /*
612          * Uptime last access time
613          */
614         if (error == 0 && nread)
615                 vfs_timestamp(&rpipe->pipe_atime);
616
617         /*
618          * If we drained the FIFO more then half way then handle
619          * write blocking hysteresis.
620          *
621          * Note that PIPE_WANTW cannot be set by the writer without
622          * it holding both rlock and wlock, so we can test it
623          * while holding just rlock.
624          */
625         if (notify_writer) {
626                 /*
627                  * Synchronous blocking is done on the pipe involved
628                  */
629                 if (rpipe->pipe_state & PIPE_WANTW) {
630                         lwkt_gettoken(&rpipe->pipe_wlock);
631                         if (rpipe->pipe_state & PIPE_WANTW) {
632                                 rpipe->pipe_state &= ~PIPE_WANTW;
633                                 lwkt_reltoken(&rpipe->pipe_wlock);
634                                 wakeup(rpipe);
635                         } else {
636                                 lwkt_reltoken(&rpipe->pipe_wlock);
637                         }
638                 }
639
640                 /*
641                  * But we may also have to deal with a kqueue which is
642                  * stored on the same pipe as its descriptor, so a
643                  * EVFILT_WRITE event waiting for our side to drain will
644                  * be on the other side.
645                  */
646                 lwkt_gettoken(&wpipe->pipe_wlock);
647                 pipewakeup(wpipe, 0);
648                 lwkt_reltoken(&wpipe->pipe_wlock);
649         }
650         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
651         lwkt_reltoken(&rpipe->pipe_rlock);
652
653         return (error);
654 }
655
656 static int
657 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
658 {
659         int error;
660         int orig_resid;
661         int nbio;
662         struct pipe *wpipe;
663         struct pipe *rpipe;
664         u_int windex;
665         u_int space;
666         u_int wcount;
667         int bigwrite;
668         int bigcount;
669
670         /*
671          * Writes go to the peer.  The peer will always exist.
672          */
673         rpipe = (struct pipe *) fp->f_data;
674         wpipe = rpipe->pipe_peer;
675         lwkt_gettoken(&wpipe->pipe_wlock);
676         if (wpipe->pipe_state & PIPE_WEOF) {
677                 lwkt_reltoken(&wpipe->pipe_wlock);
678                 return (EPIPE);
679         }
680
681         /*
682          * Degenerate case (EPIPE takes prec)
683          */
684         if (uio->uio_resid == 0) {
685                 lwkt_reltoken(&wpipe->pipe_wlock);
686                 return(0);
687         }
688
689         /*
690          * Writes are serialized (start_uio must be called with wlock)
691          */
692         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
693         if (error) {
694                 lwkt_reltoken(&wpipe->pipe_wlock);
695                 return (error);
696         }
697
698         if (fflags & O_FBLOCKING)
699                 nbio = 0;
700         else if (fflags & O_FNONBLOCKING)
701                 nbio = 1;
702         else if (fp->f_flag & O_NONBLOCK)
703                 nbio = 1;
704         else
705                 nbio = 0;
706
707         /*
708          * If it is advantageous to resize the pipe buffer, do
709          * so.  We are write-serialized so we can block safely.
710          */
711         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
712             (pipe_nbig < pipe_maxbig) &&
713             wpipe->pipe_wantwcnt > 4 &&
714             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
715                 /* 
716                  * Recheck after lock.
717                  */
718                 lwkt_gettoken(&wpipe->pipe_rlock);
719                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
720                     (pipe_nbig < pipe_maxbig) &&
721                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
722                         atomic_add_int(&pipe_nbig, 1);
723                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
724                                 ++pipe_bigcount;
725                         else
726                                 atomic_subtract_int(&pipe_nbig, 1);
727                 }
728                 lwkt_reltoken(&wpipe->pipe_rlock);
729         }
730
731         orig_resid = uio->uio_resid;
732         wcount = 0;
733
734         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
735         bigcount = 10;
736
737         while (uio->uio_resid) {
738                 if (wpipe->pipe_state & PIPE_WEOF) {
739                         error = EPIPE;
740                         break;
741                 }
742
743                 /*
744                  * Don't hog the cpu.
745                  */
746                 if (bigwrite && --bigcount == 0) {
747                         lwkt_user_yield();
748                         bigcount = 10;
749                         if (CURSIG(curthread->td_lwp)) {
750                                 error = EINTR;
751                                 break;
752                         }
753                 }
754
755                 windex = wpipe->pipe_buffer.windex &
756                          (wpipe->pipe_buffer.size - 1);
757                 space = wpipe->pipe_buffer.size -
758                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
759                 cpu_lfence();
760
761                 /* Writes of size <= PIPE_BUF must be atomic. */
762                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
763                         space = 0;
764
765                 /* 
766                  * Write to fill, read size handles write hysteresis.  Also
767                  * additional restrictions can cause select-based non-blocking
768                  * writes to spin.
769                  */
770                 if (space > 0) {
771                         u_int segsize;
772
773                         /*
774                          * Transfer size is minimum of uio transfer
775                          * and free space in pipe buffer.
776                          *
777                          * Limit each uiocopy to no more then PIPE_SIZE
778                          * so we can keep the gravy train going on a
779                          * SMP box.  This doubles the performance for
780                          * write sizes > 16K.  Otherwise large writes
781                          * wind up doing an inefficient synchronous
782                          * ping-pong.
783                          */
784                         space = szmin(space, uio->uio_resid);
785                         if (space > PIPE_SIZE)
786                                 space = PIPE_SIZE;
787
788                         /*
789                          * First segment to transfer is minimum of
790                          * transfer size and contiguous space in
791                          * pipe buffer.  If first segment to transfer
792                          * is less than the transfer size, we've got
793                          * a wraparound in the buffer.
794                          */
795                         segsize = wpipe->pipe_buffer.size - windex;
796                         if (segsize > space)
797                                 segsize = space;
798
799 #ifdef SMP
800                         /*
801                          * If this is the first loop and the reader is
802                          * blocked, do a preemptive wakeup of the reader.
803                          *
804                          * On SMP the IPI latency plus the wlock interlock
805                          * on the reader side is the fastest way to get the
806                          * reader going.  (The scheduler will hard loop on
807                          * lock tokens).
808                          *
809                          * NOTE: We can't clear WANTR here without acquiring
810                          * the rlock, which we don't want to do here!
811                          */
812                         if ((wpipe->pipe_state & PIPE_WANTR))
813                                 wakeup(wpipe);
814 #endif
815
816                         /*
817                          * Transfer segment, which may include a wrap-around.
818                          * Update windex to account for both all in one go
819                          * so the reader can read() the data atomically.
820                          */
821                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
822                                         segsize, uio);
823                         if (error == 0 && segsize < space) {
824                                 segsize = space - segsize;
825                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
826                                                 segsize, uio);
827                         }
828                         if (error)
829                                 break;
830                         cpu_mfence();
831                         wpipe->pipe_buffer.windex += space;
832                         wcount += space;
833                         continue;
834                 }
835
836                 /*
837                  * We need both the rlock and the wlock to interlock against
838                  * the EOF, WANTW, and size checks, and to modify pipe_state.
839                  *
840                  * These are token locks so we do not have to worry about
841                  * deadlocks.
842                  */
843                 lwkt_gettoken(&wpipe->pipe_rlock);
844
845                 /*
846                  * If the "read-side" has been blocked, wake it up now
847                  * and yield to let it drain synchronously rather
848                  * then block.
849                  */
850                 if (wpipe->pipe_state & PIPE_WANTR) {
851                         wpipe->pipe_state &= ~PIPE_WANTR;
852                         wakeup(wpipe);
853                 }
854
855                 /*
856                  * don't block on non-blocking I/O
857                  */
858                 if (nbio) {
859                         lwkt_reltoken(&wpipe->pipe_rlock);
860                         error = EAGAIN;
861                         break;
862                 }
863
864                 /*
865                  * re-test whether we have to block in the writer after
866                  * acquiring both locks, in case the reader opened up
867                  * some space.
868                  */
869                 space = wpipe->pipe_buffer.size -
870                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
871                 cpu_lfence();
872                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
873                         space = 0;
874
875                 /*
876                  * Retest EOF - acquiring a new token can temporarily release
877                  * tokens already held.
878                  */
879                 if (wpipe->pipe_state & PIPE_WEOF) {
880                         lwkt_reltoken(&wpipe->pipe_rlock);
881                         error = EPIPE;
882                         break;
883                 }
884
885                 /*
886                  * We have no more space and have something to offer,
887                  * wake up select/poll/kq.
888                  */
889                 if (space == 0) {
890                         wpipe->pipe_state |= PIPE_WANTW;
891                         ++wpipe->pipe_wantwcnt;
892                         pipewakeup(wpipe, 1);
893                         if (wpipe->pipe_state & PIPE_WANTW)
894                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
895                         ++pipe_wblocked_count;
896                 }
897                 lwkt_reltoken(&wpipe->pipe_rlock);
898
899                 /*
900                  * Break out if we errored or the read side wants us to go
901                  * away.
902                  */
903                 if (error)
904                         break;
905                 if (wpipe->pipe_state & PIPE_WEOF) {
906                         error = EPIPE;
907                         break;
908                 }
909         }
910         pipe_end_uio(wpipe, &wpipe->pipe_wip);
911
912         /*
913          * If we have put any characters in the buffer, we wake up
914          * the reader.
915          *
916          * Both rlock and wlock are required to be able to modify pipe_state.
917          */
918         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
919                 if (wpipe->pipe_state & PIPE_WANTR) {
920                         lwkt_gettoken(&wpipe->pipe_rlock);
921                         if (wpipe->pipe_state & PIPE_WANTR) {
922                                 wpipe->pipe_state &= ~PIPE_WANTR;
923                                 lwkt_reltoken(&wpipe->pipe_rlock);
924                                 wakeup(wpipe);
925                         } else {
926                                 lwkt_reltoken(&wpipe->pipe_rlock);
927                         }
928                 }
929                 lwkt_gettoken(&wpipe->pipe_rlock);
930                 pipewakeup(wpipe, 1);
931                 lwkt_reltoken(&wpipe->pipe_rlock);
932         }
933
934         /*
935          * Don't return EPIPE if I/O was successful
936          */
937         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
938             (uio->uio_resid == 0) &&
939             (error == EPIPE)) {
940                 error = 0;
941         }
942
943         if (error == 0)
944                 vfs_timestamp(&wpipe->pipe_mtime);
945
946         /*
947          * We have something to offer,
948          * wake up select/poll/kq.
949          */
950         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
951         lwkt_reltoken(&wpipe->pipe_wlock);
952         return (error);
953 }
954
955 /*
956  * we implement a very minimal set of ioctls for compatibility with sockets.
957  */
958 int
959 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
960            struct ucred *cred, struct sysmsg *msg)
961 {
962         struct pipe *mpipe;
963         int error;
964
965         mpipe = (struct pipe *)fp->f_data;
966
967         lwkt_gettoken(&mpipe->pipe_rlock);
968         lwkt_gettoken(&mpipe->pipe_wlock);
969
970         switch (cmd) {
971         case FIOASYNC:
972                 if (*(int *)data) {
973                         mpipe->pipe_state |= PIPE_ASYNC;
974                 } else {
975                         mpipe->pipe_state &= ~PIPE_ASYNC;
976                 }
977                 error = 0;
978                 break;
979         case FIONREAD:
980                 *(int *)data = mpipe->pipe_buffer.windex -
981                                 mpipe->pipe_buffer.rindex;
982                 error = 0;
983                 break;
984         case FIOSETOWN:
985                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
986                 break;
987         case FIOGETOWN:
988                 *(int *)data = fgetown(&mpipe->pipe_sigio);
989                 error = 0;
990                 break;
991         case TIOCSPGRP:
992                 /* This is deprecated, FIOSETOWN should be used instead. */
993                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
994                 break;
995
996         case TIOCGPGRP:
997                 /* This is deprecated, FIOGETOWN should be used instead. */
998                 *(int *)data = -fgetown(&mpipe->pipe_sigio);
999                 error = 0;
1000                 break;
1001         default:
1002                 error = ENOTTY;
1003                 break;
1004         }
1005         lwkt_reltoken(&mpipe->pipe_wlock);
1006         lwkt_reltoken(&mpipe->pipe_rlock);
1007
1008         return (error);
1009 }
1010
1011 /*
1012  * MPSAFE
1013  */
1014 static int
1015 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1016 {
1017         struct pipe *pipe;
1018
1019         pipe = (struct pipe *)fp->f_data;
1020
1021         bzero((caddr_t)ub, sizeof(*ub));
1022         ub->st_mode = S_IFIFO;
1023         ub->st_blksize = pipe->pipe_buffer.size;
1024         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1025         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1026         ub->st_atimespec = pipe->pipe_atime;
1027         ub->st_mtimespec = pipe->pipe_mtime;
1028         ub->st_ctimespec = pipe->pipe_ctime;
1029         /*
1030          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1031          * st_flags, st_gen.
1032          * XXX (st_dev, st_ino) should be unique.
1033          */
1034         return (0);
1035 }
1036
1037 static int
1038 pipe_close(struct file *fp)
1039 {
1040         struct pipe *cpipe;
1041
1042         cpipe = (struct pipe *)fp->f_data;
1043         fp->f_ops = &badfileops;
1044         fp->f_data = NULL;
1045         funsetown(&cpipe->pipe_sigio);
1046         pipeclose(cpipe);
1047         return (0);
1048 }
1049
1050 /*
1051  * Shutdown one or both directions of a full-duplex pipe.
1052  */
1053 static int
1054 pipe_shutdown(struct file *fp, int how)
1055 {
1056         struct pipe *rpipe;
1057         struct pipe *wpipe;
1058         int error = EPIPE;
1059
1060         rpipe = (struct pipe *)fp->f_data;
1061         wpipe = rpipe->pipe_peer;
1062
1063         /*
1064          * We modify pipe_state on both pipes, which means we need
1065          * all four tokens!
1066          */
1067         lwkt_gettoken(&rpipe->pipe_rlock);
1068         lwkt_gettoken(&rpipe->pipe_wlock);
1069         lwkt_gettoken(&wpipe->pipe_rlock);
1070         lwkt_gettoken(&wpipe->pipe_wlock);
1071
1072         switch(how) {
1073         case SHUT_RDWR:
1074         case SHUT_RD:
1075                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1076                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1077                 if (rpipe->pipe_state & PIPE_WANTR) {
1078                         rpipe->pipe_state &= ~PIPE_WANTR;
1079                         wakeup(rpipe);
1080                 }
1081                 if (rpipe->pipe_state & PIPE_WANTW) {
1082                         rpipe->pipe_state &= ~PIPE_WANTW;
1083                         wakeup(rpipe);
1084                 }
1085                 error = 0;
1086                 if (how == SHUT_RD)
1087                         break;
1088                 /* fall through */
1089         case SHUT_WR:
1090                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1091                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1092                 if (wpipe->pipe_state & PIPE_WANTR) {
1093                         wpipe->pipe_state &= ~PIPE_WANTR;
1094                         wakeup(wpipe);
1095                 }
1096                 if (wpipe->pipe_state & PIPE_WANTW) {
1097                         wpipe->pipe_state &= ~PIPE_WANTW;
1098                         wakeup(wpipe);
1099                 }
1100                 error = 0;
1101                 break;
1102         }
1103         pipewakeup(rpipe, 1);
1104         pipewakeup(wpipe, 1);
1105
1106         lwkt_reltoken(&wpipe->pipe_wlock);
1107         lwkt_reltoken(&wpipe->pipe_rlock);
1108         lwkt_reltoken(&rpipe->pipe_wlock);
1109         lwkt_reltoken(&rpipe->pipe_rlock);
1110
1111         return (error);
1112 }
1113
1114 static void
1115 pipe_free_kmem(struct pipe *cpipe)
1116 {
1117         if (cpipe->pipe_buffer.buffer != NULL) {
1118                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1119                         atomic_subtract_int(&pipe_nbig, 1);
1120                 kmem_free(&kernel_map,
1121                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1122                         cpipe->pipe_buffer.size);
1123                 cpipe->pipe_buffer.buffer = NULL;
1124                 cpipe->pipe_buffer.object = NULL;
1125         }
1126 }
1127
1128 /*
1129  * Close the pipe.  The slock must be held to interlock against simultanious
1130  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1131  */
1132 static void
1133 pipeclose(struct pipe *cpipe)
1134 {
1135         globaldata_t gd;
1136         struct pipe *ppipe;
1137
1138         if (cpipe == NULL)
1139                 return;
1140
1141         /*
1142          * The slock may not have been allocated yet (close during
1143          * initialization)
1144          *
1145          * We need both the read and write tokens to modify pipe_state.
1146          */
1147         if (cpipe->pipe_slock)
1148                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1149         lwkt_gettoken(&cpipe->pipe_rlock);
1150         lwkt_gettoken(&cpipe->pipe_wlock);
1151
1152         /*
1153          * Set our state, wakeup anyone waiting in select/poll/kq, and
1154          * wakeup anyone blocked on our pipe.
1155          */
1156         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1157         pipewakeup(cpipe, 1);
1158         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1159                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1160                 wakeup(cpipe);
1161         }
1162
1163         /*
1164          * Disconnect from peer.
1165          */
1166         if ((ppipe = cpipe->pipe_peer) != NULL) {
1167                 lwkt_gettoken(&ppipe->pipe_rlock);
1168                 lwkt_gettoken(&ppipe->pipe_wlock);
1169                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1170                 pipewakeup(ppipe, 1);
1171                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1172                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1173                         wakeup(ppipe);
1174                 }
1175                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1176                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1177                 lwkt_reltoken(&ppipe->pipe_wlock);
1178                 lwkt_reltoken(&ppipe->pipe_rlock);
1179         }
1180
1181         /*
1182          * If the peer is also closed we can free resources for both
1183          * sides, otherwise we leave our side intact to deal with any
1184          * races (since we only have the slock).
1185          */
1186         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1187                 cpipe->pipe_peer = NULL;
1188                 ppipe->pipe_peer = NULL;
1189                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1190                 pipeclose(ppipe);
1191                 ppipe = NULL;
1192         }
1193
1194         lwkt_reltoken(&cpipe->pipe_wlock);
1195         lwkt_reltoken(&cpipe->pipe_rlock);
1196         if (cpipe->pipe_slock)
1197                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1198
1199         /*
1200          * If we disassociated from our peer we can free resources
1201          */
1202         if (ppipe == NULL) {
1203                 gd = mycpu;
1204                 if (cpipe->pipe_slock) {
1205                         kfree(cpipe->pipe_slock, M_PIPE);
1206                         cpipe->pipe_slock = NULL;
1207                 }
1208                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1209                     cpipe->pipe_buffer.size != PIPE_SIZE
1210                 ) {
1211                         pipe_free_kmem(cpipe);
1212                         kfree(cpipe, M_PIPE);
1213                 } else {
1214                         cpipe->pipe_state = 0;
1215                         cpipe->pipe_peer = gd->gd_pipeq;
1216                         gd->gd_pipeq = cpipe;
1217                         ++gd->gd_pipeqcount;
1218                 }
1219         }
1220 }
1221
1222 static int
1223 pipe_kqfilter(struct file *fp, struct knote *kn)
1224 {
1225         struct pipe *cpipe;
1226
1227         cpipe = (struct pipe *)kn->kn_fp->f_data;
1228
1229         switch (kn->kn_filter) {
1230         case EVFILT_READ:
1231                 kn->kn_fop = &pipe_rfiltops;
1232                 break;
1233         case EVFILT_WRITE:
1234                 kn->kn_fop = &pipe_wfiltops;
1235                 if (cpipe->pipe_peer == NULL) {
1236                         /* other end of pipe has been closed */
1237                         return (EPIPE);
1238                 }
1239                 break;
1240         default:
1241                 return (EOPNOTSUPP);
1242         }
1243         kn->kn_hook = (caddr_t)cpipe;
1244
1245         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1246
1247         return (0);
1248 }
1249
1250 static void
1251 filt_pipedetach(struct knote *kn)
1252 {
1253         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1254
1255         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1256 }
1257
1258 /*ARGSUSED*/
1259 static int
1260 filt_piperead(struct knote *kn, long hint)
1261 {
1262         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1263         int ready = 0;
1264
1265         lwkt_gettoken(&rpipe->pipe_rlock);
1266         lwkt_gettoken(&rpipe->pipe_wlock);
1267
1268         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1269
1270         if (rpipe->pipe_state & PIPE_REOF) {
1271                 /*
1272                  * Only set NODATA if all data has been exhausted
1273                  */
1274                 if (kn->kn_data == 0)
1275                         kn->kn_flags |= EV_NODATA;
1276                 kn->kn_flags |= EV_EOF; 
1277                 ready = 1;
1278         }
1279
1280         lwkt_reltoken(&rpipe->pipe_wlock);
1281         lwkt_reltoken(&rpipe->pipe_rlock);
1282
1283         if (!ready)
1284                 ready = kn->kn_data > 0;
1285
1286         return (ready);
1287 }
1288
1289 /*ARGSUSED*/
1290 static int
1291 filt_pipewrite(struct knote *kn, long hint)
1292 {
1293         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1294         struct pipe *wpipe = rpipe->pipe_peer;
1295         int ready = 0;
1296
1297         kn->kn_data = 0;
1298         if (wpipe == NULL) {
1299                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1300                 return (1);
1301         }
1302
1303         lwkt_gettoken(&wpipe->pipe_rlock);
1304         lwkt_gettoken(&wpipe->pipe_wlock);
1305
1306         if (wpipe->pipe_state & PIPE_WEOF) {
1307                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1308                 ready = 1;
1309         }
1310
1311         if (!ready)
1312                 kn->kn_data = wpipe->pipe_buffer.size -
1313                               (wpipe->pipe_buffer.windex -
1314                                wpipe->pipe_buffer.rindex);
1315
1316         lwkt_reltoken(&wpipe->pipe_wlock);
1317         lwkt_reltoken(&wpipe->pipe_rlock);
1318
1319         if (!ready)
1320                 ready = kn->kn_data >= PIPE_BUF;
1321
1322         return (ready);
1323 }