kernel - Refactor sys_pipe
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68
69 #include <machine/cpufunc.h>
70
71 struct pipegdlock {
72         struct mtx      mtx;
73 } __cachealign;
74
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio, 
79                 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio, 
81                 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87                 struct ucred *cred, struct sysmsg *msg);
88
89 static struct fileops pipeops = {
90         .fo_read = pipe_read, 
91         .fo_write = pipe_write,
92         .fo_ioctl = pipe_ioctl,
93         .fo_kqfilter = pipe_kqfilter,
94         .fo_stat = pipe_stat,
95         .fo_close = pipe_close,
96         .fo_shutdown = pipe_shutdown
97 };
98
99 static void     filt_pipedetach(struct knote *kn);
100 static int      filt_piperead(struct knote *kn, long hint);
101 static int      filt_pipewrite(struct knote *kn, long hint);
102
103 static struct filterops pipe_rfiltops =
104         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 static struct filterops pipe_wfiltops =
106         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111
112 static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 static struct pipegdlock *pipe_gdlocks;
114
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118 static int pipe_size = 32768;
119 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
120         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
121 static int pipe_delay = 5000;   /* 5uS default */
122 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
123         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
124
125 /*
126  * Auto-size pipe cache to reduce kmem allocations and frees.
127  */
128 static
129 void
130 pipeinit(void *dummy)
131 {
132         size_t mbytes = kmem_lim_size();
133         int n;
134
135         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
136                 if (mbytes >= 7 * 1024)
137                         pipe_maxcache *= 2;
138                 if (mbytes >= 15 * 1024)
139                         pipe_maxcache *= 2;
140         }
141         pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
142                              M_PIPE, M_WAITOK | M_ZERO);
143         for (n = 0; n < ncpus; ++n)
144                 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
145 }
146 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
147
148 static void pipeclose (struct pipe *pipe,
149                 struct pipebuf *pbr, struct pipebuf *pbw);
150 static void pipe_free_kmem (struct pipebuf *buf);
151 static int pipe_create (struct pipe **pipep);
152
153 static __inline void
154 pipewakeup(struct pipebuf *pb, int dosigio)
155 {
156         if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
157                 lwkt_gettoken(&sigio_token);
158                 pgsigio(pb->sigio, SIGIO, 0);
159                 lwkt_reltoken(&sigio_token);
160         }
161         KNOTE(&pb->kq.ki_note, 0);
162 }
163
164 /*
165  * These routines are called before and after a UIO.  The UIO
166  * may block, causing our held tokens to be lost temporarily.
167  *
168  * We use these routines to serialize reads against other reads
169  * and writes against other writes.
170  *
171  * The read token is held on entry so *ipp does not race.
172  */
173 static __inline int
174 pipe_start_uio(int *ipp)
175 {
176         int error;
177
178         while (*ipp) {
179                 *ipp = -1;
180                 error = tsleep(ipp, PCATCH, "pipexx", 0);
181                 if (error)
182                         return (error);
183         }
184         *ipp = 1;
185         return (0);
186 }
187
188 static __inline void
189 pipe_end_uio(int *ipp)
190 {
191         if (*ipp < 0) {
192                 *ipp = 0;
193                 wakeup(ipp);
194         } else {
195                 KKASSERT(*ipp > 0);
196                 *ipp = 0;
197         }
198 }
199
200 /*
201  * The pipe system call for the DTYPE_PIPE type of pipes
202  *
203  * pipe_args(int dummy)
204  *
205  * MPSAFE
206  */
207 int
208 sys_pipe(struct pipe_args *uap)
209 {
210         return kern_pipe(uap->sysmsg_fds, 0);
211 }
212
213 int
214 sys_pipe2(struct pipe2_args *uap)
215 {
216         return kern_pipe(uap->sysmsg_fds, uap->flags);
217 }
218
219 int
220 kern_pipe(long *fds, int flags)
221 {
222         struct thread *td = curthread;
223         struct filedesc *fdp = td->td_proc->p_fd;
224         struct file *rf, *wf;
225         struct pipe *pipe;
226         int fd1, fd2, error;
227
228         pipe = NULL;
229         if (pipe_create(&pipe)) {
230                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
231                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
232                 return (ENFILE);
233         }
234         
235         error = falloc(td->td_lwp, &rf, &fd1);
236         if (error) {
237                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
238                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
239                 return (error);
240         }
241         fds[0] = fd1;
242
243         /*
244          * Warning: once we've gotten past allocation of the fd for the
245          * read-side, we can only drop the read side via fdrop() in order
246          * to avoid races against processes which manage to dup() the read
247          * side while we are blocked trying to allocate the write side.
248          */
249         rf->f_type = DTYPE_PIPE;
250         rf->f_flag = FREAD | FWRITE;
251         rf->f_ops = &pipeops;
252         rf->f_data = (void *)((intptr_t)pipe | 0);
253         if (flags & O_NONBLOCK)
254                 rf->f_flag |= O_NONBLOCK;
255         if (flags & O_CLOEXEC)
256                 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
257
258         error = falloc(td->td_lwp, &wf, &fd2);
259         if (error) {
260                 fsetfd(fdp, NULL, fd1);
261                 fdrop(rf);
262                 /* pipeA has been closed by fdrop() */
263                 /* close pipeB here */
264                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
265                 return (error);
266         }
267         wf->f_type = DTYPE_PIPE;
268         wf->f_flag = FREAD | FWRITE;
269         wf->f_ops = &pipeops;
270         wf->f_data = (void *)((intptr_t)pipe | 1);
271         if (flags & O_NONBLOCK)
272                 wf->f_flag |= O_NONBLOCK;
273         if (flags & O_CLOEXEC)
274                 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
275
276         fds[1] = fd2;
277
278         /*
279          * Once activated the peer relationship remains valid until
280          * both sides are closed.
281          */
282         fsetfd(fdp, rf, fd1);
283         fsetfd(fdp, wf, fd2);
284         fdrop(rf);
285         fdrop(wf);
286
287         return (0);
288 }
289
290 /*
291  * [re]allocates KVA for the pipe's circular buffer.  The space is
292  * pageable.  Called twice to setup full-duplex communications.
293  *
294  * NOTE: Independent vm_object's are used to improve performance.
295  *
296  * Returns 0 on success, ENOMEM on failure.
297  */
298 static int
299 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
300 {
301         struct vm_object *object;
302         caddr_t buffer;
303         vm_pindex_t npages;
304         int error;
305
306         size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
307         if (size < 16384)
308                 size = 16384;
309         if (size > 1024*1024)
310                 size = 1024*1024;
311
312         npages = round_page(size) / PAGE_SIZE;
313         object = pb->object;
314
315         /*
316          * [re]create the object if necessary and reserve space for it
317          * in the kernel_map.  The object and memory are pageable.  On
318          * success, free the old resources before assigning the new
319          * ones.
320          */
321         if (object == NULL || object->size != npages) {
322                 object = vm_object_allocate(OBJT_DEFAULT, npages);
323                 buffer = (caddr_t)vm_map_min(&kernel_map);
324
325                 error = vm_map_find(&kernel_map, object, NULL,
326                                     0, (vm_offset_t *)&buffer, size,
327                                     PAGE_SIZE, TRUE,
328                                     VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
329                                     VM_PROT_ALL, VM_PROT_ALL, 0);
330
331                 if (error != KERN_SUCCESS) {
332                         vm_object_deallocate(object);
333                         return (ENOMEM);
334                 }
335                 pipe_free_kmem(pb);
336                 pb->object = object;
337                 pb->buffer = buffer;
338                 pb->size = size;
339         }
340         pb->rindex = 0;
341         pb->windex = 0;
342
343         return (0);
344 }
345
346 /*
347  * Initialize and allocate VM and memory for pipe, pulling the pipe from
348  * our per-cpu cache if possible.
349  *
350  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
351  * must still deallocate the pipe on failure.
352  */
353 static int
354 pipe_create(struct pipe **pipep)
355 {
356         globaldata_t gd = mycpu;
357         struct pipe *pipe;
358         int error;
359
360         if ((pipe = gd->gd_pipeq) != NULL) {
361                 gd->gd_pipeq = pipe->next;
362                 --gd->gd_pipeqcount;
363                 pipe->next = NULL;
364         } else {
365                 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
366                 lwkt_token_init(&pipe->bufferA.rlock, "piper");
367                 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
368                 lwkt_token_init(&pipe->bufferB.rlock, "piper");
369                 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
370         }
371         *pipep = pipe;
372         if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
373                 return (error);
374         }
375         if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
376                 return (error);
377         }
378         vfs_timestamp(&pipe->ctime);
379         pipe->bufferA.atime = pipe->ctime;
380         pipe->bufferA.mtime = pipe->ctime;
381         pipe->bufferB.atime = pipe->ctime;
382         pipe->bufferB.mtime = pipe->ctime;
383         pipe->open_count = 2;
384
385         return (0);
386 }
387
388 /*
389  * Read data from a pipe
390  */
391 static int
392 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
393 {
394         struct pipebuf *rpb;
395         struct pipebuf *wpb;
396         struct pipe *pipe;
397         size_t nread = 0;
398         size_t size;    /* total bytes available */
399         size_t nsize;   /* total bytes to read */
400         size_t rindex;  /* contiguous bytes available */
401         int notify_writer;
402         int bigread;
403         int bigcount;
404         int error;
405         int nbio;
406
407         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
408         if ((intptr_t)fp->f_data & 1) {
409                 rpb = &pipe->bufferB;
410                 wpb = &pipe->bufferA;
411         } else {
412                 rpb = &pipe->bufferA;
413                 wpb = &pipe->bufferB;
414         }
415         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
416
417         if (uio->uio_resid == 0)
418                 return(0);
419
420         /*
421          * Calculate nbio
422          */
423         if (fflags & O_FBLOCKING)
424                 nbio = 0;
425         else if (fflags & O_FNONBLOCKING)
426                 nbio = 1;
427         else if (fp->f_flag & O_NONBLOCK)
428                 nbio = 1;
429         else
430                 nbio = 0;
431
432         /*
433          * 'quick' NBIO test before things get expensive.
434          */
435         if (nbio && rpb->rindex == rpb->windex)
436                 return EAGAIN;
437
438         /*
439          * Reads are serialized.  Note however that buffer.buffer and
440          * buffer.size can change out from under us when the number
441          * of bytes in the buffer are zero due to the write-side doing a
442          * pipespace().
443          */
444         lwkt_gettoken(&rpb->rlock);
445         error = pipe_start_uio(&rpb->rip);
446         if (error) {
447                 lwkt_reltoken(&rpb->rlock);
448                 return (error);
449         }
450         notify_writer = 0;
451
452         bigread = (uio->uio_resid > 10 * 1024 * 1024);
453         bigcount = 10;
454
455         while (uio->uio_resid) {
456                 /*
457                  * Don't hog the cpu.
458                  */
459                 if (bigread && --bigcount == 0) {
460                         lwkt_user_yield();
461                         bigcount = 10;
462                         if (CURSIG(curthread->td_lwp)) {
463                                 error = EINTR;
464                                 break;
465                         }
466                 }
467
468                 size = rpb->windex - rpb->rindex;
469                 cpu_lfence();
470                 if (size) {
471                         rindex = rpb->rindex & (rpb->size - 1);
472                         nsize = size;
473                         if (nsize > rpb->size - rindex)
474                                 nsize = rpb->size - rindex;
475                         nsize = szmin(nsize, uio->uio_resid);
476
477                         error = uiomove(&rpb->buffer[rindex], nsize, uio);
478                         if (error)
479                                 break;
480                         cpu_mfence();
481                         rpb->rindex += nsize;
482                         nread += nsize;
483
484                         /*
485                          * If the FIFO is still over half full just continue
486                          * and do not try to notify the writer yet.
487                          */
488                         if (size - nsize >= (rpb->size >> 1)) {
489                                 notify_writer = 0;
490                                 continue;
491                         }
492
493                         /*
494                          * When the FIFO is less then half full notify any
495                          * waiting writer.  WANTW can be checked while
496                          * holding just the rlock.
497                          */
498                         notify_writer = 1;
499                         if ((rpb->state & PIPE_WANTW) == 0)
500                                 continue;
501                 }
502
503                 /*
504                  * If the "write-side" was blocked we wake it up.  This code
505                  * is reached either when the buffer is completely emptied
506                  * or if it becomes more then half-empty.
507                  *
508                  * Pipe_state can only be modified if both the rlock and
509                  * wlock are held.
510                  */
511                 if (rpb->state & PIPE_WANTW) {
512                         lwkt_gettoken(&rpb->wlock);
513                         if (rpb->state & PIPE_WANTW) {
514                                 rpb->state &= ~PIPE_WANTW;
515                                 lwkt_reltoken(&rpb->wlock);
516                                 wakeup(rpb);
517                         } else {
518                                 lwkt_reltoken(&rpb->wlock);
519                         }
520                 }
521
522                 /*
523                  * Pick up our copy loop again if the writer sent data to
524                  * us while we were messing around.
525                  *
526                  * On a SMP box poll up to pipe_delay nanoseconds for new
527                  * data.  Typically a value of 2000 to 4000 is sufficient
528                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
529                  * is used for synchronous communications with small packets,
530                  * and 8000 or so (8uS) will pipeline large buffer xfers
531                  * between cpus over a pipe.
532                  *
533                  * For synchronous communications a hit means doing a
534                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
535                  * where as miss requiring a tsleep/wakeup sequence
536                  * will take 7uS or more.
537                  */
538                 if (rpb->windex != rpb->rindex)
539                         continue;
540
541 #ifdef _RDTSC_SUPPORTED_
542                 if (pipe_delay) {
543                         int64_t tsc_target;
544                         int good = 0;
545
546                         tsc_target = tsc_get_target(pipe_delay);
547                         while (tsc_test_target(tsc_target) == 0) {
548                                 if (rpb->windex != rpb->rindex) {
549                                         good = 1;
550                                         break;
551                                 }
552                         }
553                         if (good)
554                                 continue;
555                 }
556 #endif
557
558                 /*
559                  * Detect EOF condition, do not set error.
560                  */
561                 if (rpb->state & PIPE_REOF)
562                         break;
563
564                 /*
565                  * Break if some data was read, or if this was a non-blocking
566                  * read.
567                  */
568                 if (nread > 0)
569                         break;
570
571                 if (nbio) {
572                         error = EAGAIN;
573                         break;
574                 }
575
576                 /*
577                  * Last chance, interlock with WANTR.
578                  */
579                 lwkt_gettoken(&rpb->wlock);
580                 size = rpb->windex - rpb->rindex;
581                 if (size) {
582                         lwkt_reltoken(&rpb->wlock);
583                         continue;
584                 }
585
586                 /*
587                  * Retest EOF - acquiring a new token can temporarily release
588                  * tokens already held.
589                  */
590                 if (rpb->state & PIPE_REOF) {
591                         lwkt_reltoken(&rpb->wlock);
592                         break;
593                 }
594
595                 /*
596                  * If there is no more to read in the pipe, reset its
597                  * pointers to the beginning.  This improves cache hit
598                  * stats.
599                  *
600                  * We need both locks to modify both pointers, and there
601                  * must also not be a write in progress or the uiomove()
602                  * in the write might block and temporarily release
603                  * its wlock, then reacquire and update windex.  We are
604                  * only serialized against reads, not writes.
605                  *
606                  * XXX should we even bother resetting the indices?  It
607                  *     might actually be more cache efficient not to.
608                  */
609                 if (rpb->rindex == rpb->windex && rpb->wip == 0) {
610                         rpb->rindex = 0;
611                         rpb->windex = 0;
612                 }
613
614                 /*
615                  * Wait for more data.
616                  *
617                  * Pipe_state can only be set if both the rlock and wlock
618                  * are held.
619                  */
620                 rpb->state |= PIPE_WANTR;
621                 tsleep_interlock(rpb, PCATCH);
622                 lwkt_reltoken(&rpb->wlock);
623                 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
624                 if (error)
625                         break;
626         }
627         pipe_end_uio(&rpb->rip);
628
629         /*
630          * Uptime last access time
631          */
632         if (error == 0 && nread)
633                 vfs_timestamp(&rpb->atime);
634
635         /*
636          * If we drained the FIFO more then half way then handle
637          * write blocking hysteresis.
638          *
639          * Note that PIPE_WANTW cannot be set by the writer without
640          * it holding both rlock and wlock, so we can test it
641          * while holding just rlock.
642          */
643         if (notify_writer) {
644                 /*
645                  * Synchronous blocking is done on the pipe involved
646                  */
647                 if (rpb->state & PIPE_WANTW) {
648                         lwkt_gettoken(&rpb->wlock);
649                         if (rpb->state & PIPE_WANTW) {
650                                 rpb->state &= ~PIPE_WANTW;
651                                 lwkt_reltoken(&rpb->wlock);
652                                 wakeup(rpb);
653                         } else {
654                                 lwkt_reltoken(&rpb->wlock);
655                         }
656                 }
657
658                 /*
659                  * But we may also have to deal with a kqueue which is
660                  * stored on the same pipe as its descriptor, so a
661                  * EVFILT_WRITE event waiting for our side to drain will
662                  * be on the other side.
663                  */
664                 lwkt_gettoken(&wpb->wlock);
665                 pipewakeup(wpb, 0);
666                 lwkt_reltoken(&wpb->wlock);
667         }
668         /*size = rpb->windex - rpb->rindex;*/
669         lwkt_reltoken(&rpb->rlock);
670
671         return (error);
672 }
673
674 static int
675 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
676 {
677         struct pipebuf *rpb;
678         struct pipebuf *wpb;
679         struct pipe *pipe;
680         size_t windex;
681         size_t space;
682         size_t wcount;
683         size_t orig_resid;
684         int bigwrite;
685         int bigcount;
686         int error;
687         int nbio;
688
689         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
690         if ((intptr_t)fp->f_data & 1) {
691                 rpb = &pipe->bufferB;
692                 wpb = &pipe->bufferA;
693         } else {
694                 rpb = &pipe->bufferA;
695                 wpb = &pipe->bufferB;
696         }
697
698         /*
699          * Calculate nbio
700          */
701         if (fflags & O_FBLOCKING)
702                 nbio = 0;
703         else if (fflags & O_FNONBLOCKING)
704                 nbio = 1;
705         else if (fp->f_flag & O_NONBLOCK)
706                 nbio = 1;
707         else
708                 nbio = 0;
709
710         /*
711          * 'quick' NBIO test before things get expensive.
712          */
713         if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
714             uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
715                 return EAGAIN;
716         }
717
718         /*
719          * Writes go to the peer.  The peer will always exist.
720          */
721         lwkt_gettoken(&wpb->wlock);
722         if (wpb->state & PIPE_WEOF) {
723                 lwkt_reltoken(&wpb->wlock);
724                 return (EPIPE);
725         }
726
727         /*
728          * Degenerate case (EPIPE takes prec)
729          */
730         if (uio->uio_resid == 0) {
731                 lwkt_reltoken(&wpb->wlock);
732                 return(0);
733         }
734
735         /*
736          * Writes are serialized (start_uio must be called with wlock)
737          */
738         error = pipe_start_uio(&wpb->wip);
739         if (error) {
740                 lwkt_reltoken(&wpb->wlock);
741                 return (error);
742         }
743
744         orig_resid = uio->uio_resid;
745         wcount = 0;
746
747         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
748         bigcount = 10;
749
750         while (uio->uio_resid) {
751                 if (wpb->state & PIPE_WEOF) {
752                         error = EPIPE;
753                         break;
754                 }
755
756                 /*
757                  * Don't hog the cpu.
758                  */
759                 if (bigwrite && --bigcount == 0) {
760                         lwkt_user_yield();
761                         bigcount = 10;
762                         if (CURSIG(curthread->td_lwp)) {
763                                 error = EINTR;
764                                 break;
765                         }
766                 }
767
768                 windex = wpb->windex & (wpb->size - 1);
769                 space = wpb->size - (wpb->windex - wpb->rindex);
770                 cpu_lfence();
771
772                 /* Writes of size <= PIPE_BUF must be atomic. */
773                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
774                         space = 0;
775
776                 /* 
777                  * Write to fill, read size handles write hysteresis.  Also
778                  * additional restrictions can cause select-based non-blocking
779                  * writes to spin.
780                  */
781                 if (space > 0) {
782                         size_t segsize;
783
784                         /*
785                          * Transfer size is minimum of uio transfer
786                          * and free space in pipe buffer.
787                          *
788                          * Limit each uiocopy to no more then wpb->size
789                          * so we can keep the gravy train going on a
790                          * SMP box.  This significantly increases write
791                          * performance.  Otherwise large writes wind up doing
792                          * an inefficient synchronous ping-pong.
793                          */
794                         space = szmin(space, uio->uio_resid);
795                         if (space > (wpb->size >> 1))
796                                 space = (wpb->size >> 1);
797
798                         /*
799                          * First segment to transfer is minimum of
800                          * transfer size and contiguous space in
801                          * pipe buffer.  If first segment to transfer
802                          * is less than the transfer size, we've got
803                          * a wraparound in the buffer.
804                          */
805                         segsize = wpb->size - windex;
806                         if (segsize > space)
807                                 segsize = space;
808
809                         /*
810                          * If this is the first loop and the reader is
811                          * blocked, do a preemptive wakeup of the reader.
812                          *
813                          * On SMP the IPI latency plus the wlock interlock
814                          * on the reader side is the fastest way to get the
815                          * reader going.  (The scheduler will hard loop on
816                          * lock tokens).
817                          *
818                          * NOTE: We can't clear WANTR here without acquiring
819                          * the rlock, which we don't want to do here!
820                          */
821                         if ((wpb->state & PIPE_WANTR))
822                                 wakeup(wpb);
823
824                         /*
825                          * Transfer segment, which may include a wrap-around.
826                          * Update windex to account for both all in one go
827                          * so the reader can read() the data atomically.
828                          */
829                         error = uiomove(&wpb->buffer[windex], segsize, uio);
830                         if (error == 0 && segsize < space) {
831                                 segsize = space - segsize;
832                                 error = uiomove(&wpb->buffer[0], segsize, uio);
833                         }
834                         if (error)
835                                 break;
836                         cpu_mfence();
837                         wpb->windex += space;
838                         wcount += space;
839                         continue;
840                 }
841
842                 /*
843                  * We need both the rlock and the wlock to interlock against
844                  * the EOF, WANTW, and size checks, and to modify pipe_state.
845                  *
846                  * These are token locks so we do not have to worry about
847                  * deadlocks.
848                  */
849                 lwkt_gettoken(&wpb->rlock);
850
851                 /*
852                  * If the "read-side" has been blocked, wake it up now
853                  * and yield to let it drain synchronously rather
854                  * then block.
855                  */
856                 if (wpb->state & PIPE_WANTR) {
857                         wpb->state &= ~PIPE_WANTR;
858                         wakeup(wpb);
859                 }
860
861                 /*
862                  * don't block on non-blocking I/O
863                  */
864                 if (nbio) {
865                         lwkt_reltoken(&wpb->rlock);
866                         error = EAGAIN;
867                         break;
868                 }
869
870                 /*
871                  * re-test whether we have to block in the writer after
872                  * acquiring both locks, in case the reader opened up
873                  * some space.
874                  */
875                 space = wpb->size - (wpb->windex - wpb->rindex);
876                 cpu_lfence();
877                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
878                         space = 0;
879
880                 /*
881                  * Retest EOF - acquiring a new token can temporarily release
882                  * tokens already held.
883                  */
884                 if (wpb->state & PIPE_WEOF) {
885                         lwkt_reltoken(&wpb->rlock);
886                         error = EPIPE;
887                         break;
888                 }
889
890                 /*
891                  * We have no more space and have something to offer,
892                  * wake up select/poll/kq.
893                  */
894                 if (space == 0) {
895                         wpb->state |= PIPE_WANTW;
896                         pipewakeup(wpb, 1);
897                         if (wpb->state & PIPE_WANTW)
898                                 error = tsleep(wpb, PCATCH, "pipewr", 0);
899                 }
900                 lwkt_reltoken(&wpb->rlock);
901
902                 /*
903                  * Break out if we errored or the read side wants us to go
904                  * away.
905                  */
906                 if (error)
907                         break;
908                 if (wpb->state & PIPE_WEOF) {
909                         error = EPIPE;
910                         break;
911                 }
912         }
913         pipe_end_uio(&wpb->wip);
914
915         /*
916          * If we have put any characters in the buffer, we wake up
917          * the reader.
918          *
919          * Both rlock and wlock are required to be able to modify pipe_state.
920          */
921         if (wpb->windex != wpb->rindex) {
922                 if (wpb->state & PIPE_WANTR) {
923                         lwkt_gettoken(&wpb->rlock);
924                         if (wpb->state & PIPE_WANTR) {
925                                 wpb->state &= ~PIPE_WANTR;
926                                 lwkt_reltoken(&wpb->rlock);
927                                 wakeup(wpb);
928                         } else {
929                                 lwkt_reltoken(&wpb->rlock);
930                         }
931                 }
932                 lwkt_gettoken(&wpb->rlock);
933                 pipewakeup(wpb, 1);
934                 lwkt_reltoken(&wpb->rlock);
935         }
936
937         /*
938          * Don't return EPIPE if I/O was successful
939          */
940         if ((wpb->rindex == wpb->windex) &&
941             (uio->uio_resid == 0) &&
942             (error == EPIPE)) {
943                 error = 0;
944         }
945
946         if (error == 0)
947                 vfs_timestamp(&wpb->mtime);
948
949         /*
950          * We have something to offer,
951          * wake up select/poll/kq.
952          */
953         /*space = wpb->windex - wpb->rindex;*/
954         lwkt_reltoken(&wpb->wlock);
955
956         return (error);
957 }
958
959 /*
960  * we implement a very minimal set of ioctls for compatibility with sockets.
961  */
962 static int
963 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
964            struct ucred *cred, struct sysmsg *msg)
965 {
966         struct pipebuf *rpb;
967         struct pipe *pipe;
968         int error;
969
970         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
971         if ((intptr_t)fp->f_data & 1) {
972                 rpb = &pipe->bufferB;
973         } else {
974                 rpb = &pipe->bufferA;
975         }
976
977         lwkt_gettoken(&rpb->rlock);
978         lwkt_gettoken(&rpb->wlock);
979
980         switch (cmd) {
981         case FIOASYNC:
982                 if (*(int *)data) {
983                         rpb->state |= PIPE_ASYNC;
984                 } else {
985                         rpb->state &= ~PIPE_ASYNC;
986                 }
987                 error = 0;
988                 break;
989         case FIONREAD:
990                 *(int *)data = (int)(rpb->windex - rpb->rindex);
991                 error = 0;
992                 break;
993         case FIOSETOWN:
994                 error = fsetown(*(int *)data, &rpb->sigio);
995                 break;
996         case FIOGETOWN:
997                 *(int *)data = fgetown(&rpb->sigio);
998                 error = 0;
999                 break;
1000         case TIOCSPGRP:
1001                 /* This is deprecated, FIOSETOWN should be used instead. */
1002                 error = fsetown(-(*(int *)data), &rpb->sigio);
1003                 break;
1004
1005         case TIOCGPGRP:
1006                 /* This is deprecated, FIOGETOWN should be used instead. */
1007                 *(int *)data = -fgetown(&rpb->sigio);
1008                 error = 0;
1009                 break;
1010         default:
1011                 error = ENOTTY;
1012                 break;
1013         }
1014         lwkt_reltoken(&rpb->wlock);
1015         lwkt_reltoken(&rpb->rlock);
1016
1017         return (error);
1018 }
1019
1020 /*
1021  * MPSAFE
1022  */
1023 static int
1024 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1025 {
1026         struct pipebuf *rpb;
1027         struct pipe *pipe;
1028
1029         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1030         if ((intptr_t)fp->f_data & 1) {
1031                 rpb = &pipe->bufferB;
1032         } else {
1033                 rpb = &pipe->bufferA;
1034         }
1035
1036         bzero((caddr_t)ub, sizeof(*ub));
1037         ub->st_mode = S_IFIFO;
1038         ub->st_blksize = rpb->size;
1039         ub->st_size = rpb->windex - rpb->rindex;
1040         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1041         ub->st_atimespec = rpb->atime;
1042         ub->st_mtimespec = rpb->mtime;
1043         ub->st_ctimespec = pipe->ctime;
1044         /*
1045          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1046          * st_flags, st_gen.
1047          * XXX (st_dev, st_ino) should be unique.
1048          */
1049
1050         return (0);
1051 }
1052
1053 static int
1054 pipe_close(struct file *fp)
1055 {
1056         struct pipebuf *rpb;
1057         struct pipebuf *wpb;
1058         struct pipe *pipe;
1059
1060         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1061         if ((intptr_t)fp->f_data & 1) {
1062                 rpb = &pipe->bufferB;
1063                 wpb = &pipe->bufferA;
1064         } else {
1065                 rpb = &pipe->bufferA;
1066                 wpb = &pipe->bufferB;
1067         }
1068
1069         fp->f_ops = &badfileops;
1070         fp->f_data = NULL;
1071         funsetown(&rpb->sigio);
1072         pipeclose(pipe, rpb, wpb);
1073
1074         return (0);
1075 }
1076
1077 /*
1078  * Shutdown one or both directions of a full-duplex pipe.
1079  */
1080 static int
1081 pipe_shutdown(struct file *fp, int how)
1082 {
1083         struct pipebuf *rpb;
1084         struct pipebuf *wpb;
1085         struct pipe *pipe;
1086         int error = EPIPE;
1087
1088         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1089         if ((intptr_t)fp->f_data & 1) {
1090                 rpb = &pipe->bufferB;
1091                 wpb = &pipe->bufferA;
1092         } else {
1093                 rpb = &pipe->bufferA;
1094                 wpb = &pipe->bufferB;
1095         }
1096
1097         /*
1098          * We modify pipe_state on both pipes, which means we need
1099          * all four tokens!
1100          */
1101         lwkt_gettoken(&rpb->rlock);
1102         lwkt_gettoken(&rpb->wlock);
1103         lwkt_gettoken(&wpb->rlock);
1104         lwkt_gettoken(&wpb->wlock);
1105
1106         switch(how) {
1107         case SHUT_RDWR:
1108         case SHUT_RD:
1109                 rpb->state |= PIPE_REOF;                /* my reads */
1110                 rpb->state |= PIPE_WEOF;                /* peer writes */
1111                 if (rpb->state & PIPE_WANTR) {
1112                         rpb->state &= ~PIPE_WANTR;
1113                         wakeup(rpb);
1114                 }
1115                 if (rpb->state & PIPE_WANTW) {
1116                         rpb->state &= ~PIPE_WANTW;
1117                         wakeup(rpb);
1118                 }
1119                 error = 0;
1120                 if (how == SHUT_RD)
1121                         break;
1122                 /* fall through */
1123         case SHUT_WR:
1124                 wpb->state |= PIPE_REOF;                /* peer reads */
1125                 wpb->state |= PIPE_WEOF;                /* my writes */
1126                 if (wpb->state & PIPE_WANTR) {
1127                         wpb->state &= ~PIPE_WANTR;
1128                         wakeup(wpb);
1129                 }
1130                 if (wpb->state & PIPE_WANTW) {
1131                         wpb->state &= ~PIPE_WANTW;
1132                         wakeup(wpb);
1133                 }
1134                 error = 0;
1135                 break;
1136         }
1137         pipewakeup(rpb, 1);
1138         pipewakeup(wpb, 1);
1139
1140         lwkt_reltoken(&wpb->wlock);
1141         lwkt_reltoken(&wpb->rlock);
1142         lwkt_reltoken(&rpb->wlock);
1143         lwkt_reltoken(&rpb->rlock);
1144
1145         return (error);
1146 }
1147
1148 /*
1149  * Destroy the pipe buffer.
1150  */
1151 static void
1152 pipe_free_kmem(struct pipebuf *pb)
1153 {
1154         if (pb->buffer != NULL) {
1155                 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1156                 pb->buffer = NULL;
1157                 pb->object = NULL;
1158         }
1159 }
1160
1161 /*
1162  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1163  * and writing on wpb.  This routine must be called twice with the pipebufs
1164  * reversed to close both directions.
1165  */
1166 static void
1167 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1168 {
1169         globaldata_t gd;
1170
1171         if (pipe == NULL)
1172                 return;
1173
1174         /*
1175          * We need both the read and write tokens to modify pipe_state.
1176          */
1177         lwkt_gettoken(&rpb->rlock);
1178         lwkt_gettoken(&rpb->wlock);
1179
1180         /*
1181          * Set our state, wakeup anyone waiting in select/poll/kq, and
1182          * wakeup anyone blocked on our pipe.  No action if our side
1183          * is already closed.
1184          */
1185         if (rpb->state & PIPE_CLOSED) {
1186                 lwkt_reltoken(&rpb->wlock);
1187                 lwkt_reltoken(&rpb->rlock);
1188                 return;
1189         }
1190
1191         rpb->state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1192         pipewakeup(rpb, 1);
1193         if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1194                 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1195                 wakeup(rpb);
1196         }
1197         lwkt_reltoken(&rpb->wlock);
1198         lwkt_reltoken(&rpb->rlock);
1199
1200         /*
1201          * Disconnect from peer.
1202          */
1203         lwkt_gettoken(&wpb->rlock);
1204         lwkt_gettoken(&wpb->wlock);
1205
1206         wpb->state |= PIPE_REOF | PIPE_WEOF;
1207         pipewakeup(wpb, 1);
1208         if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1209                 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1210                 wakeup(wpb);
1211         }
1212         if (SLIST_FIRST(&wpb->kq.ki_note))
1213                 KNOTE(&wpb->kq.ki_note, 0);
1214         lwkt_reltoken(&wpb->wlock);
1215         lwkt_reltoken(&wpb->rlock);
1216
1217         /*
1218          * Free resources once both sides are closed.  We maintain a pcpu
1219          * cache to improve performance, so the actual tear-down case is
1220          * limited to bulk situations.
1221          *
1222          * However, the bulk tear-down case can cause intense contention
1223          * on the kernel_map when, e.g. hundreds to hundreds of thousands
1224          * of processes are killed at the same time.  To deal with this we
1225          * use a pcpu mutex to maintain concurrency but also limit the
1226          * number of threads banging on the map and pmap.
1227          *
1228          * We use the mtx mechanism instead of the lockmgr mechanism because
1229          * the mtx mechanism utilizes a queued design which will not break
1230          * down in the face of thousands to hundreds of thousands of
1231          * processes trying to free pipes simultaneously.  The lockmgr
1232          * mechanism will wind up waking them all up each time a lock
1233          * cycles.
1234          */
1235         if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1236                 gd = mycpu;
1237                 if (gd->gd_pipeqcount >= pipe_maxcache) {
1238                         mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1239                         pipe_free_kmem(rpb);
1240                         pipe_free_kmem(wpb);
1241                         mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1242                         kfree(pipe, M_PIPE);
1243                 } else {
1244                         rpb->state = 0;
1245                         wpb->state = 0;
1246                         pipe->next = gd->gd_pipeq;
1247                         gd->gd_pipeq = pipe;
1248                         ++gd->gd_pipeqcount;
1249                 }
1250         }
1251 }
1252
1253 static int
1254 pipe_kqfilter(struct file *fp, struct knote *kn)
1255 {
1256         struct pipebuf *rpb;
1257         struct pipebuf *wpb;
1258         struct pipe *pipe;
1259
1260         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1261         if ((intptr_t)fp->f_data & 1) {
1262                 rpb = &pipe->bufferB;
1263                 wpb = &pipe->bufferA;
1264         } else {
1265                 rpb = &pipe->bufferA;
1266                 wpb = &pipe->bufferB;
1267         }
1268
1269         switch (kn->kn_filter) {
1270         case EVFILT_READ:
1271                 kn->kn_fop = &pipe_rfiltops;
1272                 break;
1273         case EVFILT_WRITE:
1274                 kn->kn_fop = &pipe_wfiltops;
1275                 if (wpb->state & PIPE_CLOSED) {
1276                         /* other end of pipe has been closed */
1277                         return (EPIPE);
1278                 }
1279                 break;
1280         default:
1281                 return (EOPNOTSUPP);
1282         }
1283
1284         if (rpb == &pipe->bufferA)
1285                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1286         else
1287                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1288
1289         knote_insert(&rpb->kq.ki_note, kn);
1290
1291         return (0);
1292 }
1293
1294 static void
1295 filt_pipedetach(struct knote *kn)
1296 {
1297         struct pipebuf *rpb;
1298         struct pipebuf *wpb;
1299         struct pipe *pipe;
1300
1301         pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1302         if ((intptr_t)kn->kn_hook & 1) {
1303                 rpb = &pipe->bufferB;
1304                 wpb = &pipe->bufferA;
1305         } else {
1306                 rpb = &pipe->bufferA;
1307                 wpb = &pipe->bufferB;
1308         }
1309         knote_remove(&rpb->kq.ki_note, kn);
1310 }
1311
1312 /*ARGSUSED*/
1313 static int
1314 filt_piperead(struct knote *kn, long hint)
1315 {
1316         struct pipebuf *rpb;
1317         struct pipebuf *wpb;
1318         struct pipe *pipe;
1319         int ready = 0;
1320
1321         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1322         if ((intptr_t)kn->kn_fp->f_data & 1) {
1323                 rpb = &pipe->bufferB;
1324                 wpb = &pipe->bufferA;
1325         } else {
1326                 rpb = &pipe->bufferA;
1327                 wpb = &pipe->bufferB;
1328         }
1329
1330         lwkt_gettoken(&rpb->rlock);
1331         lwkt_gettoken(&rpb->wlock);
1332
1333         kn->kn_data = rpb->windex - rpb->rindex;
1334
1335         if (rpb->state & PIPE_REOF) {
1336                 /*
1337                  * Only set NODATA if all data has been exhausted
1338                  */
1339                 if (kn->kn_data == 0)
1340                         kn->kn_flags |= EV_NODATA;
1341                 kn->kn_flags |= EV_EOF; 
1342                 ready = 1;
1343         }
1344
1345         lwkt_reltoken(&rpb->wlock);
1346         lwkt_reltoken(&rpb->rlock);
1347
1348         if (!ready)
1349                 ready = kn->kn_data > 0;
1350
1351         return (ready);
1352 }
1353
1354 /*ARGSUSED*/
1355 static int
1356 filt_pipewrite(struct knote *kn, long hint)
1357 {
1358         struct pipebuf *rpb;
1359         struct pipebuf *wpb;
1360         struct pipe *pipe;
1361         int ready = 0;
1362
1363         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1364         if ((intptr_t)kn->kn_fp->f_data & 1) {
1365                 rpb = &pipe->bufferB;
1366                 wpb = &pipe->bufferA;
1367         } else {
1368                 rpb = &pipe->bufferA;
1369                 wpb = &pipe->bufferB;
1370         }
1371
1372         kn->kn_data = 0;
1373         if (wpb->state & PIPE_CLOSED) {
1374                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1375                 return (1);
1376         }
1377
1378         lwkt_gettoken(&wpb->rlock);
1379         lwkt_gettoken(&wpb->wlock);
1380
1381         if (wpb->state & PIPE_WEOF) {
1382                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1383                 ready = 1;
1384         }
1385
1386         if (!ready)
1387                 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1388
1389         lwkt_reltoken(&wpb->wlock);
1390         lwkt_reltoken(&wpb->rlock);
1391
1392         if (!ready)
1393                 ready = kn->kn_data >= PIPE_BUF;
1394
1395         return (ready);
1396 }