nrelease - fix/improve livecd
[dragonfly.git] / sys / kern / sys_pipe.c
CommitLineData
984263bc
MD
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
1dfdffca
MD
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
5 *
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
984263bc
MD
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
19 * John S. Dyson.
20 * 4. Modifications may be freely made to this file if the above conditions
21 * are met.
984263bc
MD
22 */
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
984263bc
MD
30#include <sys/param.h>
31#include <sys/systm.h>
fc7d5181 32#include <sys/kernel.h>
984263bc
MD
33#include <sys/proc.h>
34#include <sys/fcntl.h>
35#include <sys/file.h>
36#include <sys/filedesc.h>
37#include <sys/filio.h>
38#include <sys/ttycom.h>
39#include <sys/stat.h>
984263bc 40#include <sys/signalvar.h>
80d831e1 41#include <sys/sysmsg.h>
984263bc
MD
42#include <sys/pipe.h>
43#include <sys/vnode.h>
44#include <sys/uio.h>
45#include <sys/event.h>
fc7d5181
MD
46#include <sys/globaldata.h>
47#include <sys/module.h>
48#include <sys/malloc.h>
49#include <sys/sysctl.h>
004d2de5 50#include <sys/socket.h>
ca1161c6 51#include <sys/kern_syscall.h>
5eab490e
MD
52#include <sys/lock.h>
53#include <sys/mutex.h>
984263bc
MD
54
55#include <vm/vm.h>
56#include <vm/vm_param.h>
984263bc
MD
57#include <vm/vm_object.h>
58#include <vm/vm_kern.h>
59#include <vm/vm_extern.h>
60#include <vm/pmap.h>
61#include <vm/vm_map.h>
62#include <vm/vm_page.h>
63#include <vm/vm_zone.h>
64
dadab5e9 65#include <sys/file2.h>
607b0ed9 66#include <sys/signal2.h>
5eab490e 67#include <sys/mutex2.h>
dadab5e9 68
8100156a 69#include <machine/cpufunc.h>
984263bc 70
1dfdffca
MD
71struct pipegdlock {
72 struct mtx mtx;
73} __cachealign;
74
984263bc
MD
75/*
76 * interfaces to the outside world
77 */
402ed7e1 78static int pipe_read (struct file *fp, struct uio *uio,
87de5057 79 struct ucred *cred, int flags);
402ed7e1 80static int pipe_write (struct file *fp, struct uio *uio,
87de5057
MD
81 struct ucred *cred, int flags);
82static int pipe_close (struct file *fp);
83static int pipe_shutdown (struct file *fp, int how);
402ed7e1 84static int pipe_kqfilter (struct file *fp, struct knote *kn);
87de5057 85static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
87baaf0c
MD
86static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 struct ucred *cred, struct sysmsg *msg);
984263bc 88
a7c16d7a 89__read_mostly static struct fileops pipeops = {
b2d248cb
MD
90 .fo_read = pipe_read,
91 .fo_write = pipe_write,
92 .fo_ioctl = pipe_ioctl,
b2d248cb
MD
93 .fo_kqfilter = pipe_kqfilter,
94 .fo_stat = pipe_stat,
95 .fo_close = pipe_close,
96 .fo_shutdown = pipe_shutdown
984263bc
MD
97};
98
99static void filt_pipedetach(struct knote *kn);
100static int filt_piperead(struct knote *kn, long hint);
101static int filt_pipewrite(struct knote *kn, long hint);
102
a7c16d7a 103__read_mostly static struct filterops pipe_rfiltops =
a081e067 104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
a7c16d7a 105__read_mostly static struct filterops pipe_wfiltops =
a081e067 106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
984263bc 107
fc7d5181 108MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
984263bc 109
fc7d5181
MD
110#define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
111
a7c16d7a
MD
112__read_mostly static int pipe_maxcache = PIPEQ_MAX_CACHE;
113__read_mostly static struct pipegdlock *pipe_gdlocks;
fc7d5181
MD
114
115SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
fc7d5181
MD
116SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
9e465b5e
MD
118
119/*
120 * The pipe buffer size can be changed at any time. Only new pipe()s
121 * are affected. Note that due to cpu cache effects, you do not want
122 * to make this value too large.
123 */
a7c16d7a 124__read_mostly static int pipe_size = 32768;
1dfdffca
MD
125SYSCTL_INT(_kern_pipe, OID_AUTO, size,
126 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
9e465b5e
MD
127
128/*
129 * Reader/writer delay loop. When the reader exhausts the pipe buffer
130 * or the write completely fills the pipe buffer and would otherwise sleep,
131 * it first busy-loops for a few microseconds waiting for data or buffer
132 * space. This eliminates IPIs for most high-bandwidth writer/reader pipes
133 * and also helps when the user program uses a large data buffer in its
134 * UIOs.
135 *
136 * This defaults to 4uS.
137 */
138#ifdef _RDTSC_SUPPORTED_
a7c16d7a 139__read_mostly static int pipe_delay = 4000; /* 4uS default */
880ffa3a
MD
140SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
141 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
9e465b5e 142#endif
984263bc 143
b12defdc
MD
144/*
145 * Auto-size pipe cache to reduce kmem allocations and frees.
146 */
147static
148void
149pipeinit(void *dummy)
150{
151 size_t mbytes = kmem_lim_size();
5eab490e 152 int n;
b12defdc 153
b12defdc
MD
154 if (pipe_maxcache == PIPEQ_MAX_CACHE) {
155 if (mbytes >= 7 * 1024)
156 pipe_maxcache *= 2;
157 if (mbytes >= 15 * 1024)
158 pipe_maxcache *= 2;
159 }
0186d194
MD
160
161 /*
162 * Detune the pcpu caching a bit on systems with an insane number
163 * of cpu threads to reduce memory waste.
164 */
165 if (ncpus > 64) {
166 pipe_maxcache = pipe_maxcache * 64 / ncpus;
167 if (pipe_maxcache < PIPEQ_MAX_CACHE)
168 pipe_maxcache = PIPEQ_MAX_CACHE;
169 }
170
5eab490e
MD
171 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
172 M_PIPE, M_WAITOK | M_ZERO);
173 for (n = 0; n < ncpus; ++n)
1dfdffca 174 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
b12defdc 175}
f3f3eadb 176SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
b12defdc 177
1dfdffca
MD
178static void pipeclose (struct pipe *pipe,
179 struct pipebuf *pbr, struct pipebuf *pbw);
180static void pipe_free_kmem (struct pipebuf *buf);
181static int pipe_create (struct pipe **pipep);
984263bc 182
9e465b5e
MD
183/*
184 * Test and clear the specified flag, wakeup(pb) if it was set.
185 * This function must also act as a memory barrier.
186 */
187static __inline void
188pipesignal(struct pipebuf *pb, uint32_t flags)
189{
190 uint32_t oflags;
191 uint32_t nflags;
192
193 for (;;) {
194 oflags = pb->state;
195 cpu_ccfence();
196 nflags = oflags & ~flags;
197 if (atomic_cmpset_int(&pb->state, oflags, nflags))
198 break;
199 }
200 if (oflags & flags)
201 wakeup(pb);
202}
203
204/*
205 *
206 */
1ae37239 207static __inline void
1dfdffca 208pipewakeup(struct pipebuf *pb, int dosigio)
1ae37239 209{
1dfdffca 210 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
a8d3ab53 211 lwkt_gettoken(&sigio_token);
1dfdffca 212 pgsigio(pb->sigio, SIGIO, 0);
a8d3ab53 213 lwkt_reltoken(&sigio_token);
1ae37239 214 }
1dfdffca 215 KNOTE(&pb->kq.ki_note, 0);
1ae37239
MD
216}
217
218/*
219 * These routines are called before and after a UIO. The UIO
220 * may block, causing our held tokens to be lost temporarily.
221 *
222 * We use these routines to serialize reads against other reads
223 * and writes against other writes.
224 *
9e465b5e 225 * The appropriate token is held on entry so *ipp does not race.
1ae37239
MD
226 */
227static __inline int
1dfdffca 228pipe_start_uio(int *ipp)
1ae37239
MD
229{
230 int error;
231
232 while (*ipp) {
233 *ipp = -1;
234 error = tsleep(ipp, PCATCH, "pipexx", 0);
235 if (error)
236 return (error);
237 }
238 *ipp = 1;
239 return (0);
240}
241
242static __inline void
1dfdffca 243pipe_end_uio(int *ipp)
1ae37239
MD
244{
245 if (*ipp < 0) {
246 *ipp = 0;
247 wakeup(ipp);
248 } else {
930bd151 249 KKASSERT(*ipp > 0);
1ae37239
MD
250 *ipp = 0;
251 }
252}
253
984263bc
MD
254/*
255 * The pipe system call for the DTYPE_PIPE type of pipes
41c20dac 256 *
3919ced0
MD
257 * pipe_args(int dummy)
258 *
259 * MPSAFE
984263bc 260 */
984263bc 261int
80d831e1 262sys_pipe(struct sysmsg *sysmsg, const struct pipe_args *uap)
ca1161c6 263{
80d831e1 264 return kern_pipe(sysmsg->sysmsg_fds, 0);
ca1161c6
MD
265}
266
267int
80d831e1 268sys_pipe2(struct sysmsg *sysmsg, const struct pipe2_args *uap)
ca1161c6 269{
80d831e1 270 return kern_pipe(sysmsg->sysmsg_fds, uap->flags);
ca1161c6
MD
271}
272
273int
274kern_pipe(long *fds, int flags)
984263bc 275{
dadab5e9 276 struct thread *td = curthread;
f3a2d8c4 277 struct filedesc *fdp = td->td_proc->p_fd;
984263bc 278 struct file *rf, *wf;
1dfdffca 279 struct pipe *pipe;
90b9818c 280 int fd1, fd2, error;
984263bc 281
1dfdffca
MD
282 pipe = NULL;
283 if (pipe_create(&pipe)) {
284 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
285 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
984263bc
MD
286 return (ENFILE);
287 }
288
f3a2d8c4 289 error = falloc(td->td_lwp, &rf, &fd1);
984263bc 290 if (error) {
1dfdffca
MD
291 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
292 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
984263bc
MD
293 return (error);
294 }
ca1161c6 295 fds[0] = fd1;
984263bc
MD
296
297 /*
298 * Warning: once we've gotten past allocation of the fd for the
299 * read-side, we can only drop the read side via fdrop() in order
300 * to avoid races against processes which manage to dup() the read
301 * side while we are blocked trying to allocate the write side.
302 */
984263bc 303 rf->f_type = DTYPE_PIPE;
fbb4eeab 304 rf->f_flag = FREAD | FWRITE;
984263bc 305 rf->f_ops = &pipeops;
1dfdffca 306 rf->f_data = (void *)((intptr_t)pipe | 0);
ca1161c6
MD
307 if (flags & O_NONBLOCK)
308 rf->f_flag |= O_NONBLOCK;
309 if (flags & O_CLOEXEC)
310 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
311
f3a2d8c4 312 error = falloc(td->td_lwp, &wf, &fd2);
984263bc 313 if (error) {
f3a2d8c4 314 fsetfd(fdp, NULL, fd1);
9f87144f 315 fdrop(rf);
1dfdffca
MD
316 /* pipeA has been closed by fdrop() */
317 /* close pipeB here */
318 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
984263bc
MD
319 return (error);
320 }
984263bc 321 wf->f_type = DTYPE_PIPE;
fbb4eeab 322 wf->f_flag = FREAD | FWRITE;
984263bc 323 wf->f_ops = &pipeops;
1dfdffca 324 wf->f_data = (void *)((intptr_t)pipe | 1);
ca1161c6
MD
325 if (flags & O_NONBLOCK)
326 wf->f_flag |= O_NONBLOCK;
327 if (flags & O_CLOEXEC)
328 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
329
330 fds[1] = fd2;
984263bc 331
1ae37239
MD
332 /*
333 * Once activated the peer relationship remains valid until
334 * both sides are closed.
335 */
f3a2d8c4
MD
336 fsetfd(fdp, rf, fd1);
337 fsetfd(fdp, wf, fd2);
9f87144f
MD
338 fdrop(rf);
339 fdrop(wf);
984263bc
MD
340
341 return (0);
342}
343
344/*
1dfdffca
MD
345 * [re]allocates KVA for the pipe's circular buffer. The space is
346 * pageable. Called twice to setup full-duplex communications.
347 *
348 * NOTE: Independent vm_object's are used to improve performance.
349 *
350 * Returns 0 on success, ENOMEM on failure.
984263bc
MD
351 */
352static int
1dfdffca 353pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
984263bc
MD
354{
355 struct vm_object *object;
356 caddr_t buffer;
1dfdffca
MD
357 vm_pindex_t npages;
358 int error;
359
360 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
361 if (size < 16384)
362 size = 16384;
363 if (size > 1024*1024)
364 size = 1024*1024;
984263bc 365
fc7d5181 366 npages = round_page(size) / PAGE_SIZE;
1dfdffca 367 object = pb->object;
984263bc
MD
368
369 /*
fc7d5181
MD
370 * [re]create the object if necessary and reserve space for it
371 * in the kernel_map. The object and memory are pageable. On
372 * success, free the old resources before assigning the new
373 * ones.
984263bc 374 */
fc7d5181
MD
375 if (object == NULL || object->size != npages) {
376 object = vm_object_allocate(OBJT_DEFAULT, npages);
1eeaf6b2 377 buffer = (caddr_t)vm_map_min(kernel_map);
984263bc 378
1eeaf6b2 379 error = vm_map_find(kernel_map, object, NULL,
0adbcbd6 380 0, (vm_offset_t *)&buffer, size,
3091de50
MD
381 PAGE_SIZE, TRUE,
382 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
0adbcbd6 383 VM_PROT_ALL, VM_PROT_ALL, 0);
984263bc 384
fc7d5181
MD
385 if (error != KERN_SUCCESS) {
386 vm_object_deallocate(object);
387 return (ENOMEM);
388 }
1dfdffca
MD
389 pipe_free_kmem(pb);
390 pb->object = object;
391 pb->buffer = buffer;
392 pb->size = size;
fc7d5181 393 }
1dfdffca
MD
394 pb->rindex = 0;
395 pb->windex = 0;
396
984263bc
MD
397 return (0);
398}
399
400/*
fc7d5181 401 * Initialize and allocate VM and memory for pipe, pulling the pipe from
1dfdffca
MD
402 * our per-cpu cache if possible.
403 *
404 * Returns 0 on success, else an error code (typically ENOMEM). Caller
405 * must still deallocate the pipe on failure.
984263bc
MD
406 */
407static int
1dfdffca 408pipe_create(struct pipe **pipep)
984263bc 409{
fc7d5181 410 globaldata_t gd = mycpu;
1dfdffca 411 struct pipe *pipe;
984263bc
MD
412 int error;
413
1dfdffca
MD
414 if ((pipe = gd->gd_pipeq) != NULL) {
415 gd->gd_pipeq = pipe->next;
fc7d5181 416 --gd->gd_pipeqcount;
1dfdffca 417 pipe->next = NULL;
fc7d5181 418 } else {
1dfdffca 419 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
68891965 420 pipe->inum = gd->gd_anoninum++ * ncpus + gd->gd_cpuid + 2;
1dfdffca
MD
421 lwkt_token_init(&pipe->bufferA.rlock, "piper");
422 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
423 lwkt_token_init(&pipe->bufferB.rlock, "piper");
424 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
fc7d5181 425 }
1dfdffca
MD
426 *pipep = pipe;
427 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
984263bc 428 return (error);
1dfdffca
MD
429 }
430 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
431 return (error);
432 }
433 vfs_timestamp(&pipe->ctime);
434 pipe->bufferA.atime = pipe->ctime;
435 pipe->bufferA.mtime = pipe->ctime;
436 pipe->bufferB.atime = pipe->ctime;
437 pipe->bufferB.mtime = pipe->ctime;
438 pipe->open_count = 2;
439
984263bc
MD
440 return (0);
441}
442
1dfdffca
MD
443/*
444 * Read data from a pipe
445 */
984263bc 446static int
9ba76b73 447pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
984263bc 448{
1dfdffca
MD
449 struct pipebuf *rpb;
450 struct pipebuf *wpb;
451 struct pipe *pipe;
607b0ed9 452 size_t nread = 0;
1dfdffca
MD
453 size_t size; /* total bytes available */
454 size_t nsize; /* total bytes to read */
455 size_t rindex; /* contiguous bytes available */
b20720b5 456 int notify_writer;
607b0ed9
MD
457 int bigread;
458 int bigcount;
1dfdffca
MD
459 int error;
460 int nbio;
984263bc 461
1dfdffca
MD
462 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
463 if ((intptr_t)fp->f_data & 1) {
464 rpb = &pipe->bufferB;
465 wpb = &pipe->bufferA;
466 } else {
467 rpb = &pipe->bufferA;
468 wpb = &pipe->bufferB;
469 }
a3ef5f2e
MD
470 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
471
607b0ed9 472 if (uio->uio_resid == 0)
1ae37239
MD
473 return(0);
474
475 /*
1dfdffca 476 * Calculate nbio
1ae37239 477 */
9ba76b73
MD
478 if (fflags & O_FBLOCKING)
479 nbio = 0;
480 else if (fflags & O_FNONBLOCKING)
481 nbio = 1;
482 else if (fp->f_flag & O_NONBLOCK)
483 nbio = 1;
484 else
485 nbio = 0;
486
1ae37239 487 /*
1dfdffca
MD
488 * 'quick' NBIO test before things get expensive.
489 */
9e465b5e
MD
490 if (nbio && rpb->rindex == rpb->windex &&
491 (rpb->state & PIPE_REOF) == 0) {
1dfdffca 492 return EAGAIN;
9e465b5e 493 }
1dfdffca
MD
494
495 /*
496 * Reads are serialized. Note however that buffer.buffer and
497 * buffer.size can change out from under us when the number
1ae37239
MD
498 * of bytes in the buffer are zero due to the write-side doing a
499 * pipespace().
500 */
1dfdffca
MD
501 lwkt_gettoken(&rpb->rlock);
502 error = pipe_start_uio(&rpb->rip);
1ae37239 503 if (error) {
1dfdffca 504 lwkt_reltoken(&rpb->rlock);
1ae37239
MD
505 return (error);
506 }
b20720b5 507 notify_writer = 0;
607b0ed9
MD
508
509 bigread = (uio->uio_resid > 10 * 1024 * 1024);
510 bigcount = 10;
511
984263bc 512 while (uio->uio_resid) {
607b0ed9
MD
513 /*
514 * Don't hog the cpu.
515 */
516 if (bigread && --bigcount == 0) {
517 lwkt_user_yield();
518 bigcount = 10;
519 if (CURSIG(curthread->td_lwp)) {
520 error = EINTR;
521 break;
522 }
523 }
524
9e465b5e
MD
525 /*
526 * lfence required to avoid read-reordering of buffer
527 * contents prior to validation of size.
528 */
1dfdffca 529 size = rpb->windex - rpb->rindex;
1ae37239 530 cpu_lfence();
c600838f 531 if (size) {
1dfdffca 532 rindex = rpb->rindex & (rpb->size - 1);
1ae37239 533 nsize = size;
1dfdffca
MD
534 if (nsize > rpb->size - rindex)
535 nsize = rpb->size - rindex;
607b0ed9 536 nsize = szmin(nsize, uio->uio_resid);
c600838f 537
9e465b5e
MD
538 /*
539 * Limit how much we move in one go so we have a
540 * chance to kick the writer while data is still
541 * available in the pipe. This avoids getting into
542 * a ping-pong with the writer.
543 */
544 if (nsize > (rpb->size >> 1))
545 nsize = rpb->size >> 1;
546
1dfdffca 547 error = uiomove(&rpb->buffer[rindex], nsize, uio);
984263bc
MD
548 if (error)
549 break;
1dfdffca 550 rpb->rindex += nsize;
1ae37239 551 nread += nsize;
984263bc
MD
552
553 /*
880ffa3a 554 * If the FIFO is still over half full just continue
9e465b5e
MD
555 * and do not try to notify the writer yet. If
556 * less than half full notify any waiting writer.
984263bc 557 */
9e465b5e 558 if (size - nsize > (rpb->size >> 1)) {
b20720b5 559 notify_writer = 0;
9e465b5e
MD
560 } else {
561 notify_writer = 1;
562 pipesignal(rpb, PIPE_WANTW);
984263bc 563 }
9e465b5e 564 continue;
1ae37239 565 }
984263bc 566
1ae37239
MD
567 /*
568 * If the "write-side" was blocked we wake it up. This code
9e465b5e 569 * is reached when the buffer is completely emptied.
1ae37239 570 */
9e465b5e 571 pipesignal(rpb, PIPE_WANTW);
984263bc 572
1ae37239
MD
573 /*
574 * Pick up our copy loop again if the writer sent data to
880ffa3a
MD
575 * us while we were messing around.
576 *
577 * On a SMP box poll up to pipe_delay nanoseconds for new
578 * data. Typically a value of 2000 to 4000 is sufficient
579 * to eradicate most IPIs/tsleeps/wakeups when a pipe
580 * is used for synchronous communications with small packets,
581 * and 8000 or so (8uS) will pipeline large buffer xfers
582 * between cpus over a pipe.
583 *
584 * For synchronous communications a hit means doing a
585 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
586 * where as miss requiring a tsleep/wakeup sequence
587 * will take 7uS or more.
1ae37239 588 */
1dfdffca 589 if (rpb->windex != rpb->rindex)
1ae37239 590 continue;
984263bc 591
1918fc5c 592#ifdef _RDTSC_SUPPORTED_
880ffa3a
MD
593 if (pipe_delay) {
594 int64_t tsc_target;
595 int good = 0;
596
597 tsc_target = tsc_get_target(pipe_delay);
598 while (tsc_test_target(tsc_target) == 0) {
9e465b5e 599 cpu_lfence();
1dfdffca 600 if (rpb->windex != rpb->rindex) {
880ffa3a
MD
601 good = 1;
602 break;
603 }
9e465b5e 604 cpu_pause();
880ffa3a
MD
605 }
606 if (good)
607 continue;
608 }
609#endif
610
1ae37239
MD
611 /*
612 * Detect EOF condition, do not set error.
613 */
1dfdffca 614 if (rpb->state & PIPE_REOF)
1ae37239 615 break;
984263bc 616
1ae37239
MD
617 /*
618 * Break if some data was read, or if this was a non-blocking
619 * read.
620 */
621 if (nread > 0)
622 break;
623
624 if (nbio) {
625 error = EAGAIN;
626 break;
627 }
628
629 /*
9e465b5e 630 * Last chance, interlock with WANTR
1ae37239 631 */
9e465b5e
MD
632 tsleep_interlock(rpb, PCATCH);
633 atomic_set_int(&rpb->state, PIPE_WANTR);
1ae37239 634
1bfdcce2 635 /*
9e465b5e 636 * Retest bytes available after memory barrier above.
1bfdcce2 637 */
9e465b5e
MD
638 size = rpb->windex - rpb->rindex;
639 if (size)
640 continue;
1bfdcce2 641
1ae37239 642 /*
9e465b5e 643 * Retest EOF after memory barrier above.
1ae37239 644 */
9e465b5e
MD
645 if (rpb->state & PIPE_REOF)
646 break;
1ae37239
MD
647
648 /*
9e465b5e 649 * Wait for more data or state change
1ae37239 650 */
1dfdffca 651 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
1ae37239
MD
652 if (error)
653 break;
984263bc 654 }
1dfdffca 655 pipe_end_uio(&rpb->rip);
984263bc 656
1ae37239
MD
657 /*
658 * Uptime last access time
659 */
d489a79a 660 if (error == 0 && nread && rpb->lticks != ticks) {
1dfdffca 661 vfs_timestamp(&rpb->atime);
d489a79a
MD
662 rpb->lticks = ticks;
663 }
984263bc
MD
664
665 /*
b20720b5
MD
666 * If we drained the FIFO more then half way then handle
667 * write blocking hysteresis.
1ae37239 668 *
b20720b5
MD
669 * Note that PIPE_WANTW cannot be set by the writer without
670 * it holding both rlock and wlock, so we can test it
671 * while holding just rlock.
984263bc 672 */
b20720b5 673 if (notify_writer) {
8315ba5b
MD
674 /*
675 * Synchronous blocking is done on the pipe involved
676 */
9e465b5e 677 pipesignal(rpb, PIPE_WANTW);
8315ba5b
MD
678
679 /*
680 * But we may also have to deal with a kqueue which is
681 * stored on the same pipe as its descriptor, so a
682 * EVFILT_WRITE event waiting for our side to drain will
683 * be on the other side.
684 */
1dfdffca 685 pipewakeup(wpb, 0);
984263bc 686 }
1dfdffca
MD
687 /*size = rpb->windex - rpb->rindex;*/
688 lwkt_reltoken(&rpb->rlock);
984263bc 689
984263bc
MD
690 return (error);
691}
692
984263bc 693static int
9ba76b73 694pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
984263bc 695{
1dfdffca
MD
696 struct pipebuf *rpb;
697 struct pipebuf *wpb;
698 struct pipe *pipe;
699 size_t windex;
700 size_t space;
701 size_t wcount;
702 size_t orig_resid;
607b0ed9
MD
703 int bigwrite;
704 int bigcount;
1dfdffca
MD
705 int error;
706 int nbio;
707
708 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
709 if ((intptr_t)fp->f_data & 1) {
710 rpb = &pipe->bufferB;
711 wpb = &pipe->bufferA;
712 } else {
713 rpb = &pipe->bufferA;
714 wpb = &pipe->bufferB;
715 }
716
717 /*
718 * Calculate nbio
719 */
720 if (fflags & O_FBLOCKING)
721 nbio = 0;
722 else if (fflags & O_FNONBLOCKING)
723 nbio = 1;
724 else if (fp->f_flag & O_NONBLOCK)
725 nbio = 1;
726 else
727 nbio = 0;
728
729 /*
730 * 'quick' NBIO test before things get expensive.
731 */
732 if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
733 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
734 return EAGAIN;
735 }
984263bc 736
1ae37239
MD
737 /*
738 * Writes go to the peer. The peer will always exist.
739 */
1dfdffca
MD
740 lwkt_gettoken(&wpb->wlock);
741 if (wpb->state & PIPE_WEOF) {
742 lwkt_reltoken(&wpb->wlock);
1ae37239
MD
743 return (EPIPE);
744 }
984263bc
MD
745
746 /*
1ae37239 747 * Degenerate case (EPIPE takes prec)
984263bc 748 */
1ae37239 749 if (uio->uio_resid == 0) {
1dfdffca 750 lwkt_reltoken(&wpb->wlock);
1ae37239
MD
751 return(0);
752 }
753
754 /*
755 * Writes are serialized (start_uio must be called with wlock)
756 */
1dfdffca 757 error = pipe_start_uio(&wpb->wip);
1ae37239 758 if (error) {
1dfdffca 759 lwkt_reltoken(&wpb->wlock);
1ae37239 760 return (error);
984263bc 761 }
984263bc 762
984263bc 763 orig_resid = uio->uio_resid;
1ae37239 764 wcount = 0;
984263bc 765
607b0ed9
MD
766 bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
767 bigcount = 10;
768
984263bc 769 while (uio->uio_resid) {
1dfdffca 770 if (wpb->state & PIPE_WEOF) {
984263bc
MD
771 error = EPIPE;
772 break;
773 }
774
607b0ed9
MD
775 /*
776 * Don't hog the cpu.
777 */
778 if (bigwrite && --bigcount == 0) {
779 lwkt_user_yield();
780 bigcount = 10;
781 if (CURSIG(curthread->td_lwp)) {
782 error = EINTR;
783 break;
784 }
785 }
786
1dfdffca
MD
787 windex = wpb->windex & (wpb->size - 1);
788 space = wpb->size - (wpb->windex - wpb->rindex);
984263bc 789
9e465b5e
MD
790 /*
791 * Writes of size <= PIPE_BUF must be atomic.
792 */
984263bc
MD
793 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
794 space = 0;
795
c617bada
MD
796 /*
797 * Write to fill, read size handles write hysteresis. Also
798 * additional restrictions can cause select-based non-blocking
799 * writes to spin.
800 */
801 if (space > 0) {
1dfdffca 802 size_t segsize;
984263bc 803
984263bc 804 /*
9e465b5e
MD
805 * We want to notify a potentially waiting reader
806 * before we exhaust the write buffer for SMP
807 * pipelining. Otherwise the write/read will begin
808 * to ping-pong.
984263bc 809 */
607b0ed9 810 space = szmin(space, uio->uio_resid);
1dfdffca
MD
811 if (space > (wpb->size >> 1))
812 space = (wpb->size >> 1);
984263bc
MD
813
814 /*
1ae37239
MD
815 * First segment to transfer is minimum of
816 * transfer size and contiguous space in
817 * pipe buffer. If first segment to transfer
818 * is less than the transfer size, we've got
819 * a wraparound in the buffer.
984263bc 820 */
1dfdffca 821 segsize = wpb->size - windex;
1ae37239
MD
822 if (segsize > space)
823 segsize = space;
984263bc
MD
824
825 /*
1ae37239
MD
826 * If this is the first loop and the reader is
827 * blocked, do a preemptive wakeup of the reader.
828 *
39055880
MD
829 * On SMP the IPI latency plus the wlock interlock
830 * on the reader side is the fastest way to get the
831 * reader going. (The scheduler will hard loop on
832 * lock tokens).
984263bc 833 */
9e465b5e
MD
834 if (wcount == 0)
835 pipesignal(wpb, PIPE_WANTR);
984263bc 836
984263bc 837 /*
880ffa3a
MD
838 * Transfer segment, which may include a wrap-around.
839 * Update windex to account for both all in one go
840 * so the reader can read() the data atomically.
984263bc 841 */
1dfdffca 842 error = uiomove(&wpb->buffer[windex], segsize, uio);
1ae37239 843 if (error == 0 && segsize < space) {
1ae37239 844 segsize = space - segsize;
1dfdffca 845 error = uiomove(&wpb->buffer[0], segsize, uio);
1ae37239
MD
846 }
847 if (error)
984263bc 848 break;
9e465b5e
MD
849
850 /*
851 * Memory fence prior to windex updating (note: not
852 * needed so this is a NOP on Intel).
853 */
854 cpu_sfence();
1dfdffca 855 wpb->windex += space;
9e465b5e
MD
856
857 /*
858 * Signal reader
859 */
860 if (wcount != 0)
861 pipesignal(wpb, PIPE_WANTR);
1ae37239
MD
862 wcount += space;
863 continue;
984263bc 864 }
984263bc 865
1ae37239 866 /*
9e465b5e 867 * Wakeup any pending reader
1ae37239 868 */
9e465b5e 869 pipesignal(wpb, PIPE_WANTR);
1ae37239
MD
870
871 /*
872 * don't block on non-blocking I/O
873 */
874 if (nbio) {
1ae37239
MD
875 error = EAGAIN;
876 break;
877 }
878
9e465b5e
MD
879#ifdef _RDTSC_SUPPORTED_
880 if (pipe_delay) {
881 int64_t tsc_target;
882 int good = 0;
883
884 tsc_target = tsc_get_target(pipe_delay);
885 while (tsc_test_target(tsc_target) == 0) {
886 cpu_lfence();
887 space = wpb->size - (wpb->windex - wpb->rindex);
888 if ((space < uio->uio_resid) &&
889 (orig_resid <= PIPE_BUF)) {
890 space = 0;
891 }
892 if (space) {
893 good = 1;
894 break;
895 }
896 cpu_pause();
897 }
898 if (good)
899 continue;
900 }
901#endif
902
903 /*
904 * Interlocked test. Atomic op enforces the memory barrier.
905 */
906 tsleep_interlock(wpb, PCATCH);
907 atomic_set_int(&wpb->state, PIPE_WANTW);
908
b20720b5 909 /*
9e465b5e
MD
910 * Retest space available after memory barrier above.
911 * Writes of size <= PIPE_BUF must be atomic.
b20720b5 912 */
1dfdffca 913 space = wpb->size - (wpb->windex - wpb->rindex);
b20720b5
MD
914 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
915 space = 0;
916
1bfdcce2 917 /*
9e465b5e 918 * Retest EOF after memory barrier above.
1bfdcce2 919 */
1dfdffca 920 if (wpb->state & PIPE_WEOF) {
1bfdcce2
MD
921 error = EPIPE;
922 break;
923 }
924
1ae37239
MD
925 /*
926 * We have no more space and have something to offer,
5b22f1a7 927 * wake up select/poll/kq.
1ae37239 928 */
b20720b5 929 if (space == 0) {
1dfdffca 930 pipewakeup(wpb, 1);
9e465b5e 931 error = tsleep(wpb, PCATCH | PINTERLOCKED, "pipewr", 0);
b20720b5 932 }
1ae37239
MD
933
934 /*
935 * Break out if we errored or the read side wants us to go
936 * away.
937 */
938 if (error)
939 break;
1dfdffca 940 if (wpb->state & PIPE_WEOF) {
1ae37239
MD
941 error = EPIPE;
942 break;
943 }
944 }
1dfdffca 945 pipe_end_uio(&wpb->wip);
1ae37239
MD
946
947 /*
948 * If we have put any characters in the buffer, we wake up
949 * the reader.
950 *
951 * Both rlock and wlock are required to be able to modify pipe_state.
952 */
1dfdffca 953 if (wpb->windex != wpb->rindex) {
9e465b5e 954 pipesignal(wpb, PIPE_WANTR);
1dfdffca 955 pipewakeup(wpb, 1);
984263bc
MD
956 }
957
958 /*
959 * Don't return EPIPE if I/O was successful
960 */
1dfdffca 961 if ((wpb->rindex == wpb->windex) &&
984263bc
MD
962 (uio->uio_resid == 0) &&
963 (error == EPIPE)) {
964 error = 0;
965 }
966
d489a79a 967 if (error == 0 && wpb->lticks != ticks) {
1dfdffca 968 vfs_timestamp(&wpb->mtime);
d489a79a
MD
969 wpb->lticks = ticks;
970 }
984263bc
MD
971
972 /*
973 * We have something to offer,
5b22f1a7 974 * wake up select/poll/kq.
984263bc 975 */
1dfdffca
MD
976 /*space = wpb->windex - wpb->rindex;*/
977 lwkt_reltoken(&wpb->wlock);
978
984263bc
MD
979 return (error);
980}
981
982/*
983 * we implement a very minimal set of ioctls for compatibility with sockets.
984 */
59b728a7 985static int
87baaf0c
MD
986pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
987 struct ucred *cred, struct sysmsg *msg)
984263bc 988{
1dfdffca
MD
989 struct pipebuf *rpb;
990 struct pipe *pipe;
d9b2033e 991 int error;
984263bc 992
1dfdffca
MD
993 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
994 if ((intptr_t)fp->f_data & 1) {
995 rpb = &pipe->bufferB;
996 } else {
997 rpb = &pipe->bufferA;
998 }
984263bc 999
1dfdffca
MD
1000 lwkt_gettoken(&rpb->rlock);
1001 lwkt_gettoken(&rpb->wlock);
880ffa3a 1002
d9b2033e 1003 switch (cmd) {
984263bc
MD
1004 case FIOASYNC:
1005 if (*(int *)data) {
9e465b5e 1006 atomic_set_int(&rpb->state, PIPE_ASYNC);
984263bc 1007 } else {
9e465b5e 1008 atomic_clear_int(&rpb->state, PIPE_ASYNC);
984263bc 1009 }
d9b2033e
MD
1010 error = 0;
1011 break;
984263bc 1012 case FIONREAD:
1dfdffca 1013 *(int *)data = (int)(rpb->windex - rpb->rindex);
d9b2033e
MD
1014 error = 0;
1015 break;
984263bc 1016 case FIOSETOWN:
1dfdffca 1017 error = fsetown(*(int *)data, &rpb->sigio);
d9b2033e 1018 break;
984263bc 1019 case FIOGETOWN:
1dfdffca 1020 *(int *)data = fgetown(&rpb->sigio);
d9b2033e
MD
1021 error = 0;
1022 break;
984263bc 1023 case TIOCSPGRP:
d9b2033e 1024 /* This is deprecated, FIOSETOWN should be used instead. */
1dfdffca 1025 error = fsetown(-(*(int *)data), &rpb->sigio);
d9b2033e 1026 break;
984263bc 1027
984263bc 1028 case TIOCGPGRP:
d9b2033e 1029 /* This is deprecated, FIOGETOWN should be used instead. */
1dfdffca 1030 *(int *)data = -fgetown(&rpb->sigio);
d9b2033e
MD
1031 error = 0;
1032 break;
1033 default:
1034 error = ENOTTY;
1035 break;
984263bc 1036 }
1dfdffca
MD
1037 lwkt_reltoken(&rpb->wlock);
1038 lwkt_reltoken(&rpb->rlock);
880ffa3a 1039
d9b2033e 1040 return (error);
984263bc
MD
1041}
1042
d9b2033e 1043/*
1ee6e3c6 1044 * MPSAFE
d9b2033e 1045 */
984263bc 1046static int
87de5057 1047pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
984263bc 1048{
1dfdffca 1049 struct pipebuf *rpb;
d9b2033e
MD
1050 struct pipe *pipe;
1051
1dfdffca
MD
1052 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1053 if ((intptr_t)fp->f_data & 1) {
1054 rpb = &pipe->bufferB;
1055 } else {
1056 rpb = &pipe->bufferA;
1057 }
984263bc
MD
1058
1059 bzero((caddr_t)ub, sizeof(*ub));
1060 ub->st_mode = S_IFIFO;
1dfdffca
MD
1061 ub->st_blksize = rpb->size;
1062 ub->st_size = rpb->windex - rpb->rindex;
4f048b1c 1063 ub->st_blocks = howmany(ub->st_size, ub->st_blksize);
1dfdffca
MD
1064 ub->st_atimespec = rpb->atime;
1065 ub->st_mtimespec = rpb->mtime;
1066 ub->st_ctimespec = pipe->ctime;
68891965
MD
1067 ub->st_uid = fp->f_cred->cr_uid;
1068 ub->st_gid = fp->f_cred->cr_gid;
1069 ub->st_ino = pipe->inum;
984263bc 1070 /*
68891965 1071 * Left as 0: st_dev, st_nlink, st_rdev,
984263bc
MD
1072 * st_flags, st_gen.
1073 * XXX (st_dev, st_ino) should be unique.
1074 */
1dfdffca 1075
984263bc
MD
1076 return (0);
1077}
1078
984263bc 1079static int
87de5057 1080pipe_close(struct file *fp)
984263bc 1081{
1dfdffca
MD
1082 struct pipebuf *rpb;
1083 struct pipebuf *wpb;
1084 struct pipe *pipe;
1085
1086 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1087 if ((intptr_t)fp->f_data & 1) {
1088 rpb = &pipe->bufferB;
1089 wpb = &pipe->bufferA;
1090 } else {
1091 rpb = &pipe->bufferA;
1092 wpb = &pipe->bufferB;
1093 }
984263bc
MD
1094
1095 fp->f_ops = &badfileops;
1096 fp->f_data = NULL;
1dfdffca
MD
1097 funsetown(&rpb->sigio);
1098 pipeclose(pipe, rpb, wpb);
1099
984263bc
MD
1100 return (0);
1101}
1102
004d2de5
MD
1103/*
1104 * Shutdown one or both directions of a full-duplex pipe.
1105 */
004d2de5 1106static int
87de5057 1107pipe_shutdown(struct file *fp, int how)
004d2de5 1108{
1dfdffca
MD
1109 struct pipebuf *rpb;
1110 struct pipebuf *wpb;
1111 struct pipe *pipe;
004d2de5
MD
1112 int error = EPIPE;
1113
1dfdffca
MD
1114 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1115 if ((intptr_t)fp->f_data & 1) {
1116 rpb = &pipe->bufferB;
1117 wpb = &pipe->bufferA;
1118 } else {
1119 rpb = &pipe->bufferA;
1120 wpb = &pipe->bufferB;
1121 }
1ae37239
MD
1122
1123 /*
1124 * We modify pipe_state on both pipes, which means we need
1125 * all four tokens!
1126 */
1dfdffca
MD
1127 lwkt_gettoken(&rpb->rlock);
1128 lwkt_gettoken(&rpb->wlock);
1129 lwkt_gettoken(&wpb->rlock);
1130 lwkt_gettoken(&wpb->wlock);
d9b2033e 1131
004d2de5
MD
1132 switch(how) {
1133 case SHUT_RDWR:
1134 case SHUT_RD:
9e465b5e
MD
1135 /*
1136 * EOF on my reads and peer writes
1137 */
1138 atomic_set_int(&rpb->state, PIPE_REOF | PIPE_WEOF);
1dfdffca
MD
1139 if (rpb->state & PIPE_WANTR) {
1140 rpb->state &= ~PIPE_WANTR;
1141 wakeup(rpb);
004d2de5 1142 }
1dfdffca
MD
1143 if (rpb->state & PIPE_WANTW) {
1144 rpb->state &= ~PIPE_WANTW;
1145 wakeup(rpb);
1ae37239 1146 }
1ae37239 1147 error = 0;
004d2de5
MD
1148 if (how == SHUT_RD)
1149 break;
1150 /* fall through */
1151 case SHUT_WR:
9e465b5e
MD
1152 /*
1153 * EOF on peer reads and my writes
1154 */
1155 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1dfdffca
MD
1156 if (wpb->state & PIPE_WANTR) {
1157 wpb->state &= ~PIPE_WANTR;
1158 wakeup(wpb);
930bd151 1159 }
1dfdffca
MD
1160 if (wpb->state & PIPE_WANTW) {
1161 wpb->state &= ~PIPE_WANTW;
1162 wakeup(wpb);
1ae37239 1163 }
1ae37239
MD
1164 error = 0;
1165 break;
004d2de5 1166 }
1dfdffca
MD
1167 pipewakeup(rpb, 1);
1168 pipewakeup(wpb, 1);
1ae37239 1169
1dfdffca
MD
1170 lwkt_reltoken(&wpb->wlock);
1171 lwkt_reltoken(&wpb->rlock);
1172 lwkt_reltoken(&rpb->wlock);
1173 lwkt_reltoken(&rpb->rlock);
1ae37239 1174
004d2de5
MD
1175 return (error);
1176}
1177
5eab490e
MD
1178/*
1179 * Destroy the pipe buffer.
1180 */
984263bc 1181static void
1dfdffca 1182pipe_free_kmem(struct pipebuf *pb)
984263bc 1183{
1dfdffca 1184 if (pb->buffer != NULL) {
1eeaf6b2 1185 kmem_free(kernel_map, (vm_offset_t)pb->buffer, pb->size);
1dfdffca
MD
1186 pb->buffer = NULL;
1187 pb->object = NULL;
984263bc 1188 }
984263bc
MD
1189}
1190
1191/*
1dfdffca
MD
1192 * Close one half of the pipe. We are closing the pipe for reading on rpb
1193 * and writing on wpb. This routine must be called twice with the pipebufs
1194 * reversed to close both directions.
984263bc
MD
1195 */
1196static void
1dfdffca 1197pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
984263bc 1198{
fc7d5181 1199 globaldata_t gd;
984263bc 1200
1dfdffca 1201 if (pipe == NULL)
fc7d5181 1202 return;
984263bc 1203
1ae37239 1204 /*
1ae37239
MD
1205 * We need both the read and write tokens to modify pipe_state.
1206 */
1dfdffca
MD
1207 lwkt_gettoken(&rpb->rlock);
1208 lwkt_gettoken(&rpb->wlock);
984263bc 1209
fc7d5181 1210 /*
5b22f1a7 1211 * Set our state, wakeup anyone waiting in select/poll/kq, and
1dfdffca
MD
1212 * wakeup anyone blocked on our pipe. No action if our side
1213 * is already closed.
fc7d5181 1214 */
1dfdffca
MD
1215 if (rpb->state & PIPE_CLOSED) {
1216 lwkt_reltoken(&rpb->wlock);
1217 lwkt_reltoken(&rpb->rlock);
1218 return;
fc7d5181 1219 }
984263bc 1220
9e465b5e 1221 atomic_set_int(&rpb->state, PIPE_CLOSED | PIPE_REOF | PIPE_WEOF);
1dfdffca
MD
1222 pipewakeup(rpb, 1);
1223 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1224 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1225 wakeup(rpb);
1ae37239 1226 }
1dfdffca
MD
1227 lwkt_reltoken(&rpb->wlock);
1228 lwkt_reltoken(&rpb->rlock);
fc7d5181 1229
1ae37239 1230 /*
1dfdffca 1231 * Disconnect from peer.
1ae37239 1232 */
1dfdffca
MD
1233 lwkt_gettoken(&wpb->rlock);
1234 lwkt_gettoken(&wpb->wlock);
1235
9e465b5e 1236 atomic_set_int(&wpb->state, PIPE_REOF | PIPE_WEOF);
1dfdffca
MD
1237 pipewakeup(wpb, 1);
1238 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1239 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1240 wakeup(wpb);
fc7d5181 1241 }
1dfdffca
MD
1242 if (SLIST_FIRST(&wpb->kq.ki_note))
1243 KNOTE(&wpb->kq.ki_note, 0);
1244 lwkt_reltoken(&wpb->wlock);
1245 lwkt_reltoken(&wpb->rlock);
8100156a 1246
fc7d5181 1247 /*
1dfdffca
MD
1248 * Free resources once both sides are closed. We maintain a pcpu
1249 * cache to improve performance, so the actual tear-down case is
1250 * limited to bulk situations.
5eab490e
MD
1251 *
1252 * However, the bulk tear-down case can cause intense contention
1253 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1254 * of processes are killed at the same time. To deal with this we
1255 * use a pcpu mutex to maintain concurrency but also limit the
1256 * number of threads banging on the map and pmap.
1257 *
1258 * We use the mtx mechanism instead of the lockmgr mechanism because
1259 * the mtx mechanism utilizes a queued design which will not break
1260 * down in the face of thousands to hundreds of thousands of
1261 * processes trying to free pipes simultaneously. The lockmgr
1262 * mechanism will wind up waking them all up each time a lock
1263 * cycles.
fc7d5181 1264 */
1dfdffca 1265 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1ae37239 1266 gd = mycpu;
1dfdffca
MD
1267 if (gd->gd_pipeqcount >= pipe_maxcache) {
1268 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1269 pipe_free_kmem(rpb);
1270 pipe_free_kmem(wpb);
1271 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1272 kfree(pipe, M_PIPE);
1ae37239 1273 } else {
1dfdffca
MD
1274 rpb->state = 0;
1275 wpb->state = 0;
1276 pipe->next = gd->gd_pipeq;
1277 gd->gd_pipeq = pipe;
1ae37239
MD
1278 ++gd->gd_pipeqcount;
1279 }
984263bc
MD
1280 }
1281}
1282
984263bc
MD
1283static int
1284pipe_kqfilter(struct file *fp, struct knote *kn)
1285{
1dfdffca
MD
1286 struct pipebuf *rpb;
1287 struct pipebuf *wpb;
1288 struct pipe *pipe;
d9b2033e 1289
1dfdffca
MD
1290 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1291 if ((intptr_t)fp->f_data & 1) {
1292 rpb = &pipe->bufferB;
1293 wpb = &pipe->bufferA;
1294 } else {
1295 rpb = &pipe->bufferA;
1296 wpb = &pipe->bufferB;
1297 }
984263bc
MD
1298
1299 switch (kn->kn_filter) {
1300 case EVFILT_READ:
1301 kn->kn_fop = &pipe_rfiltops;
1302 break;
1303 case EVFILT_WRITE:
1304 kn->kn_fop = &pipe_wfiltops;
984263bc
MD
1305 break;
1306 default:
b287d649 1307 return (EOPNOTSUPP);
984263bc 1308 }
984263bc 1309
1dfdffca
MD
1310 if (rpb == &pipe->bufferA)
1311 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1312 else
1313 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1314
1315 knote_insert(&rpb->kq.ki_note, kn);
3720cffa 1316
984263bc
MD
1317 return (0);
1318}
1319
1320static void
1321filt_pipedetach(struct knote *kn)
1322{
1dfdffca
MD
1323 struct pipebuf *rpb;
1324 struct pipebuf *wpb;
1325 struct pipe *pipe;
984263bc 1326
1dfdffca
MD
1327 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1328 if ((intptr_t)kn->kn_hook & 1) {
1329 rpb = &pipe->bufferB;
1330 wpb = &pipe->bufferA;
1331 } else {
1332 rpb = &pipe->bufferA;
1333 wpb = &pipe->bufferB;
1334 }
1335 knote_remove(&rpb->kq.ki_note, kn);
984263bc
MD
1336}
1337
1338/*ARGSUSED*/
1339static int
1340filt_piperead(struct knote *kn, long hint)
1341{
1dfdffca
MD
1342 struct pipebuf *rpb;
1343 struct pipebuf *wpb;
1344 struct pipe *pipe;
13b050b0
SG
1345 int ready = 0;
1346
1dfdffca
MD
1347 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1348 if ((intptr_t)kn->kn_fp->f_data & 1) {
1349 rpb = &pipe->bufferB;
1350 wpb = &pipe->bufferA;
1351 } else {
1352 rpb = &pipe->bufferA;
1353 wpb = &pipe->bufferB;
1354 }
984263bc 1355
945890d8
MD
1356 /*
1357 * We shouldn't need the pipe locks because the knote itself is
1358 * locked via KN_PROCESSING. If we lose a race against the writer,
1359 * the writer will just issue a KNOTE() after us.
1360 */
1361#if 0
1dfdffca
MD
1362 lwkt_gettoken(&rpb->rlock);
1363 lwkt_gettoken(&rpb->wlock);
945890d8 1364#endif
8c4ed426 1365
1dfdffca 1366 kn->kn_data = rpb->windex - rpb->rindex;
945890d8
MD
1367 if (kn->kn_data < 0)
1368 kn->kn_data = 0;
1dfdffca
MD
1369
1370 if (rpb->state & PIPE_REOF) {
3bcb6e5e
SZ
1371 /*
1372 * Only set NODATA if all data has been exhausted
1373 */
1374 if (kn->kn_data == 0)
1375 kn->kn_flags |= EV_NODATA;
26a9aa6b
MD
1376 kn->kn_flags |= EV_EOF;
1377
1378 /*
1379 * Only set HUP if the pipe is completely closed.
1380 * half-closed does not count (to make the behavior
1381 * the same as linux).
1382 */
1383 if (wpb->state & PIPE_CLOSED) {
1384 kn->kn_flags |= EV_HUP;
1385 ready = 1;
1386 }
984263bc 1387 }
3720cffa 1388
945890d8 1389#if 0
1dfdffca
MD
1390 lwkt_reltoken(&rpb->wlock);
1391 lwkt_reltoken(&rpb->rlock);
945890d8 1392#endif
13b050b0 1393
6df899ee 1394 if (!ready && (kn->kn_sfflags & NOTE_HUPONLY) == 0)
13b050b0
SG
1395 ready = kn->kn_data > 0;
1396
1397 return (ready);
984263bc
MD
1398}
1399
1400/*ARGSUSED*/
1401static int
1402filt_pipewrite(struct knote *kn, long hint)
1403{
1dfdffca
MD
1404 struct pipebuf *rpb;
1405 struct pipebuf *wpb;
1406 struct pipe *pipe;
13b050b0
SG
1407 int ready = 0;
1408
1dfdffca
MD
1409 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1410 if ((intptr_t)kn->kn_fp->f_data & 1) {
1411 rpb = &pipe->bufferB;
1412 wpb = &pipe->bufferA;
1413 } else {
1414 rpb = &pipe->bufferA;
1415 wpb = &pipe->bufferB;
1416 }
1417
045ef32c 1418 kn->kn_data = 0;
1dfdffca 1419 if (wpb->state & PIPE_CLOSED) {
added858 1420 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
045ef32c
SG
1421 return (1);
1422 }
1423
945890d8
MD
1424 /*
1425 * We shouldn't need the pipe locks because the knote itself is
1426 * locked via KN_PROCESSING. If we lose a race against the reader,
1427 * the writer will just issue a KNOTE() after us.
1428 */
1429#if 0
1dfdffca
MD
1430 lwkt_gettoken(&wpb->rlock);
1431 lwkt_gettoken(&wpb->wlock);
945890d8 1432#endif
13b050b0 1433
1dfdffca 1434 if (wpb->state & PIPE_WEOF) {
added858 1435 kn->kn_flags |= EV_EOF | EV_HUP | EV_NODATA;
13b050b0 1436 ready = 1;
984263bc 1437 }
3720cffa 1438
945890d8 1439 if (!ready) {
1dfdffca 1440 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
945890d8
MD
1441 if (kn->kn_data < 0)
1442 kn->kn_data = 0;
1443 }
13b050b0 1444
945890d8 1445#if 0
1dfdffca
MD
1446 lwkt_reltoken(&wpb->wlock);
1447 lwkt_reltoken(&wpb->rlock);
945890d8 1448#endif
13b050b0
SG
1449
1450 if (!ready)
1451 ready = kn->kn_data >= PIPE_BUF;
1452
1453 return (ready);
984263bc 1454}