kernel - Add reapctl() system call for managing sub-processes
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  */
21
22 /*
23  * This file contains a high-performance replacement for the socket-based
24  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
25  * all features of sockets, but does do everything that pipes normally
26  * do.
27  */
28 #include <sys/param.h>
29 #include <sys/systm.h>
30 #include <sys/kernel.h>
31 #include <sys/proc.h>
32 #include <sys/fcntl.h>
33 #include <sys/file.h>
34 #include <sys/filedesc.h>
35 #include <sys/filio.h>
36 #include <sys/ttycom.h>
37 #include <sys/stat.h>
38 #include <sys/signalvar.h>
39 #include <sys/sysproto.h>
40 #include <sys/pipe.h>
41 #include <sys/vnode.h>
42 #include <sys/uio.h>
43 #include <sys/event.h>
44 #include <sys/globaldata.h>
45 #include <sys/module.h>
46 #include <sys/malloc.h>
47 #include <sys/sysctl.h>
48 #include <sys/socket.h>
49
50 #include <vm/vm.h>
51 #include <vm/vm_param.h>
52 #include <sys/lock.h>
53 #include <vm/vm_object.h>
54 #include <vm/vm_kern.h>
55 #include <vm/vm_extern.h>
56 #include <vm/pmap.h>
57 #include <vm/vm_map.h>
58 #include <vm/vm_page.h>
59 #include <vm/vm_zone.h>
60
61 #include <sys/file2.h>
62 #include <sys/signal2.h>
63
64 #include <machine/cpufunc.h>
65
66 /*
67  * interfaces to the outside world
68  */
69 static int pipe_read (struct file *fp, struct uio *uio, 
70                 struct ucred *cred, int flags);
71 static int pipe_write (struct file *fp, struct uio *uio, 
72                 struct ucred *cred, int flags);
73 static int pipe_close (struct file *fp);
74 static int pipe_shutdown (struct file *fp, int how);
75 static int pipe_kqfilter (struct file *fp, struct knote *kn);
76 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
77 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
78                 struct ucred *cred, struct sysmsg *msg);
79
80 static struct fileops pipeops = {
81         .fo_read = pipe_read, 
82         .fo_write = pipe_write,
83         .fo_ioctl = pipe_ioctl,
84         .fo_kqfilter = pipe_kqfilter,
85         .fo_stat = pipe_stat,
86         .fo_close = pipe_close,
87         .fo_shutdown = pipe_shutdown
88 };
89
90 static void     filt_pipedetach(struct knote *kn);
91 static int      filt_piperead(struct knote *kn, long hint);
92 static int      filt_pipewrite(struct knote *kn, long hint);
93
94 static struct filterops pipe_rfiltops =
95         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
96 static struct filterops pipe_wfiltops =
97         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
98
99 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
100
101 /*
102  * Default pipe buffer size(s), this can be kind-of large now because pipe
103  * space is pageable.  The pipe code will try to maintain locality of
104  * reference for performance reasons, so small amounts of outstanding I/O
105  * will not wipe the cache.
106  */
107 #define MINPIPESIZE (PIPE_SIZE/3)
108 #define MAXPIPESIZE (2*PIPE_SIZE/3)
109
110 /*
111  * Limit the number of "big" pipes
112  */
113 #define LIMITBIGPIPES   64
114 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
115
116 static int pipe_maxbig = LIMITBIGPIPES;
117 static int pipe_maxcache = PIPEQ_MAX_CACHE;
118 static int pipe_bigcount;
119 static int pipe_nbig;
120 static int pipe_bcache_alloc;
121 static int pipe_bkmem_alloc;
122 static int pipe_rblocked_count;
123 static int pipe_wblocked_count;
124
125 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
126 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
127         CTLFLAG_RD, &pipe_nbig, 0, "number of big pipes allocated");
128 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
129         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
130 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
131         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
132 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
133         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
134 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
135         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
136 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
137         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
138 static int pipe_delay = 5000;   /* 5uS default */
139 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
140         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
141 #if !defined(NO_PIPE_SYSCTL_STATS)
142 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
143         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
144 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
145         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
146 #endif
147
148 /*
149  * Auto-size pipe cache to reduce kmem allocations and frees.
150  */
151 static
152 void
153 pipeinit(void *dummy)
154 {
155         size_t mbytes = kmem_lim_size();
156
157         if (pipe_maxbig == LIMITBIGPIPES) {
158                 if (mbytes >= 7 * 1024)
159                         pipe_maxbig *= 2;
160                 if (mbytes >= 15 * 1024)
161                         pipe_maxbig *= 2;
162         }
163         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
164                 if (mbytes >= 7 * 1024)
165                         pipe_maxcache *= 2;
166                 if (mbytes >= 15 * 1024)
167                         pipe_maxcache *= 2;
168         }
169 }
170 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL)
171
172 static void pipeclose (struct pipe *cpipe);
173 static void pipe_free_kmem (struct pipe *cpipe);
174 static int pipe_create (struct pipe **cpipep);
175 static int pipespace (struct pipe *cpipe, int size);
176
177 static __inline void
178 pipewakeup(struct pipe *cpipe, int dosigio)
179 {
180         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
181                 lwkt_gettoken(&sigio_token);
182                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
183                 lwkt_reltoken(&sigio_token);
184         }
185         KNOTE(&cpipe->pipe_kq.ki_note, 0);
186 }
187
188 /*
189  * These routines are called before and after a UIO.  The UIO
190  * may block, causing our held tokens to be lost temporarily.
191  *
192  * We use these routines to serialize reads against other reads
193  * and writes against other writes.
194  *
195  * The read token is held on entry so *ipp does not race.
196  */
197 static __inline int
198 pipe_start_uio(struct pipe *cpipe, int *ipp)
199 {
200         int error;
201
202         while (*ipp) {
203                 *ipp = -1;
204                 error = tsleep(ipp, PCATCH, "pipexx", 0);
205                 if (error)
206                         return (error);
207         }
208         *ipp = 1;
209         return (0);
210 }
211
212 static __inline void
213 pipe_end_uio(struct pipe *cpipe, int *ipp)
214 {
215         if (*ipp < 0) {
216                 *ipp = 0;
217                 wakeup(ipp);
218         } else {
219                 KKASSERT(*ipp > 0);
220                 *ipp = 0;
221         }
222 }
223
224 /*
225  * The pipe system call for the DTYPE_PIPE type of pipes
226  *
227  * pipe_args(int dummy)
228  *
229  * MPSAFE
230  */
231 int
232 sys_pipe(struct pipe_args *uap)
233 {
234         struct thread *td = curthread;
235         struct filedesc *fdp = td->td_proc->p_fd;
236         struct file *rf, *wf;
237         struct pipe *rpipe, *wpipe;
238         int fd1, fd2, error;
239
240         rpipe = wpipe = NULL;
241         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
242                 pipeclose(rpipe); 
243                 pipeclose(wpipe); 
244                 return (ENFILE);
245         }
246         
247         error = falloc(td->td_lwp, &rf, &fd1);
248         if (error) {
249                 pipeclose(rpipe);
250                 pipeclose(wpipe);
251                 return (error);
252         }
253         uap->sysmsg_fds[0] = fd1;
254
255         /*
256          * Warning: once we've gotten past allocation of the fd for the
257          * read-side, we can only drop the read side via fdrop() in order
258          * to avoid races against processes which manage to dup() the read
259          * side while we are blocked trying to allocate the write side.
260          */
261         rf->f_type = DTYPE_PIPE;
262         rf->f_flag = FREAD | FWRITE;
263         rf->f_ops = &pipeops;
264         rf->f_data = rpipe;
265         error = falloc(td->td_lwp, &wf, &fd2);
266         if (error) {
267                 fsetfd(fdp, NULL, fd1);
268                 fdrop(rf);
269                 /* rpipe has been closed by fdrop(). */
270                 pipeclose(wpipe);
271                 return (error);
272         }
273         wf->f_type = DTYPE_PIPE;
274         wf->f_flag = FREAD | FWRITE;
275         wf->f_ops = &pipeops;
276         wf->f_data = wpipe;
277         uap->sysmsg_fds[1] = fd2;
278
279         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
280                                     M_PIPE, M_WAITOK|M_ZERO);
281         wpipe->pipe_slock = rpipe->pipe_slock;
282         rpipe->pipe_peer = wpipe;
283         wpipe->pipe_peer = rpipe;
284         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
285
286         /*
287          * Once activated the peer relationship remains valid until
288          * both sides are closed.
289          */
290         fsetfd(fdp, rf, fd1);
291         fsetfd(fdp, wf, fd2);
292         fdrop(rf);
293         fdrop(wf);
294
295         return (0);
296 }
297
298 /*
299  * Allocate kva for pipe circular buffer, the space is pageable
300  * This routine will 'realloc' the size of a pipe safely, if it fails
301  * it will retain the old buffer.
302  * If it fails it will return ENOMEM.
303  */
304 static int
305 pipespace(struct pipe *cpipe, int size)
306 {
307         struct vm_object *object;
308         caddr_t buffer;
309         int npages, error;
310
311         npages = round_page(size) / PAGE_SIZE;
312         object = cpipe->pipe_buffer.object;
313
314         /*
315          * [re]create the object if necessary and reserve space for it
316          * in the kernel_map.  The object and memory are pageable.  On
317          * success, free the old resources before assigning the new
318          * ones.
319          */
320         if (object == NULL || object->size != npages) {
321                 object = vm_object_allocate(OBJT_DEFAULT, npages);
322                 buffer = (caddr_t)vm_map_min(&kernel_map);
323
324                 error = vm_map_find(&kernel_map, object, NULL,
325                                     0, (vm_offset_t *)&buffer, size,
326                                     PAGE_SIZE,
327                                     1, VM_MAPTYPE_NORMAL,
328                                     VM_PROT_ALL, VM_PROT_ALL, 0);
329
330                 if (error != KERN_SUCCESS) {
331                         vm_object_deallocate(object);
332                         return (ENOMEM);
333                 }
334                 pipe_free_kmem(cpipe);
335                 cpipe->pipe_buffer.object = object;
336                 cpipe->pipe_buffer.buffer = buffer;
337                 cpipe->pipe_buffer.size = size;
338                 ++pipe_bkmem_alloc;
339         } else {
340                 ++pipe_bcache_alloc;
341         }
342         cpipe->pipe_buffer.rindex = 0;
343         cpipe->pipe_buffer.windex = 0;
344         return (0);
345 }
346
347 /*
348  * Initialize and allocate VM and memory for pipe, pulling the pipe from
349  * our per-cpu cache if possible.  For now make sure it is sized for the
350  * smaller PIPE_SIZE default.
351  */
352 static int
353 pipe_create(struct pipe **cpipep)
354 {
355         globaldata_t gd = mycpu;
356         struct pipe *cpipe;
357         int error;
358
359         if ((cpipe = gd->gd_pipeq) != NULL) {
360                 gd->gd_pipeq = cpipe->pipe_peer;
361                 --gd->gd_pipeqcount;
362                 cpipe->pipe_peer = NULL;
363                 cpipe->pipe_wantwcnt = 0;
364         } else {
365                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
366         }
367         *cpipep = cpipe;
368         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
369                 return (error);
370         vfs_timestamp(&cpipe->pipe_ctime);
371         cpipe->pipe_atime = cpipe->pipe_ctime;
372         cpipe->pipe_mtime = cpipe->pipe_ctime;
373         lwkt_token_init(&cpipe->pipe_rlock, "piper");
374         lwkt_token_init(&cpipe->pipe_wlock, "pipew");
375         return (0);
376 }
377
378 static int
379 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
380 {
381         struct pipe *rpipe;
382         struct pipe *wpipe;
383         int error;
384         size_t nread = 0;
385         int nbio;
386         u_int size;     /* total bytes available */
387         u_int nsize;    /* total bytes to read */
388         u_int rindex;   /* contiguous bytes available */
389         int notify_writer;
390         int bigread;
391         int bigcount;
392
393         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
394
395         if (uio->uio_resid == 0)
396                 return(0);
397
398         /*
399          * Setup locks, calculate nbio
400          */
401         rpipe = (struct pipe *)fp->f_data;
402         wpipe = rpipe->pipe_peer;
403         lwkt_gettoken(&rpipe->pipe_rlock);
404
405         if (fflags & O_FBLOCKING)
406                 nbio = 0;
407         else if (fflags & O_FNONBLOCKING)
408                 nbio = 1;
409         else if (fp->f_flag & O_NONBLOCK)
410                 nbio = 1;
411         else
412                 nbio = 0;
413
414         /*
415          * Reads are serialized.  Note however that pipe_buffer.buffer and
416          * pipe_buffer.size can change out from under us when the number
417          * of bytes in the buffer are zero due to the write-side doing a
418          * pipespace().
419          */
420         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
421         if (error) {
422                 lwkt_reltoken(&rpipe->pipe_rlock);
423                 return (error);
424         }
425         notify_writer = 0;
426
427         bigread = (uio->uio_resid > 10 * 1024 * 1024);
428         bigcount = 10;
429
430         while (uio->uio_resid) {
431                 /*
432                  * Don't hog the cpu.
433                  */
434                 if (bigread && --bigcount == 0) {
435                         lwkt_user_yield();
436                         bigcount = 10;
437                         if (CURSIG(curthread->td_lwp)) {
438                                 error = EINTR;
439                                 break;
440                         }
441                 }
442
443                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
444                 cpu_lfence();
445                 if (size) {
446                         rindex = rpipe->pipe_buffer.rindex &
447                                  (rpipe->pipe_buffer.size - 1);
448                         nsize = size;
449                         if (nsize > rpipe->pipe_buffer.size - rindex)
450                                 nsize = rpipe->pipe_buffer.size - rindex;
451                         nsize = szmin(nsize, uio->uio_resid);
452
453                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
454                                         nsize, uio);
455                         if (error)
456                                 break;
457                         cpu_mfence();
458                         rpipe->pipe_buffer.rindex += nsize;
459                         nread += nsize;
460
461                         /*
462                          * If the FIFO is still over half full just continue
463                          * and do not try to notify the writer yet.
464                          */
465                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
466                                 notify_writer = 0;
467                                 continue;
468                         }
469
470                         /*
471                          * When the FIFO is less then half full notify any
472                          * waiting writer.  WANTW can be checked while
473                          * holding just the rlock.
474                          */
475                         notify_writer = 1;
476                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
477                                 continue;
478                 }
479
480                 /*
481                  * If the "write-side" was blocked we wake it up.  This code
482                  * is reached either when the buffer is completely emptied
483                  * or if it becomes more then half-empty.
484                  *
485                  * Pipe_state can only be modified if both the rlock and
486                  * wlock are held.
487                  */
488                 if (rpipe->pipe_state & PIPE_WANTW) {
489                         lwkt_gettoken(&rpipe->pipe_wlock);
490                         if (rpipe->pipe_state & PIPE_WANTW) {
491                                 rpipe->pipe_state &= ~PIPE_WANTW;
492                                 lwkt_reltoken(&rpipe->pipe_wlock);
493                                 wakeup(rpipe);
494                         } else {
495                                 lwkt_reltoken(&rpipe->pipe_wlock);
496                         }
497                 }
498
499                 /*
500                  * Pick up our copy loop again if the writer sent data to
501                  * us while we were messing around.
502                  *
503                  * On a SMP box poll up to pipe_delay nanoseconds for new
504                  * data.  Typically a value of 2000 to 4000 is sufficient
505                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
506                  * is used for synchronous communications with small packets,
507                  * and 8000 or so (8uS) will pipeline large buffer xfers
508                  * between cpus over a pipe.
509                  *
510                  * For synchronous communications a hit means doing a
511                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
512                  * where as miss requiring a tsleep/wakeup sequence
513                  * will take 7uS or more.
514                  */
515                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
516                         continue;
517
518 #ifdef _RDTSC_SUPPORTED_
519                 if (pipe_delay) {
520                         int64_t tsc_target;
521                         int good = 0;
522
523                         tsc_target = tsc_get_target(pipe_delay);
524                         while (tsc_test_target(tsc_target) == 0) {
525                                 if (rpipe->pipe_buffer.windex !=
526                                     rpipe->pipe_buffer.rindex) {
527                                         good = 1;
528                                         break;
529                                 }
530                         }
531                         if (good)
532                                 continue;
533                 }
534 #endif
535
536                 /*
537                  * Detect EOF condition, do not set error.
538                  */
539                 if (rpipe->pipe_state & PIPE_REOF)
540                         break;
541
542                 /*
543                  * Break if some data was read, or if this was a non-blocking
544                  * read.
545                  */
546                 if (nread > 0)
547                         break;
548
549                 if (nbio) {
550                         error = EAGAIN;
551                         break;
552                 }
553
554                 /*
555                  * Last chance, interlock with WANTR.
556                  */
557                 lwkt_gettoken(&rpipe->pipe_wlock);
558                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
559                 if (size) {
560                         lwkt_reltoken(&rpipe->pipe_wlock);
561                         continue;
562                 }
563
564                 /*
565                  * Retest EOF - acquiring a new token can temporarily release
566                  * tokens already held.
567                  */
568                 if (rpipe->pipe_state & PIPE_REOF) {
569                         lwkt_reltoken(&rpipe->pipe_wlock);
570                         break;
571                 }
572
573                 /*
574                  * If there is no more to read in the pipe, reset its
575                  * pointers to the beginning.  This improves cache hit
576                  * stats.
577                  *
578                  * We need both locks to modify both pointers, and there
579                  * must also not be a write in progress or the uiomove()
580                  * in the write might block and temporarily release
581                  * its wlock, then reacquire and update windex.  We are
582                  * only serialized against reads, not writes.
583                  *
584                  * XXX should we even bother resetting the indices?  It
585                  *     might actually be more cache efficient not to.
586                  */
587                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
588                     rpipe->pipe_wip == 0) {
589                         rpipe->pipe_buffer.rindex = 0;
590                         rpipe->pipe_buffer.windex = 0;
591                 }
592
593                 /*
594                  * Wait for more data.
595                  *
596                  * Pipe_state can only be set if both the rlock and wlock
597                  * are held.
598                  */
599                 rpipe->pipe_state |= PIPE_WANTR;
600                 tsleep_interlock(rpipe, PCATCH);
601                 lwkt_reltoken(&rpipe->pipe_wlock);
602                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
603                 ++pipe_rblocked_count;
604                 if (error)
605                         break;
606         }
607         pipe_end_uio(rpipe, &rpipe->pipe_rip);
608
609         /*
610          * Uptime last access time
611          */
612         if (error == 0 && nread)
613                 vfs_timestamp(&rpipe->pipe_atime);
614
615         /*
616          * If we drained the FIFO more then half way then handle
617          * write blocking hysteresis.
618          *
619          * Note that PIPE_WANTW cannot be set by the writer without
620          * it holding both rlock and wlock, so we can test it
621          * while holding just rlock.
622          */
623         if (notify_writer) {
624                 /*
625                  * Synchronous blocking is done on the pipe involved
626                  */
627                 if (rpipe->pipe_state & PIPE_WANTW) {
628                         lwkt_gettoken(&rpipe->pipe_wlock);
629                         if (rpipe->pipe_state & PIPE_WANTW) {
630                                 rpipe->pipe_state &= ~PIPE_WANTW;
631                                 lwkt_reltoken(&rpipe->pipe_wlock);
632                                 wakeup(rpipe);
633                         } else {
634                                 lwkt_reltoken(&rpipe->pipe_wlock);
635                         }
636                 }
637
638                 /*
639                  * But we may also have to deal with a kqueue which is
640                  * stored on the same pipe as its descriptor, so a
641                  * EVFILT_WRITE event waiting for our side to drain will
642                  * be on the other side.
643                  */
644                 lwkt_gettoken(&wpipe->pipe_wlock);
645                 pipewakeup(wpipe, 0);
646                 lwkt_reltoken(&wpipe->pipe_wlock);
647         }
648         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
649         lwkt_reltoken(&rpipe->pipe_rlock);
650
651         return (error);
652 }
653
654 static int
655 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
656 {
657         int error;
658         int orig_resid;
659         int nbio;
660         struct pipe *wpipe;
661         struct pipe *rpipe;
662         u_int windex;
663         u_int space;
664         u_int wcount;
665         int bigwrite;
666         int bigcount;
667
668         /*
669          * Writes go to the peer.  The peer will always exist.
670          */
671         rpipe = (struct pipe *) fp->f_data;
672         wpipe = rpipe->pipe_peer;
673         lwkt_gettoken(&wpipe->pipe_wlock);
674         if (wpipe->pipe_state & PIPE_WEOF) {
675                 lwkt_reltoken(&wpipe->pipe_wlock);
676                 return (EPIPE);
677         }
678
679         /*
680          * Degenerate case (EPIPE takes prec)
681          */
682         if (uio->uio_resid == 0) {
683                 lwkt_reltoken(&wpipe->pipe_wlock);
684                 return(0);
685         }
686
687         /*
688          * Writes are serialized (start_uio must be called with wlock)
689          */
690         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
691         if (error) {
692                 lwkt_reltoken(&wpipe->pipe_wlock);
693                 return (error);
694         }
695
696         if (fflags & O_FBLOCKING)
697                 nbio = 0;
698         else if (fflags & O_FNONBLOCKING)
699                 nbio = 1;
700         else if (fp->f_flag & O_NONBLOCK)
701                 nbio = 1;
702         else
703                 nbio = 0;
704
705         /*
706          * If it is advantageous to resize the pipe buffer, do
707          * so.  We are write-serialized so we can block safely.
708          */
709         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
710             (pipe_nbig < pipe_maxbig) &&
711             wpipe->pipe_wantwcnt > 4 &&
712             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
713                 /* 
714                  * Recheck after lock.
715                  */
716                 lwkt_gettoken(&wpipe->pipe_rlock);
717                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
718                     (pipe_nbig < pipe_maxbig) &&
719                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
720                         atomic_add_int(&pipe_nbig, 1);
721                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
722                                 ++pipe_bigcount;
723                         else
724                                 atomic_subtract_int(&pipe_nbig, 1);
725                 }
726                 lwkt_reltoken(&wpipe->pipe_rlock);
727         }
728
729         orig_resid = uio->uio_resid;
730         wcount = 0;
731
732         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
733         bigcount = 10;
734
735         while (uio->uio_resid) {
736                 if (wpipe->pipe_state & PIPE_WEOF) {
737                         error = EPIPE;
738                         break;
739                 }
740
741                 /*
742                  * Don't hog the cpu.
743                  */
744                 if (bigwrite && --bigcount == 0) {
745                         lwkt_user_yield();
746                         bigcount = 10;
747                         if (CURSIG(curthread->td_lwp)) {
748                                 error = EINTR;
749                                 break;
750                         }
751                 }
752
753                 windex = wpipe->pipe_buffer.windex &
754                          (wpipe->pipe_buffer.size - 1);
755                 space = wpipe->pipe_buffer.size -
756                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
757                 cpu_lfence();
758
759                 /* Writes of size <= PIPE_BUF must be atomic. */
760                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
761                         space = 0;
762
763                 /* 
764                  * Write to fill, read size handles write hysteresis.  Also
765                  * additional restrictions can cause select-based non-blocking
766                  * writes to spin.
767                  */
768                 if (space > 0) {
769                         u_int segsize;
770
771                         /*
772                          * Transfer size is minimum of uio transfer
773                          * and free space in pipe buffer.
774                          *
775                          * Limit each uiocopy to no more then PIPE_SIZE
776                          * so we can keep the gravy train going on a
777                          * SMP box.  This doubles the performance for
778                          * write sizes > 16K.  Otherwise large writes
779                          * wind up doing an inefficient synchronous
780                          * ping-pong.
781                          */
782                         space = szmin(space, uio->uio_resid);
783                         if (space > PIPE_SIZE)
784                                 space = PIPE_SIZE;
785
786                         /*
787                          * First segment to transfer is minimum of
788                          * transfer size and contiguous space in
789                          * pipe buffer.  If first segment to transfer
790                          * is less than the transfer size, we've got
791                          * a wraparound in the buffer.
792                          */
793                         segsize = wpipe->pipe_buffer.size - windex;
794                         if (segsize > space)
795                                 segsize = space;
796
797                         /*
798                          * If this is the first loop and the reader is
799                          * blocked, do a preemptive wakeup of the reader.
800                          *
801                          * On SMP the IPI latency plus the wlock interlock
802                          * on the reader side is the fastest way to get the
803                          * reader going.  (The scheduler will hard loop on
804                          * lock tokens).
805                          *
806                          * NOTE: We can't clear WANTR here without acquiring
807                          * the rlock, which we don't want to do here!
808                          */
809                         if ((wpipe->pipe_state & PIPE_WANTR))
810                                 wakeup(wpipe);
811
812                         /*
813                          * Transfer segment, which may include a wrap-around.
814                          * Update windex to account for both all in one go
815                          * so the reader can read() the data atomically.
816                          */
817                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
818                                         segsize, uio);
819                         if (error == 0 && segsize < space) {
820                                 segsize = space - segsize;
821                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
822                                                 segsize, uio);
823                         }
824                         if (error)
825                                 break;
826                         cpu_mfence();
827                         wpipe->pipe_buffer.windex += space;
828                         wcount += space;
829                         continue;
830                 }
831
832                 /*
833                  * We need both the rlock and the wlock to interlock against
834                  * the EOF, WANTW, and size checks, and to modify pipe_state.
835                  *
836                  * These are token locks so we do not have to worry about
837                  * deadlocks.
838                  */
839                 lwkt_gettoken(&wpipe->pipe_rlock);
840
841                 /*
842                  * If the "read-side" has been blocked, wake it up now
843                  * and yield to let it drain synchronously rather
844                  * then block.
845                  */
846                 if (wpipe->pipe_state & PIPE_WANTR) {
847                         wpipe->pipe_state &= ~PIPE_WANTR;
848                         wakeup(wpipe);
849                 }
850
851                 /*
852                  * don't block on non-blocking I/O
853                  */
854                 if (nbio) {
855                         lwkt_reltoken(&wpipe->pipe_rlock);
856                         error = EAGAIN;
857                         break;
858                 }
859
860                 /*
861                  * re-test whether we have to block in the writer after
862                  * acquiring both locks, in case the reader opened up
863                  * some space.
864                  */
865                 space = wpipe->pipe_buffer.size -
866                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
867                 cpu_lfence();
868                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
869                         space = 0;
870
871                 /*
872                  * Retest EOF - acquiring a new token can temporarily release
873                  * tokens already held.
874                  */
875                 if (wpipe->pipe_state & PIPE_WEOF) {
876                         lwkt_reltoken(&wpipe->pipe_rlock);
877                         error = EPIPE;
878                         break;
879                 }
880
881                 /*
882                  * We have no more space and have something to offer,
883                  * wake up select/poll/kq.
884                  */
885                 if (space == 0) {
886                         wpipe->pipe_state |= PIPE_WANTW;
887                         ++wpipe->pipe_wantwcnt;
888                         pipewakeup(wpipe, 1);
889                         if (wpipe->pipe_state & PIPE_WANTW)
890                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
891                         ++pipe_wblocked_count;
892                 }
893                 lwkt_reltoken(&wpipe->pipe_rlock);
894
895                 /*
896                  * Break out if we errored or the read side wants us to go
897                  * away.
898                  */
899                 if (error)
900                         break;
901                 if (wpipe->pipe_state & PIPE_WEOF) {
902                         error = EPIPE;
903                         break;
904                 }
905         }
906         pipe_end_uio(wpipe, &wpipe->pipe_wip);
907
908         /*
909          * If we have put any characters in the buffer, we wake up
910          * the reader.
911          *
912          * Both rlock and wlock are required to be able to modify pipe_state.
913          */
914         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
915                 if (wpipe->pipe_state & PIPE_WANTR) {
916                         lwkt_gettoken(&wpipe->pipe_rlock);
917                         if (wpipe->pipe_state & PIPE_WANTR) {
918                                 wpipe->pipe_state &= ~PIPE_WANTR;
919                                 lwkt_reltoken(&wpipe->pipe_rlock);
920                                 wakeup(wpipe);
921                         } else {
922                                 lwkt_reltoken(&wpipe->pipe_rlock);
923                         }
924                 }
925                 lwkt_gettoken(&wpipe->pipe_rlock);
926                 pipewakeup(wpipe, 1);
927                 lwkt_reltoken(&wpipe->pipe_rlock);
928         }
929
930         /*
931          * Don't return EPIPE if I/O was successful
932          */
933         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
934             (uio->uio_resid == 0) &&
935             (error == EPIPE)) {
936                 error = 0;
937         }
938
939         if (error == 0)
940                 vfs_timestamp(&wpipe->pipe_mtime);
941
942         /*
943          * We have something to offer,
944          * wake up select/poll/kq.
945          */
946         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
947         lwkt_reltoken(&wpipe->pipe_wlock);
948         return (error);
949 }
950
951 /*
952  * we implement a very minimal set of ioctls for compatibility with sockets.
953  */
954 int
955 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
956            struct ucred *cred, struct sysmsg *msg)
957 {
958         struct pipe *mpipe;
959         int error;
960
961         mpipe = (struct pipe *)fp->f_data;
962
963         lwkt_gettoken(&mpipe->pipe_rlock);
964         lwkt_gettoken(&mpipe->pipe_wlock);
965
966         switch (cmd) {
967         case FIOASYNC:
968                 if (*(int *)data) {
969                         mpipe->pipe_state |= PIPE_ASYNC;
970                 } else {
971                         mpipe->pipe_state &= ~PIPE_ASYNC;
972                 }
973                 error = 0;
974                 break;
975         case FIONREAD:
976                 *(int *)data = mpipe->pipe_buffer.windex -
977                                 mpipe->pipe_buffer.rindex;
978                 error = 0;
979                 break;
980         case FIOSETOWN:
981                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
982                 break;
983         case FIOGETOWN:
984                 *(int *)data = fgetown(&mpipe->pipe_sigio);
985                 error = 0;
986                 break;
987         case TIOCSPGRP:
988                 /* This is deprecated, FIOSETOWN should be used instead. */
989                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
990                 break;
991
992         case TIOCGPGRP:
993                 /* This is deprecated, FIOGETOWN should be used instead. */
994                 *(int *)data = -fgetown(&mpipe->pipe_sigio);
995                 error = 0;
996                 break;
997         default:
998                 error = ENOTTY;
999                 break;
1000         }
1001         lwkt_reltoken(&mpipe->pipe_wlock);
1002         lwkt_reltoken(&mpipe->pipe_rlock);
1003
1004         return (error);
1005 }
1006
1007 /*
1008  * MPSAFE
1009  */
1010 static int
1011 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1012 {
1013         struct pipe *pipe;
1014
1015         pipe = (struct pipe *)fp->f_data;
1016
1017         bzero((caddr_t)ub, sizeof(*ub));
1018         ub->st_mode = S_IFIFO;
1019         ub->st_blksize = pipe->pipe_buffer.size;
1020         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1021         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1022         ub->st_atimespec = pipe->pipe_atime;
1023         ub->st_mtimespec = pipe->pipe_mtime;
1024         ub->st_ctimespec = pipe->pipe_ctime;
1025         /*
1026          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1027          * st_flags, st_gen.
1028          * XXX (st_dev, st_ino) should be unique.
1029          */
1030         return (0);
1031 }
1032
1033 static int
1034 pipe_close(struct file *fp)
1035 {
1036         struct pipe *cpipe;
1037
1038         cpipe = (struct pipe *)fp->f_data;
1039         fp->f_ops = &badfileops;
1040         fp->f_data = NULL;
1041         funsetown(&cpipe->pipe_sigio);
1042         pipeclose(cpipe);
1043         return (0);
1044 }
1045
1046 /*
1047  * Shutdown one or both directions of a full-duplex pipe.
1048  */
1049 static int
1050 pipe_shutdown(struct file *fp, int how)
1051 {
1052         struct pipe *rpipe;
1053         struct pipe *wpipe;
1054         int error = EPIPE;
1055
1056         rpipe = (struct pipe *)fp->f_data;
1057         wpipe = rpipe->pipe_peer;
1058
1059         /*
1060          * We modify pipe_state on both pipes, which means we need
1061          * all four tokens!
1062          */
1063         lwkt_gettoken(&rpipe->pipe_rlock);
1064         lwkt_gettoken(&rpipe->pipe_wlock);
1065         lwkt_gettoken(&wpipe->pipe_rlock);
1066         lwkt_gettoken(&wpipe->pipe_wlock);
1067
1068         switch(how) {
1069         case SHUT_RDWR:
1070         case SHUT_RD:
1071                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1072                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1073                 if (rpipe->pipe_state & PIPE_WANTR) {
1074                         rpipe->pipe_state &= ~PIPE_WANTR;
1075                         wakeup(rpipe);
1076                 }
1077                 if (rpipe->pipe_state & PIPE_WANTW) {
1078                         rpipe->pipe_state &= ~PIPE_WANTW;
1079                         wakeup(rpipe);
1080                 }
1081                 error = 0;
1082                 if (how == SHUT_RD)
1083                         break;
1084                 /* fall through */
1085         case SHUT_WR:
1086                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1087                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1088                 if (wpipe->pipe_state & PIPE_WANTR) {
1089                         wpipe->pipe_state &= ~PIPE_WANTR;
1090                         wakeup(wpipe);
1091                 }
1092                 if (wpipe->pipe_state & PIPE_WANTW) {
1093                         wpipe->pipe_state &= ~PIPE_WANTW;
1094                         wakeup(wpipe);
1095                 }
1096                 error = 0;
1097                 break;
1098         }
1099         pipewakeup(rpipe, 1);
1100         pipewakeup(wpipe, 1);
1101
1102         lwkt_reltoken(&wpipe->pipe_wlock);
1103         lwkt_reltoken(&wpipe->pipe_rlock);
1104         lwkt_reltoken(&rpipe->pipe_wlock);
1105         lwkt_reltoken(&rpipe->pipe_rlock);
1106
1107         return (error);
1108 }
1109
1110 static void
1111 pipe_free_kmem(struct pipe *cpipe)
1112 {
1113         if (cpipe->pipe_buffer.buffer != NULL) {
1114                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1115                         atomic_subtract_int(&pipe_nbig, 1);
1116                 kmem_free(&kernel_map,
1117                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1118                         cpipe->pipe_buffer.size);
1119                 cpipe->pipe_buffer.buffer = NULL;
1120                 cpipe->pipe_buffer.object = NULL;
1121         }
1122 }
1123
1124 /*
1125  * Close the pipe.  The slock must be held to interlock against simultanious
1126  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1127  */
1128 static void
1129 pipeclose(struct pipe *cpipe)
1130 {
1131         globaldata_t gd;
1132         struct pipe *ppipe;
1133
1134         if (cpipe == NULL)
1135                 return;
1136
1137         /*
1138          * The slock may not have been allocated yet (close during
1139          * initialization)
1140          *
1141          * We need both the read and write tokens to modify pipe_state.
1142          */
1143         if (cpipe->pipe_slock)
1144                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1145         lwkt_gettoken(&cpipe->pipe_rlock);
1146         lwkt_gettoken(&cpipe->pipe_wlock);
1147
1148         /*
1149          * Set our state, wakeup anyone waiting in select/poll/kq, and
1150          * wakeup anyone blocked on our pipe.
1151          */
1152         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1153         pipewakeup(cpipe, 1);
1154         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1155                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1156                 wakeup(cpipe);
1157         }
1158
1159         /*
1160          * Disconnect from peer.
1161          */
1162         if ((ppipe = cpipe->pipe_peer) != NULL) {
1163                 lwkt_gettoken(&ppipe->pipe_rlock);
1164                 lwkt_gettoken(&ppipe->pipe_wlock);
1165                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1166                 pipewakeup(ppipe, 1);
1167                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1168                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1169                         wakeup(ppipe);
1170                 }
1171                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1172                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1173                 lwkt_reltoken(&ppipe->pipe_wlock);
1174                 lwkt_reltoken(&ppipe->pipe_rlock);
1175         }
1176
1177         /*
1178          * If the peer is also closed we can free resources for both
1179          * sides, otherwise we leave our side intact to deal with any
1180          * races (since we only have the slock).
1181          */
1182         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1183                 cpipe->pipe_peer = NULL;
1184                 ppipe->pipe_peer = NULL;
1185                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1186                 pipeclose(ppipe);
1187                 ppipe = NULL;
1188         }
1189
1190         lwkt_reltoken(&cpipe->pipe_wlock);
1191         lwkt_reltoken(&cpipe->pipe_rlock);
1192         if (cpipe->pipe_slock)
1193                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1194
1195         /*
1196          * If we disassociated from our peer we can free resources
1197          */
1198         if (ppipe == NULL) {
1199                 gd = mycpu;
1200                 if (cpipe->pipe_slock) {
1201                         kfree(cpipe->pipe_slock, M_PIPE);
1202                         cpipe->pipe_slock = NULL;
1203                 }
1204                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1205                     cpipe->pipe_buffer.size != PIPE_SIZE
1206                 ) {
1207                         pipe_free_kmem(cpipe);
1208                         kfree(cpipe, M_PIPE);
1209                 } else {
1210                         cpipe->pipe_state = 0;
1211                         cpipe->pipe_peer = gd->gd_pipeq;
1212                         gd->gd_pipeq = cpipe;
1213                         ++gd->gd_pipeqcount;
1214                 }
1215         }
1216 }
1217
1218 static int
1219 pipe_kqfilter(struct file *fp, struct knote *kn)
1220 {
1221         struct pipe *cpipe;
1222
1223         cpipe = (struct pipe *)kn->kn_fp->f_data;
1224
1225         switch (kn->kn_filter) {
1226         case EVFILT_READ:
1227                 kn->kn_fop = &pipe_rfiltops;
1228                 break;
1229         case EVFILT_WRITE:
1230                 kn->kn_fop = &pipe_wfiltops;
1231                 if (cpipe->pipe_peer == NULL) {
1232                         /* other end of pipe has been closed */
1233                         return (EPIPE);
1234                 }
1235                 break;
1236         default:
1237                 return (EOPNOTSUPP);
1238         }
1239         kn->kn_hook = (caddr_t)cpipe;
1240
1241         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1242
1243         return (0);
1244 }
1245
1246 static void
1247 filt_pipedetach(struct knote *kn)
1248 {
1249         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1250
1251         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1252 }
1253
1254 /*ARGSUSED*/
1255 static int
1256 filt_piperead(struct knote *kn, long hint)
1257 {
1258         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1259         int ready = 0;
1260
1261         lwkt_gettoken(&rpipe->pipe_rlock);
1262         lwkt_gettoken(&rpipe->pipe_wlock);
1263
1264         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1265
1266         if (rpipe->pipe_state & PIPE_REOF) {
1267                 /*
1268                  * Only set NODATA if all data has been exhausted
1269                  */
1270                 if (kn->kn_data == 0)
1271                         kn->kn_flags |= EV_NODATA;
1272                 kn->kn_flags |= EV_EOF; 
1273                 ready = 1;
1274         }
1275
1276         lwkt_reltoken(&rpipe->pipe_wlock);
1277         lwkt_reltoken(&rpipe->pipe_rlock);
1278
1279         if (!ready)
1280                 ready = kn->kn_data > 0;
1281
1282         return (ready);
1283 }
1284
1285 /*ARGSUSED*/
1286 static int
1287 filt_pipewrite(struct knote *kn, long hint)
1288 {
1289         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1290         struct pipe *wpipe = rpipe->pipe_peer;
1291         int ready = 0;
1292
1293         kn->kn_data = 0;
1294         if (wpipe == NULL) {
1295                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1296                 return (1);
1297         }
1298
1299         lwkt_gettoken(&wpipe->pipe_rlock);
1300         lwkt_gettoken(&wpipe->pipe_wlock);
1301
1302         if (wpipe->pipe_state & PIPE_WEOF) {
1303                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1304                 ready = 1;
1305         }
1306
1307         if (!ready)
1308                 kn->kn_data = wpipe->pipe_buffer.size -
1309                               (wpipe->pipe_buffer.windex -
1310                                wpipe->pipe_buffer.rindex);
1311
1312         lwkt_reltoken(&wpipe->pipe_wlock);
1313         lwkt_reltoken(&wpipe->pipe_rlock);
1314
1315         if (!ready)
1316                 ready = kn->kn_data >= PIPE_BUF;
1317
1318         return (ready);
1319 }