kernel - Make the wdog.h and gpio.h includes conditional in kern_shutdown.c.
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  * Copyright (c) 2003-2017 The DragonFly Project.  All rights reserved.
5  *
6  * This code is derived from software contributed to The DragonFly Project
7  * by Matthew Dillon <dillon@backplane.com>
8  *
9  * Redistribution and use in source and binary forms, with or without
10  * modification, are permitted provided that the following conditions
11  * are met:
12  * 1. Redistributions of source code must retain the above copyright
13  *    notice immediately at the beginning of the file, without modification,
14  *    this list of conditions, and the following disclaimer.
15  * 2. Redistributions in binary form must reproduce the above copyright
16  *    notice, this list of conditions and the following disclaimer in the
17  *    documentation and/or other materials provided with the distribution.
18  * 3. Absolutely no warranty of function or purpose is made by the author
19  *    John S. Dyson.
20  * 4. Modifications may be freely made to this file if the above conditions
21  *    are met.
22  */
23
24 /*
25  * This file contains a high-performance replacement for the socket-based
26  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
27  * all features of sockets, but does do everything that pipes normally
28  * do.
29  */
30 #include <sys/param.h>
31 #include <sys/systm.h>
32 #include <sys/kernel.h>
33 #include <sys/proc.h>
34 #include <sys/fcntl.h>
35 #include <sys/file.h>
36 #include <sys/filedesc.h>
37 #include <sys/filio.h>
38 #include <sys/ttycom.h>
39 #include <sys/stat.h>
40 #include <sys/signalvar.h>
41 #include <sys/sysproto.h>
42 #include <sys/pipe.h>
43 #include <sys/vnode.h>
44 #include <sys/uio.h>
45 #include <sys/event.h>
46 #include <sys/globaldata.h>
47 #include <sys/module.h>
48 #include <sys/malloc.h>
49 #include <sys/sysctl.h>
50 #include <sys/socket.h>
51 #include <sys/kern_syscall.h>
52 #include <sys/lock.h>
53 #include <sys/mutex.h>
54
55 #include <vm/vm.h>
56 #include <vm/vm_param.h>
57 #include <vm/vm_object.h>
58 #include <vm/vm_kern.h>
59 #include <vm/vm_extern.h>
60 #include <vm/pmap.h>
61 #include <vm/vm_map.h>
62 #include <vm/vm_page.h>
63 #include <vm/vm_zone.h>
64
65 #include <sys/file2.h>
66 #include <sys/signal2.h>
67 #include <sys/mutex2.h>
68
69 #include <machine/cpufunc.h>
70
71 struct pipegdlock {
72         struct mtx      mtx;
73 } __cachealign;
74
75 /*
76  * interfaces to the outside world
77  */
78 static int pipe_read (struct file *fp, struct uio *uio, 
79                 struct ucred *cred, int flags);
80 static int pipe_write (struct file *fp, struct uio *uio, 
81                 struct ucred *cred, int flags);
82 static int pipe_close (struct file *fp);
83 static int pipe_shutdown (struct file *fp, int how);
84 static int pipe_kqfilter (struct file *fp, struct knote *kn);
85 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87                 struct ucred *cred, struct sysmsg *msg);
88
89 static struct fileops pipeops = {
90         .fo_read = pipe_read, 
91         .fo_write = pipe_write,
92         .fo_ioctl = pipe_ioctl,
93         .fo_kqfilter = pipe_kqfilter,
94         .fo_stat = pipe_stat,
95         .fo_close = pipe_close,
96         .fo_shutdown = pipe_shutdown
97 };
98
99 static void     filt_pipedetach(struct knote *kn);
100 static int      filt_piperead(struct knote *kn, long hint);
101 static int      filt_pipewrite(struct knote *kn, long hint);
102
103 static struct filterops pipe_rfiltops =
104         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105 static struct filterops pipe_wfiltops =
106         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107
108 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109
110 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
111
112 static int pipe_maxcache = PIPEQ_MAX_CACHE;
113 static struct pipegdlock *pipe_gdlocks;
114
115 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118
119 /*
120  * The pipe buffer size can be changed at any time.  Only new pipe()s
121  * are affected.  Note that due to cpu cache effects, you do not want
122  * to make this value too large.
123  */
124 static int pipe_size = 32768;
125 SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126         CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127
128 /*
129  * Reader/writer delay loop.  When the reader exhausts the pipe buffer
130  * or the write completely fills the pipe buffer and would otherwise sleep,
131  * it first busy-loops for a few microseconds waiting for data or buffer
132  * space.  This eliminates IPIs for most high-bandwidth writer/reader pipes
133  * and also helps when the user program uses a large data buffer in its
134  * UIOs.
135  *
136  * This defaults to 4uS.
137  */
138 #ifdef _RDTSC_SUPPORTED_
139 static int pipe_delay = 4000;   /* 4uS default */
140 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142 #endif
143
144 /*
145  * Auto-size pipe cache to reduce kmem allocations and frees.
146  */
147 static
148 void
149 pipeinit(void *dummy)
150 {
151         size_t mbytes = kmem_lim_size();
152         int n;
153
154         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155                 if (mbytes >= 7 * 1024)
156                         pipe_maxcache *= 2;
157                 if (mbytes >= 15 * 1024)
158                         pipe_maxcache *= 2;
159         }
160         pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
161                              M_PIPE, M_WAITOK | M_ZERO);
162         for (n = 0; n < ncpus; ++n)
163                 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
164 }
165 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
166
167 static void pipeclose (struct pipe *pipe,
168                 struct pipebuf *pbr, struct pipebuf *pbw);
169 static void pipe_free_kmem (struct pipebuf *buf);
170 static int pipe_create (struct pipe **pipep);
171
172 /*
173  * Test and clear the specified flag, wakeup(pb) if it was set.
174  * This function must also act as a memory barrier.
175  */
176 static __inline void
177 pipesignal(struct pipebuf *pb, uint32_t flags)
178 {
179         uint32_t oflags;
180         uint32_t nflags;
181
182         for (;;) {
183                 oflags = pb->state;
184                 cpu_ccfence();
185                 nflags = oflags & ~flags;
186                 if (atomic_cmpset_int(&pb->state, oflags, nflags))
187                         break;
188         }
189         if (oflags & flags)
190                 wakeup(pb);
191 }
192
193 /*
194  *
195  */
196 static __inline void
197 pipewakeup(struct pipebuf *pb, int dosigio)
198 {
199         if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
200                 lwkt_gettoken(&sigio_token);
201                 pgsigio(pb->sigio, SIGIO, 0);
202                 lwkt_reltoken(&sigio_token);
203         }
204         KNOTE(&pb->kq.ki_note, 0);
205 }
206
207 /*
208  * These routines are called before and after a UIO.  The UIO
209  * may block, causing our held tokens to be lost temporarily.
210  *
211  * We use these routines to serialize reads against other reads
212  * and writes against other writes.
213  *
214  * The appropriate token is held on entry so *ipp does not race.
215  */
216 static __inline int
217 pipe_start_uio(int *ipp)
218 {
219         int error;
220
221         while (*ipp) {
222                 *ipp = -1;
223                 error = tsleep(ipp, PCATCH, "pipexx", 0);
224                 if (error)
225                         return (error);
226         }
227         *ipp = 1;
228         return (0);
229 }
230
231 static __inline void
232 pipe_end_uio(int *ipp)
233 {
234         if (*ipp < 0) {
235                 *ipp = 0;
236                 wakeup(ipp);
237         } else {
238                 KKASSERT(*ipp > 0);
239                 *ipp = 0;
240         }
241 }
242
243 /*
244  * The pipe system call for the DTYPE_PIPE type of pipes
245  *
246  * pipe_args(int dummy)
247  *
248  * MPSAFE
249  */
250 int
251 sys_pipe(struct pipe_args *uap)
252 {
253         return kern_pipe(uap->sysmsg_fds, 0);
254 }
255
256 int
257 sys_pipe2(struct pipe2_args *uap)
258 {
259         return kern_pipe(uap->sysmsg_fds, uap->flags);
260 }
261
262 int
263 kern_pipe(long *fds, int flags)
264 {
265         struct thread *td = curthread;
266         struct filedesc *fdp = td->td_proc->p_fd;
267         struct file *rf, *wf;
268         struct pipe *pipe;
269         int fd1, fd2, error;
270
271         pipe = NULL;
272         if (pipe_create(&pipe)) {
273                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
274                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
275                 return (ENFILE);
276         }
277         
278         error = falloc(td->td_lwp, &rf, &fd1);
279         if (error) {
280                 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
281                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
282                 return (error);
283         }
284         fds[0] = fd1;
285
286         /*
287          * Warning: once we've gotten past allocation of the fd for the
288          * read-side, we can only drop the read side via fdrop() in order
289          * to avoid races against processes which manage to dup() the read
290          * side while we are blocked trying to allocate the write side.
291          */
292         rf->f_type = DTYPE_PIPE;
293         rf->f_flag = FREAD | FWRITE;
294         rf->f_ops = &pipeops;
295         rf->f_data = (void *)((intptr_t)pipe | 0);
296         if (flags & O_NONBLOCK)
297                 rf->f_flag |= O_NONBLOCK;
298         if (flags & O_CLOEXEC)
299                 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
300
301         error = falloc(td->td_lwp, &wf, &fd2);
302         if (error) {
303                 fsetfd(fdp, NULL, fd1);
304                 fdrop(rf);
305                 /* pipeA has been closed by fdrop() */
306                 /* close pipeB here */
307                 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
308                 return (error);
309         }
310         wf->f_type = DTYPE_PIPE;
311         wf->f_flag = FREAD | FWRITE;
312         wf->f_ops = &pipeops;
313         wf->f_data = (void *)((intptr_t)pipe | 1);
314         if (flags & O_NONBLOCK)
315                 wf->f_flag |= O_NONBLOCK;
316         if (flags & O_CLOEXEC)
317                 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
318
319         fds[1] = fd2;
320
321         /*
322          * Once activated the peer relationship remains valid until
323          * both sides are closed.
324          */
325         fsetfd(fdp, rf, fd1);
326         fsetfd(fdp, wf, fd2);
327         fdrop(rf);
328         fdrop(wf);
329
330         return (0);
331 }
332
333 /*
334  * [re]allocates KVA for the pipe's circular buffer.  The space is
335  * pageable.  Called twice to setup full-duplex communications.
336  *
337  * NOTE: Independent vm_object's are used to improve performance.
338  *
339  * Returns 0 on success, ENOMEM on failure.
340  */
341 static int
342 pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
343 {
344         struct vm_object *object;
345         caddr_t buffer;
346         vm_pindex_t npages;
347         int error;
348
349         size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
350         if (size < 16384)
351                 size = 16384;
352         if (size > 1024*1024)
353                 size = 1024*1024;
354
355         npages = round_page(size) / PAGE_SIZE;
356         object = pb->object;
357
358         /*
359          * [re]create the object if necessary and reserve space for it
360          * in the kernel_map.  The object and memory are pageable.  On
361          * success, free the old resources before assigning the new
362          * ones.
363          */
364         if (object == NULL || object->size != npages) {
365                 object = vm_object_allocate(OBJT_DEFAULT, npages);
366                 buffer = (caddr_t)vm_map_min(&kernel_map);
367
368                 error = vm_map_find(&kernel_map, object, NULL,
369                                     0, (vm_offset_t *)&buffer, size,
370                                     PAGE_SIZE, TRUE,
371                                     VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
372                                     VM_PROT_ALL, VM_PROT_ALL, 0);
373
374                 if (error != KERN_SUCCESS) {
375                         vm_object_deallocate(object);
376                         return (ENOMEM);
377                 }
378                 pipe_free_kmem(pb);
379                 pb->object = object;
380                 pb->buffer = buffer;
381                 pb->size = size;
382         }
383         pb->rindex = 0;
384         pb->windex = 0;
385
386         return (0);
387 }
388
389 /*
390  * Initialize and allocate VM and memory for pipe, pulling the pipe from
391  * our per-cpu cache if possible.
392  *
393  * Returns 0 on success, else an error code (typically ENOMEM).  Caller
394  * must still deallocate the pipe on failure.
395  */
396 static int
397 pipe_create(struct pipe **pipep)
398 {
399         globaldata_t gd = mycpu;
400         struct pipe *pipe;
401         int error;
402
403         if ((pipe = gd->gd_pipeq) != NULL) {
404                 gd->gd_pipeq = pipe->next;
405                 --gd->gd_pipeqcount;
406                 pipe->next = NULL;
407         } else {
408                 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
409                 lwkt_token_init(&pipe->bufferA.rlock, "piper");
410                 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
411                 lwkt_token_init(&pipe->bufferB.rlock, "piper");
412                 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
413         }
414         *pipep = pipe;
415         if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
416                 return (error);
417         }
418         if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
419                 return (error);
420         }
421         vfs_timestamp(&pipe->ctime);
422         pipe->bufferA.atime = pipe->ctime;
423         pipe->bufferA.mtime = pipe->ctime;
424         pipe->bufferB.atime = pipe->ctime;
425         pipe->bufferB.mtime = pipe->ctime;
426         pipe->open_count = 2;
427
428         return (0);
429 }
430
431 /*
432  * Read data from a pipe
433  */
434 static int
435 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
436 {
437         struct pipebuf *rpb;
438         struct pipebuf *wpb;
439         struct pipe *pipe;
440         size_t nread = 0;
441         size_t size;    /* total bytes available */
442         size_t nsize;   /* total bytes to read */
443         size_t rindex;  /* contiguous bytes available */
444         int notify_writer;
445         int bigread;
446         int bigcount;
447         int error;
448         int nbio;
449
450         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
451         if ((intptr_t)fp->f_data & 1) {
452                 rpb = &pipe->bufferB;
453                 wpb = &pipe->bufferA;
454         } else {
455                 rpb = &pipe->bufferA;
456                 wpb = &pipe->bufferB;
457         }
458         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
459
460         if (uio->uio_resid == 0)
461                 return(0);
462
463         /*
464          * Calculate nbio
465          */
466         if (fflags & O_FBLOCKING)
467                 nbio = 0;
468         else if (fflags & O_FNONBLOCKING)
469                 nbio = 1;
470         else if (fp->f_flag & O_NONBLOCK)
471                 nbio = 1;
472         else
473                 nbio = 0;
474
475         /*
476          * 'quick' NBIO test before things get expensive.
477          */
478         if (nbio && rpb->rindex == rpb->windex &&
479             (rpb->state & PIPE_REOF) == 0) {
480                 return EAGAIN;
481         }
482
483         /*
484          * Reads are serialized.  Note however that buffer.buffer and
485          * buffer.size can change out from under us when the number
486          * of bytes in the buffer are zero due to the write-side doing a
487          * pipespace().
488          */
489         lwkt_gettoken(&rpb->rlock);
490         error = pipe_start_uio(&rpb->rip);
491         if (error) {
492                 lwkt_reltoken(&rpb->rlock);
493                 return (error);
494         }
495         notify_writer = 0;
496
497         bigread = (uio->uio_resid > 10 * 1024 * 1024);
498         bigcount = 10;
499
500         while (uio->uio_resid) {
501                 /*
502                  * Don't hog the cpu.
503                  */
504                 if (bigread && --bigcount == 0) {
505                         lwkt_user_yield();
506                         bigcount = 10;
507                         if (CURSIG(curthread->td_lwp)) {
508                                 error = EINTR;
509                                 break;
510                         }
511                 }
512
513                 /*
514                  * lfence required to avoid read-reordering of buffer
515                  * contents prior to validation of size.
516                  */
517                 size = rpb->windex - rpb->rindex;
518                 cpu_lfence();
519                 if (size) {
520                         rindex = rpb->rindex & (rpb->size - 1);
521                         nsize = size;
522                         if (nsize > rpb->size - rindex)
523                                 nsize = rpb->size - rindex;
524                         nsize = szmin(nsize, uio->uio_resid);
525
526                         /*
527                          * Limit how much we move in one go so we have a
528                          * chance to kick the writer while data is still
529                          * available in the pipe.  This avoids getting into
530                          * a ping-pong with the writer.
531                          */
532                         if (nsize > (rpb->size >> 1))
533                                 nsize = rpb->size >> 1;
534
535                         error = uiomove(&rpb->buffer[rindex], nsize, uio);
536                         if (error)
537                                 break;
538                         rpb->rindex += nsize;
539                         nread += nsize;
540
541                         /*
542                          * If the FIFO is still over half full just continue
543                          * and do not try to notify the writer yet.  If
544                          * less than half full notify any waiting writer.
545                          */
546                         if (size - nsize > (rpb->size >> 1)) {
547                                 notify_writer = 0;
548                         } else {
549                                 notify_writer = 1;
550                                 pipesignal(rpb, PIPE_WANTW);
551                         }
552                         continue;
553                 }
554
555                 /*
556                  * If the "write-side" was blocked we wake it up.  This code
557                  * is reached when the buffer is completely emptied.
558                  */
559                 pipesignal(rpb, PIPE_WANTW);
560
561                 /*
562                  * Pick up our copy loop again if the writer sent data to
563                  * us while we were messing around.
564                  *
565                  * On a SMP box poll up to pipe_delay nanoseconds for new
566                  * data.  Typically a value of 2000 to 4000 is sufficient
567                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
568                  * is used for synchronous communications with small packets,
569                  * and 8000 or so (8uS) will pipeline large buffer xfers
570                  * between cpus over a pipe.
571                  *
572                  * For synchronous communications a hit means doing a
573                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
574                  * where as miss requiring a tsleep/wakeup sequence
575                  * will take 7uS or more.
576                  */
577                 if (rpb->windex != rpb->rindex)
578                         continue;
579
580 #ifdef _RDTSC_SUPPORTED_
581                 if (pipe_delay) {
582                         int64_t tsc_target;
583                         int good = 0;
584
585                         tsc_target = tsc_get_target(pipe_delay);
586                         while (tsc_test_target(tsc_target) == 0) {
587                                 cpu_lfence();
588                                 if (rpb->windex != rpb->rindex) {
589                                         good = 1;
590                                         break;
591                                 }
592                                 cpu_pause();
593                         }
594                         if (good)
595                                 continue;
596                 }
597 #endif
598
599                 /*
600                  * Detect EOF condition, do not set error.
601                  */
602                 if (rpb->state & PIPE_REOF)
603                         break;
604
605                 /*
606                  * Break if some data was read, or if this was a non-blocking
607                  * read.
608                  */
609                 if (nread > 0)
610                         break;
611
612                 if (nbio) {
613                         error = EAGAIN;
614                         break;
615                 }
616
617                 /*
618                  * Last chance, interlock with WANTR
619                  */
620                 tsleep_interlock(rpb, PCATCH);
621                 atomic_set_int(&rpb->state, PIPE_WANTR);
622
623                 /*
624                  * Retest bytes available after memory barrier above.
625                  */
626                 size = rpb->windex - rpb->rindex;
627                 if (size)
628                         continue;
629
630                 /*
631                  * Retest EOF after memory barrier above.
632                  */
633                 if (rpb->state & PIPE_REOF)
634                         break;
635
636                 /*
637                  * Wait for more data or state change
638                  */
639                 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
640                 if (error)
641                         break;
642         }
643         pipe_end_uio(&rpb->rip);
644
645         /*
646          * Uptime last access time
647          */
648         if (error == 0 && nread)
649                 vfs_timestamp(&rpb->atime);
650
651         /*
652          * If we drained the FIFO more then half way then handle
653          * write blocking hysteresis.
654          *
655          * Note that PIPE_WANTW cannot be set by the writer without
656          * it holding both rlock and wlock, so we can test it
657          * while holding just rlock.
658          */
659         if (notify_writer) {
660                 /*
661                  * Synchronous blocking is done on the pipe involved
662                  */
663                 pipesignal(rpb, PIPE_WANTW);
664
665                 /*
666                  * But we may also have to deal with a kqueue which is
667                  * stored on the same pipe as its descriptor, so a
668                  * EVFILT_WRITE event waiting for our side to drain will
669                  * be on the other side.
670                  */
671                 pipewakeup(wpb, 0);
672         }
673         /*size = rpb->windex - rpb->rindex;*/
674         lwkt_reltoken(&rpb->rlock);
675
676         return (error);
677 }
678
679 static int
680 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
681 {
682         struct pipebuf *rpb;
683         struct pipebuf *wpb;
684         struct pipe *pipe;
685         size_t windex;
686         size_t space;
687         size_t wcount;
688         size_t orig_resid;
689         int bigwrite;
690         int bigcount;
691         int error;
692         int nbio;
693
694         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
695         if ((intptr_t)fp->f_data & 1) {
696                 rpb = &pipe->bufferB;
697                 wpb = &pipe->bufferA;
698         } else {
699                 rpb = &pipe->bufferA;
700                 wpb = &pipe->bufferB;
701         }
702
703         /*
704          * Calculate nbio
705          */
706         if (fflags & O_FBLOCKING)
707                 nbio = 0;
708         else if (fflags & O_FNONBLOCKING)
709                 nbio = 1;
710         else if (fp->f_flag & O_NONBLOCK)
711                 nbio = 1;
712         else
713                 nbio = 0;
714
715         /*
716          * 'quick' NBIO test before things get expensive.
717          */
718         if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
719             uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
720                 return EAGAIN;
721         }
722
723         /*
724          * Writes go to the peer.  The peer will always exist.
725          */
726         lwkt_gettoken(&wpb->wlock);
727         if (wpb->state & PIPE_WEOF) {
728                 lwkt_reltoken(&wpb->wlock);
729                 return (EPIPE);
730         }
731
732         /*
733          * Degenerate case (EPIPE takes prec)
734          */
735         if (uio->uio_resid == 0) {
736                 lwkt_reltoken(&wpb->wlock);
737                 return(0);
738         }
739
740         /*
741          * Writes are serialized (start_uio must be called with wlock)
742          */
743         error = pipe_start_uio(&wpb->wip);
744         if (error) {
745                 lwkt_reltoken(&wpb->wlock);
746                 return (error);
747         }
748
749         orig_resid = uio->uio_resid;
750         wcount = 0;
751
752         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
753         bigcount = 10;
754
755         while (uio->uio_resid) {
756                 if (wpb->state & PIPE_WEOF) {
757                         error = EPIPE;
758                         break;
759                 }
760
761                 /*
762                  * Don't hog the cpu.
763                  */
764                 if (bigwrite && --bigcount == 0) {
765                         lwkt_user_yield();
766                         bigcount = 10;
767                         if (CURSIG(curthread->td_lwp)) {
768                                 error = EINTR;
769                                 break;
770                         }
771                 }
772
773                 windex = wpb->windex & (wpb->size - 1);
774                 space = wpb->size - (wpb->windex - wpb->rindex);
775
776                 /*
777                  * Writes of size <= PIPE_BUF must be atomic.
778                  */
779                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
780                         space = 0;
781
782                 /* 
783                  * Write to fill, read size handles write hysteresis.  Also
784                  * additional restrictions can cause select-based non-blocking
785                  * writes to spin.
786                  */
787                 if (space > 0) {
788                         size_t segsize;
789
790                         /*
791                          * We want to notify a potentially waiting reader
792                          * before we exhaust the write buffer for SMP
793                          * pipelining.  Otherwise the write/read will begin
794                          * to ping-pong.
795                          */
796                         space = szmin(space, uio->uio_resid);
797                         if (space > (wpb->size >> 1))
798                                 space = (wpb->size >> 1);
799
800                         /*
801                          * First segment to transfer is minimum of
802                          * transfer size and contiguous space in
803                          * pipe buffer.  If first segment to transfer
804                          * is less than the transfer size, we've got
805                          * a wraparound in the buffer.
806                          */
807                         segsize = wpb->size - windex;
808                         if (segsize > space)
809                                 segsize = space;
810
811                         /*
812                          * If this is the first loop and the reader is
813                          * blocked, do a preemptive wakeup of the reader.
814                          *
815                          * On SMP the IPI latency plus the wlock interlock
816                          * on the reader side is the fastest way to get the
817                          * reader going.  (The scheduler will hard loop on
818                          * lock tokens).
819                          */
820                         if (wcount == 0)
821                                 pipesignal(wpb, PIPE_WANTR);
822
823                         /*
824                          * Transfer segment, which may include a wrap-around.
825                          * Update windex to account for both all in one go
826                          * so the reader can read() the data atomically.
827                          */
828                         error = uiomove(&wpb->buffer[windex], segsize, uio);
829                         if (error == 0 && segsize < space) {
830                                 segsize = space - segsize;
831                                 error = uiomove(&wpb->buffer[0], segsize, uio);
832                         }
833                         if (error)
834                                 break;
835
836                         /*
837                          * Memory fence prior to windex updating (note: not
838                          * needed so this is a NOP on Intel).
839                          */
840                         cpu_sfence();
841                         wpb->windex += space;
842
843                         /*
844                          * Signal reader
845                          */
846                         if (wcount != 0)
847                                 pipesignal(wpb, PIPE_WANTR);
848                         wcount += space;
849                         continue;
850                 }
851
852                 /*
853                  * Wakeup any pending reader
854                  */
855                 pipesignal(wpb, PIPE_WANTR);
856
857                 /*
858                  * don't block on non-blocking I/O
859                  */
860                 if (nbio) {
861                         error = EAGAIN;
862                         break;
863                 }
864
865 #ifdef _RDTSC_SUPPORTED_
866                 if (pipe_delay) {
867                         int64_t tsc_target;
868                         int good = 0;
869
870                         tsc_target = tsc_get_target(pipe_delay);
871                         while (tsc_test_target(tsc_target) == 0) {
872                                 cpu_lfence();
873                                 space = wpb->size - (wpb->windex - wpb->rindex);
874                                 if ((space < uio->uio_resid) &&
875                                     (orig_resid <= PIPE_BUF)) {
876                                         space = 0;
877                                 }
878                                 if (space) {
879                                         good = 1;
880                                         break;
881                                 }
882                                 cpu_pause();
883                         }
884                         if (good)
885                                 continue;
886                 }
887 #endif
888
889                 /*
890                  * Interlocked test.   Atomic op enforces the memory barrier.
891                  */
892                 tsleep_interlock(wpb, PCATCH);
893                 atomic_set_int(&wpb->state, PIPE_WANTW);
894
895                 /*
896                  * Retest space available after memory barrier above.
897                  * Writes of size <= PIPE_BUF must be atomic.
898                  */
899                 space = wpb->size - (wpb->windex - wpb->rindex);
900                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
901                         space = 0;
902
903                 /*
904                  * Retest EOF after memory barrier above.
905                  */
906                 if (wpb->state & PIPE_WEOF) {
907                         error = EPIPE;
908                         break;
909                 }
910
911                 /*
912                  * We have no more space and have something to offer,
913                  * wake up select/poll/kq.
914                  */
915                 if (space == 0) {
916                         pipewakeup(wpb, 1);
917                         error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
918                 }
919
920                 /*
921                  * Break out if we errored or the read side wants us to go
922                  * away.
923                  */
924                 if (error)
925                         break;
926                 if (wpb->state & PIPE_WEOF) {
927                         error = EPIPE;
928                         break;
929                 }
930         }
931         pipe_end_uio(&wpb->wip);
932
933         /*
934          * If we have put any characters in the buffer, we wake up
935          * the reader.
936          *
937          * Both rlock and wlock are required to be able to modify pipe_state.
938          */
939         if (wpb->windex != wpb->rindex) {
940                 pipesignal(wpb, PIPE_WANTR);
941                 pipewakeup(wpb, 1);
942         }
943
944         /*
945          * Don't return EPIPE if I/O was successful
946          */
947         if ((wpb->rindex == wpb->windex) &&
948             (uio->uio_resid == 0) &&
949             (error == EPIPE)) {
950                 error = 0;
951         }
952
953         if (error == 0)
954                 vfs_timestamp(&wpb->mtime);
955
956         /*
957          * We have something to offer,
958          * wake up select/poll/kq.
959          */
960         /*space = wpb->windex - wpb->rindex;*/
961         lwkt_reltoken(&wpb->wlock);
962
963         return (error);
964 }
965
966 /*
967  * we implement a very minimal set of ioctls for compatibility with sockets.
968  */
969 static int
970 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
971            struct ucred *cred, struct sysmsg *msg)
972 {
973         struct pipebuf *rpb;
974         struct pipe *pipe;
975         int error;
976
977         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
978         if ((intptr_t)fp->f_data & 1) {
979                 rpb = &pipe->bufferB;
980         } else {
981                 rpb = &pipe->bufferA;
982         }
983
984         lwkt_gettoken(&rpb->rlock);
985         lwkt_gettoken(&rpb->wlock);
986
987         switch (cmd) {
988         case FIOASYNC:
989                 if (*(int *)data) {
990                         atomic_set_int(&rpb->state, PIPE_ASYNC);
991                 } else {
992                         atomic_clear_int(&rpb->state, PIPE_ASYNC);
993                 }
994                 error = 0;
995                 break;
996         case FIONREAD:
997                 *(int *)data = (int)(rpb->windex - rpb->rindex);
998                 error = 0;
999                 break;
1000         case FIOSETOWN:
1001                 error = fsetown(*(int *)data, &rpb->sigio);
1002                 break;
1003         case FIOGETOWN:
1004                 *(int *)data = fgetown(&rpb->sigio);
1005                 error = 0;
1006                 break;
1007         case TIOCSPGRP:
1008                 /* This is deprecated, FIOSETOWN should be used instead. */
1009                 error = fsetown(-(*(int *)data), &rpb->sigio);
1010                 break;
1011
1012         case TIOCGPGRP:
1013                 /* This is deprecated, FIOGETOWN should be used instead. */
1014                 *(int *)data = -fgetown(&rpb->sigio);
1015                 error = 0;
1016                 break;
1017         default:
1018                 error = ENOTTY;
1019                 break;
1020         }
1021         lwkt_reltoken(&rpb->wlock);
1022         lwkt_reltoken(&rpb->rlock);
1023
1024         return (error);
1025 }
1026
1027 /*
1028  * MPSAFE
1029  */
1030 static int
1031 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1032 {
1033         struct pipebuf *rpb;
1034         struct pipe *pipe;
1035
1036         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1037         if ((intptr_t)fp->f_data & 1) {
1038                 rpb = &pipe->bufferB;
1039         } else {
1040                 rpb = &pipe->bufferA;
1041         }
1042
1043         bzero((caddr_t)ub, sizeof(*ub));
1044         ub->st_mode = S_IFIFO;
1045         ub->st_blksize = rpb->size;
1046         ub->st_size = rpb->windex - rpb->rindex;
1047         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1048         ub->st_atimespec = rpb->atime;
1049         ub->st_mtimespec = rpb->mtime;
1050         ub->st_ctimespec = pipe->ctime;
1051         /*
1052          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1053          * st_flags, st_gen.
1054          * XXX (st_dev, st_ino) should be unique.
1055          */
1056
1057         return (0);
1058 }
1059
1060 static int
1061 pipe_close(struct file *fp)
1062 {
1063         struct pipebuf *rpb;
1064         struct pipebuf *wpb;
1065         struct pipe *pipe;
1066
1067         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1068         if ((intptr_t)fp->f_data & 1) {
1069                 rpb = &pipe->bufferB;
1070                 wpb = &pipe->bufferA;
1071         } else {
1072                 rpb = &pipe->bufferA;
1073                 wpb = &pipe->bufferB;
1074         }
1075
1076         fp->f_ops = &badfileops;
1077         fp->f_data = NULL;
1078         funsetown(&rpb->sigio);
1079         pipeclose(pipe, rpb, wpb);
1080
1081         return (0);
1082 }
1083
1084 /*
1085  * Shutdown one or both directions of a full-duplex pipe.
1086  */
1087 static int
1088 pipe_shutdown(struct file *fp, int how)
1089 {
1090         struct pipebuf *rpb;
1091         struct pipebuf *wpb;
1092         struct pipe *pipe;
1093         int error = EPIPE;
1094
1095         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1096         if ((intptr_t)fp->f_data & 1) {
1097                 rpb = &pipe->bufferB;
1098                 wpb = &pipe->bufferA;
1099         } else {
1100                 rpb = &pipe->bufferA;
1101                 wpb = &pipe->bufferB;
1102         }
1103
1104         /*
1105          * We modify pipe_state on both pipes, which means we need
1106          * all four tokens!
1107          */
1108         lwkt_gettoken(&rpb->rlock);
1109         lwkt_gettoken(&rpb->wlock);
1110         lwkt_gettoken(&wpb->rlock);
1111         lwkt_gettoken(&wpb->wlock);
1112
1113         switch(how) {
1114         case SHUT_RDWR:
1115         case SHUT_RD:
1116                 /*
1117                  * EOF on my reads and peer writes
1118                  */
1119                 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1120                 if (rpb->state & PIPE_WANTR) {
1121                         rpb->state &= ~PIPE_WANTR;
1122                         wakeup(rpb);
1123                 }
1124                 if (rpb->state & PIPE_WANTW) {
1125                         rpb->state &= ~PIPE_WANTW;
1126                         wakeup(rpb);
1127                 }
1128                 error = 0;
1129                 if (how == SHUT_RD)
1130                         break;
1131                 /* fall through */
1132         case SHUT_WR:
1133                 /*
1134                  * EOF on peer reads and my writes
1135                  */
1136                 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1137                 if (wpb->state & PIPE_WANTR) {
1138                         wpb->state &= ~PIPE_WANTR;
1139                         wakeup(wpb);
1140                 }
1141                 if (wpb->state & PIPE_WANTW) {
1142                         wpb->state &= ~PIPE_WANTW;
1143                         wakeup(wpb);
1144                 }
1145                 error = 0;
1146                 break;
1147         }
1148         pipewakeup(rpb, 1);
1149         pipewakeup(wpb, 1);
1150
1151         lwkt_reltoken(&wpb->wlock);
1152         lwkt_reltoken(&wpb->rlock);
1153         lwkt_reltoken(&rpb->wlock);
1154         lwkt_reltoken(&rpb->rlock);
1155
1156         return (error);
1157 }
1158
1159 /*
1160  * Destroy the pipe buffer.
1161  */
1162 static void
1163 pipe_free_kmem(struct pipebuf *pb)
1164 {
1165         if (pb->buffer != NULL) {
1166                 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1167                 pb->buffer = NULL;
1168                 pb->object = NULL;
1169         }
1170 }
1171
1172 /*
1173  * Close one half of the pipe.  We are closing the pipe for reading on rpb
1174  * and writing on wpb.  This routine must be called twice with the pipebufs
1175  * reversed to close both directions.
1176  */
1177 static void
1178 pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1179 {
1180         globaldata_t gd;
1181
1182         if (pipe == NULL)
1183                 return;
1184
1185         /*
1186          * We need both the read and write tokens to modify pipe_state.
1187          */
1188         lwkt_gettoken(&rpb->rlock);
1189         lwkt_gettoken(&rpb->wlock);
1190
1191         /*
1192          * Set our state, wakeup anyone waiting in select/poll/kq, and
1193          * wakeup anyone blocked on our pipe.  No action if our side
1194          * is already closed.
1195          */
1196         if (rpb->state & PIPE_CLOSED) {
1197                 lwkt_reltoken(&rpb->wlock);
1198                 lwkt_reltoken(&rpb->rlock);
1199                 return;
1200         }
1201
1202         atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1203         pipewakeup(rpb, 1);
1204         if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1205                 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1206                 wakeup(rpb);
1207         }
1208         lwkt_reltoken(&rpb->wlock);
1209         lwkt_reltoken(&rpb->rlock);
1210
1211         /*
1212          * Disconnect from peer.
1213          */
1214         lwkt_gettoken(&wpb->rlock);
1215         lwkt_gettoken(&wpb->wlock);
1216
1217         atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1218         pipewakeup(wpb, 1);
1219         if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1220                 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1221                 wakeup(wpb);
1222         }
1223         if (SLIST_FIRST(&wpb->kq.ki_note))
1224                 KNOTE(&wpb->kq.ki_note, 0);
1225         lwkt_reltoken(&wpb->wlock);
1226         lwkt_reltoken(&wpb->rlock);
1227
1228         /*
1229          * Free resources once both sides are closed.  We maintain a pcpu
1230          * cache to improve performance, so the actual tear-down case is
1231          * limited to bulk situations.
1232          *
1233          * However, the bulk tear-down case can cause intense contention
1234          * on the kernel_map when, e.g. hundreds to hundreds of thousands
1235          * of processes are killed at the same time.  To deal with this we
1236          * use a pcpu mutex to maintain concurrency but also limit the
1237          * number of threads banging on the map and pmap.
1238          *
1239          * We use the mtx mechanism instead of the lockmgr mechanism because
1240          * the mtx mechanism utilizes a queued design which will not break
1241          * down in the face of thousands to hundreds of thousands of
1242          * processes trying to free pipes simultaneously.  The lockmgr
1243          * mechanism will wind up waking them all up each time a lock
1244          * cycles.
1245          */
1246         if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1247                 gd = mycpu;
1248                 if (gd->gd_pipeqcount >= pipe_maxcache) {
1249                         mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1250                         pipe_free_kmem(rpb);
1251                         pipe_free_kmem(wpb);
1252                         mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1253                         kfree(pipe, M_PIPE);
1254                 } else {
1255                         rpb->state = 0;
1256                         wpb->state = 0;
1257                         pipe->next = gd->gd_pipeq;
1258                         gd->gd_pipeq = pipe;
1259                         ++gd->gd_pipeqcount;
1260                 }
1261         }
1262 }
1263
1264 static int
1265 pipe_kqfilter(struct file *fp, struct knote *kn)
1266 {
1267         struct pipebuf *rpb;
1268         struct pipebuf *wpb;
1269         struct pipe *pipe;
1270
1271         pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1272         if ((intptr_t)fp->f_data & 1) {
1273                 rpb = &pipe->bufferB;
1274                 wpb = &pipe->bufferA;
1275         } else {
1276                 rpb = &pipe->bufferA;
1277                 wpb = &pipe->bufferB;
1278         }
1279
1280         switch (kn->kn_filter) {
1281         case EVFILT_READ:
1282                 kn->kn_fop = &pipe_rfiltops;
1283                 break;
1284         case EVFILT_WRITE:
1285                 kn->kn_fop = &pipe_wfiltops;
1286                 if (wpb->state & PIPE_CLOSED) {
1287                         /* other end of pipe has been closed */
1288                         return (EPIPE);
1289                 }
1290                 break;
1291         default:
1292                 return (EOPNOTSUPP);
1293         }
1294
1295         if (rpb == &pipe->bufferA)
1296                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1297         else
1298                 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1299
1300         knote_insert(&rpb->kq.ki_note, kn);
1301
1302         return (0);
1303 }
1304
1305 static void
1306 filt_pipedetach(struct knote *kn)
1307 {
1308         struct pipebuf *rpb;
1309         struct pipebuf *wpb;
1310         struct pipe *pipe;
1311
1312         pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1313         if ((intptr_t)kn->kn_hook & 1) {
1314                 rpb = &pipe->bufferB;
1315                 wpb = &pipe->bufferA;
1316         } else {
1317                 rpb = &pipe->bufferA;
1318                 wpb = &pipe->bufferB;
1319         }
1320         knote_remove(&rpb->kq.ki_note, kn);
1321 }
1322
1323 /*ARGSUSED*/
1324 static int
1325 filt_piperead(struct knote *kn, long hint)
1326 {
1327         struct pipebuf *rpb;
1328         struct pipebuf *wpb;
1329         struct pipe *pipe;
1330         int ready = 0;
1331
1332         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1333         if ((intptr_t)kn->kn_fp->f_data & 1) {
1334                 rpb = &pipe->bufferB;
1335                 wpb = &pipe->bufferA;
1336         } else {
1337                 rpb = &pipe->bufferA;
1338                 wpb = &pipe->bufferB;
1339         }
1340
1341         lwkt_gettoken(&rpb->rlock);
1342         lwkt_gettoken(&rpb->wlock);
1343
1344         kn->kn_data = rpb->windex - rpb->rindex;
1345
1346         if (rpb->state & PIPE_REOF) {
1347                 /*
1348                  * Only set NODATA if all data has been exhausted
1349                  */
1350                 if (kn->kn_data == 0)
1351                         kn->kn_flags |= EV_NODATA;
1352                 kn->kn_flags |= EV_EOF; 
1353                 ready = 1;
1354         }
1355
1356         lwkt_reltoken(&rpb->wlock);
1357         lwkt_reltoken(&rpb->rlock);
1358
1359         if (!ready)
1360                 ready = kn->kn_data > 0;
1361
1362         return (ready);
1363 }
1364
1365 /*ARGSUSED*/
1366 static int
1367 filt_pipewrite(struct knote *kn, long hint)
1368 {
1369         struct pipebuf *rpb;
1370         struct pipebuf *wpb;
1371         struct pipe *pipe;
1372         int ready = 0;
1373
1374         pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1375         if ((intptr_t)kn->kn_fp->f_data & 1) {
1376                 rpb = &pipe->bufferB;
1377                 wpb = &pipe->bufferA;
1378         } else {
1379                 rpb = &pipe->bufferA;
1380                 wpb = &pipe->bufferB;
1381         }
1382
1383         kn->kn_data = 0;
1384         if (wpb->state & PIPE_CLOSED) {
1385                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1386                 return (1);
1387         }
1388
1389         lwkt_gettoken(&wpb->rlock);
1390         lwkt_gettoken(&wpb->wlock);
1391
1392         if (wpb->state & PIPE_WEOF) {
1393                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1394                 ready = 1;
1395         }
1396
1397         if (!ready)
1398                 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1399
1400         lwkt_reltoken(&wpb->wlock);
1401         lwkt_reltoken(&wpb->rlock);
1402
1403         if (!ready)
1404                 ready = kn->kn_data >= PIPE_BUF;
1405
1406         return (ready);
1407 }