kernel - Add callout debugging
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68
69 #include <machine/cpufunc.h>
70
71 struct pipegdlock {
72         struct mtx      mtx;
73 } __cachealign;
74
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio, 
79                 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio, 
81                 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87                 struct ucred *cred, struct sysmsg *msg);
88
89 static struct fileops pipeops = {
90         .fo_read = pipe_read, 
91         .fo_write = pipe_write,
92         .fo_ioctl = pipe_ioctl,
93         .fo_kqfilter = pipe_kqfilter,
94         .fo_stat = pipe_stat,
95         .fo_close = pipe_close,
96         .fo_shutdown = pipe_shutdown
97 };
98
99 static void     filt_pipedetach(struct knote *kn);
100 static int      filt_piperead(struct knote *kn, long hint);
101 static int      filt_pipewrite(struct knote *kn, long hint);
102
103 static struct filterops pipe_rfiltops =
104         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 static struct filterops pipe_wfiltops =
106         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111
112 static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 static struct pipegdlock *pipe_gdlocks;
114
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 static int pipe_delay = 4000;   /* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151         size_t mbytes = kmem_lim_size();
152         int n;
153
154         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155                 if (mbytes >= 7 * 1024)
156                         pipe_maxcache *= 2;
157                 if (mbytes >= 15 * 1024)
158                         pipe_maxcache *= 2;
159         }
160         pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
161                              M_PIPE, M_WAITOK | M_ZERO);
162         for (n = 0; n < ncpus; ++n)
163                 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
164 }
165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
166
167 static void pipeclose (struct pipe *pipe,
168                 struct pipebuf *pbr, struct pipebuf *pbw);
169 static void pipe_free_kmem (struct pipebuf *buf);
170 static int pipe_create (struct pipe **pipep);
171
172 /*
173  * Test and clear the specified flag, wakeup(pb) if it was set.
174  * This function must also act as a memory barrier.
175  */
176 static __inline void
177 pipesignal(struct pipebuf *pb, uint32_t flags)
178 {
179         uint32_t oflags;
180         uint32_t nflags;
181
182         for (;;) {
183                 oflags = pb->state;
184                 cpu_ccfence();
185                 nflags = oflags & ~flags;
186                 if (atomic_cmpset_int(&pb->state, oflags, nflags))
187                         break;
188         }
189         if (oflags & flags)
190                 wakeup(pb);
191 }
192
193 /*
194  *
195  */
196 static __inline void
197 pipewakeup(struct pipebuf *pb, int dosigio)
198 {
199         if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
200                 lwkt_gettoken(&sigio_token);
201                 pgsigio(pb->sigio, SIGIO, 0);
202                 lwkt_reltoken(&sigio_token);
203         }
204         KNOTE(&pb->kq.ki_note, 0);
205 }
206
207 /*
208  * These routines are called before and after a UIO.  The UIO
209  * may block, causing our held tokens to be lost temporarily.
210  *
211  * We use these routines to serialize reads against other reads
212  * and writes against other writes.
213  *
214  * The appropriate token is held on entry so *ipp does not race.
215  */
216 static __inline int
217 pipe_start_uio(int *ipp)
218 {
219         int error;
220
221         while (*ipp) {
222                 *ipp = -1;
223                 error = tsleep(ipp, PCATCH, "pipexx", 0);
224                 if (error)
225                         return (error);
226         }
227         *ipp = 1;
228         return (0);
229 }
230
231 static __inline void
232 pipe_end_uio(int *ipp)
233 {
234         if (*ipp < 0) {
235                 *ipp = 0;
236                 wakeup(ipp);
237         } else {
238                 KKASSERT(*ipp > 0);
239                 *ipp = 0;
240         }
241 }
242
243 /*
244  * The pipe system call for the DTYPE_PIPE type of pipes
245  *
246  * pipe_args(int dummy)
247  *
248  * MPSAFE
249  */
250 int
251 sys_pipe(struct pipe_args *uap)
252 {
253         return kern_pipe(uap->sysmsg_fds, 0);
254 }
255
256 int
257 sys_pipe2(struct pipe2_args *uap)
258 {
259         return kern_pipe(uap->sysmsg_fds, uap->flags);
260 }
261
262 int
263 kern_pipe(long *fds, int flags)
264 {
265         struct thread *td = curthread;
266         struct filedesc *fdp = td->td_proc->p_fd;
267         struct file *rf, *wf;
268         struct pipe *pipe;
269         int fd1, fd2, error;
270
271         pipe = NULL;
272         if (pipe_create(&pipe)) {
273                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
274                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
275                 return (ENFILE);
276         }
277         
278         error = falloc(td->td_lwp, &rf, &fd1);
279         if (error) {
280                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
281                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
282                 return (error);
283         }
284         fds[0] = fd1;
285
286         /*
287          * Warning: once we've gotten past allocation of the fd for the
288          * read-side, we can only drop the read side via fdrop() in order
289          * to avoid races against processes which manage to dup() the read
290          * side while we are blocked trying to allocate the write side.
291          */
292         rf->f_type = DTYPE_PIPE;
293         rf->f_flag = FREAD | FWRITE;
294         rf->f_ops = &pipeops;
295         rf->f_data = (void *)((intptr_t)pipe | 0);
296         if (flags & O_NONBLOCK)
297                 rf->f_flag |= O_NONBLOCK;
298         if (flags & O_CLOEXEC)
299                 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
300
301         error = falloc(td->td_lwp, &wf, &fd2);
302         if (error) {
303                 fsetfd(fdp, NULL, fd1);
304                 fdrop(rf);
305                 /* pipeA has been closed by fdrop() */
306                 /* close pipeB here */
307                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
308                 return (error);
309         }
310         wf->f_type = DTYPE_PIPE;
311         wf->f_flag = FREAD | FWRITE;
312         wf->f_ops = &pipeops;
313         wf->f_data = (void *)((intptr_t)pipe | 1);
314         if (flags & O_NONBLOCK)
315                 wf->f_flag |= O_NONBLOCK;
316         if (flags & O_CLOEXEC)
317                 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
318
319         fds[1] = fd2;
320
321         /*
322          * Once activated the peer relationship remains valid until
323          * both sides are closed.
324          */
325         fsetfd(fdp, rf, fd1);
326         fsetfd(fdp, wf, fd2);
327         fdrop(rf);
328         fdrop(wf);
329
330         return (0);
331 }
332
333 /*
334  * [re]allocates KVA for the pipe's circular buffer.  The space is
335  * pageable.  Called twice to setup full-duplex communications.
336  *
337  * NOTE: Independent vm_object's are used to improve performance.
338  *
339  * Returns 0 on success, ENOMEM on failure.
340  */
341 static int
342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
343 {
344         struct vm_object *object;
345         caddr_t buffer;
346         vm_pindex_t npages;
347         int error;
348
349         size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
350         if (size < 16384)
351                 size = 16384;
352         if (size > 1024*1024)
353                 size = 1024*1024;
354
355         npages = round_page(size) / PAGE_SIZE;
356         object = pb->object;
357
358         /*
359          * [re]create the object if necessary and reserve space for it
360          * in the kernel_map.  The object and memory are pageable.  On
361          * success, free the old resources before assigning the new
362          * ones.
363          */
364         if (object == NULL || object->size != npages) {
365                 object = vm_object_allocate(OBJT_DEFAULT, npages);
366                 buffer = (caddr_t)vm_map_min(&kernel_map);
367
368                 error = vm_map_find(&kernel_map, object, NULL,
369                                     0, (vm_offset_t *)&buffer, size,
370                                     PAGE_SIZE, TRUE,
371                                     VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
372                                     VM_PROT_ALL, VM_PROT_ALL, 0);
373
374                 if (error != KERN_SUCCESS) {
375                         vm_object_deallocate(object);
376                         return (ENOMEM);
377                 }
378                 pipe_free_kmem(pb);
379                 pb->object = object;
380                 pb->buffer = buffer;
381                 pb->size = size;
382         }
383         pb->rindex = 0;
384         pb->windex = 0;
385
386         return (0);
387 }
388
389 /*
390  * Initialize and allocate VM and memory for pipe, pulling the pipe from
391  * our per-cpu cache if possible.
392  *
393  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
394  * must still deallocate the pipe on failure.
395  */
396 static int
397 pipe_create(struct pipe **pipep)
398 {
399         globaldata_t gd = mycpu;
400         struct pipe *pipe;
401         int error;
402
403         if ((pipe = gd->gd_pipeq) != NULL) {
404                 gd->gd_pipeq = pipe->next;
405                 --gd->gd_pipeqcount;
406                 pipe->next = NULL;
407         } else {
408                 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
409                 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
410                 lwkt_token_init(&pipe->bufferA.rlock, "piper");
411                 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
412                 lwkt_token_init(&pipe->bufferB.rlock, "piper");
413                 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
414         }
415         *pipep = pipe;
416         if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
417                 return (error);
418         }
419         if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
420                 return (error);
421         }
422         vfs_timestamp(&pipe->ctime);
423         pipe->bufferA.atime = pipe->ctime;
424         pipe->bufferA.mtime = pipe->ctime;
425         pipe->bufferB.atime = pipe->ctime;
426         pipe->bufferB.mtime = pipe->ctime;
427         pipe->open_count = 2;
428
429         return (0);
430 }
431
432 /*
433  * Read data from a pipe
434  */
435 static int
436 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
437 {
438         struct pipebuf *rpb;
439         struct pipebuf *wpb;
440         struct pipe *pipe;
441         size_t nread = 0;
442         size_t size;    /* total bytes available */
443         size_t nsize;   /* total bytes to read */
444         size_t rindex;  /* contiguous bytes available */
445         int notify_writer;
446         int bigread;
447         int bigcount;
448         int error;
449         int nbio;
450
451         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
452         if ((intptr_t)fp->f_data & 1) {
453                 rpb = &pipe->bufferB;
454                 wpb = &pipe->bufferA;
455         } else {
456                 rpb = &pipe->bufferA;
457                 wpb = &pipe->bufferB;
458         }
459         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
460
461         if (uio->uio_resid == 0)
462                 return(0);
463
464         /*
465          * Calculate nbio
466          */
467         if (fflags & O_FBLOCKING)
468                 nbio = 0;
469         else if (fflags & O_FNONBLOCKING)
470                 nbio = 1;
471         else if (fp->f_flag & O_NONBLOCK)
472                 nbio = 1;
473         else
474                 nbio = 0;
475
476         /*
477          * 'quick' NBIO test before things get expensive.
478          */
479         if (nbio && rpb->rindex == rpb->windex &&
480             (rpb->state & PIPE_REOF) == 0) {
481                 return EAGAIN;
482         }
483
484         /*
485          * Reads are serialized.  Note however that buffer.buffer and
486          * buffer.size can change out from under us when the number
487          * of bytes in the buffer are zero due to the write-side doing a
488          * pipespace().
489          */
490         lwkt_gettoken(&rpb->rlock);
491         error = pipe_start_uio(&rpb->rip);
492         if (error) {
493                 lwkt_reltoken(&rpb->rlock);
494                 return (error);
495         }
496         notify_writer = 0;
497
498         bigread = (uio->uio_resid > 10 * 1024 * 1024);
499         bigcount = 10;
500
501         while (uio->uio_resid) {
502                 /*
503                  * Don't hog the cpu.
504                  */
505                 if (bigread && --bigcount == 0) {
506                         lwkt_user_yield();
507                         bigcount = 10;
508                         if (CURSIG(curthread->td_lwp)) {
509                                 error = EINTR;
510                                 break;
511                         }
512                 }
513
514                 /*
515                  * lfence required to avoid read-reordering of buffer
516                  * contents prior to validation of size.
517                  */
518                 size = rpb->windex - rpb->rindex;
519                 cpu_lfence();
520                 if (size) {
521                         rindex = rpb->rindex & (rpb->size - 1);
522                         nsize = size;
523                         if (nsize > rpb->size - rindex)
524                                 nsize = rpb->size - rindex;
525                         nsize = szmin(nsize, uio->uio_resid);
526
527                         /*
528                          * Limit how much we move in one go so we have a
529                          * chance to kick the writer while data is still
530                          * available in the pipe.  This avoids getting into
531                          * a ping-pong with the writer.
532                          */
533                         if (nsize > (rpb->size >> 1))
534                                 nsize = rpb->size >> 1;
535
536                         error = uiomove(&rpb->buffer[rindex], nsize, uio);
537                         if (error)
538                                 break;
539                         rpb->rindex += nsize;
540                         nread += nsize;
541
542                         /*
543                          * If the FIFO is still over half full just continue
544                          * and do not try to notify the writer yet.  If
545                          * less than half full notify any waiting writer.
546                          */
547                         if (size - nsize > (rpb->size >> 1)) {
548                                 notify_writer = 0;
549                         } else {
550                                 notify_writer = 1;
551                                 pipesignal(rpb, PIPE_WANTW);
552                         }
553                         continue;
554                 }
555
556                 /*
557                  * If the "write-side" was blocked we wake it up.  This code
558                  * is reached when the buffer is completely emptied.
559                  */
560                 pipesignal(rpb, PIPE_WANTW);
561
562                 /*
563                  * Pick up our copy loop again if the writer sent data to
564                  * us while we were messing around.
565                  *
566                  * On a SMP box poll up to pipe_delay nanoseconds for new
567                  * data.  Typically a value of 2000 to 4000 is sufficient
568                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
569                  * is used for synchronous communications with small packets,
570                  * and 8000 or so (8uS) will pipeline large buffer xfers
571                  * between cpus over a pipe.
572                  *
573                  * For synchronous communications a hit means doing a
574                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
575                  * where as miss requiring a tsleep/wakeup sequence
576                  * will take 7uS or more.
577                  */
578                 if (rpb->windex != rpb->rindex)
579                         continue;
580
581 #ifdef _RDTSC_SUPPORTED_
582                 if (pipe_delay) {
583                         int64_t tsc_target;
584                         int good = 0;
585
586                         tsc_target = tsc_get_target(pipe_delay);
587                         while (tsc_test_target(tsc_target) == 0) {
588                                 cpu_lfence();
589                                 if (rpb->windex != rpb->rindex) {
590                                         good = 1;
591                                         break;
592                                 }
593                                 cpu_pause();
594                         }
595                         if (good)
596                                 continue;
597                 }
598 #endif
599
600                 /*
601                  * Detect EOF condition, do not set error.
602                  */
603                 if (rpb->state & PIPE_REOF)
604                         break;
605
606                 /*
607                  * Break if some data was read, or if this was a non-blocking
608                  * read.
609                  */
610                 if (nread > 0)
611                         break;
612
613                 if (nbio) {
614                         error = EAGAIN;
615                         break;
616                 }
617
618                 /*
619                  * Last chance, interlock with WANTR
620                  */
621                 tsleep_interlock(rpb, PCATCH);
622                 atomic_set_int(&rpb->state, PIPE_WANTR);
623
624                 /*
625                  * Retest bytes available after memory barrier above.
626                  */
627                 size = rpb->windex - rpb->rindex;
628                 if (size)
629                         continue;
630
631                 /*
632                  * Retest EOF after memory barrier above.
633                  */
634                 if (rpb->state & PIPE_REOF)
635                         break;
636
637                 /*
638                  * Wait for more data or state change
639                  */
640                 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
641                 if (error)
642                         break;
643         }
644         pipe_end_uio(&rpb->rip);
645
646         /*
647          * Uptime last access time
648          */
649         if (error == 0 && nread)
650                 vfs_timestamp(&rpb->atime);
651
652         /*
653          * If we drained the FIFO more then half way then handle
654          * write blocking hysteresis.
655          *
656          * Note that PIPE_WANTW cannot be set by the writer without
657          * it holding both rlock and wlock, so we can test it
658          * while holding just rlock.
659          */
660         if (notify_writer) {
661                 /*
662                  * Synchronous blocking is done on the pipe involved
663                  */
664                 pipesignal(rpb, PIPE_WANTW);
665
666                 /*
667                  * But we may also have to deal with a kqueue which is
668                  * stored on the same pipe as its descriptor, so a
669                  * EVFILT_WRITE event waiting for our side to drain will
670                  * be on the other side.
671                  */
672                 pipewakeup(wpb, 0);
673         }
674         /*size = rpb->windex - rpb->rindex;*/
675         lwkt_reltoken(&rpb->rlock);
676
677         return (error);
678 }
679
680 static int
681 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
682 {
683         struct pipebuf *rpb;
684         struct pipebuf *wpb;
685         struct pipe *pipe;
686         size_t windex;
687         size_t space;
688         size_t wcount;
689         size_t orig_resid;
690         int bigwrite;
691         int bigcount;
692         int error;
693         int nbio;
694
695         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
696         if ((intptr_t)fp->f_data & 1) {
697                 rpb = &pipe->bufferB;
698                 wpb = &pipe->bufferA;
699         } else {
700                 rpb = &pipe->bufferA;
701                 wpb = &pipe->bufferB;
702         }
703
704         /*
705          * Calculate nbio
706          */
707         if (fflags & O_FBLOCKING)
708                 nbio = 0;
709         else if (fflags & O_FNONBLOCKING)
710                 nbio = 1;
711         else if (fp->f_flag & O_NONBLOCK)
712                 nbio = 1;
713         else
714                 nbio = 0;
715
716         /*
717          * 'quick' NBIO test before things get expensive.
718          */
719         if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
720             uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
721                 return EAGAIN;
722         }
723
724         /*
725          * Writes go to the peer.  The peer will always exist.
726          */
727         lwkt_gettoken(&wpb->wlock);
728         if (wpb->state & PIPE_WEOF) {
729                 lwkt_reltoken(&wpb->wlock);
730                 return (EPIPE);
731         }
732
733         /*
734          * Degenerate case (EPIPE takes prec)
735          */
736         if (uio->uio_resid == 0) {
737                 lwkt_reltoken(&wpb->wlock);
738                 return(0);
739         }
740
741         /*
742          * Writes are serialized (start_uio must be called with wlock)
743          */
744         error = pipe_start_uio(&wpb->wip);
745         if (error) {
746                 lwkt_reltoken(&wpb->wlock);
747                 return (error);
748         }
749
750         orig_resid = uio->uio_resid;
751         wcount = 0;
752
753         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
754         bigcount = 10;
755
756         while (uio->uio_resid) {
757                 if (wpb->state & PIPE_WEOF) {
758                         error = EPIPE;
759                         break;
760                 }
761
762                 /*
763                  * Don't hog the cpu.
764                  */
765                 if (bigwrite && --bigcount == 0) {
766                         lwkt_user_yield();
767                         bigcount = 10;
768                         if (CURSIG(curthread->td_lwp)) {
769                                 error = EINTR;
770                                 break;
771                         }
772                 }
773
774                 windex = wpb->windex & (wpb->size - 1);
775                 space = wpb->size - (wpb->windex - wpb->rindex);
776
777                 /*
778                  * Writes of size <= PIPE_BUF must be atomic.
779                  */
780                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
781                         space = 0;
782
783                 /* 
784                  * Write to fill, read size handles write hysteresis.  Also
785                  * additional restrictions can cause select-based non-blocking
786                  * writes to spin.
787                  */
788                 if (space > 0) {
789                         size_t segsize;
790
791                         /*
792                          * We want to notify a potentially waiting reader
793                          * before we exhaust the write buffer for SMP
794                          * pipelining.  Otherwise the write/read will begin
795                          * to ping-pong.
796                          */
797                         space = szmin(space, uio->uio_resid);
798                         if (space > (wpb->size >> 1))
799                                 space = (wpb->size >> 1);
800
801                         /*
802                          * First segment to transfer is minimum of
803                          * transfer size and contiguous space in
804                          * pipe buffer.  If first segment to transfer
805                          * is less than the transfer size, we've got
806                          * a wraparound in the buffer.
807                          */
808                         segsize = wpb->size - windex;
809                         if (segsize > space)
810                                 segsize = space;
811
812                         /*
813                          * If this is the first loop and the reader is
814                          * blocked, do a preemptive wakeup of the reader.
815                          *
816                          * On SMP the IPI latency plus the wlock interlock
817                          * on the reader side is the fastest way to get the
818                          * reader going.  (The scheduler will hard loop on
819                          * lock tokens).
820                          */
821                         if (wcount == 0)
822                                 pipesignal(wpb, PIPE_WANTR);
823
824                         /*
825                          * Transfer segment, which may include a wrap-around.
826                          * Update windex to account for both all in one go
827                          * so the reader can read() the data atomically.
828                          */
829                         error = uiomove(&wpb->buffer[windex], segsize, uio);
830                         if (error == 0 && segsize < space) {
831                                 segsize = space - segsize;
832                                 error = uiomove(&wpb->buffer[0], segsize, uio);
833                         }
834                         if (error)
835                                 break;
836
837                         /*
838                          * Memory fence prior to windex updating (note: not
839                          * needed so this is a NOP on Intel).
840                          */
841                         cpu_sfence();
842                         wpb->windex += space;
843
844                         /*
845                          * Signal reader
846                          */
847                         if (wcount != 0)
848                                 pipesignal(wpb, PIPE_WANTR);
849                         wcount += space;
850                         continue;
851                 }
852
853                 /*
854                  * Wakeup any pending reader
855                  */
856                 pipesignal(wpb, PIPE_WANTR);
857
858                 /*
859                  * don't block on non-blocking I/O
860                  */
861                 if (nbio) {
862                         error = EAGAIN;
863                         break;
864                 }
865
866 #ifdef _RDTSC_SUPPORTED_
867                 if (pipe_delay) {
868                         int64_t tsc_target;
869                         int good = 0;
870
871                         tsc_target = tsc_get_target(pipe_delay);
872                         while (tsc_test_target(tsc_target) == 0) {
873                                 cpu_lfence();
874                                 space = wpb->size - (wpb->windex - wpb->rindex);
875                                 if ((space < uio->uio_resid) &&
876                                     (orig_resid <= PIPE_BUF)) {
877                                         space = 0;
878                                 }
879                                 if (space) {
880                                         good = 1;
881                                         break;
882                                 }
883                                 cpu_pause();
884                         }
885                         if (good)
886                                 continue;
887                 }
888 #endif
889
890                 /*
891                  * Interlocked test.   Atomic op enforces the memory barrier.
892                  */
893                 tsleep_interlock(wpb, PCATCH);
894                 atomic_set_int(&wpb->state, PIPE_WANTW);
895
896                 /*
897                  * Retest space available after memory barrier above.
898                  * Writes of size <= PIPE_BUF must be atomic.
899                  */
900                 space = wpb->size - (wpb->windex - wpb->rindex);
901                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
902                         space = 0;
903
904                 /*
905                  * Retest EOF after memory barrier above.
906                  */
907                 if (wpb->state & PIPE_WEOF) {
908                         error = EPIPE;
909                         break;
910                 }
911
912                 /*
913                  * We have no more space and have something to offer,
914                  * wake up select/poll/kq.
915                  */
916                 if (space == 0) {
917                         pipewakeup(wpb, 1);
918                         error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
919                 }
920
921                 /*
922                  * Break out if we errored or the read side wants us to go
923                  * away.
924                  */
925                 if (error)
926                         break;
927                 if (wpb->state & PIPE_WEOF) {
928                         error = EPIPE;
929                         break;
930                 }
931         }
932         pipe_end_uio(&wpb->wip);
933
934         /*
935          * If we have put any characters in the buffer, we wake up
936          * the reader.
937          *
938          * Both rlock and wlock are required to be able to modify pipe_state.
939          */
940         if (wpb->windex != wpb->rindex) {
941                 pipesignal(wpb, PIPE_WANTR);
942                 pipewakeup(wpb, 1);
943         }
944
945         /*
946          * Don't return EPIPE if I/O was successful
947          */
948         if ((wpb->rindex == wpb->windex) &&
949             (uio->uio_resid == 0) &&
950             (error == EPIPE)) {
951                 error = 0;
952         }
953
954         if (error == 0)
955                 vfs_timestamp(&wpb->mtime);
956
957         /*
958          * We have something to offer,
959          * wake up select/poll/kq.
960          */
961         /*space = wpb->windex - wpb->rindex;*/
962         lwkt_reltoken(&wpb->wlock);
963
964         return (error);
965 }
966
967 /*
968  * we implement a very minimal set of ioctls for compatibility with sockets.
969  */
970 static int
971 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
972            struct ucred *cred, struct sysmsg *msg)
973 {
974         struct pipebuf *rpb;
975         struct pipe *pipe;
976         int error;
977
978         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
979         if ((intptr_t)fp->f_data & 1) {
980                 rpb = &pipe->bufferB;
981         } else {
982                 rpb = &pipe->bufferA;
983         }
984
985         lwkt_gettoken(&rpb->rlock);
986         lwkt_gettoken(&rpb->wlock);
987
988         switch (cmd) {
989         case FIOASYNC:
990                 if (*(int *)data) {
991                         atomic_set_int(&rpb->state, PIPE_ASYNC);
992                 } else {
993                         atomic_clear_int(&rpb->state, PIPE_ASYNC);
994                 }
995                 error = 0;
996                 break;
997         case FIONREAD:
998                 *(int *)data = (int)(rpb->windex - rpb->rindex);
999                 error = 0;
1000                 break;
1001         case FIOSETOWN:
1002                 error = fsetown(*(int *)data, &rpb->sigio);
1003                 break;
1004         case FIOGETOWN:
1005                 *(int *)data = fgetown(&rpb->sigio);
1006                 error = 0;
1007                 break;
1008         case TIOCSPGRP:
1009                 /* This is deprecated, FIOSETOWN should be used instead. */
1010                 error = fsetown(-(*(int *)data), &rpb->sigio);
1011                 break;
1012
1013         case TIOCGPGRP:
1014                 /* This is deprecated, FIOGETOWN should be used instead. */
1015                 *(int *)data = -fgetown(&rpb->sigio);
1016                 error = 0;
1017                 break;
1018         default:
1019                 error = ENOTTY;
1020                 break;
1021         }
1022         lwkt_reltoken(&rpb->wlock);
1023         lwkt_reltoken(&rpb->rlock);
1024
1025         return (error);
1026 }
1027
1028 /*
1029  * MPSAFE
1030  */
1031 static int
1032 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1033 {
1034         struct pipebuf *rpb;
1035         struct pipe *pipe;
1036
1037         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1038         if ((intptr_t)fp->f_data & 1) {
1039                 rpb = &pipe->bufferB;
1040         } else {
1041                 rpb = &pipe->bufferA;
1042         }
1043
1044         bzero((caddr_t)ub, sizeof(*ub));
1045         ub->st_mode = S_IFIFO;
1046         ub->st_blksize = rpb->size;
1047         ub->st_size = rpb->windex - rpb->rindex;
1048         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1049         ub->st_atimespec = rpb->atime;
1050         ub->st_mtimespec = rpb->mtime;
1051         ub->st_ctimespec = pipe->ctime;
1052         ub->st_uid = fp->f_cred->cr_uid;
1053         ub->st_gid = fp->f_cred->cr_gid;
1054         ub->st_ino = pipe->inum;
1055         /*
1056          * Left as 0: st_dev, st_nlink, st_rdev,
1057          * st_flags, st_gen.
1058          * XXX (st_dev, st_ino) should be unique.
1059          */
1060
1061         return (0);
1062 }
1063
1064 static int
1065 pipe_close(struct file *fp)
1066 {
1067         struct pipebuf *rpb;
1068         struct pipebuf *wpb;
1069         struct pipe *pipe;
1070
1071         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1072         if ((intptr_t)fp->f_data & 1) {
1073                 rpb = &pipe->bufferB;
1074                 wpb = &pipe->bufferA;
1075         } else {
1076                 rpb = &pipe->bufferA;
1077                 wpb = &pipe->bufferB;
1078         }
1079
1080         fp->f_ops = &badfileops;
1081         fp->f_data = NULL;
1082         funsetown(&rpb->sigio);
1083         pipeclose(pipe, rpb, wpb);
1084
1085         return (0);
1086 }
1087
1088 /*
1089  * Shutdown one or both directions of a full-duplex pipe.
1090  */
1091 static int
1092 pipe_shutdown(struct file *fp, int how)
1093 {
1094         struct pipebuf *rpb;
1095         struct pipebuf *wpb;
1096         struct pipe *pipe;
1097         int error = EPIPE;
1098
1099         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1100         if ((intptr_t)fp->f_data & 1) {
1101                 rpb = &pipe->bufferB;
1102                 wpb = &pipe->bufferA;
1103         } else {
1104                 rpb = &pipe->bufferA;
1105                 wpb = &pipe->bufferB;
1106         }
1107
1108         /*
1109          * We modify pipe_state on both pipes, which means we need
1110          * all four tokens!
1111          */
1112         lwkt_gettoken(&rpb->rlock);
1113         lwkt_gettoken(&rpb->wlock);
1114         lwkt_gettoken(&wpb->rlock);
1115         lwkt_gettoken(&wpb->wlock);
1116
1117         switch(how) {
1118         case SHUT_RDWR:
1119         case SHUT_RD:
1120                 /*
1121                  * EOF on my reads and peer writes
1122                  */
1123                 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1124                 if (rpb->state & PIPE_WANTR) {
1125                         rpb->state &= ~PIPE_WANTR;
1126                         wakeup(rpb);
1127                 }
1128                 if (rpb->state & PIPE_WANTW) {
1129                         rpb->state &= ~PIPE_WANTW;
1130                         wakeup(rpb);
1131                 }
1132                 error = 0;
1133                 if (how == SHUT_RD)
1134                         break;
1135                 /* fall through */
1136         case SHUT_WR:
1137                 /*
1138                  * EOF on peer reads and my writes
1139                  */
1140                 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1141                 if (wpb->state & PIPE_WANTR) {
1142                         wpb->state &= ~PIPE_WANTR;
1143                         wakeup(wpb);
1144                 }
1145                 if (wpb->state & PIPE_WANTW) {
1146                         wpb->state &= ~PIPE_WANTW;
1147                         wakeup(wpb);
1148                 }
1149                 error = 0;
1150                 break;
1151         }
1152         pipewakeup(rpb, 1);
1153         pipewakeup(wpb, 1);
1154
1155         lwkt_reltoken(&wpb->wlock);
1156         lwkt_reltoken(&wpb->rlock);
1157         lwkt_reltoken(&rpb->wlock);
1158         lwkt_reltoken(&rpb->rlock);
1159
1160         return (error);
1161 }
1162
1163 /*
1164  * Destroy the pipe buffer.
1165  */
1166 static void
1167 pipe_free_kmem(struct pipebuf *pb)
1168 {
1169         if (pb->buffer != NULL) {
1170                 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1171                 pb->buffer = NULL;
1172                 pb->object = NULL;
1173         }
1174 }
1175
1176 /*
1177  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1178  * and writing on wpb.  This routine must be called twice with the pipebufs
1179  * reversed to close both directions.
1180  */
1181 static void
1182 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1183 {
1184         globaldata_t gd;
1185
1186         if (pipe == NULL)
1187                 return;
1188
1189         /*
1190          * We need both the read and write tokens to modify pipe_state.
1191          */
1192         lwkt_gettoken(&rpb->rlock);
1193         lwkt_gettoken(&rpb->wlock);
1194
1195         /*
1196          * Set our state, wakeup anyone waiting in select/poll/kq, and
1197          * wakeup anyone blocked on our pipe.  No action if our side
1198          * is already closed.
1199          */
1200         if (rpb->state & PIPE_CLOSED) {
1201                 lwkt_reltoken(&rpb->wlock);
1202                 lwkt_reltoken(&rpb->rlock);
1203                 return;
1204         }
1205
1206         atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1207         pipewakeup(rpb, 1);
1208         if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1209                 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1210                 wakeup(rpb);
1211         }
1212         lwkt_reltoken(&rpb->wlock);
1213         lwkt_reltoken(&rpb->rlock);
1214
1215         /*
1216          * Disconnect from peer.
1217          */
1218         lwkt_gettoken(&wpb->rlock);
1219         lwkt_gettoken(&wpb->wlock);
1220
1221         atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1222         pipewakeup(wpb, 1);
1223         if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224                 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225                 wakeup(wpb);
1226         }
1227         if (SLIST_FIRST(&wpb->kq.ki_note))
1228                 KNOTE(&wpb->kq.ki_note, 0);
1229         lwkt_reltoken(&wpb->wlock);
1230         lwkt_reltoken(&wpb->rlock);
1231
1232         /*
1233          * Free resources once both sides are closed.  We maintain a pcpu
1234          * cache to improve performance, so the actual tear-down case is
1235          * limited to bulk situations.
1236          *
1237          * However, the bulk tear-down case can cause intense contention
1238          * on the kernel_map when, e.g. hundreds to hundreds of thousands
1239          * of processes are killed at the same time.  To deal with this we
1240          * use a pcpu mutex to maintain concurrency but also limit the
1241          * number of threads banging on the map and pmap.
1242          *
1243          * We use the mtx mechanism instead of the lockmgr mechanism because
1244          * the mtx mechanism utilizes a queued design which will not break
1245          * down in the face of thousands to hundreds of thousands of
1246          * processes trying to free pipes simultaneously.  The lockmgr
1247          * mechanism will wind up waking them all up each time a lock
1248          * cycles.
1249          */
1250         if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1251                 gd = mycpu;
1252                 if (gd->gd_pipeqcount >= pipe_maxcache) {
1253                         mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1254                         pipe_free_kmem(rpb);
1255                         pipe_free_kmem(wpb);
1256                         mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1257                         kfree(pipe, M_PIPE);
1258                 } else {
1259                         rpb->state = 0;
1260                         wpb->state = 0;
1261                         pipe->next = gd->gd_pipeq;
1262                         gd->gd_pipeq = pipe;
1263                         ++gd->gd_pipeqcount;
1264                 }
1265         }
1266 }
1267
1268 static int
1269 pipe_kqfilter(struct file *fp, struct knote *kn)
1270 {
1271         struct pipebuf *rpb;
1272         struct pipebuf *wpb;
1273         struct pipe *pipe;
1274
1275         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1276         if ((intptr_t)fp->f_data & 1) {
1277                 rpb = &pipe->bufferB;
1278                 wpb = &pipe->bufferA;
1279         } else {
1280                 rpb = &pipe->bufferA;
1281                 wpb = &pipe->bufferB;
1282         }
1283
1284         switch (kn->kn_filter) {
1285         case EVFILT_READ:
1286                 kn->kn_fop = &pipe_rfiltops;
1287                 break;
1288         case EVFILT_WRITE:
1289                 kn->kn_fop = &pipe_wfiltops;
1290                 if (wpb->state & PIPE_CLOSED) {
1291                         /* other end of pipe has been closed */
1292                         return (EPIPE);
1293                 }
1294                 break;
1295         default:
1296                 return (EOPNOTSUPP);
1297         }
1298
1299         if (rpb == &pipe->bufferA)
1300                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1301         else
1302                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1303
1304         knote_insert(&rpb->kq.ki_note, kn);
1305
1306         return (0);
1307 }
1308
1309 static void
1310 filt_pipedetach(struct knote *kn)
1311 {
1312         struct pipebuf *rpb;
1313         struct pipebuf *wpb;
1314         struct pipe *pipe;
1315
1316         pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1317         if ((intptr_t)kn->kn_hook & 1) {
1318                 rpb = &pipe->bufferB;
1319                 wpb = &pipe->bufferA;
1320         } else {
1321                 rpb = &pipe->bufferA;
1322                 wpb = &pipe->bufferB;
1323         }
1324         knote_remove(&rpb->kq.ki_note, kn);
1325 }
1326
1327 /*ARGSUSED*/
1328 static int
1329 filt_piperead(struct knote *kn, long hint)
1330 {
1331         struct pipebuf *rpb;
1332         struct pipebuf *wpb;
1333         struct pipe *pipe;
1334         int ready = 0;
1335
1336         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1337         if ((intptr_t)kn->kn_fp->f_data & 1) {
1338                 rpb = &pipe->bufferB;
1339                 wpb = &pipe->bufferA;
1340         } else {
1341                 rpb = &pipe->bufferA;
1342                 wpb = &pipe->bufferB;
1343         }
1344
1345         lwkt_gettoken(&rpb->rlock);
1346         lwkt_gettoken(&rpb->wlock);
1347
1348         kn->kn_data = rpb->windex - rpb->rindex;
1349
1350         if (rpb->state & PIPE_REOF) {
1351                 /*
1352                  * Only set NODATA if all data has been exhausted
1353                  */
1354                 if (kn->kn_data == 0)
1355                         kn->kn_flags |= EV_NODATA;
1356                 kn->kn_flags |= EV_EOF; 
1357                 ready = 1;
1358         }
1359
1360         lwkt_reltoken(&rpb->wlock);
1361         lwkt_reltoken(&rpb->rlock);
1362
1363         if (!ready)
1364                 ready = kn->kn_data > 0;
1365
1366         return (ready);
1367 }
1368
1369 /*ARGSUSED*/
1370 static int
1371 filt_pipewrite(struct knote *kn, long hint)
1372 {
1373         struct pipebuf *rpb;
1374         struct pipebuf *wpb;
1375         struct pipe *pipe;
1376         int ready = 0;
1377
1378         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1379         if ((intptr_t)kn->kn_fp->f_data & 1) {
1380                 rpb = &pipe->bufferB;
1381                 wpb = &pipe->bufferA;
1382         } else {
1383                 rpb = &pipe->bufferA;
1384                 wpb = &pipe->bufferB;
1385         }
1386
1387         kn->kn_data = 0;
1388         if (wpb->state & PIPE_CLOSED) {
1389                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1390                 return (1);
1391         }
1392
1393         lwkt_gettoken(&wpb->rlock);
1394         lwkt_gettoken(&wpb->wlock);
1395
1396         if (wpb->state & PIPE_WEOF) {
1397                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1398                 ready = 1;
1399         }
1400
1401         if (!ready)
1402                 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1403
1404         lwkt_reltoken(&wpb->wlock);
1405         lwkt_reltoken(&wpb->rlock);
1406
1407         if (!ready)
1408                 ready = kn->kn_data >= PIPE_BUF;
1409
1410         return (ready);
1411 }