kernel - Make numerous proc accesses use p->p_token instead of proc_token.
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64
65 #include <machine/cpufunc.h>
66
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio, 
71                 struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio, 
73                 struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79                 struct ucred *cred, struct sysmsg *msg);
80
81 static struct fileops pipeops = {
82         .fo_read = pipe_read, 
83         .fo_write = pipe_write,
84         .fo_ioctl = pipe_ioctl,
85         .fo_kqfilter = pipe_kqfilter,
86         .fo_stat = pipe_stat,
87         .fo_close = pipe_close,
88         .fo_shutdown = pipe_shutdown
89 };
90
91 static void     filt_pipedetach(struct knote *kn);
92 static int      filt_piperead(struct knote *kn, long hint);
93 static int      filt_pipewrite(struct knote *kn, long hint);
94
95 static struct filterops pipe_rfiltops =
96         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES   64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 #ifdef SMP
140 static int pipe_delay = 5000;   /* 5uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 #if !defined(NO_PIPE_SYSCTL_STATS)
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
146         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
147 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
148         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
149 #endif
150
151 static void pipeclose (struct pipe *cpipe);
152 static void pipe_free_kmem (struct pipe *cpipe);
153 static int pipe_create (struct pipe **cpipep);
154 static int pipespace (struct pipe *cpipe, int size);
155
156 static __inline void
157 pipewakeup(struct pipe *cpipe, int dosigio)
158 {
159         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
160                 lwkt_gettoken(&proc_token);
161                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
162                 lwkt_reltoken(&proc_token);
163         }
164         KNOTE(&cpipe->pipe_kq.ki_note, 0);
165 }
166
167 /*
168  * These routines are called before and after a UIO.  The UIO
169  * may block, causing our held tokens to be lost temporarily.
170  *
171  * We use these routines to serialize reads against other reads
172  * and writes against other writes.
173  *
174  * The read token is held on entry so *ipp does not race.
175  */
176 static __inline int
177 pipe_start_uio(struct pipe *cpipe, int *ipp)
178 {
179         int error;
180
181         while (*ipp) {
182                 *ipp = -1;
183                 error = tsleep(ipp, PCATCH, "pipexx", 0);
184                 if (error)
185                         return (error);
186         }
187         *ipp = 1;
188         return (0);
189 }
190
191 static __inline void
192 pipe_end_uio(struct pipe *cpipe, int *ipp)
193 {
194         if (*ipp < 0) {
195                 *ipp = 0;
196                 wakeup(ipp);
197         } else {
198                 KKASSERT(*ipp > 0);
199                 *ipp = 0;
200         }
201 }
202
203 /*
204  * The pipe system call for the DTYPE_PIPE type of pipes
205  *
206  * pipe_args(int dummy)
207  *
208  * MPSAFE
209  */
210 int
211 sys_pipe(struct pipe_args *uap)
212 {
213         struct thread *td = curthread;
214         struct filedesc *fdp = td->td_proc->p_fd;
215         struct file *rf, *wf;
216         struct pipe *rpipe, *wpipe;
217         int fd1, fd2, error;
218
219         rpipe = wpipe = NULL;
220         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
221                 pipeclose(rpipe); 
222                 pipeclose(wpipe); 
223                 return (ENFILE);
224         }
225         
226         error = falloc(td->td_lwp, &rf, &fd1);
227         if (error) {
228                 pipeclose(rpipe);
229                 pipeclose(wpipe);
230                 return (error);
231         }
232         uap->sysmsg_fds[0] = fd1;
233
234         /*
235          * Warning: once we've gotten past allocation of the fd for the
236          * read-side, we can only drop the read side via fdrop() in order
237          * to avoid races against processes which manage to dup() the read
238          * side while we are blocked trying to allocate the write side.
239          */
240         rf->f_type = DTYPE_PIPE;
241         rf->f_flag = FREAD | FWRITE;
242         rf->f_ops = &pipeops;
243         rf->f_data = rpipe;
244         error = falloc(td->td_lwp, &wf, &fd2);
245         if (error) {
246                 fsetfd(fdp, NULL, fd1);
247                 fdrop(rf);
248                 /* rpipe has been closed by fdrop(). */
249                 pipeclose(wpipe);
250                 return (error);
251         }
252         wf->f_type = DTYPE_PIPE;
253         wf->f_flag = FREAD | FWRITE;
254         wf->f_ops = &pipeops;
255         wf->f_data = wpipe;
256         uap->sysmsg_fds[1] = fd2;
257
258         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
259                                     M_PIPE, M_WAITOK|M_ZERO);
260         wpipe->pipe_slock = rpipe->pipe_slock;
261         rpipe->pipe_peer = wpipe;
262         wpipe->pipe_peer = rpipe;
263         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
264
265         /*
266          * Once activated the peer relationship remains valid until
267          * both sides are closed.
268          */
269         fsetfd(fdp, rf, fd1);
270         fsetfd(fdp, wf, fd2);
271         fdrop(rf);
272         fdrop(wf);
273
274         return (0);
275 }
276
277 /*
278  * Allocate kva for pipe circular buffer, the space is pageable
279  * This routine will 'realloc' the size of a pipe safely, if it fails
280  * it will retain the old buffer.
281  * If it fails it will return ENOMEM.
282  */
283 static int
284 pipespace(struct pipe *cpipe, int size)
285 {
286         struct vm_object *object;
287         caddr_t buffer;
288         int npages, error;
289
290         npages = round_page(size) / PAGE_SIZE;
291         object = cpipe->pipe_buffer.object;
292
293         /*
294          * [re]create the object if necessary and reserve space for it
295          * in the kernel_map.  The object and memory are pageable.  On
296          * success, free the old resources before assigning the new
297          * ones.
298          */
299         if (object == NULL || object->size != npages) {
300                 object = vm_object_allocate(OBJT_DEFAULT, npages);
301                 buffer = (caddr_t)vm_map_min(&kernel_map);
302
303                 error = vm_map_find(&kernel_map, object, 0,
304                                     (vm_offset_t *)&buffer,
305                                     size, PAGE_SIZE,
306                                     1, VM_MAPTYPE_NORMAL,
307                                     VM_PROT_ALL, VM_PROT_ALL,
308                                     0);
309
310                 if (error != KERN_SUCCESS) {
311                         vm_object_deallocate(object);
312                         return (ENOMEM);
313                 }
314                 pipe_free_kmem(cpipe);
315                 cpipe->pipe_buffer.object = object;
316                 cpipe->pipe_buffer.buffer = buffer;
317                 cpipe->pipe_buffer.size = size;
318                 ++pipe_bkmem_alloc;
319         } else {
320                 ++pipe_bcache_alloc;
321         }
322         cpipe->pipe_buffer.rindex = 0;
323         cpipe->pipe_buffer.windex = 0;
324         return (0);
325 }
326
327 /*
328  * Initialize and allocate VM and memory for pipe, pulling the pipe from
329  * our per-cpu cache if possible.  For now make sure it is sized for the
330  * smaller PIPE_SIZE default.
331  */
332 static int
333 pipe_create(struct pipe **cpipep)
334 {
335         globaldata_t gd = mycpu;
336         struct pipe *cpipe;
337         int error;
338
339         if ((cpipe = gd->gd_pipeq) != NULL) {
340                 gd->gd_pipeq = cpipe->pipe_peer;
341                 --gd->gd_pipeqcount;
342                 cpipe->pipe_peer = NULL;
343                 cpipe->pipe_wantwcnt = 0;
344         } else {
345                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
346         }
347         *cpipep = cpipe;
348         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
349                 return (error);
350         vfs_timestamp(&cpipe->pipe_ctime);
351         cpipe->pipe_atime = cpipe->pipe_ctime;
352         cpipe->pipe_mtime = cpipe->pipe_ctime;
353         lwkt_token_init(&cpipe->pipe_rlock, "piper");
354         lwkt_token_init(&cpipe->pipe_wlock, "pipew");
355         return (0);
356 }
357
358 static int
359 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
360 {
361         struct pipe *rpipe;
362         struct pipe *wpipe;
363         int error;
364         size_t nread = 0;
365         int nbio;
366         u_int size;     /* total bytes available */
367         u_int nsize;    /* total bytes to read */
368         u_int rindex;   /* contiguous bytes available */
369         int notify_writer;
370         int bigread;
371         int bigcount;
372
373         if (uio->uio_resid == 0)
374                 return(0);
375
376         /*
377          * Setup locks, calculate nbio
378          */
379         rpipe = (struct pipe *)fp->f_data;
380         wpipe = rpipe->pipe_peer;
381         lwkt_gettoken(&rpipe->pipe_rlock);
382
383         if (fflags & O_FBLOCKING)
384                 nbio = 0;
385         else if (fflags & O_FNONBLOCKING)
386                 nbio = 1;
387         else if (fp->f_flag & O_NONBLOCK)
388                 nbio = 1;
389         else
390                 nbio = 0;
391
392         /*
393          * Reads are serialized.  Note however that pipe_buffer.buffer and
394          * pipe_buffer.size can change out from under us when the number
395          * of bytes in the buffer are zero due to the write-side doing a
396          * pipespace().
397          */
398         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
399         if (error) {
400                 lwkt_reltoken(&rpipe->pipe_rlock);
401                 return (error);
402         }
403         notify_writer = 0;
404
405         bigread = (uio->uio_resid > 10 * 1024 * 1024);
406         bigcount = 10;
407
408         while (uio->uio_resid) {
409                 /*
410                  * Don't hog the cpu.
411                  */
412                 if (bigread && --bigcount == 0) {
413                         lwkt_user_yield();
414                         bigcount = 10;
415                         if (CURSIG(curthread->td_lwp)) {
416                                 error = EINTR;
417                                 break;
418                         }
419                 }
420
421                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
422                 cpu_lfence();
423                 if (size) {
424                         rindex = rpipe->pipe_buffer.rindex &
425                                  (rpipe->pipe_buffer.size - 1);
426                         nsize = size;
427                         if (nsize > rpipe->pipe_buffer.size - rindex)
428                                 nsize = rpipe->pipe_buffer.size - rindex;
429                         nsize = szmin(nsize, uio->uio_resid);
430
431                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
432                                         nsize, uio);
433                         if (error)
434                                 break;
435                         cpu_mfence();
436                         rpipe->pipe_buffer.rindex += nsize;
437                         nread += nsize;
438
439                         /*
440                          * If the FIFO is still over half full just continue
441                          * and do not try to notify the writer yet.
442                          */
443                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
444                                 notify_writer = 0;
445                                 continue;
446                         }
447
448                         /*
449                          * When the FIFO is less then half full notify any
450                          * waiting writer.  WANTW can be checked while
451                          * holding just the rlock.
452                          */
453                         notify_writer = 1;
454                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
455                                 continue;
456                 }
457
458                 /*
459                  * If the "write-side" was blocked we wake it up.  This code
460                  * is reached either when the buffer is completely emptied
461                  * or if it becomes more then half-empty.
462                  *
463                  * Pipe_state can only be modified if both the rlock and
464                  * wlock are held.
465                  */
466                 if (rpipe->pipe_state & PIPE_WANTW) {
467                         lwkt_gettoken(&rpipe->pipe_wlock);
468                         if (rpipe->pipe_state & PIPE_WANTW) {
469                                 rpipe->pipe_state &= ~PIPE_WANTW;
470                                 lwkt_reltoken(&rpipe->pipe_wlock);
471                                 wakeup(rpipe);
472                         } else {
473                                 lwkt_reltoken(&rpipe->pipe_wlock);
474                         }
475                 }
476
477                 /*
478                  * Pick up our copy loop again if the writer sent data to
479                  * us while we were messing around.
480                  *
481                  * On a SMP box poll up to pipe_delay nanoseconds for new
482                  * data.  Typically a value of 2000 to 4000 is sufficient
483                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
484                  * is used for synchronous communications with small packets,
485                  * and 8000 or so (8uS) will pipeline large buffer xfers
486                  * between cpus over a pipe.
487                  *
488                  * For synchronous communications a hit means doing a
489                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
490                  * where as miss requiring a tsleep/wakeup sequence
491                  * will take 7uS or more.
492                  */
493                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
494                         continue;
495
496 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
497                 if (pipe_delay) {
498                         int64_t tsc_target;
499                         int good = 0;
500
501                         tsc_target = tsc_get_target(pipe_delay);
502                         while (tsc_test_target(tsc_target) == 0) {
503                                 if (rpipe->pipe_buffer.windex !=
504                                     rpipe->pipe_buffer.rindex) {
505                                         good = 1;
506                                         break;
507                                 }
508                         }
509                         if (good)
510                                 continue;
511                 }
512 #endif
513
514                 /*
515                  * Detect EOF condition, do not set error.
516                  */
517                 if (rpipe->pipe_state & PIPE_REOF)
518                         break;
519
520                 /*
521                  * Break if some data was read, or if this was a non-blocking
522                  * read.
523                  */
524                 if (nread > 0)
525                         break;
526
527                 if (nbio) {
528                         error = EAGAIN;
529                         break;
530                 }
531
532                 /*
533                  * Last chance, interlock with WANTR.
534                  */
535                 lwkt_gettoken(&rpipe->pipe_wlock);
536                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
537                 if (size) {
538                         lwkt_reltoken(&rpipe->pipe_wlock);
539                         continue;
540                 }
541
542                 /*
543                  * Retest EOF - acquiring a new token can temporarily release
544                  * tokens already held.
545                  */
546                 if (rpipe->pipe_state & PIPE_REOF) {
547                         lwkt_reltoken(&rpipe->pipe_wlock);
548                         break;
549                 }
550
551                 /*
552                  * If there is no more to read in the pipe, reset its
553                  * pointers to the beginning.  This improves cache hit
554                  * stats.
555                  *
556                  * We need both locks to modify both pointers, and there
557                  * must also not be a write in progress or the uiomove()
558                  * in the write might block and temporarily release
559                  * its wlock, then reacquire and update windex.  We are
560                  * only serialized against reads, not writes.
561                  *
562                  * XXX should we even bother resetting the indices?  It
563                  *     might actually be more cache efficient not to.
564                  */
565                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
566                     rpipe->pipe_wip == 0) {
567                         rpipe->pipe_buffer.rindex = 0;
568                         rpipe->pipe_buffer.windex = 0;
569                 }
570
571                 /*
572                  * Wait for more data.
573                  *
574                  * Pipe_state can only be set if both the rlock and wlock
575                  * are held.
576                  */
577                 rpipe->pipe_state |= PIPE_WANTR;
578                 tsleep_interlock(rpipe, PCATCH);
579                 lwkt_reltoken(&rpipe->pipe_wlock);
580                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
581                 ++pipe_rblocked_count;
582                 if (error)
583                         break;
584         }
585         pipe_end_uio(rpipe, &rpipe->pipe_rip);
586
587         /*
588          * Uptime last access time
589          */
590         if (error == 0 && nread)
591                 vfs_timestamp(&rpipe->pipe_atime);
592
593         /*
594          * If we drained the FIFO more then half way then handle
595          * write blocking hysteresis.
596          *
597          * Note that PIPE_WANTW cannot be set by the writer without
598          * it holding both rlock and wlock, so we can test it
599          * while holding just rlock.
600          */
601         if (notify_writer) {
602                 /*
603                  * Synchronous blocking is done on the pipe involved
604                  */
605                 if (rpipe->pipe_state & PIPE_WANTW) {
606                         lwkt_gettoken(&rpipe->pipe_wlock);
607                         if (rpipe->pipe_state & PIPE_WANTW) {
608                                 rpipe->pipe_state &= ~PIPE_WANTW;
609                                 lwkt_reltoken(&rpipe->pipe_wlock);
610                                 wakeup(rpipe);
611                         } else {
612                                 lwkt_reltoken(&rpipe->pipe_wlock);
613                         }
614                 }
615
616                 /*
617                  * But we may also have to deal with a kqueue which is
618                  * stored on the same pipe as its descriptor, so a
619                  * EVFILT_WRITE event waiting for our side to drain will
620                  * be on the other side.
621                  */
622                 lwkt_gettoken(&wpipe->pipe_wlock);
623                 pipewakeup(wpipe, 0);
624                 lwkt_reltoken(&wpipe->pipe_wlock);
625         }
626         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
627         lwkt_reltoken(&rpipe->pipe_rlock);
628
629         return (error);
630 }
631
632 static int
633 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
634 {
635         int error;
636         int orig_resid;
637         int nbio;
638         struct pipe *wpipe;
639         struct pipe *rpipe;
640         u_int windex;
641         u_int space;
642         u_int wcount;
643         int bigwrite;
644         int bigcount;
645
646         /*
647          * Writes go to the peer.  The peer will always exist.
648          */
649         rpipe = (struct pipe *) fp->f_data;
650         wpipe = rpipe->pipe_peer;
651         lwkt_gettoken(&wpipe->pipe_wlock);
652         if (wpipe->pipe_state & PIPE_WEOF) {
653                 lwkt_reltoken(&wpipe->pipe_wlock);
654                 return (EPIPE);
655         }
656
657         /*
658          * Degenerate case (EPIPE takes prec)
659          */
660         if (uio->uio_resid == 0) {
661                 lwkt_reltoken(&wpipe->pipe_wlock);
662                 return(0);
663         }
664
665         /*
666          * Writes are serialized (start_uio must be called with wlock)
667          */
668         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
669         if (error) {
670                 lwkt_reltoken(&wpipe->pipe_wlock);
671                 return (error);
672         }
673
674         if (fflags & O_FBLOCKING)
675                 nbio = 0;
676         else if (fflags & O_FNONBLOCKING)
677                 nbio = 1;
678         else if (fp->f_flag & O_NONBLOCK)
679                 nbio = 1;
680         else
681                 nbio = 0;
682
683         /*
684          * If it is advantageous to resize the pipe buffer, do
685          * so.  We are write-serialized so we can block safely.
686          */
687         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
688             (pipe_nbig < pipe_maxbig) &&
689             wpipe->pipe_wantwcnt > 4 &&
690             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
691                 /* 
692                  * Recheck after lock.
693                  */
694                 lwkt_gettoken(&wpipe->pipe_rlock);
695                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
696                     (pipe_nbig < pipe_maxbig) &&
697                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
698                         atomic_add_int(&pipe_nbig, 1);
699                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
700                                 ++pipe_bigcount;
701                         else
702                                 atomic_subtract_int(&pipe_nbig, 1);
703                 }
704                 lwkt_reltoken(&wpipe->pipe_rlock);
705         }
706
707         orig_resid = uio->uio_resid;
708         wcount = 0;
709
710         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
711         bigcount = 10;
712
713         while (uio->uio_resid) {
714                 if (wpipe->pipe_state & PIPE_WEOF) {
715                         error = EPIPE;
716                         break;
717                 }
718
719                 /*
720                  * Don't hog the cpu.
721                  */
722                 if (bigwrite && --bigcount == 0) {
723                         lwkt_user_yield();
724                         bigcount = 10;
725                         if (CURSIG(curthread->td_lwp)) {
726                                 error = EINTR;
727                                 break;
728                         }
729                 }
730
731                 windex = wpipe->pipe_buffer.windex &
732                          (wpipe->pipe_buffer.size - 1);
733                 space = wpipe->pipe_buffer.size -
734                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
735                 cpu_lfence();
736
737                 /* Writes of size <= PIPE_BUF must be atomic. */
738                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
739                         space = 0;
740
741                 /* 
742                  * Write to fill, read size handles write hysteresis.  Also
743                  * additional restrictions can cause select-based non-blocking
744                  * writes to spin.
745                  */
746                 if (space > 0) {
747                         u_int segsize;
748
749                         /*
750                          * Transfer size is minimum of uio transfer
751                          * and free space in pipe buffer.
752                          *
753                          * Limit each uiocopy to no more then PIPE_SIZE
754                          * so we can keep the gravy train going on a
755                          * SMP box.  This doubles the performance for
756                          * write sizes > 16K.  Otherwise large writes
757                          * wind up doing an inefficient synchronous
758                          * ping-pong.
759                          */
760                         space = szmin(space, uio->uio_resid);
761                         if (space > PIPE_SIZE)
762                                 space = PIPE_SIZE;
763
764                         /*
765                          * First segment to transfer is minimum of
766                          * transfer size and contiguous space in
767                          * pipe buffer.  If first segment to transfer
768                          * is less than the transfer size, we've got
769                          * a wraparound in the buffer.
770                          */
771                         segsize = wpipe->pipe_buffer.size - windex;
772                         if (segsize > space)
773                                 segsize = space;
774
775 #ifdef SMP
776                         /*
777                          * If this is the first loop and the reader is
778                          * blocked, do a preemptive wakeup of the reader.
779                          *
780                          * On SMP the IPI latency plus the wlock interlock
781                          * on the reader side is the fastest way to get the
782                          * reader going.  (The scheduler will hard loop on
783                          * lock tokens).
784                          *
785                          * NOTE: We can't clear WANTR here without acquiring
786                          * the rlock, which we don't want to do here!
787                          */
788                         if ((wpipe->pipe_state & PIPE_WANTR))
789                                 wakeup(wpipe);
790 #endif
791
792                         /*
793                          * Transfer segment, which may include a wrap-around.
794                          * Update windex to account for both all in one go
795                          * so the reader can read() the data atomically.
796                          */
797                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
798                                         segsize, uio);
799                         if (error == 0 && segsize < space) {
800                                 segsize = space - segsize;
801                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
802                                                 segsize, uio);
803                         }
804                         if (error)
805                                 break;
806                         cpu_mfence();
807                         wpipe->pipe_buffer.windex += space;
808                         wcount += space;
809                         continue;
810                 }
811
812                 /*
813                  * We need both the rlock and the wlock to interlock against
814                  * the EOF, WANTW, and size checks, and to modify pipe_state.
815                  *
816                  * These are token locks so we do not have to worry about
817                  * deadlocks.
818                  */
819                 lwkt_gettoken(&wpipe->pipe_rlock);
820
821                 /*
822                  * If the "read-side" has been blocked, wake it up now
823                  * and yield to let it drain synchronously rather
824                  * then block.
825                  */
826                 if (wpipe->pipe_state & PIPE_WANTR) {
827                         wpipe->pipe_state &= ~PIPE_WANTR;
828                         wakeup(wpipe);
829                 }
830
831                 /*
832                  * don't block on non-blocking I/O
833                  */
834                 if (nbio) {
835                         lwkt_reltoken(&wpipe->pipe_rlock);
836                         error = EAGAIN;
837                         break;
838                 }
839
840                 /*
841                  * re-test whether we have to block in the writer after
842                  * acquiring both locks, in case the reader opened up
843                  * some space.
844                  */
845                 space = wpipe->pipe_buffer.size -
846                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
847                 cpu_lfence();
848                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
849                         space = 0;
850
851                 /*
852                  * Retest EOF - acquiring a new token can temporarily release
853                  * tokens already held.
854                  */
855                 if (wpipe->pipe_state & PIPE_WEOF) {
856                         lwkt_reltoken(&wpipe->pipe_rlock);
857                         error = EPIPE;
858                         break;
859                 }
860
861                 /*
862                  * We have no more space and have something to offer,
863                  * wake up select/poll/kq.
864                  */
865                 if (space == 0) {
866                         wpipe->pipe_state |= PIPE_WANTW;
867                         ++wpipe->pipe_wantwcnt;
868                         pipewakeup(wpipe, 1);
869                         if (wpipe->pipe_state & PIPE_WANTW)
870                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
871                         ++pipe_wblocked_count;
872                 }
873                 lwkt_reltoken(&wpipe->pipe_rlock);
874
875                 /*
876                  * Break out if we errored or the read side wants us to go
877                  * away.
878                  */
879                 if (error)
880                         break;
881                 if (wpipe->pipe_state & PIPE_WEOF) {
882                         error = EPIPE;
883                         break;
884                 }
885         }
886         pipe_end_uio(wpipe, &wpipe->pipe_wip);
887
888         /*
889          * If we have put any characters in the buffer, we wake up
890          * the reader.
891          *
892          * Both rlock and wlock are required to be able to modify pipe_state.
893          */
894         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
895                 if (wpipe->pipe_state & PIPE_WANTR) {
896                         lwkt_gettoken(&wpipe->pipe_rlock);
897                         if (wpipe->pipe_state & PIPE_WANTR) {
898                                 wpipe->pipe_state &= ~PIPE_WANTR;
899                                 lwkt_reltoken(&wpipe->pipe_rlock);
900                                 wakeup(wpipe);
901                         } else {
902                                 lwkt_reltoken(&wpipe->pipe_rlock);
903                         }
904                 }
905                 lwkt_gettoken(&wpipe->pipe_rlock);
906                 pipewakeup(wpipe, 1);
907                 lwkt_reltoken(&wpipe->pipe_rlock);
908         }
909
910         /*
911          * Don't return EPIPE if I/O was successful
912          */
913         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
914             (uio->uio_resid == 0) &&
915             (error == EPIPE)) {
916                 error = 0;
917         }
918
919         if (error == 0)
920                 vfs_timestamp(&wpipe->pipe_mtime);
921
922         /*
923          * We have something to offer,
924          * wake up select/poll/kq.
925          */
926         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
927         lwkt_reltoken(&wpipe->pipe_wlock);
928         return (error);
929 }
930
931 /*
932  * we implement a very minimal set of ioctls for compatibility with sockets.
933  */
934 int
935 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
936            struct ucred *cred, struct sysmsg *msg)
937 {
938         struct pipe *mpipe;
939         int error;
940
941         mpipe = (struct pipe *)fp->f_data;
942
943         lwkt_gettoken(&mpipe->pipe_rlock);
944         lwkt_gettoken(&mpipe->pipe_wlock);
945
946         switch (cmd) {
947         case FIOASYNC:
948                 if (*(int *)data) {
949                         mpipe->pipe_state |= PIPE_ASYNC;
950                 } else {
951                         mpipe->pipe_state &= ~PIPE_ASYNC;
952                 }
953                 error = 0;
954                 break;
955         case FIONREAD:
956                 *(int *)data = mpipe->pipe_buffer.windex -
957                                 mpipe->pipe_buffer.rindex;
958                 error = 0;
959                 break;
960         case FIOSETOWN:
961                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
962                 break;
963         case FIOGETOWN:
964                 *(int *)data = fgetown(&mpipe->pipe_sigio);
965                 error = 0;
966                 break;
967         case TIOCSPGRP:
968                 /* This is deprecated, FIOSETOWN should be used instead. */
969                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
970                 break;
971
972         case TIOCGPGRP:
973                 /* This is deprecated, FIOGETOWN should be used instead. */
974                 *(int *)data = -fgetown(&mpipe->pipe_sigio);
975                 error = 0;
976                 break;
977         default:
978                 error = ENOTTY;
979                 break;
980         }
981         lwkt_reltoken(&mpipe->pipe_wlock);
982         lwkt_reltoken(&mpipe->pipe_rlock);
983
984         return (error);
985 }
986
987 /*
988  * MPSAFE
989  */
990 static int
991 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
992 {
993         struct pipe *pipe;
994
995         pipe = (struct pipe *)fp->f_data;
996
997         bzero((caddr_t)ub, sizeof(*ub));
998         ub->st_mode = S_IFIFO;
999         ub->st_blksize = pipe->pipe_buffer.size;
1000         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1001         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1002         ub->st_atimespec = pipe->pipe_atime;
1003         ub->st_mtimespec = pipe->pipe_mtime;
1004         ub->st_ctimespec = pipe->pipe_ctime;
1005         /*
1006          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1007          * st_flags, st_gen.
1008          * XXX (st_dev, st_ino) should be unique.
1009          */
1010         return (0);
1011 }
1012
1013 static int
1014 pipe_close(struct file *fp)
1015 {
1016         struct pipe *cpipe;
1017
1018         cpipe = (struct pipe *)fp->f_data;
1019         fp->f_ops = &badfileops;
1020         fp->f_data = NULL;
1021         funsetown(&cpipe->pipe_sigio);
1022         pipeclose(cpipe);
1023         return (0);
1024 }
1025
1026 /*
1027  * Shutdown one or both directions of a full-duplex pipe.
1028  */
1029 static int
1030 pipe_shutdown(struct file *fp, int how)
1031 {
1032         struct pipe *rpipe;
1033         struct pipe *wpipe;
1034         int error = EPIPE;
1035
1036         rpipe = (struct pipe *)fp->f_data;
1037         wpipe = rpipe->pipe_peer;
1038
1039         /*
1040          * We modify pipe_state on both pipes, which means we need
1041          * all four tokens!
1042          */
1043         lwkt_gettoken(&rpipe->pipe_rlock);
1044         lwkt_gettoken(&rpipe->pipe_wlock);
1045         lwkt_gettoken(&wpipe->pipe_rlock);
1046         lwkt_gettoken(&wpipe->pipe_wlock);
1047
1048         switch(how) {
1049         case SHUT_RDWR:
1050         case SHUT_RD:
1051                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1052                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1053                 if (rpipe->pipe_state & PIPE_WANTR) {
1054                         rpipe->pipe_state &= ~PIPE_WANTR;
1055                         wakeup(rpipe);
1056                 }
1057                 if (rpipe->pipe_state & PIPE_WANTW) {
1058                         rpipe->pipe_state &= ~PIPE_WANTW;
1059                         wakeup(rpipe);
1060                 }
1061                 error = 0;
1062                 if (how == SHUT_RD)
1063                         break;
1064                 /* fall through */
1065         case SHUT_WR:
1066                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1067                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1068                 if (wpipe->pipe_state & PIPE_WANTR) {
1069                         wpipe->pipe_state &= ~PIPE_WANTR;
1070                         wakeup(wpipe);
1071                 }
1072                 if (wpipe->pipe_state & PIPE_WANTW) {
1073                         wpipe->pipe_state &= ~PIPE_WANTW;
1074                         wakeup(wpipe);
1075                 }
1076                 error = 0;
1077                 break;
1078         }
1079         pipewakeup(rpipe, 1);
1080         pipewakeup(wpipe, 1);
1081
1082         lwkt_reltoken(&wpipe->pipe_wlock);
1083         lwkt_reltoken(&wpipe->pipe_rlock);
1084         lwkt_reltoken(&rpipe->pipe_wlock);
1085         lwkt_reltoken(&rpipe->pipe_rlock);
1086
1087         return (error);
1088 }
1089
1090 static void
1091 pipe_free_kmem(struct pipe *cpipe)
1092 {
1093         if (cpipe->pipe_buffer.buffer != NULL) {
1094                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1095                         atomic_subtract_int(&pipe_nbig, 1);
1096                 kmem_free(&kernel_map,
1097                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1098                         cpipe->pipe_buffer.size);
1099                 cpipe->pipe_buffer.buffer = NULL;
1100                 cpipe->pipe_buffer.object = NULL;
1101         }
1102 }
1103
1104 /*
1105  * Close the pipe.  The slock must be held to interlock against simultanious
1106  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1107  */
1108 static void
1109 pipeclose(struct pipe *cpipe)
1110 {
1111         globaldata_t gd;
1112         struct pipe *ppipe;
1113
1114         if (cpipe == NULL)
1115                 return;
1116
1117         /*
1118          * The slock may not have been allocated yet (close during
1119          * initialization)
1120          *
1121          * We need both the read and write tokens to modify pipe_state.
1122          */
1123         if (cpipe->pipe_slock)
1124                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1125         lwkt_gettoken(&cpipe->pipe_rlock);
1126         lwkt_gettoken(&cpipe->pipe_wlock);
1127
1128         /*
1129          * Set our state, wakeup anyone waiting in select/poll/kq, and
1130          * wakeup anyone blocked on our pipe.
1131          */
1132         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1133         pipewakeup(cpipe, 1);
1134         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1135                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1136                 wakeup(cpipe);
1137         }
1138
1139         /*
1140          * Disconnect from peer.
1141          */
1142         if ((ppipe = cpipe->pipe_peer) != NULL) {
1143                 lwkt_gettoken(&ppipe->pipe_rlock);
1144                 lwkt_gettoken(&ppipe->pipe_wlock);
1145                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1146                 pipewakeup(ppipe, 1);
1147                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1148                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1149                         wakeup(ppipe);
1150                 }
1151                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1152                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1153                 lwkt_reltoken(&ppipe->pipe_wlock);
1154                 lwkt_reltoken(&ppipe->pipe_rlock);
1155         }
1156
1157         /*
1158          * If the peer is also closed we can free resources for both
1159          * sides, otherwise we leave our side intact to deal with any
1160          * races (since we only have the slock).
1161          */
1162         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1163                 cpipe->pipe_peer = NULL;
1164                 ppipe->pipe_peer = NULL;
1165                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1166                 pipeclose(ppipe);
1167                 ppipe = NULL;
1168         }
1169
1170         lwkt_reltoken(&cpipe->pipe_wlock);
1171         lwkt_reltoken(&cpipe->pipe_rlock);
1172         if (cpipe->pipe_slock)
1173                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1174
1175         /*
1176          * If we disassociated from our peer we can free resources
1177          */
1178         if (ppipe == NULL) {
1179                 gd = mycpu;
1180                 if (cpipe->pipe_slock) {
1181                         kfree(cpipe->pipe_slock, M_PIPE);
1182                         cpipe->pipe_slock = NULL;
1183                 }
1184                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1185                     cpipe->pipe_buffer.size != PIPE_SIZE
1186                 ) {
1187                         pipe_free_kmem(cpipe);
1188                         kfree(cpipe, M_PIPE);
1189                 } else {
1190                         cpipe->pipe_state = 0;
1191                         cpipe->pipe_peer = gd->gd_pipeq;
1192                         gd->gd_pipeq = cpipe;
1193                         ++gd->gd_pipeqcount;
1194                 }
1195         }
1196 }
1197
1198 static int
1199 pipe_kqfilter(struct file *fp, struct knote *kn)
1200 {
1201         struct pipe *cpipe;
1202
1203         cpipe = (struct pipe *)kn->kn_fp->f_data;
1204
1205         switch (kn->kn_filter) {
1206         case EVFILT_READ:
1207                 kn->kn_fop = &pipe_rfiltops;
1208                 break;
1209         case EVFILT_WRITE:
1210                 kn->kn_fop = &pipe_wfiltops;
1211                 if (cpipe->pipe_peer == NULL) {
1212                         /* other end of pipe has been closed */
1213                         return (EPIPE);
1214                 }
1215                 break;
1216         default:
1217                 return (EOPNOTSUPP);
1218         }
1219         kn->kn_hook = (caddr_t)cpipe;
1220
1221         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1222
1223         return (0);
1224 }
1225
1226 static void
1227 filt_pipedetach(struct knote *kn)
1228 {
1229         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1230
1231         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1232 }
1233
1234 /*ARGSUSED*/
1235 static int
1236 filt_piperead(struct knote *kn, long hint)
1237 {
1238         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1239         int ready = 0;
1240
1241         lwkt_gettoken(&rpipe->pipe_rlock);
1242         lwkt_gettoken(&rpipe->pipe_wlock);
1243
1244         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1245
1246         /*
1247          * Only set EOF if all data has been exhausted
1248          */
1249         if ((rpipe->pipe_state & PIPE_REOF) && kn->kn_data == 0) {
1250                 kn->kn_flags |= EV_EOF; 
1251                 ready = 1;
1252         }
1253
1254         lwkt_reltoken(&rpipe->pipe_wlock);
1255         lwkt_reltoken(&rpipe->pipe_rlock);
1256
1257         if (!ready)
1258                 ready = kn->kn_data > 0;
1259
1260         return (ready);
1261 }
1262
1263 /*ARGSUSED*/
1264 static int
1265 filt_pipewrite(struct knote *kn, long hint)
1266 {
1267         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1268         struct pipe *wpipe = rpipe->pipe_peer;
1269         int ready = 0;
1270
1271         kn->kn_data = 0;
1272         if (wpipe == NULL) {
1273                 kn->kn_flags |= EV_EOF;
1274                 return (1);
1275         }
1276
1277         lwkt_gettoken(&wpipe->pipe_rlock);
1278         lwkt_gettoken(&wpipe->pipe_wlock);
1279
1280         if (wpipe->pipe_state & PIPE_WEOF) {
1281                 kn->kn_flags |= EV_EOF; 
1282                 ready = 1;
1283         }
1284
1285         if (!ready)
1286                 kn->kn_data = wpipe->pipe_buffer.size -
1287                               (wpipe->pipe_buffer.windex -
1288                                wpipe->pipe_buffer.rindex);
1289
1290         lwkt_reltoken(&wpipe->pipe_wlock);
1291         lwkt_reltoken(&wpipe->pipe_rlock);
1292
1293         if (!ready)
1294                 ready = kn->kn_data >= PIPE_BUF;
1295
1296         return (ready);
1297 }