kernel - Refactor sys_pipe
[dragonfly.git] / sys / kern / sys_pipe.c
CommitLineData
984263bc
MD
1/*
2 * Copyright (c) 1996 John S. Dyson
3 * All rights reserved.
1dfdffca
MD
4 * Copyright (c) 2003-2017 The DragonFly Project. All rights reserved.
5 *
6 * This code is derived from software contributed to The DragonFly Project
7 * by Matthew Dillon <dillon@backplane.com>
984263bc
MD
8 *
9 * Redistribution and use in source and binary forms, with or without
10 * modification, are permitted provided that the following conditions
11 * are met:
12 * 1. Redistributions of source code must retain the above copyright
13 * notice immediately at the beginning of the file, without modification,
14 * this list of conditions, and the following disclaimer.
15 * 2. Redistributions in binary form must reproduce the above copyright
16 * notice, this list of conditions and the following disclaimer in the
17 * documentation and/or other materials provided with the distribution.
18 * 3. Absolutely no warranty of function or purpose is made by the author
19 * John S. Dyson.
20 * 4. Modifications may be freely made to this file if the above conditions
21 * are met.
984263bc
MD
22 */
23
24/*
25 * This file contains a high-performance replacement for the socket-based
26 * pipes scheme originally used in FreeBSD/4.4Lite. It does not support
27 * all features of sockets, but does do everything that pipes normally
28 * do.
29 */
984263bc
MD
30#include <sys/param.h>
31#include <sys/systm.h>
fc7d5181 32#include <sys/kernel.h>
984263bc
MD
33#include <sys/proc.h>
34#include <sys/fcntl.h>
35#include <sys/file.h>
36#include <sys/filedesc.h>
37#include <sys/filio.h>
38#include <sys/ttycom.h>
39#include <sys/stat.h>
984263bc
MD
40#include <sys/signalvar.h>
41#include <sys/sysproto.h>
42#include <sys/pipe.h>
43#include <sys/vnode.h>
44#include <sys/uio.h>
45#include <sys/event.h>
fc7d5181
MD
46#include <sys/globaldata.h>
47#include <sys/module.h>
48#include <sys/malloc.h>
49#include <sys/sysctl.h>
004d2de5 50#include <sys/socket.h>
ca1161c6 51#include <sys/kern_syscall.h>
5eab490e
MD
52#include <sys/lock.h>
53#include <sys/mutex.h>
984263bc
MD
54
55#include <vm/vm.h>
56#include <vm/vm_param.h>
984263bc
MD
57#include <vm/vm_object.h>
58#include <vm/vm_kern.h>
59#include <vm/vm_extern.h>
60#include <vm/pmap.h>
61#include <vm/vm_map.h>
62#include <vm/vm_page.h>
63#include <vm/vm_zone.h>
64
dadab5e9 65#include <sys/file2.h>
607b0ed9 66#include <sys/signal2.h>
5eab490e 67#include <sys/mutex2.h>
dadab5e9 68
8100156a 69#include <machine/cpufunc.h>
984263bc 70
1dfdffca
MD
71struct pipegdlock {
72 struct mtx mtx;
73} __cachealign;
74
984263bc
MD
75/*
76 * interfaces to the outside world
77 */
402ed7e1 78static int pipe_read (struct file *fp, struct uio *uio,
87de5057 79 struct ucred *cred, int flags);
402ed7e1 80static int pipe_write (struct file *fp, struct uio *uio,
87de5057
MD
81 struct ucred *cred, int flags);
82static int pipe_close (struct file *fp);
83static int pipe_shutdown (struct file *fp, int how);
402ed7e1 84static int pipe_kqfilter (struct file *fp, struct knote *kn);
87de5057 85static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
87baaf0c
MD
86static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
87 struct ucred *cred, struct sysmsg *msg);
984263bc
MD
88
89static struct fileops pipeops = {
b2d248cb
MD
90 .fo_read = pipe_read,
91 .fo_write = pipe_write,
92 .fo_ioctl = pipe_ioctl,
b2d248cb
MD
93 .fo_kqfilter = pipe_kqfilter,
94 .fo_stat = pipe_stat,
95 .fo_close = pipe_close,
96 .fo_shutdown = pipe_shutdown
984263bc
MD
97};
98
99static void filt_pipedetach(struct knote *kn);
100static int filt_piperead(struct knote *kn, long hint);
101static int filt_pipewrite(struct knote *kn, long hint);
102
103static struct filterops pipe_rfiltops =
a081e067 104 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
984263bc 105static struct filterops pipe_wfiltops =
a081e067 106 { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
984263bc 107
fc7d5181 108MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
984263bc 109
fc7d5181
MD
110#define PIPEQ_MAX_CACHE 16 /* per-cpu pipe structure cache */
111
fc7d5181 112static int pipe_maxcache = PIPEQ_MAX_CACHE;
1dfdffca 113static struct pipegdlock *pipe_gdlocks;
fc7d5181
MD
114
115SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
fc7d5181
MD
116SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
117 CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
1dfdffca
MD
118static int pipe_size = 32768;
119SYSCTL_INT(_kern_pipe, OID_AUTO, size,
120 CTLFLAG_RW, &pipe_size, 0, "Pipe buffer size (16384 minimum)");
880ffa3a
MD
121static int pipe_delay = 5000; /* 5uS default */
122SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
123 CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
984263bc 124
b12defdc
MD
125/*
126 * Auto-size pipe cache to reduce kmem allocations and frees.
127 */
128static
129void
130pipeinit(void *dummy)
131{
132 size_t mbytes = kmem_lim_size();
5eab490e 133 int n;
b12defdc 134
b12defdc
MD
135 if (pipe_maxcache == PIPEQ_MAX_CACHE) {
136 if (mbytes >= 7 * 1024)
137 pipe_maxcache *= 2;
138 if (mbytes >= 15 * 1024)
139 pipe_maxcache *= 2;
140 }
5eab490e
MD
141 pipe_gdlocks = kmalloc(sizeof(*pipe_gdlocks) * ncpus,
142 M_PIPE, M_WAITOK | M_ZERO);
143 for (n = 0; n < ncpus; ++n)
1dfdffca 144 mtx_init(&pipe_gdlocks[n].mtx, "pipekm");
b12defdc 145}
f3f3eadb 146SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL);
b12defdc 147
1dfdffca
MD
148static void pipeclose (struct pipe *pipe,
149 struct pipebuf *pbr, struct pipebuf *pbw);
150static void pipe_free_kmem (struct pipebuf *buf);
151static int pipe_create (struct pipe **pipep);
984263bc 152
1ae37239 153static __inline void
1dfdffca 154pipewakeup(struct pipebuf *pb, int dosigio)
1ae37239 155{
1dfdffca 156 if (dosigio && (pb->state & PIPE_ASYNC) && pb->sigio) {
a8d3ab53 157 lwkt_gettoken(&sigio_token);
1dfdffca 158 pgsigio(pb->sigio, SIGIO, 0);
a8d3ab53 159 lwkt_reltoken(&sigio_token);
1ae37239 160 }
1dfdffca 161 KNOTE(&pb->kq.ki_note, 0);
1ae37239
MD
162}
163
164/*
165 * These routines are called before and after a UIO. The UIO
166 * may block, causing our held tokens to be lost temporarily.
167 *
168 * We use these routines to serialize reads against other reads
169 * and writes against other writes.
170 *
171 * The read token is held on entry so *ipp does not race.
172 */
173static __inline int
1dfdffca 174pipe_start_uio(int *ipp)
1ae37239
MD
175{
176 int error;
177
178 while (*ipp) {
179 *ipp = -1;
180 error = tsleep(ipp, PCATCH, "pipexx", 0);
181 if (error)
182 return (error);
183 }
184 *ipp = 1;
185 return (0);
186}
187
188static __inline void
1dfdffca 189pipe_end_uio(int *ipp)
1ae37239
MD
190{
191 if (*ipp < 0) {
192 *ipp = 0;
193 wakeup(ipp);
194 } else {
930bd151 195 KKASSERT(*ipp > 0);
1ae37239
MD
196 *ipp = 0;
197 }
198}
199
984263bc
MD
200/*
201 * The pipe system call for the DTYPE_PIPE type of pipes
41c20dac 202 *
3919ced0
MD
203 * pipe_args(int dummy)
204 *
205 * MPSAFE
984263bc 206 */
984263bc 207int
753fd850 208sys_pipe(struct pipe_args *uap)
ca1161c6
MD
209{
210 return kern_pipe(uap->sysmsg_fds, 0);
211}
212
213int
214sys_pipe2(struct pipe2_args *uap)
215{
216 return kern_pipe(uap->sysmsg_fds, uap->flags);
217}
218
219int
220kern_pipe(long *fds, int flags)
984263bc 221{
dadab5e9 222 struct thread *td = curthread;
f3a2d8c4 223 struct filedesc *fdp = td->td_proc->p_fd;
984263bc 224 struct file *rf, *wf;
1dfdffca 225 struct pipe *pipe;
90b9818c 226 int fd1, fd2, error;
984263bc 227
1dfdffca
MD
228 pipe = NULL;
229 if (pipe_create(&pipe)) {
230 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
231 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
984263bc
MD
232 return (ENFILE);
233 }
234
f3a2d8c4 235 error = falloc(td->td_lwp, &rf, &fd1);
984263bc 236 if (error) {
1dfdffca
MD
237 pipeclose(pipe, &pipe->bufferA, &pipe->bufferB);
238 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
984263bc
MD
239 return (error);
240 }
ca1161c6 241 fds[0] = fd1;
984263bc
MD
242
243 /*
244 * Warning: once we've gotten past allocation of the fd for the
245 * read-side, we can only drop the read side via fdrop() in order
246 * to avoid races against processes which manage to dup() the read
247 * side while we are blocked trying to allocate the write side.
248 */
984263bc 249 rf->f_type = DTYPE_PIPE;
fbb4eeab 250 rf->f_flag = FREAD | FWRITE;
984263bc 251 rf->f_ops = &pipeops;
1dfdffca 252 rf->f_data = (void *)((intptr_t)pipe | 0);
ca1161c6
MD
253 if (flags & O_NONBLOCK)
254 rf->f_flag |= O_NONBLOCK;
255 if (flags & O_CLOEXEC)
256 fdp->fd_files[fd1].fileflags |= UF_EXCLOSE;
257
f3a2d8c4 258 error = falloc(td->td_lwp, &wf, &fd2);
984263bc 259 if (error) {
f3a2d8c4 260 fsetfd(fdp, NULL, fd1);
9f87144f 261 fdrop(rf);
1dfdffca
MD
262 /* pipeA has been closed by fdrop() */
263 /* close pipeB here */
264 pipeclose(pipe, &pipe->bufferB, &pipe->bufferA);
984263bc
MD
265 return (error);
266 }
984263bc 267 wf->f_type = DTYPE_PIPE;
fbb4eeab 268 wf->f_flag = FREAD | FWRITE;
984263bc 269 wf->f_ops = &pipeops;
1dfdffca 270 wf->f_data = (void *)((intptr_t)pipe | 1);
ca1161c6
MD
271 if (flags & O_NONBLOCK)
272 wf->f_flag |= O_NONBLOCK;
273 if (flags & O_CLOEXEC)
274 fdp->fd_files[fd2].fileflags |= UF_EXCLOSE;
275
276 fds[1] = fd2;
984263bc 277
1ae37239
MD
278 /*
279 * Once activated the peer relationship remains valid until
280 * both sides are closed.
281 */
f3a2d8c4
MD
282 fsetfd(fdp, rf, fd1);
283 fsetfd(fdp, wf, fd2);
9f87144f
MD
284 fdrop(rf);
285 fdrop(wf);
984263bc
MD
286
287 return (0);
288}
289
290/*
1dfdffca
MD
291 * [re]allocates KVA for the pipe's circular buffer. The space is
292 * pageable. Called twice to setup full-duplex communications.
293 *
294 * NOTE: Independent vm_object's are used to improve performance.
295 *
296 * Returns 0 on success, ENOMEM on failure.
984263bc
MD
297 */
298static int
1dfdffca 299pipespace(struct pipe *pipe, struct pipebuf *pb, size_t size)
984263bc
MD
300{
301 struct vm_object *object;
302 caddr_t buffer;
1dfdffca
MD
303 vm_pindex_t npages;
304 int error;
305
306 size = (size + PAGE_MASK) & ~(size_t)PAGE_MASK;
307 if (size < 16384)
308 size = 16384;
309 if (size > 1024*1024)
310 size = 1024*1024;
984263bc 311
fc7d5181 312 npages = round_page(size) / PAGE_SIZE;
1dfdffca 313 object = pb->object;
984263bc
MD
314
315 /*
fc7d5181
MD
316 * [re]create the object if necessary and reserve space for it
317 * in the kernel_map. The object and memory are pageable. On
318 * success, free the old resources before assigning the new
319 * ones.
984263bc 320 */
fc7d5181
MD
321 if (object == NULL || object->size != npages) {
322 object = vm_object_allocate(OBJT_DEFAULT, npages);
e4846942 323 buffer = (caddr_t)vm_map_min(&kernel_map);
984263bc 324
0adbcbd6
MD
325 error = vm_map_find(&kernel_map, object, NULL,
326 0, (vm_offset_t *)&buffer, size,
3091de50
MD
327 PAGE_SIZE, TRUE,
328 VM_MAPTYPE_NORMAL, VM_SUBSYS_PIPE,
0adbcbd6 329 VM_PROT_ALL, VM_PROT_ALL, 0);
984263bc 330
fc7d5181
MD
331 if (error != KERN_SUCCESS) {
332 vm_object_deallocate(object);
333 return (ENOMEM);
334 }
1dfdffca
MD
335 pipe_free_kmem(pb);
336 pb->object = object;
337 pb->buffer = buffer;
338 pb->size = size;
fc7d5181 339 }
1dfdffca
MD
340 pb->rindex = 0;
341 pb->windex = 0;
342
984263bc
MD
343 return (0);
344}
345
346/*
fc7d5181 347 * Initialize and allocate VM and memory for pipe, pulling the pipe from
1dfdffca
MD
348 * our per-cpu cache if possible.
349 *
350 * Returns 0 on success, else an error code (typically ENOMEM). Caller
351 * must still deallocate the pipe on failure.
984263bc
MD
352 */
353static int
1dfdffca 354pipe_create(struct pipe **pipep)
984263bc 355{
fc7d5181 356 globaldata_t gd = mycpu;
1dfdffca 357 struct pipe *pipe;
984263bc
MD
358 int error;
359
1dfdffca
MD
360 if ((pipe = gd->gd_pipeq) != NULL) {
361 gd->gd_pipeq = pipe->next;
fc7d5181 362 --gd->gd_pipeqcount;
1dfdffca 363 pipe->next = NULL;
fc7d5181 364 } else {
1dfdffca
MD
365 pipe = kmalloc(sizeof(*pipe), M_PIPE, M_WAITOK | M_ZERO);
366 lwkt_token_init(&pipe->bufferA.rlock, "piper");
367 lwkt_token_init(&pipe->bufferA.wlock, "pipew");
368 lwkt_token_init(&pipe->bufferB.rlock, "piper");
369 lwkt_token_init(&pipe->bufferB.wlock, "pipew");
fc7d5181 370 }
1dfdffca
MD
371 *pipep = pipe;
372 if ((error = pipespace(pipe, &pipe->bufferA, pipe_size)) != 0) {
984263bc 373 return (error);
1dfdffca
MD
374 }
375 if ((error = pipespace(pipe, &pipe->bufferB, pipe_size)) != 0) {
376 return (error);
377 }
378 vfs_timestamp(&pipe->ctime);
379 pipe->bufferA.atime = pipe->ctime;
380 pipe->bufferA.mtime = pipe->ctime;
381 pipe->bufferB.atime = pipe->ctime;
382 pipe->bufferB.mtime = pipe->ctime;
383 pipe->open_count = 2;
384
984263bc
MD
385 return (0);
386}
387
1dfdffca
MD
388/*
389 * Read data from a pipe
390 */
984263bc 391static int
9ba76b73 392pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
984263bc 393{
1dfdffca
MD
394 struct pipebuf *rpb;
395 struct pipebuf *wpb;
396 struct pipe *pipe;
607b0ed9 397 size_t nread = 0;
1dfdffca
MD
398 size_t size; /* total bytes available */
399 size_t nsize; /* total bytes to read */
400 size_t rindex; /* contiguous bytes available */
b20720b5 401 int notify_writer;
607b0ed9
MD
402 int bigread;
403 int bigcount;
1dfdffca
MD
404 int error;
405 int nbio;
984263bc 406
1dfdffca
MD
407 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
408 if ((intptr_t)fp->f_data & 1) {
409 rpb = &pipe->bufferB;
410 wpb = &pipe->bufferA;
411 } else {
412 rpb = &pipe->bufferA;
413 wpb = &pipe->bufferB;
414 }
a3ef5f2e
MD
415 atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
416
607b0ed9 417 if (uio->uio_resid == 0)
1ae37239
MD
418 return(0);
419
420 /*
1dfdffca 421 * Calculate nbio
1ae37239 422 */
9ba76b73
MD
423 if (fflags & O_FBLOCKING)
424 nbio = 0;
425 else if (fflags & O_FNONBLOCKING)
426 nbio = 1;
427 else if (fp->f_flag & O_NONBLOCK)
428 nbio = 1;
429 else
430 nbio = 0;
431
1ae37239 432 /*
1dfdffca
MD
433 * 'quick' NBIO test before things get expensive.
434 */
435 if (nbio && rpb->rindex == rpb->windex)
436 return EAGAIN;
437
438 /*
439 * Reads are serialized. Note however that buffer.buffer and
440 * buffer.size can change out from under us when the number
1ae37239
MD
441 * of bytes in the buffer are zero due to the write-side doing a
442 * pipespace().
443 */
1dfdffca
MD
444 lwkt_gettoken(&rpb->rlock);
445 error = pipe_start_uio(&rpb->rip);
1ae37239 446 if (error) {
1dfdffca 447 lwkt_reltoken(&rpb->rlock);
1ae37239
MD
448 return (error);
449 }
b20720b5 450 notify_writer = 0;
607b0ed9
MD
451
452 bigread = (uio->uio_resid > 10 * 1024 * 1024);
453 bigcount = 10;
454
984263bc 455 while (uio->uio_resid) {
607b0ed9
MD
456 /*
457 * Don't hog the cpu.
458 */
459 if (bigread && --bigcount == 0) {
460 lwkt_user_yield();
461 bigcount = 10;
462 if (CURSIG(curthread->td_lwp)) {
463 error = EINTR;
464 break;
465 }
466 }
467
1dfdffca 468 size = rpb->windex - rpb->rindex;
1ae37239 469 cpu_lfence();
c600838f 470 if (size) {
1dfdffca 471 rindex = rpb->rindex & (rpb->size - 1);
1ae37239 472 nsize = size;
1dfdffca
MD
473 if (nsize > rpb->size - rindex)
474 nsize = rpb->size - rindex;
607b0ed9 475 nsize = szmin(nsize, uio->uio_resid);
c600838f 476
1dfdffca 477 error = uiomove(&rpb->buffer[rindex], nsize, uio);
984263bc
MD
478 if (error)
479 break;
1ae37239 480 cpu_mfence();
1dfdffca 481 rpb->rindex += nsize;
1ae37239 482 nread += nsize;
984263bc
MD
483
484 /*
880ffa3a
MD
485 * If the FIFO is still over half full just continue
486 * and do not try to notify the writer yet.
984263bc 487 */
1dfdffca 488 if (size - nsize >= (rpb->size >> 1)) {
b20720b5 489 notify_writer = 0;
1ae37239 490 continue;
984263bc 491 }
b20720b5
MD
492
493 /*
880ffa3a
MD
494 * When the FIFO is less then half full notify any
495 * waiting writer. WANTW can be checked while
496 * holding just the rlock.
b20720b5
MD
497 */
498 notify_writer = 1;
1dfdffca 499 if ((rpb->state & PIPE_WANTW) == 0)
b20720b5 500 continue;
1ae37239 501 }
984263bc 502
1ae37239
MD
503 /*
504 * If the "write-side" was blocked we wake it up. This code
505 * is reached either when the buffer is completely emptied
506 * or if it becomes more then half-empty.
507 *
508 * Pipe_state can only be modified if both the rlock and
509 * wlock are held.
510 */
1dfdffca
MD
511 if (rpb->state & PIPE_WANTW) {
512 lwkt_gettoken(&rpb->wlock);
513 if (rpb->state & PIPE_WANTW) {
514 rpb->state &= ~PIPE_WANTW;
515 lwkt_reltoken(&rpb->wlock);
516 wakeup(rpb);
1ae37239 517 } else {
1dfdffca 518 lwkt_reltoken(&rpb->wlock);
984263bc 519 }
1ae37239 520 }
984263bc 521
1ae37239
MD
522 /*
523 * Pick up our copy loop again if the writer sent data to
880ffa3a
MD
524 * us while we were messing around.
525 *
526 * On a SMP box poll up to pipe_delay nanoseconds for new
527 * data. Typically a value of 2000 to 4000 is sufficient
528 * to eradicate most IPIs/tsleeps/wakeups when a pipe
529 * is used for synchronous communications with small packets,
530 * and 8000 or so (8uS) will pipeline large buffer xfers
531 * between cpus over a pipe.
532 *
533 * For synchronous communications a hit means doing a
534 * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
535 * where as miss requiring a tsleep/wakeup sequence
536 * will take 7uS or more.
1ae37239 537 */
1dfdffca 538 if (rpb->windex != rpb->rindex)
1ae37239 539 continue;
984263bc 540
1918fc5c 541#ifdef _RDTSC_SUPPORTED_
880ffa3a
MD
542 if (pipe_delay) {
543 int64_t tsc_target;
544 int good = 0;
545
546 tsc_target = tsc_get_target(pipe_delay);
547 while (tsc_test_target(tsc_target) == 0) {
1dfdffca 548 if (rpb->windex != rpb->rindex) {
880ffa3a
MD
549 good = 1;
550 break;
551 }
552 }
553 if (good)
554 continue;
555 }
556#endif
557
1ae37239
MD
558 /*
559 * Detect EOF condition, do not set error.
560 */
1dfdffca 561 if (rpb->state & PIPE_REOF)
1ae37239 562 break;
984263bc 563
1ae37239
MD
564 /*
565 * Break if some data was read, or if this was a non-blocking
566 * read.
567 */
568 if (nread > 0)
569 break;
570
571 if (nbio) {
572 error = EAGAIN;
573 break;
574 }
575
576 /*
577 * Last chance, interlock with WANTR.
578 */
1dfdffca
MD
579 lwkt_gettoken(&rpb->wlock);
580 size = rpb->windex - rpb->rindex;
1ae37239 581 if (size) {
1dfdffca 582 lwkt_reltoken(&rpb->wlock);
1ae37239 583 continue;
984263bc 584 }
1ae37239 585
1bfdcce2
MD
586 /*
587 * Retest EOF - acquiring a new token can temporarily release
588 * tokens already held.
589 */
1dfdffca
MD
590 if (rpb->state & PIPE_REOF) {
591 lwkt_reltoken(&rpb->wlock);
1bfdcce2 592 break;
9aff1d71 593 }
1bfdcce2 594
1ae37239
MD
595 /*
596 * If there is no more to read in the pipe, reset its
597 * pointers to the beginning. This improves cache hit
598 * stats.
599 *
600 * We need both locks to modify both pointers, and there
601 * must also not be a write in progress or the uiomove()
602 * in the write might block and temporarily release
603 * its wlock, then reacquire and update windex. We are
604 * only serialized against reads, not writes.
605 *
606 * XXX should we even bother resetting the indices? It
607 * might actually be more cache efficient not to.
608 */
1dfdffca
MD
609 if (rpb->rindex == rpb->windex && rpb->wip == 0) {
610 rpb->rindex = 0;
611 rpb->windex = 0;
1ae37239
MD
612 }
613
614 /*
615 * Wait for more data.
616 *
617 * Pipe_state can only be set if both the rlock and wlock
618 * are held.
619 */
1dfdffca
MD
620 rpb->state |= PIPE_WANTR;
621 tsleep_interlock(rpb, PCATCH);
622 lwkt_reltoken(&rpb->wlock);
623 error = tsleep(rpb, PCATCH | PINTERLOCKED, "piperd", 0);
1ae37239
MD
624 if (error)
625 break;
984263bc 626 }
1dfdffca 627 pipe_end_uio(&rpb->rip);
984263bc 628
1ae37239
MD
629 /*
630 * Uptime last access time
631 */
632 if (error == 0 && nread)
1dfdffca 633 vfs_timestamp(&rpb->atime);
984263bc
MD
634
635 /*
b20720b5
MD
636 * If we drained the FIFO more then half way then handle
637 * write blocking hysteresis.
1ae37239 638 *
b20720b5
MD
639 * Note that PIPE_WANTW cannot be set by the writer without
640 * it holding both rlock and wlock, so we can test it
641 * while holding just rlock.
984263bc 642 */
b20720b5 643 if (notify_writer) {
8315ba5b
MD
644 /*
645 * Synchronous blocking is done on the pipe involved
646 */
1dfdffca
MD
647 if (rpb->state & PIPE_WANTW) {
648 lwkt_gettoken(&rpb->wlock);
649 if (rpb->state & PIPE_WANTW) {
650 rpb->state &= ~PIPE_WANTW;
651 lwkt_reltoken(&rpb->wlock);
652 wakeup(rpb);
1ae37239 653 } else {
1dfdffca 654 lwkt_reltoken(&rpb->wlock);
1ae37239 655 }
984263bc 656 }
8315ba5b
MD
657
658 /*
659 * But we may also have to deal with a kqueue which is
660 * stored on the same pipe as its descriptor, so a
661 * EVFILT_WRITE event waiting for our side to drain will
662 * be on the other side.
663 */
1dfdffca
MD
664 lwkt_gettoken(&wpb->wlock);
665 pipewakeup(wpb, 0);
666 lwkt_reltoken(&wpb->wlock);
984263bc 667 }
1dfdffca
MD
668 /*size = rpb->windex - rpb->rindex;*/
669 lwkt_reltoken(&rpb->rlock);
984263bc 670
984263bc
MD
671 return (error);
672}
673
984263bc 674static int
9ba76b73 675pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
984263bc 676{
1dfdffca
MD
677 struct pipebuf *rpb;
678 struct pipebuf *wpb;
679 struct pipe *pipe;
680 size_t windex;
681 size_t space;
682 size_t wcount;
683 size_t orig_resid;
607b0ed9
MD
684 int bigwrite;
685 int bigcount;
1dfdffca
MD
686 int error;
687 int nbio;
688
689 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
690 if ((intptr_t)fp->f_data & 1) {
691 rpb = &pipe->bufferB;
692 wpb = &pipe->bufferA;
693 } else {
694 rpb = &pipe->bufferA;
695 wpb = &pipe->bufferB;
696 }
697
698 /*
699 * Calculate nbio
700 */
701 if (fflags & O_FBLOCKING)
702 nbio = 0;
703 else if (fflags & O_FNONBLOCKING)
704 nbio = 1;
705 else if (fp->f_flag & O_NONBLOCK)
706 nbio = 1;
707 else
708 nbio = 0;
709
710 /*
711 * 'quick' NBIO test before things get expensive.
712 */
713 if (nbio && wpb->size == (wpb->windex - wpb->rindex) &&
714 uio->uio_resid && (wpb->state & PIPE_WEOF) == 0) {
715 return EAGAIN;
716 }
984263bc 717
1ae37239
MD
718 /*
719 * Writes go to the peer. The peer will always exist.
720 */
1dfdffca
MD
721 lwkt_gettoken(&wpb->wlock);
722 if (wpb->state & PIPE_WEOF) {
723 lwkt_reltoken(&wpb->wlock);
1ae37239
MD
724 return (EPIPE);
725 }
984263bc
MD
726
727 /*
1ae37239 728 * Degenerate case (EPIPE takes prec)
984263bc 729 */
1ae37239 730 if (uio->uio_resid == 0) {
1dfdffca 731 lwkt_reltoken(&wpb->wlock);
1ae37239
MD
732 return(0);
733 }
734
735 /*
736 * Writes are serialized (start_uio must be called with wlock)
737 */
1dfdffca 738 error = pipe_start_uio(&wpb->wip);
1ae37239 739 if (error) {
1dfdffca 740 lwkt_reltoken(&wpb->wlock);
1ae37239 741 return (error);
984263bc 742 }
984263bc 743
984263bc 744 orig_resid = uio->uio_resid;
1ae37239 745 wcount = 0;
984263bc 746
607b0ed9
MD
747 bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
748 bigcount = 10;
749
984263bc 750 while (uio->uio_resid) {
1dfdffca 751 if (wpb->state & PIPE_WEOF) {
984263bc
MD
752 error = EPIPE;
753 break;
754 }
755
607b0ed9
MD
756 /*
757 * Don't hog the cpu.
758 */
759 if (bigwrite && --bigcount == 0) {
760 lwkt_user_yield();
761 bigcount = 10;
762 if (CURSIG(curthread->td_lwp)) {
763 error = EINTR;
764 break;
765 }
766 }
767
1dfdffca
MD
768 windex = wpb->windex & (wpb->size - 1);
769 space = wpb->size - (wpb->windex - wpb->rindex);
1ae37239 770 cpu_lfence();
984263bc
MD
771
772 /* Writes of size <= PIPE_BUF must be atomic. */
773 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
774 space = 0;
775
c617bada
MD
776 /*
777 * Write to fill, read size handles write hysteresis. Also
778 * additional restrictions can cause select-based non-blocking
779 * writes to spin.
780 */
781 if (space > 0) {
1dfdffca 782 size_t segsize;
984263bc 783
984263bc 784 /*
1ae37239
MD
785 * Transfer size is minimum of uio transfer
786 * and free space in pipe buffer.
787 *
1dfdffca 788 * Limit each uiocopy to no more then wpb->size
1ae37239 789 * so we can keep the gravy train going on a
1dfdffca
MD
790 * SMP box. This significantly increases write
791 * performance. Otherwise large writes wind up doing
792 * an inefficient synchronous ping-pong.
984263bc 793 */
607b0ed9 794 space = szmin(space, uio->uio_resid);
1dfdffca
MD
795 if (space > (wpb->size >> 1))
796 space = (wpb->size >> 1);
984263bc
MD
797
798 /*
1ae37239
MD
799 * First segment to transfer is minimum of
800 * transfer size and contiguous space in
801 * pipe buffer. If first segment to transfer
802 * is less than the transfer size, we've got
803 * a wraparound in the buffer.
984263bc 804 */
1dfdffca 805 segsize = wpb->size - windex;
1ae37239
MD
806 if (segsize > space)
807 segsize = space;
984263bc
MD
808
809 /*
1ae37239
MD
810 * If this is the first loop and the reader is
811 * blocked, do a preemptive wakeup of the reader.
812 *
39055880
MD
813 * On SMP the IPI latency plus the wlock interlock
814 * on the reader side is the fastest way to get the
815 * reader going. (The scheduler will hard loop on
816 * lock tokens).
1ae37239
MD
817 *
818 * NOTE: We can't clear WANTR here without acquiring
819 * the rlock, which we don't want to do here!
984263bc 820 */
1dfdffca
MD
821 if ((wpb->state & PIPE_WANTR))
822 wakeup(wpb);
984263bc 823
984263bc 824 /*
880ffa3a
MD
825 * Transfer segment, which may include a wrap-around.
826 * Update windex to account for both all in one go
827 * so the reader can read() the data atomically.
984263bc 828 */
1dfdffca 829 error = uiomove(&wpb->buffer[windex], segsize, uio);
1ae37239 830 if (error == 0 && segsize < space) {
1ae37239 831 segsize = space - segsize;
1dfdffca 832 error = uiomove(&wpb->buffer[0], segsize, uio);
1ae37239
MD
833 }
834 if (error)
984263bc 835 break;
880ffa3a 836 cpu_mfence();
1dfdffca 837 wpb->windex += space;
1ae37239
MD
838 wcount += space;
839 continue;
984263bc 840 }
984263bc 841
1ae37239
MD
842 /*
843 * We need both the rlock and the wlock to interlock against
844 * the EOF, WANTW, and size checks, and to modify pipe_state.
845 *
846 * These are token locks so we do not have to worry about
847 * deadlocks.
848 */
1dfdffca 849 lwkt_gettoken(&wpb->rlock);
984263bc 850
984263bc 851 /*
1ae37239
MD
852 * If the "read-side" has been blocked, wake it up now
853 * and yield to let it drain synchronously rather
854 * then block.
984263bc 855 */
1dfdffca
MD
856 if (wpb->state & PIPE_WANTR) {
857 wpb->state &= ~PIPE_WANTR;
858 wakeup(wpb);
984263bc 859 }
1ae37239
MD
860
861 /*
862 * don't block on non-blocking I/O
863 */
864 if (nbio) {
1dfdffca 865 lwkt_reltoken(&wpb->rlock);
1ae37239
MD
866 error = EAGAIN;
867 break;
868 }
869
b20720b5
MD
870 /*
871 * re-test whether we have to block in the writer after
872 * acquiring both locks, in case the reader opened up
873 * some space.
874 */
1dfdffca 875 space = wpb->size - (wpb->windex - wpb->rindex);
b20720b5
MD
876 cpu_lfence();
877 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
878 space = 0;
879
1bfdcce2
MD
880 /*
881 * Retest EOF - acquiring a new token can temporarily release
882 * tokens already held.
883 */
1dfdffca
MD
884 if (wpb->state & PIPE_WEOF) {
885 lwkt_reltoken(&wpb->rlock);
1bfdcce2
MD
886 error = EPIPE;
887 break;
888 }
889
1ae37239
MD
890 /*
891 * We have no more space and have something to offer,
5b22f1a7 892 * wake up select/poll/kq.
1ae37239 893 */
b20720b5 894 if (space == 0) {
1dfdffca
MD
895 wpb->state |= PIPE_WANTW;
896 pipewakeup(wpb, 1);
897 if (wpb->state & PIPE_WANTW)
898 error = tsleep(wpb, PCATCH, "pipewr", 0);
b20720b5 899 }
1dfdffca 900 lwkt_reltoken(&wpb->rlock);
1ae37239
MD
901
902 /*
903 * Break out if we errored or the read side wants us to go
904 * away.
905 */
906 if (error)
907 break;
1dfdffca 908 if (wpb->state & PIPE_WEOF) {
1ae37239
MD
909 error = EPIPE;
910 break;
911 }
912 }
1dfdffca 913 pipe_end_uio(&wpb->wip);
1ae37239
MD
914
915 /*
916 * If we have put any characters in the buffer, we wake up
917 * the reader.
918 *
919 * Both rlock and wlock are required to be able to modify pipe_state.
920 */
1dfdffca
MD
921 if (wpb->windex != wpb->rindex) {
922 if (wpb->state & PIPE_WANTR) {
923 lwkt_gettoken(&wpb->rlock);
924 if (wpb->state & PIPE_WANTR) {
925 wpb->state &= ~PIPE_WANTR;
926 lwkt_reltoken(&wpb->rlock);
927 wakeup(wpb);
1ae37239 928 } else {
1dfdffca 929 lwkt_reltoken(&wpb->rlock);
1ae37239
MD
930 }
931 }
1dfdffca
MD
932 lwkt_gettoken(&wpb->rlock);
933 pipewakeup(wpb, 1);
934 lwkt_reltoken(&wpb->rlock);
984263bc
MD
935 }
936
937 /*
938 * Don't return EPIPE if I/O was successful
939 */
1dfdffca 940 if ((wpb->rindex == wpb->windex) &&
984263bc
MD
941 (uio->uio_resid == 0) &&
942 (error == EPIPE)) {
943 error = 0;
944 }
945
946 if (error == 0)
1dfdffca 947 vfs_timestamp(&wpb->mtime);
984263bc
MD
948
949 /*
950 * We have something to offer,
5b22f1a7 951 * wake up select/poll/kq.
984263bc 952 */
1dfdffca
MD
953 /*space = wpb->windex - wpb->rindex;*/
954 lwkt_reltoken(&wpb->wlock);
955
984263bc
MD
956 return (error);
957}
958
959/*
960 * we implement a very minimal set of ioctls for compatibility with sockets.
961 */
59b728a7 962static int
87baaf0c
MD
963pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
964 struct ucred *cred, struct sysmsg *msg)
984263bc 965{
1dfdffca
MD
966 struct pipebuf *rpb;
967 struct pipe *pipe;
d9b2033e 968 int error;
984263bc 969
1dfdffca
MD
970 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
971 if ((intptr_t)fp->f_data & 1) {
972 rpb = &pipe->bufferB;
973 } else {
974 rpb = &pipe->bufferA;
975 }
984263bc 976
1dfdffca
MD
977 lwkt_gettoken(&rpb->rlock);
978 lwkt_gettoken(&rpb->wlock);
880ffa3a 979
d9b2033e 980 switch (cmd) {
984263bc
MD
981 case FIOASYNC:
982 if (*(int *)data) {
1dfdffca 983 rpb->state |= PIPE_ASYNC;
984263bc 984 } else {
1dfdffca 985 rpb->state &= ~PIPE_ASYNC;
984263bc 986 }
d9b2033e
MD
987 error = 0;
988 break;
984263bc 989 case FIONREAD:
1dfdffca 990 *(int *)data = (int)(rpb->windex - rpb->rindex);
d9b2033e
MD
991 error = 0;
992 break;
984263bc 993 case FIOSETOWN:
1dfdffca 994 error = fsetown(*(int *)data, &rpb->sigio);
d9b2033e 995 break;
984263bc 996 case FIOGETOWN:
1dfdffca 997 *(int *)data = fgetown(&rpb->sigio);
d9b2033e
MD
998 error = 0;
999 break;
984263bc 1000 case TIOCSPGRP:
d9b2033e 1001 /* This is deprecated, FIOSETOWN should be used instead. */
1dfdffca 1002 error = fsetown(-(*(int *)data), &rpb->sigio);
d9b2033e 1003 break;
984263bc 1004
984263bc 1005 case TIOCGPGRP:
d9b2033e 1006 /* This is deprecated, FIOGETOWN should be used instead. */
1dfdffca 1007 *(int *)data = -fgetown(&rpb->sigio);
d9b2033e
MD
1008 error = 0;
1009 break;
1010 default:
1011 error = ENOTTY;
1012 break;
984263bc 1013 }
1dfdffca
MD
1014 lwkt_reltoken(&rpb->wlock);
1015 lwkt_reltoken(&rpb->rlock);
880ffa3a 1016
d9b2033e 1017 return (error);
984263bc
MD
1018}
1019
d9b2033e 1020/*
1ee6e3c6 1021 * MPSAFE
d9b2033e 1022 */
984263bc 1023static int
87de5057 1024pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
984263bc 1025{
1dfdffca 1026 struct pipebuf *rpb;
d9b2033e
MD
1027 struct pipe *pipe;
1028
1dfdffca
MD
1029 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1030 if ((intptr_t)fp->f_data & 1) {
1031 rpb = &pipe->bufferB;
1032 } else {
1033 rpb = &pipe->bufferA;
1034 }
984263bc
MD
1035
1036 bzero((caddr_t)ub, sizeof(*ub));
1037 ub->st_mode = S_IFIFO;
1dfdffca
MD
1038 ub->st_blksize = rpb->size;
1039 ub->st_size = rpb->windex - rpb->rindex;
984263bc 1040 ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1dfdffca
MD
1041 ub->st_atimespec = rpb->atime;
1042 ub->st_mtimespec = rpb->mtime;
1043 ub->st_ctimespec = pipe->ctime;
984263bc
MD
1044 /*
1045 * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1046 * st_flags, st_gen.
1047 * XXX (st_dev, st_ino) should be unique.
1048 */
1dfdffca 1049
984263bc
MD
1050 return (0);
1051}
1052
984263bc 1053static int
87de5057 1054pipe_close(struct file *fp)
984263bc 1055{
1dfdffca
MD
1056 struct pipebuf *rpb;
1057 struct pipebuf *wpb;
1058 struct pipe *pipe;
1059
1060 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1061 if ((intptr_t)fp->f_data & 1) {
1062 rpb = &pipe->bufferB;
1063 wpb = &pipe->bufferA;
1064 } else {
1065 rpb = &pipe->bufferA;
1066 wpb = &pipe->bufferB;
1067 }
984263bc
MD
1068
1069 fp->f_ops = &badfileops;
1070 fp->f_data = NULL;
1dfdffca
MD
1071 funsetown(&rpb->sigio);
1072 pipeclose(pipe, rpb, wpb);
1073
984263bc
MD
1074 return (0);
1075}
1076
004d2de5
MD
1077/*
1078 * Shutdown one or both directions of a full-duplex pipe.
1079 */
004d2de5 1080static int
87de5057 1081pipe_shutdown(struct file *fp, int how)
004d2de5 1082{
1dfdffca
MD
1083 struct pipebuf *rpb;
1084 struct pipebuf *wpb;
1085 struct pipe *pipe;
004d2de5
MD
1086 int error = EPIPE;
1087
1dfdffca
MD
1088 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1089 if ((intptr_t)fp->f_data & 1) {
1090 rpb = &pipe->bufferB;
1091 wpb = &pipe->bufferA;
1092 } else {
1093 rpb = &pipe->bufferA;
1094 wpb = &pipe->bufferB;
1095 }
1ae37239
MD
1096
1097 /*
1098 * We modify pipe_state on both pipes, which means we need
1099 * all four tokens!
1100 */
1dfdffca
MD
1101 lwkt_gettoken(&rpb->rlock);
1102 lwkt_gettoken(&rpb->wlock);
1103 lwkt_gettoken(&wpb->rlock);
1104 lwkt_gettoken(&wpb->wlock);
d9b2033e 1105
004d2de5
MD
1106 switch(how) {
1107 case SHUT_RDWR:
1108 case SHUT_RD:
1dfdffca
MD
1109 rpb->state |= PIPE_REOF; /* my reads */
1110 rpb->state |= PIPE_WEOF; /* peer writes */
1111 if (rpb->state & PIPE_WANTR) {
1112 rpb->state &= ~PIPE_WANTR;
1113 wakeup(rpb);
004d2de5 1114 }
1dfdffca
MD
1115 if (rpb->state & PIPE_WANTW) {
1116 rpb->state &= ~PIPE_WANTW;
1117 wakeup(rpb);
1ae37239 1118 }
1ae37239 1119 error = 0;
004d2de5
MD
1120 if (how == SHUT_RD)
1121 break;
1122 /* fall through */
1123 case SHUT_WR:
1dfdffca
MD
1124 wpb->state |= PIPE_REOF; /* peer reads */
1125 wpb->state |= PIPE_WEOF; /* my writes */
1126 if (wpb->state & PIPE_WANTR) {
1127 wpb->state &= ~PIPE_WANTR;
1128 wakeup(wpb);
930bd151 1129 }
1dfdffca
MD
1130 if (wpb->state & PIPE_WANTW) {
1131 wpb->state &= ~PIPE_WANTW;
1132 wakeup(wpb);
1ae37239 1133 }
1ae37239
MD
1134 error = 0;
1135 break;
004d2de5 1136 }
1dfdffca
MD
1137 pipewakeup(rpb, 1);
1138 pipewakeup(wpb, 1);
1ae37239 1139
1dfdffca
MD
1140 lwkt_reltoken(&wpb->wlock);
1141 lwkt_reltoken(&wpb->rlock);
1142 lwkt_reltoken(&rpb->wlock);
1143 lwkt_reltoken(&rpb->rlock);
1ae37239 1144
004d2de5
MD
1145 return (error);
1146}
1147
5eab490e
MD
1148/*
1149 * Destroy the pipe buffer.
1150 */
984263bc 1151static void
1dfdffca 1152pipe_free_kmem(struct pipebuf *pb)
984263bc 1153{
1dfdffca
MD
1154 if (pb->buffer != NULL) {
1155 kmem_free(&kernel_map, (vm_offset_t)pb->buffer, pb->size);
1156 pb->buffer = NULL;
1157 pb->object = NULL;
984263bc 1158 }
984263bc
MD
1159}
1160
1161/*
1dfdffca
MD
1162 * Close one half of the pipe. We are closing the pipe for reading on rpb
1163 * and writing on wpb. This routine must be called twice with the pipebufs
1164 * reversed to close both directions.
984263bc
MD
1165 */
1166static void
1dfdffca 1167pipeclose(struct pipe *pipe, struct pipebuf *rpb, struct pipebuf *wpb)
984263bc 1168{
fc7d5181 1169 globaldata_t gd;
984263bc 1170
1dfdffca 1171 if (pipe == NULL)
fc7d5181 1172 return;
984263bc 1173
1ae37239 1174 /*
1ae37239
MD
1175 * We need both the read and write tokens to modify pipe_state.
1176 */
1dfdffca
MD
1177 lwkt_gettoken(&rpb->rlock);
1178 lwkt_gettoken(&rpb->wlock);
984263bc 1179
fc7d5181 1180 /*
5b22f1a7 1181 * Set our state, wakeup anyone waiting in select/poll/kq, and
1dfdffca
MD
1182 * wakeup anyone blocked on our pipe. No action if our side
1183 * is already closed.
fc7d5181 1184 */
1dfdffca
MD
1185 if (rpb->state & PIPE_CLOSED) {
1186 lwkt_reltoken(&rpb->wlock);
1187 lwkt_reltoken(&rpb->rlock);
1188 return;
fc7d5181 1189 }
984263bc 1190
1dfdffca
MD
1191 rpb->state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1192 pipewakeup(rpb, 1);
1193 if (rpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1194 rpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1195 wakeup(rpb);
1ae37239 1196 }
1dfdffca
MD
1197 lwkt_reltoken(&rpb->wlock);
1198 lwkt_reltoken(&rpb->rlock);
fc7d5181 1199
1ae37239 1200 /*
1dfdffca 1201 * Disconnect from peer.
1ae37239 1202 */
1dfdffca
MD
1203 lwkt_gettoken(&wpb->rlock);
1204 lwkt_gettoken(&wpb->wlock);
1205
1206 wpb->state |= PIPE_REOF | PIPE_WEOF;
1207 pipewakeup(wpb, 1);
1208 if (wpb->state & (PIPE_WANTR | PIPE_WANTW)) {
1209 wpb->state &= ~(PIPE_WANTR | PIPE_WANTW);
1210 wakeup(wpb);
fc7d5181 1211 }
1dfdffca
MD
1212 if (SLIST_FIRST(&wpb->kq.ki_note))
1213 KNOTE(&wpb->kq.ki_note, 0);
1214 lwkt_reltoken(&wpb->wlock);
1215 lwkt_reltoken(&wpb->rlock);
8100156a 1216
fc7d5181 1217 /*
1dfdffca
MD
1218 * Free resources once both sides are closed. We maintain a pcpu
1219 * cache to improve performance, so the actual tear-down case is
1220 * limited to bulk situations.
5eab490e
MD
1221 *
1222 * However, the bulk tear-down case can cause intense contention
1223 * on the kernel_map when, e.g. hundreds to hundreds of thousands
1224 * of processes are killed at the same time. To deal with this we
1225 * use a pcpu mutex to maintain concurrency but also limit the
1226 * number of threads banging on the map and pmap.
1227 *
1228 * We use the mtx mechanism instead of the lockmgr mechanism because
1229 * the mtx mechanism utilizes a queued design which will not break
1230 * down in the face of thousands to hundreds of thousands of
1231 * processes trying to free pipes simultaneously. The lockmgr
1232 * mechanism will wind up waking them all up each time a lock
1233 * cycles.
fc7d5181 1234 */
1dfdffca 1235 if (atomic_fetchadd_int(&pipe->open_count, -1) == 1) {
1ae37239 1236 gd = mycpu;
1dfdffca
MD
1237 if (gd->gd_pipeqcount >= pipe_maxcache) {
1238 mtx_lock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1239 pipe_free_kmem(rpb);
1240 pipe_free_kmem(wpb);
1241 mtx_unlock(&pipe_gdlocks[gd->gd_cpuid].mtx);
1242 kfree(pipe, M_PIPE);
1ae37239 1243 } else {
1dfdffca
MD
1244 rpb->state = 0;
1245 wpb->state = 0;
1246 pipe->next = gd->gd_pipeq;
1247 gd->gd_pipeq = pipe;
1ae37239
MD
1248 ++gd->gd_pipeqcount;
1249 }
984263bc
MD
1250 }
1251}
1252
984263bc
MD
1253static int
1254pipe_kqfilter(struct file *fp, struct knote *kn)
1255{
1dfdffca
MD
1256 struct pipebuf *rpb;
1257 struct pipebuf *wpb;
1258 struct pipe *pipe;
d9b2033e 1259
1dfdffca
MD
1260 pipe = (struct pipe *)((intptr_t)fp->f_data & ~(intptr_t)1);
1261 if ((intptr_t)fp->f_data & 1) {
1262 rpb = &pipe->bufferB;
1263 wpb = &pipe->bufferA;
1264 } else {
1265 rpb = &pipe->bufferA;
1266 wpb = &pipe->bufferB;
1267 }
984263bc
MD
1268
1269 switch (kn->kn_filter) {
1270 case EVFILT_READ:
1271 kn->kn_fop = &pipe_rfiltops;
1272 break;
1273 case EVFILT_WRITE:
1274 kn->kn_fop = &pipe_wfiltops;
1dfdffca 1275 if (wpb->state & PIPE_CLOSED) {
984263bc 1276 /* other end of pipe has been closed */
41f57d15 1277 return (EPIPE);
d9b2033e 1278 }
984263bc
MD
1279 break;
1280 default:
b287d649 1281 return (EOPNOTSUPP);
984263bc 1282 }
984263bc 1283
1dfdffca
MD
1284 if (rpb == &pipe->bufferA)
1285 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 0);
1286 else
1287 kn->kn_hook = (caddr_t)(void *)((intptr_t)pipe | 1);
1288
1289 knote_insert(&rpb->kq.ki_note, kn);
3720cffa 1290
984263bc
MD
1291 return (0);
1292}
1293
1294static void
1295filt_pipedetach(struct knote *kn)
1296{
1dfdffca
MD
1297 struct pipebuf *rpb;
1298 struct pipebuf *wpb;
1299 struct pipe *pipe;
984263bc 1300
1dfdffca
MD
1301 pipe = (struct pipe *)((intptr_t)kn->kn_hook & ~(intptr_t)1);
1302 if ((intptr_t)kn->kn_hook & 1) {
1303 rpb = &pipe->bufferB;
1304 wpb = &pipe->bufferA;
1305 } else {
1306 rpb = &pipe->bufferA;
1307 wpb = &pipe->bufferB;
1308 }
1309 knote_remove(&rpb->kq.ki_note, kn);
984263bc
MD
1310}
1311
1312/*ARGSUSED*/
1313static int
1314filt_piperead(struct knote *kn, long hint)
1315{
1dfdffca
MD
1316 struct pipebuf *rpb;
1317 struct pipebuf *wpb;
1318 struct pipe *pipe;
13b050b0
SG
1319 int ready = 0;
1320
1dfdffca
MD
1321 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1322 if ((intptr_t)kn->kn_fp->f_data & 1) {
1323 rpb = &pipe->bufferB;
1324 wpb = &pipe->bufferA;
1325 } else {
1326 rpb = &pipe->bufferA;
1327 wpb = &pipe->bufferB;
1328 }
984263bc 1329
1dfdffca
MD
1330 lwkt_gettoken(&rpb->rlock);
1331 lwkt_gettoken(&rpb->wlock);
8c4ed426 1332
1dfdffca
MD
1333 kn->kn_data = rpb->windex - rpb->rindex;
1334
1335 if (rpb->state & PIPE_REOF) {
3bcb6e5e
SZ
1336 /*
1337 * Only set NODATA if all data has been exhausted
1338 */
1339 if (kn->kn_data == 0)
1340 kn->kn_flags |= EV_NODATA;
984263bc 1341 kn->kn_flags |= EV_EOF;
13b050b0 1342 ready = 1;
984263bc 1343 }
3720cffa 1344
1dfdffca
MD
1345 lwkt_reltoken(&rpb->wlock);
1346 lwkt_reltoken(&rpb->rlock);
13b050b0
SG
1347
1348 if (!ready)
1349 ready = kn->kn_data > 0;
1350
1351 return (ready);
984263bc
MD
1352}
1353
1354/*ARGSUSED*/
1355static int
1356filt_pipewrite(struct knote *kn, long hint)
1357{
1dfdffca
MD
1358 struct pipebuf *rpb;
1359 struct pipebuf *wpb;
1360 struct pipe *pipe;
13b050b0
SG
1361 int ready = 0;
1362
1dfdffca
MD
1363 pipe = (struct pipe *)((intptr_t)kn->kn_fp->f_data & ~(intptr_t)1);
1364 if ((intptr_t)kn->kn_fp->f_data & 1) {
1365 rpb = &pipe->bufferB;
1366 wpb = &pipe->bufferA;
1367 } else {
1368 rpb = &pipe->bufferA;
1369 wpb = &pipe->bufferB;
1370 }
1371
045ef32c 1372 kn->kn_data = 0;
1dfdffca 1373 if (wpb->state & PIPE_CLOSED) {
3bcb6e5e 1374 kn->kn_flags |= (EV_EOF | EV_NODATA);
045ef32c
SG
1375 return (1);
1376 }
1377
1dfdffca
MD
1378 lwkt_gettoken(&wpb->rlock);
1379 lwkt_gettoken(&wpb->wlock);
13b050b0 1380
1dfdffca 1381 if (wpb->state & PIPE_WEOF) {
3bcb6e5e 1382 kn->kn_flags |= (EV_EOF | EV_NODATA);
13b050b0 1383 ready = 1;
984263bc 1384 }
3720cffa 1385
13b050b0 1386 if (!ready)
1dfdffca 1387 kn->kn_data = wpb->size - (wpb->windex - wpb->rindex);
13b050b0 1388
1dfdffca
MD
1389 lwkt_reltoken(&wpb->wlock);
1390 lwkt_reltoken(&wpb->rlock);
13b050b0
SG
1391
1392 if (!ready)
1393 ready = kn->kn_data >= PIPE_BUF;
1394
1395 return (ready);
984263bc 1396}