nrelease - fix/improve livecd
[dragonfly.git] / sys / kern / sys_pipe.c
... / ...
CommitLineData
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
5 *
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
19 * John S. Dyson.
20 * 4. Modifications may be freely made to this file if the above conditions
21 * are met.
22 */
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
30#include <sys/param.h>
31#include <sys/systm.h>
32#include <sys/kernel.h>
33#include <sys/proc.h>
34#include <sys/fcntl.h>
35#include <sys/file.h>
36#include <sys/filedesc.h>
37#include <sys/filio.h>
38#include <sys/ttycom.h>
39#include <sys/stat.h>
40#include <sys/signalvar.h>
41#include <sys/sysmsg.h>
42#include <sys/pipe.h>
43#include <sys/vnode.h>
44#include <sys/uio.h>
45#include <sys/event.h>
46#include <sys/globaldata.h>
47#include <sys/module.h>
48#include <sys/malloc.h>
49#include <sys/sysctl.h>
50#include <sys/socket.h>
51#include <sys/kern_syscall.h>
52#include <sys/lock.h>
53#include <sys/mutex.h>
54
55#include <vm/vm.h>
56#include <vm/vm_param.h>
57#include <vm/vm_object.h>
58#include <vm/vm_kern.h>
59#include <vm/vm_extern.h>
60#include <vm/pmap.h>
61#include <vm/vm_map.h>
62#include <vm/vm_page.h>
63#include <vm/vm_zone.h>
64
65#include <sys/file2.h>
66#include <sys/signal2.h>
67#include <sys/mutex2.h>
68
69#include <machine/cpufunc.h>
70
71struct pipegdlock {
72 struct mtx mtx;
73} __cachealign;
74
75/*
76 * interfaces to the outside world
77 */
78static int pipe_read (struct file *fp, struct uio *uio,
79 struct ucred *cred, int flags);
80static int pipe_write (struct file *fp, struct uio *uio,
81 struct ucred *cred, int flags);
82static int pipe_close (struct file *fp);
83static int pipe_shutdown (struct file *fp, int how);
84static int pipe_kqfilter (struct file *fp, struct knote *kn);
85static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
86static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 struct ucred *cred, struct sysmsg *msg);
88
89__read_mostly static struct fileops pipeops = {
90 .fo_read = pipe_read,
91 .fo_write = pipe_write,
92 .fo_ioctl = pipe_ioctl,
93 .fo_kqfilter = pipe_kqfilter,
94 .fo_stat = pipe_stat,
95 .fo_close = pipe_close,
96 .fo_shutdown = pipe_shutdown
97};
98
99static void filt_pipedetach(struct knote *kn);
100static int filt_piperead(struct knote *kn, long hint);
101static int filt_pipewrite(struct knote *kn, long hint);
102
103__read_mostly static struct filterops pipe_rfiltops =
104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
105__read_mostly static struct filterops pipe_wfiltops =
106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
107
108MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
109
110#define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
111
112__read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
113__read_mostly static struct pipegdlock *pipe_gdlocks;
114
115SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
116SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
118
119/*
120 * The pipe buffer size can be changed at any time. Only new pipe()s
121 * are affected. Note that due to cpu cache effects, you do not want
122 * to make this value too large.
123 */
124__read_mostly static int pipe_size = 32768;
125SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
127
128/*
129 * Reader/writer delay loop. When the reader exhausts the pipe buffer
130 * or the write completely fills the pipe buffer and would otherwise sleep,
131 * it first busy-loops for a few microseconds waiting for data or buffer
132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes
133 * and also helps when the user program uses a large data buffer in its
134 * UIOs.
135 *
136 * This defaults to 4uS.
137 */
138#ifdef _RDTSC_SUPPORTED_
139__read_mostly static int pipe_delay = 4000; /* 4uS default */
140SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
142#endif
143
144/*
145 * Auto-size pipe cache to reduce kmem allocations and frees.
146 */
147static
148void
149pipeinit(void *dummy)
150{
151 size_t mbytes = kmem_lim_size();
152 int n;
153
154 if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 if (mbytes >= 7 * 1024)
156 pipe_maxcache *= 2;
157 if (mbytes >= 15 * 1024)
158 pipe_maxcache *= 2;
159 }
160
161 /*
162 * Detune the pcpu caching a bit on systems with an insane number
163 * of cpu threads to reduce memory waste.
164 */
165 if (ncpus > 64) {
166 pipe_maxcache = pipe_maxcache * 64 / ncpus;
167 if (pipe_maxcache < PIPEQ_MAX_CACHE)
168 pipe_maxcache = PIPEQ_MAX_CACHE;
169 }
170
171 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
172 M_PIPE, M_WAITOK | M_ZERO);
173 for (n = 0; n < ncpus; ++n)
174 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
175}
176SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
177
178static void pipeclose (struct pipe *pipe,
179 struct pipebuf *pbr, struct pipebuf *pbw);
180static void pipe_free_kmem (struct pipebuf *buf);
181static int pipe_create (struct pipe **pipep);
182
183/*
184 * Test and clear the specified flag, wakeup(pb) if it was set.
185 * This function must also act as a memory barrier.
186 */
187static __inline void
188pipesignal(struct pipebuf *pb, uint32_t flags)
189{
190 uint32_t oflags;
191 uint32_t nflags;
192
193 for (;;) {
194 oflags = pb->state;
195 cpu_ccfence();
196 nflags = oflags & ~flags;
197 if (atomic_cmpset_int(&pb->state, oflags, nflags))
198 break;
199 }
200 if (oflags & flags)
201 wakeup(pb);
202}
203
204/*
205 *
206 */
207static __inline void
208pipewakeup(struct pipebuf *pb, int dosigio)
209{
210 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
211 lwkt_gettoken(&sigio_token);
212 pgsigio(pb->sigio, SIGIO, 0);
213 lwkt_reltoken(&sigio_token);
214 }
215 KNOTE(&pb->kq.ki_note, 0);
216}
217
218/*
219 * These routines are called before and after a UIO. The UIO
220 * may block, causing our held tokens to be lost temporarily.
221 *
222 * We use these routines to serialize reads against other reads
223 * and writes against other writes.
224 *
225 * The appropriate token is held on entry so *ipp does not race.
226 */
227static __inline int
228pipe_start_uio(int *ipp)
229{
230 int error;
231
232 while (*ipp) {
233 *ipp = -1;
234 error = tsleep(ipp, PCATCH, "pipexx", 0);
235 if (error)
236 return (error);
237 }
238 *ipp = 1;
239 return (0);
240}
241
242static __inline void
243pipe_end_uio(int *ipp)
244{
245 if (*ipp < 0) {
246 *ipp = 0;
247 wakeup(ipp);
248 } else {
249 KKASSERT(*ipp > 0);
250 *ipp = 0;
251 }
252}
253
254/*
255 * The pipe system call for the DTYPE_PIPE type of pipes
256 *
257 * pipe_args(int dummy)
258 *
259 * MPSAFE
260 */
261int
262sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap)
263{
264 return kern_pipe(sysmsg->sysmsg_fds, 0);
265}
266
267int
268sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap)
269{
270 return kern_pipe(sysmsg->sysmsg_fds, uap->flags);
271}
272
273int
274kern_pipe(long *fds, int flags)
275{
276 struct thread *td = curthread;
277 struct filedesc *fdp = td->td_proc->p_fd;
278 struct file *rf, *wf;
279 struct pipe *pipe;
280 int fd1, fd2, error;
281
282 pipe = NULL;
283 if (pipe_create(&pipe)) {
284 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
285 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
286 return (ENFILE);
287 }
288
289 error = falloc(td->td_lwp, &rf, &fd1);
290 if (error) {
291 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
292 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
293 return (error);
294 }
295 fds[0] = fd1;
296
297 /*
298 * Warning: once we've gotten past allocation of the fd for the
299 * read-side, we can only drop the read side via fdrop() in order
300 * to avoid races against processes which manage to dup() the read
301 * side while we are blocked trying to allocate the write side.
302 */
303 rf->f_type = DTYPE_PIPE;
304 rf->f_flag = FREAD | FWRITE;
305 rf->f_ops = &pipeops;
306 rf->f_data = (void *)((intptr_t)pipe | 0);
307 if (flags & O_NONBLOCK)
308 rf->f_flag |= O_NONBLOCK;
309 if (flags & O_CLOEXEC)
310 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
311
312 error = falloc(td->td_lwp, &wf, &fd2);
313 if (error) {
314 fsetfd(fdp, NULL, fd1);
315 fdrop(rf);
316 /* pipeA has been closed by fdrop() */
317 /* close pipeB here */
318 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
319 return (error);
320 }
321 wf->f_type = DTYPE_PIPE;
322 wf->f_flag = FREAD | FWRITE;
323 wf->f_ops = &pipeops;
324 wf->f_data = (void *)((intptr_t)pipe | 1);
325 if (flags & O_NONBLOCK)
326 wf->f_flag |= O_NONBLOCK;
327 if (flags & O_CLOEXEC)
328 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
329
330 fds[1] = fd2;
331
332 /*
333 * Once activated the peer relationship remains valid until
334 * both sides are closed.
335 */
336 fsetfd(fdp, rf, fd1);
337 fsetfd(fdp, wf, fd2);
338 fdrop(rf);
339 fdrop(wf);
340
341 return (0);
342}
343
344/*
345 * [re]allocates KVA for the pipe's circular buffer. The space is
346 * pageable. Called twice to setup full-duplex communications.
347 *
348 * NOTE: Independent vm_object's are used to improve performance.
349 *
350 * Returns 0 on success, ENOMEM on failure.
351 */
352static int
353pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
354{
355 struct vm_object *object;
356 caddr_t buffer;
357 vm_pindex_t npages;
358 int error;
359
360 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
361 if (size < 16384)
362 size = 16384;
363 if (size > 1024*1024)
364 size = 1024*1024;
365
366 npages = round_page(size) / PAGE_SIZE;
367 object = pb->object;
368
369 /*
370 * [re]create the object if necessary and reserve space for it
371 * in the kernel_map. The object and memory are pageable. On
372 * success, free the old resources before assigning the new
373 * ones.
374 */
375 if (object == NULL || object->size != npages) {
376 object = vm_object_allocate(OBJT_DEFAULT, npages);
377 buffer = (caddr_t)vm_map_min(kernel_map);
378
379 error = vm_map_find(kernel_map, object, NULL,
380 0, (vm_offset_t *)&buffer, size,
381 PAGE_SIZE, TRUE,
382 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
383 VM_PROT_ALL, VM_PROT_ALL, 0);
384
385 if (error != KERN_SUCCESS) {
386 vm_object_deallocate(object);
387 return (ENOMEM);
388 }
389 pipe_free_kmem(pb);
390 pb->object = object;
391 pb->buffer = buffer;
392 pb->size = size;
393 }
394 pb->rindex = 0;
395 pb->windex = 0;
396
397 return (0);
398}
399
400/*
401 * Initialize and allocate VM and memory for pipe, pulling the pipe from
402 * our per-cpu cache if possible.
403 *
404 * Returns 0 on success, else an error code (typically ENOMEM). Caller
405 * must still deallocate the pipe on failure.
406 */
407static int
408pipe_create(struct pipe **pipep)
409{
410 globaldata_t gd = mycpu;
411 struct pipe *pipe;
412 int error;
413
414 if ((pipe = gd->gd_pipeq) != NULL) {
415 gd->gd_pipeq = pipe->next;
416 --gd->gd_pipeqcount;
417 pipe->next = NULL;
418 } else {
419 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
420 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
421 lwkt_token_init(&pipe->bufferA.rlock, "piper");
422 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
423 lwkt_token_init(&pipe->bufferB.rlock, "piper");
424 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
425 }
426 *pipep = pipe;
427 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
428 return (error);
429 }
430 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
431 return (error);
432 }
433 vfs_timestamp(&pipe->ctime);
434 pipe->bufferA.atime = pipe->ctime;
435 pipe->bufferA.mtime = pipe->ctime;
436 pipe->bufferB.atime = pipe->ctime;
437 pipe->bufferB.mtime = pipe->ctime;
438 pipe->open_count = 2;
439
440 return (0);
441}
442
443/*
444 * Read data from a pipe
445 */
446static int
447pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
448{
449 struct pipebuf *rpb;
450 struct pipebuf *wpb;
451 struct pipe *pipe;
452 size_t nread = 0;
453 size_t size; /* total bytes available */
454 size_t nsize; /* total bytes to read */
455 size_t rindex; /* contiguous bytes available */
456 int notify_writer;
457 int bigread;
458 int bigcount;
459 int error;
460 int nbio;
461
462 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
463 if ((intptr_t)fp->f_data & 1) {
464 rpb = &pipe->bufferB;
465 wpb = &pipe->bufferA;
466 } else {
467 rpb = &pipe->bufferA;
468 wpb = &pipe->bufferB;
469 }
470 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
471
472 if (uio->uio_resid == 0)
473 return(0);
474
475 /*
476 * Calculate nbio
477 */
478 if (fflags & O_FBLOCKING)
479 nbio = 0;
480 else if (fflags & O_FNONBLOCKING)
481 nbio = 1;
482 else if (fp->f_flag & O_NONBLOCK)
483 nbio = 1;
484 else
485 nbio = 0;
486
487 /*
488 * 'quick' NBIO test before things get expensive.
489 */
490 if (nbio && rpb->rindex == rpb->windex &&
491 (rpb->state & PIPE_REOF) == 0) {
492 return EAGAIN;
493 }
494
495 /*
496 * Reads are serialized. Note however that buffer.buffer and
497 * buffer.size can change out from under us when the number
498 * of bytes in the buffer are zero due to the write-side doing a
499 * pipespace().
500 */
501 lwkt_gettoken(&rpb->rlock);
502 error = pipe_start_uio(&rpb->rip);
503 if (error) {
504 lwkt_reltoken(&rpb->rlock);
505 return (error);
506 }
507 notify_writer = 0;
508
509 bigread = (uio->uio_resid > 10 * 1024 * 1024);
510 bigcount = 10;
511
512 while (uio->uio_resid) {
513 /*
514 * Don't hog the cpu.
515 */
516 if (bigread && --bigcount == 0) {
517 lwkt_user_yield();
518 bigcount = 10;
519 if (CURSIG(curthread->td_lwp)) {
520 error = EINTR;
521 break;
522 }
523 }
524
525 /*
526 * lfence required to avoid read-reordering of buffer
527 * contents prior to validation of size.
528 */
529 size = rpb->windex - rpb->rindex;
530 cpu_lfence();
531 if (size) {
532 rindex = rpb->rindex & (rpb->size - 1);
533 nsize = size;
534 if (nsize > rpb->size - rindex)
535 nsize = rpb->size - rindex;
536 nsize = szmin(nsize, uio->uio_resid);
537
538 /*
539 * Limit how much we move in one go so we have a
540 * chance to kick the writer while data is still
541 * available in the pipe. This avoids getting into
542 * a ping-pong with the writer.
543 */
544 if (nsize > (rpb->size >> 1))
545 nsize = rpb->size >> 1;
546
547 error = uiomove(&rpb->buffer[rindex], nsize, uio);
548 if (error)
549 break;
550 rpb->rindex += nsize;
551 nread += nsize;
552
553 /*
554 * If the FIFO is still over half full just continue
555 * and do not try to notify the writer yet. If
556 * less than half full notify any waiting writer.
557 */
558 if (size - nsize > (rpb->size >> 1)) {
559 notify_writer = 0;
560 } else {
561 notify_writer = 1;
562 pipesignal(rpb, PIPE_WANTW);
563 }
564 continue;
565 }
566
567 /*
568 * If the "write-side" was blocked we wake it up. This code
569 * is reached when the buffer is completely emptied.
570 */
571 pipesignal(rpb, PIPE_WANTW);
572
573 /*
574 * Pick up our copy loop again if the writer sent data to
575 * us while we were messing around.
576 *
577 * On a SMP box poll up to pipe_delay nanoseconds for new
578 * data. Typically a value of 2000 to 4000 is sufficient
579 * to eradicate most IPIs/tsleeps/wakeups when a pipe
580 * is used for synchronous communications with small packets,
581 * and 8000 or so (8uS) will pipeline large buffer xfers
582 * between cpus over a pipe.
583 *
584 * For synchronous communications a hit means doing a
585 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
586 * where as miss requiring a tsleep/wakeup sequence
587 * will take 7uS or more.
588 */
589 if (rpb->windex != rpb->rindex)
590 continue;
591
592#ifdef _RDTSC_SUPPORTED_
593 if (pipe_delay) {
594 int64_t tsc_target;
595 int good = 0;
596
597 tsc_target = tsc_get_target(pipe_delay);
598 while (tsc_test_target(tsc_target) == 0) {
599 cpu_lfence();
600 if (rpb->windex != rpb->rindex) {
601 good = 1;
602 break;
603 }
604 cpu_pause();
605 }
606 if (good)
607 continue;
608 }
609#endif
610
611 /*
612 * Detect EOF condition, do not set error.
613 */
614 if (rpb->state & PIPE_REOF)
615 break;
616
617 /*
618 * Break if some data was read, or if this was a non-blocking
619 * read.
620 */
621 if (nread > 0)
622 break;
623
624 if (nbio) {
625 error = EAGAIN;
626 break;
627 }
628
629 /*
630 * Last chance, interlock with WANTR
631 */
632 tsleep_interlock(rpb, PCATCH);
633 atomic_set_int(&rpb->state, PIPE_WANTR);
634
635 /*
636 * Retest bytes available after memory barrier above.
637 */
638 size = rpb->windex - rpb->rindex;
639 if (size)
640 continue;
641
642 /*
643 * Retest EOF after memory barrier above.
644 */
645 if (rpb->state & PIPE_REOF)
646 break;
647
648 /*
649 * Wait for more data or state change
650 */
651 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
652 if (error)
653 break;
654 }
655 pipe_end_uio(&rpb->rip);
656
657 /*
658 * Uptime last access time
659 */
660 if (error == 0 && nread && rpb->lticks != ticks) {
661 vfs_timestamp(&rpb->atime);
662 rpb->lticks = ticks;
663 }
664
665 /*
666 * If we drained the FIFO more then half way then handle
667 * write blocking hysteresis.
668 *
669 * Note that PIPE_WANTW cannot be set by the writer without
670 * it holding both rlock and wlock, so we can test it
671 * while holding just rlock.
672 */
673 if (notify_writer) {
674 /*
675 * Synchronous blocking is done on the pipe involved
676 */
677 pipesignal(rpb, PIPE_WANTW);
678
679 /*
680 * But we may also have to deal with a kqueue which is
681 * stored on the same pipe as its descriptor, so a
682 * EVFILT_WRITE event waiting for our side to drain will
683 * be on the other side.
684 */
685 pipewakeup(wpb, 0);
686 }
687 /*size = rpb->windex - rpb->rindex;*/
688 lwkt_reltoken(&rpb->rlock);
689
690 return (error);
691}
692
693static int
694pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
695{
696 struct pipebuf *rpb;
697 struct pipebuf *wpb;
698 struct pipe *pipe;
699 size_t windex;
700 size_t space;
701 size_t wcount;
702 size_t orig_resid;
703 int bigwrite;
704 int bigcount;
705 int error;
706 int nbio;
707
708 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
709 if ((intptr_t)fp->f_data & 1) {
710 rpb = &pipe->bufferB;
711 wpb = &pipe->bufferA;
712 } else {
713 rpb = &pipe->bufferA;
714 wpb = &pipe->bufferB;
715 }
716
717 /*
718 * Calculate nbio
719 */
720 if (fflags & O_FBLOCKING)
721 nbio = 0;
722 else if (fflags & O_FNONBLOCKING)
723 nbio = 1;
724 else if (fp->f_flag & O_NONBLOCK)
725 nbio = 1;
726 else
727 nbio = 0;
728
729 /*
730 * 'quick' NBIO test before things get expensive.
731 */
732 if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
733 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
734 return EAGAIN;
735 }
736
737 /*
738 * Writes go to the peer. The peer will always exist.
739 */
740 lwkt_gettoken(&wpb->wlock);
741 if (wpb->state & PIPE_WEOF) {
742 lwkt_reltoken(&wpb->wlock);
743 return (EPIPE);
744 }
745
746 /*
747 * Degenerate case (EPIPE takes prec)
748 */
749 if (uio->uio_resid == 0) {
750 lwkt_reltoken(&wpb->wlock);
751 return(0);
752 }
753
754 /*
755 * Writes are serialized (start_uio must be called with wlock)
756 */
757 error = pipe_start_uio(&wpb->wip);
758 if (error) {
759 lwkt_reltoken(&wpb->wlock);
760 return (error);
761 }
762
763 orig_resid = uio->uio_resid;
764 wcount = 0;
765
766 bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
767 bigcount = 10;
768
769 while (uio->uio_resid) {
770 if (wpb->state & PIPE_WEOF) {
771 error = EPIPE;
772 break;
773 }
774
775 /*
776 * Don't hog the cpu.
777 */
778 if (bigwrite && --bigcount == 0) {
779 lwkt_user_yield();
780 bigcount = 10;
781 if (CURSIG(curthread->td_lwp)) {
782 error = EINTR;
783 break;
784 }
785 }
786
787 windex = wpb->windex & (wpb->size - 1);
788 space = wpb->size - (wpb->windex - wpb->rindex);
789
790 /*
791 * Writes of size <= PIPE_BUF must be atomic.
792 */
793 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
794 space = 0;
795
796 /*
797 * Write to fill, read size handles write hysteresis. Also
798 * additional restrictions can cause select-based non-blocking
799 * writes to spin.
800 */
801 if (space > 0) {
802 size_t segsize;
803
804 /*
805 * We want to notify a potentially waiting reader
806 * before we exhaust the write buffer for SMP
807 * pipelining. Otherwise the write/read will begin
808 * to ping-pong.
809 */
810 space = szmin(space, uio->uio_resid);
811 if (space > (wpb->size >> 1))
812 space = (wpb->size >> 1);
813
814 /*
815 * First segment to transfer is minimum of
816 * transfer size and contiguous space in
817 * pipe buffer. If first segment to transfer
818 * is less than the transfer size, we've got
819 * a wraparound in the buffer.
820 */
821 segsize = wpb->size - windex;
822 if (segsize > space)
823 segsize = space;
824
825 /*
826 * If this is the first loop and the reader is
827 * blocked, do a preemptive wakeup of the reader.
828 *
829 * On SMP the IPI latency plus the wlock interlock
830 * on the reader side is the fastest way to get the
831 * reader going. (The scheduler will hard loop on
832 * lock tokens).
833 */
834 if (wcount == 0)
835 pipesignal(wpb, PIPE_WANTR);
836
837 /*
838 * Transfer segment, which may include a wrap-around.
839 * Update windex to account for both all in one go
840 * so the reader can read() the data atomically.
841 */
842 error = uiomove(&wpb->buffer[windex], segsize, uio);
843 if (error == 0 && segsize < space) {
844 segsize = space - segsize;
845 error = uiomove(&wpb->buffer[0], segsize, uio);
846 }
847 if (error)
848 break;
849
850 /*
851 * Memory fence prior to windex updating (note: not
852 * needed so this is a NOP on Intel).
853 */
854 cpu_sfence();
855 wpb->windex += space;
856
857 /*
858 * Signal reader
859 */
860 if (wcount != 0)
861 pipesignal(wpb, PIPE_WANTR);
862 wcount += space;
863 continue;
864 }
865
866 /*
867 * Wakeup any pending reader
868 */
869 pipesignal(wpb, PIPE_WANTR);
870
871 /*
872 * don't block on non-blocking I/O
873 */
874 if (nbio) {
875 error = EAGAIN;
876 break;
877 }
878
879#ifdef _RDTSC_SUPPORTED_
880 if (pipe_delay) {
881 int64_t tsc_target;
882 int good = 0;
883
884 tsc_target = tsc_get_target(pipe_delay);
885 while (tsc_test_target(tsc_target) == 0) {
886 cpu_lfence();
887 space = wpb->size - (wpb->windex - wpb->rindex);
888 if ((space < uio->uio_resid) &&
889 (orig_resid <= PIPE_BUF)) {
890 space = 0;
891 }
892 if (space) {
893 good = 1;
894 break;
895 }
896 cpu_pause();
897 }
898 if (good)
899 continue;
900 }
901#endif
902
903 /*
904 * Interlocked test. Atomic op enforces the memory barrier.
905 */
906 tsleep_interlock(wpb, PCATCH);
907 atomic_set_int(&wpb->state, PIPE_WANTW);
908
909 /*
910 * Retest space available after memory barrier above.
911 * Writes of size <= PIPE_BUF must be atomic.
912 */
913 space = wpb->size - (wpb->windex - wpb->rindex);
914 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
915 space = 0;
916
917 /*
918 * Retest EOF after memory barrier above.
919 */
920 if (wpb->state & PIPE_WEOF) {
921 error = EPIPE;
922 break;
923 }
924
925 /*
926 * We have no more space and have something to offer,
927 * wake up select/poll/kq.
928 */
929 if (space == 0) {
930 pipewakeup(wpb, 1);
931 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
932 }
933
934 /*
935 * Break out if we errored or the read side wants us to go
936 * away.
937 */
938 if (error)
939 break;
940 if (wpb->state & PIPE_WEOF) {
941 error = EPIPE;
942 break;
943 }
944 }
945 pipe_end_uio(&wpb->wip);
946
947 /*
948 * If we have put any characters in the buffer, we wake up
949 * the reader.
950 *
951 * Both rlock and wlock are required to be able to modify pipe_state.
952 */
953 if (wpb->windex != wpb->rindex) {
954 pipesignal(wpb, PIPE_WANTR);
955 pipewakeup(wpb, 1);
956 }
957
958 /*
959 * Don't return EPIPE if I/O was successful
960 */
961 if ((wpb->rindex == wpb->windex) &&
962 (uio->uio_resid == 0) &&
963 (error == EPIPE)) {
964 error = 0;
965 }
966
967 if (error == 0 && wpb->lticks != ticks) {
968 vfs_timestamp(&wpb->mtime);
969 wpb->lticks = ticks;
970 }
971
972 /*
973 * We have something to offer,
974 * wake up select/poll/kq.
975 */
976 /*space = wpb->windex - wpb->rindex;*/
977 lwkt_reltoken(&wpb->wlock);
978
979 return (error);
980}
981
982/*
983 * we implement a very minimal set of ioctls for compatibility with sockets.
984 */
985static int
986pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
987 struct ucred *cred, struct sysmsg *msg)
988{
989 struct pipebuf *rpb;
990 struct pipe *pipe;
991 int error;
992
993 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
994 if ((intptr_t)fp->f_data & 1) {
995 rpb = &pipe->bufferB;
996 } else {
997 rpb = &pipe->bufferA;
998 }
999
1000 lwkt_gettoken(&rpb->rlock);
1001 lwkt_gettoken(&rpb->wlock);
1002
1003 switch (cmd) {
1004 case FIOASYNC:
1005 if (*(int *)data) {
1006 atomic_set_int(&rpb->state, PIPE_ASYNC);
1007 } else {
1008 atomic_clear_int(&rpb->state, PIPE_ASYNC);
1009 }
1010 error = 0;
1011 break;
1012 case FIONREAD:
1013 *(int *)data = (int)(rpb->windex - rpb->rindex);
1014 error = 0;
1015 break;
1016 case FIOSETOWN:
1017 error = fsetown(*(int *)data, &rpb->sigio);
1018 break;
1019 case FIOGETOWN:
1020 *(int *)data = fgetown(&rpb->sigio);
1021 error = 0;
1022 break;
1023 case TIOCSPGRP:
1024 /* This is deprecated, FIOSETOWN should be used instead. */
1025 error = fsetown(-(*(int *)data), &rpb->sigio);
1026 break;
1027
1028 case TIOCGPGRP:
1029 /* This is deprecated, FIOGETOWN should be used instead. */
1030 *(int *)data = -fgetown(&rpb->sigio);
1031 error = 0;
1032 break;
1033 default:
1034 error = ENOTTY;
1035 break;
1036 }
1037 lwkt_reltoken(&rpb->wlock);
1038 lwkt_reltoken(&rpb->rlock);
1039
1040 return (error);
1041}
1042
1043/*
1044 * MPSAFE
1045 */
1046static int
1047pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1048{
1049 struct pipebuf *rpb;
1050 struct pipe *pipe;
1051
1052 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1053 if ((intptr_t)fp->f_data & 1) {
1054 rpb = &pipe->bufferB;
1055 } else {
1056 rpb = &pipe->bufferA;
1057 }
1058
1059 bzero((caddr_t)ub, sizeof(*ub));
1060 ub->st_mode = S_IFIFO;
1061 ub->st_blksize = rpb->size;
1062 ub->st_size = rpb->windex - rpb->rindex;
1063 ub->st_blocks = howmany(ub->st_size, ub->st_blksize);
1064 ub->st_atimespec = rpb->atime;
1065 ub->st_mtimespec = rpb->mtime;
1066 ub->st_ctimespec = pipe->ctime;
1067 ub->st_uid = fp->f_cred->cr_uid;
1068 ub->st_gid = fp->f_cred->cr_gid;
1069 ub->st_ino = pipe->inum;
1070 /*
1071 * Left as 0: st_dev, st_nlink, st_rdev,
1072 * st_flags, st_gen.
1073 * XXX (st_dev, st_ino) should be unique.
1074 */
1075
1076 return (0);
1077}
1078
1079static int
1080pipe_close(struct file *fp)
1081{
1082 struct pipebuf *rpb;
1083 struct pipebuf *wpb;
1084 struct pipe *pipe;
1085
1086 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1087 if ((intptr_t)fp->f_data & 1) {
1088 rpb = &pipe->bufferB;
1089 wpb = &pipe->bufferA;
1090 } else {
1091 rpb = &pipe->bufferA;
1092 wpb = &pipe->bufferB;
1093 }
1094
1095 fp->f_ops = &badfileops;
1096 fp->f_data = NULL;
1097 funsetown(&rpb->sigio);
1098 pipeclose(pipe, rpb, wpb);
1099
1100 return (0);
1101}
1102
1103/*
1104 * Shutdown one or both directions of a full-duplex pipe.
1105 */
1106static int
1107pipe_shutdown(struct file *fp, int how)
1108{
1109 struct pipebuf *rpb;
1110 struct pipebuf *wpb;
1111 struct pipe *pipe;
1112 int error = EPIPE;
1113
1114 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1115 if ((intptr_t)fp->f_data & 1) {
1116 rpb = &pipe->bufferB;
1117 wpb = &pipe->bufferA;
1118 } else {
1119 rpb = &pipe->bufferA;
1120 wpb = &pipe->bufferB;
1121 }
1122
1123 /*
1124 * We modify pipe_state on both pipes, which means we need
1125 * all four tokens!
1126 */
1127 lwkt_gettoken(&rpb->rlock);
1128 lwkt_gettoken(&rpb->wlock);
1129 lwkt_gettoken(&wpb->rlock);
1130 lwkt_gettoken(&wpb->wlock);
1131
1132 switch(how) {
1133 case SHUT_RDWR:
1134 case SHUT_RD:
1135 /*
1136 * EOF on my reads and peer writes
1137 */
1138 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1139 if (rpb->state & PIPE_WANTR) {
1140 rpb->state &= ~PIPE_WANTR;
1141 wakeup(rpb);
1142 }
1143 if (rpb->state & PIPE_WANTW) {
1144 rpb->state &= ~PIPE_WANTW;
1145 wakeup(rpb);
1146 }
1147 error = 0;
1148 if (how == SHUT_RD)
1149 break;
1150 /* fall through */
1151 case SHUT_WR:
1152 /*
1153 * EOF on peer reads and my writes
1154 */
1155 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1156 if (wpb->state & PIPE_WANTR) {
1157 wpb->state &= ~PIPE_WANTR;
1158 wakeup(wpb);
1159 }
1160 if (wpb->state & PIPE_WANTW) {
1161 wpb->state &= ~PIPE_WANTW;
1162 wakeup(wpb);
1163 }
1164 error = 0;
1165 break;
1166 }
1167 pipewakeup(rpb, 1);
1168 pipewakeup(wpb, 1);
1169
1170 lwkt_reltoken(&wpb->wlock);
1171 lwkt_reltoken(&wpb->rlock);
1172 lwkt_reltoken(&rpb->wlock);
1173 lwkt_reltoken(&rpb->rlock);
1174
1175 return (error);
1176}
1177
1178/*
1179 * Destroy the pipe buffer.
1180 */
1181static void
1182pipe_free_kmem(struct pipebuf *pb)
1183{
1184 if (pb->buffer != NULL) {
1185 kmem_free(kernel_map, (vm_offset_t)pb->buffer, pb->size);
1186 pb->buffer = NULL;
1187 pb->object = NULL;
1188 }
1189}
1190
1191/*
1192 * Close one half of the pipe. We are closing the pipe for reading on rpb
1193 * and writing on wpb. This routine must be called twice with the pipebufs
1194 * reversed to close both directions.
1195 */
1196static void
1197pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
1198{
1199 globaldata_t gd;
1200
1201 if (pipe == NULL)
1202 return;
1203
1204 /*
1205 * We need both the read and write tokens to modify pipe_state.
1206 */
1207 lwkt_gettoken(&rpb->rlock);
1208 lwkt_gettoken(&rpb->wlock);
1209
1210 /*
1211 * Set our state, wakeup anyone waiting in select/poll/kq, and
1212 * wakeup anyone blocked on our pipe. No action if our side
1213 * is already closed.
1214 */
1215 if (rpb->state & PIPE_CLOSED) {
1216 lwkt_reltoken(&rpb->wlock);
1217 lwkt_reltoken(&rpb->rlock);
1218 return;
1219 }
1220
1221 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1222 pipewakeup(rpb, 1);
1223 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225 wakeup(rpb);
1226 }
1227 lwkt_reltoken(&rpb->wlock);
1228 lwkt_reltoken(&rpb->rlock);
1229
1230 /*
1231 * Disconnect from peer.
1232 */
1233 lwkt_gettoken(&wpb->rlock);
1234 lwkt_gettoken(&wpb->wlock);
1235
1236 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1237 pipewakeup(wpb, 1);
1238 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1239 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1240 wakeup(wpb);
1241 }
1242 if (SLIST_FIRST(&wpb->kq.ki_note))
1243 KNOTE(&wpb->kq.ki_note, 0);
1244 lwkt_reltoken(&wpb->wlock);
1245 lwkt_reltoken(&wpb->rlock);
1246
1247 /*
1248 * Free resources once both sides are closed. We maintain a pcpu
1249 * cache to improve performance, so the actual tear-down case is
1250 * limited to bulk situations.
1251 *
1252 * However, the bulk tear-down case can cause intense contention
1253 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1254 * of processes are killed at the same time. To deal with this we
1255 * use a pcpu mutex to maintain concurrency but also limit the
1256 * number of threads banging on the map and pmap.
1257 *
1258 * We use the mtx mechanism instead of the lockmgr mechanism because
1259 * the mtx mechanism utilizes a queued design which will not break
1260 * down in the face of thousands to hundreds of thousands of
1261 * processes trying to free pipes simultaneously. The lockmgr
1262 * mechanism will wind up waking them all up each time a lock
1263 * cycles.
1264 */
1265 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1266 gd = mycpu;
1267 if (gd->gd_pipeqcount >= pipe_maxcache) {
1268 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1269 pipe_free_kmem(rpb);
1270 pipe_free_kmem(wpb);
1271 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1272 kfree(pipe, M_PIPE);
1273 } else {
1274 rpb->state = 0;
1275 wpb->state = 0;
1276 pipe->next = gd->gd_pipeq;
1277 gd->gd_pipeq = pipe;
1278 ++gd->gd_pipeqcount;
1279 }
1280 }
1281}
1282
1283static int
1284pipe_kqfilter(struct file *fp, struct knote *kn)
1285{
1286 struct pipebuf *rpb;
1287 struct pipebuf *wpb;
1288 struct pipe *pipe;
1289
1290 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1291 if ((intptr_t)fp->f_data & 1) {
1292 rpb = &pipe->bufferB;
1293 wpb = &pipe->bufferA;
1294 } else {
1295 rpb = &pipe->bufferA;
1296 wpb = &pipe->bufferB;
1297 }
1298
1299 switch (kn->kn_filter) {
1300 case EVFILT_READ:
1301 kn->kn_fop = &pipe_rfiltops;
1302 break;
1303 case EVFILT_WRITE:
1304 kn->kn_fop = &pipe_wfiltops;
1305 break;
1306 default:
1307 return (EOPNOTSUPP);
1308 }
1309
1310 if (rpb == &pipe->bufferA)
1311 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1312 else
1313 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1314
1315 knote_insert(&rpb->kq.ki_note, kn);
1316
1317 return (0);
1318}
1319
1320static void
1321filt_pipedetach(struct knote *kn)
1322{
1323 struct pipebuf *rpb;
1324 struct pipebuf *wpb;
1325 struct pipe *pipe;
1326
1327 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1328 if ((intptr_t)kn->kn_hook & 1) {
1329 rpb = &pipe->bufferB;
1330 wpb = &pipe->bufferA;
1331 } else {
1332 rpb = &pipe->bufferA;
1333 wpb = &pipe->bufferB;
1334 }
1335 knote_remove(&rpb->kq.ki_note, kn);
1336}
1337
1338/*ARGSUSED*/
1339static int
1340filt_piperead(struct knote *kn, long hint)
1341{
1342 struct pipebuf *rpb;
1343 struct pipebuf *wpb;
1344 struct pipe *pipe;
1345 int ready = 0;
1346
1347 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1348 if ((intptr_t)kn->kn_fp->f_data & 1) {
1349 rpb = &pipe->bufferB;
1350 wpb = &pipe->bufferA;
1351 } else {
1352 rpb = &pipe->bufferA;
1353 wpb = &pipe->bufferB;
1354 }
1355
1356 /*
1357 * We shouldn't need the pipe locks because the knote itself is
1358 * locked via KN_PROCESSING. If we lose a race against the writer,
1359 * the writer will just issue a KNOTE() after us.
1360 */
1361#if 0
1362 lwkt_gettoken(&rpb->rlock);
1363 lwkt_gettoken(&rpb->wlock);
1364#endif
1365
1366 kn->kn_data = rpb->windex - rpb->rindex;
1367 if (kn->kn_data < 0)
1368 kn->kn_data = 0;
1369
1370 if (rpb->state & PIPE_REOF) {
1371 /*
1372 * Only set NODATA if all data has been exhausted
1373 */
1374 if (kn->kn_data == 0)
1375 kn->kn_flags |= EV_NODATA;
1376 kn->kn_flags |= EV_EOF;
1377
1378 /*
1379 * Only set HUP if the pipe is completely closed.
1380 * half-closed does not count (to make the behavior
1381 * the same as linux).
1382 */
1383 if (wpb->state & PIPE_CLOSED) {
1384 kn->kn_flags |= EV_HUP;
1385 ready = 1;
1386 }
1387 }
1388
1389#if 0
1390 lwkt_reltoken(&rpb->wlock);
1391 lwkt_reltoken(&rpb->rlock);
1392#endif
1393
1394 if (!ready && (kn->kn_sfflags & NOTE_HUPONLY) == 0)
1395 ready = kn->kn_data > 0;
1396
1397 return (ready);
1398}
1399
1400/*ARGSUSED*/
1401static int
1402filt_pipewrite(struct knote *kn, long hint)
1403{
1404 struct pipebuf *rpb;
1405 struct pipebuf *wpb;
1406 struct pipe *pipe;
1407 int ready = 0;
1408
1409 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1410 if ((intptr_t)kn->kn_fp->f_data & 1) {
1411 rpb = &pipe->bufferB;
1412 wpb = &pipe->bufferA;
1413 } else {
1414 rpb = &pipe->bufferA;
1415 wpb = &pipe->bufferB;
1416 }
1417
1418 kn->kn_data = 0;
1419 if (wpb->state & PIPE_CLOSED) {
1420 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1421 return (1);
1422 }
1423
1424 /*
1425 * We shouldn't need the pipe locks because the knote itself is
1426 * locked via KN_PROCESSING. If we lose a race against the reader,
1427 * the writer will just issue a KNOTE() after us.
1428 */
1429#if 0
1430 lwkt_gettoken(&wpb->rlock);
1431 lwkt_gettoken(&wpb->wlock);
1432#endif
1433
1434 if (wpb->state & PIPE_WEOF) {
1435 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
1436 ready = 1;
1437 }
1438
1439 if (!ready) {
1440 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
1441 if (kn->kn_data < 0)
1442 kn->kn_data = 0;
1443 }
1444
1445#if 0
1446 lwkt_reltoken(&wpb->wlock);
1447 lwkt_reltoken(&wpb->rlock);
1448#endif
1449
1450 if (!ready)
1451 ready = kn->kn_data >= PIPE_BUF;
1452
1453 return (ready);
1454}