kernel - usched_dfly revamp (8), add reschedule hints
[dragonfly.git] / sys / kern / sys_pipe.c
1 /*
2  * Copyright (c) 1996 John S. Dyson
3  * All rights reserved.
4  *
5  * Redistribution and use in source and binary forms, with or without
6  * modification, are permitted provided that the following conditions
7  * are met:
8  * 1. Redistributions of source code must retain the above copyright
9  *    notice immediately at the beginning of the file, without modification,
10  *    this list of conditions, and the following disclaimer.
11  * 2. Redistributions in binary form must reproduce the above copyright
12  *    notice, this list of conditions and the following disclaimer in the
13  *    documentation and/or other materials provided with the distribution.
14  * 3. Absolutely no warranty of function or purpose is made by the author
15  *    John S. Dyson.
16  * 4. Modifications may be freely made to this file if the above conditions
17  *    are met.
18  *
19  * $FreeBSD: src/sys/kern/sys_pipe.c,v 1.60.2.13 2002/08/05 15:05:15 des Exp $
20  * $DragonFly: src/sys/kern/sys_pipe.c,v 1.50 2008/09/09 04:06:13 dillon Exp $
21  */
22
23 /*
24  * This file contains a high-performance replacement for the socket-based
25  * pipes scheme originally used in FreeBSD/4.4Lite.  It does not support
26  * all features of sockets, but does do everything that pipes normally
27  * do.
28  */
29 #include <sys/param.h>
30 #include <sys/systm.h>
31 #include <sys/kernel.h>
32 #include <sys/proc.h>
33 #include <sys/fcntl.h>
34 #include <sys/file.h>
35 #include <sys/filedesc.h>
36 #include <sys/filio.h>
37 #include <sys/ttycom.h>
38 #include <sys/stat.h>
39 #include <sys/signalvar.h>
40 #include <sys/sysproto.h>
41 #include <sys/pipe.h>
42 #include <sys/vnode.h>
43 #include <sys/uio.h>
44 #include <sys/event.h>
45 #include <sys/globaldata.h>
46 #include <sys/module.h>
47 #include <sys/malloc.h>
48 #include <sys/sysctl.h>
49 #include <sys/socket.h>
50
51 #include <vm/vm.h>
52 #include <vm/vm_param.h>
53 #include <sys/lock.h>
54 #include <vm/vm_object.h>
55 #include <vm/vm_kern.h>
56 #include <vm/vm_extern.h>
57 #include <vm/pmap.h>
58 #include <vm/vm_map.h>
59 #include <vm/vm_page.h>
60 #include <vm/vm_zone.h>
61
62 #include <sys/file2.h>
63 #include <sys/signal2.h>
64
65 #include <machine/cpufunc.h>
66
67 /*
68  * interfaces to the outside world
69  */
70 static int pipe_read (struct file *fp, struct uio *uio, 
71                 struct ucred *cred, int flags);
72 static int pipe_write (struct file *fp, struct uio *uio, 
73                 struct ucred *cred, int flags);
74 static int pipe_close (struct file *fp);
75 static int pipe_shutdown (struct file *fp, int how);
76 static int pipe_kqfilter (struct file *fp, struct knote *kn);
77 static int pipe_stat (struct file *fp, struct stat *sb, struct ucred *cred);
78 static int pipe_ioctl (struct file *fp, u_long cmd, caddr_t data,
79                 struct ucred *cred, struct sysmsg *msg);
80
81 static struct fileops pipeops = {
82         .fo_read = pipe_read, 
83         .fo_write = pipe_write,
84         .fo_ioctl = pipe_ioctl,
85         .fo_kqfilter = pipe_kqfilter,
86         .fo_stat = pipe_stat,
87         .fo_close = pipe_close,
88         .fo_shutdown = pipe_shutdown
89 };
90
91 static void     filt_pipedetach(struct knote *kn);
92 static int      filt_piperead(struct knote *kn, long hint);
93 static int      filt_pipewrite(struct knote *kn, long hint);
94
95 static struct filterops pipe_rfiltops =
96         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_piperead };
97 static struct filterops pipe_wfiltops =
98         { FILTEROP_ISFD|FILTEROP_MPSAFE, NULL, filt_pipedetach, filt_pipewrite };
99
100 MALLOC_DEFINE(M_PIPE, "pipe", "pipe structures");
101
102 /*
103  * Default pipe buffer size(s), this can be kind-of large now because pipe
104  * space is pageable.  The pipe code will try to maintain locality of
105  * reference for performance reasons, so small amounts of outstanding I/O
106  * will not wipe the cache.
107  */
108 #define MINPIPESIZE (PIPE_SIZE/3)
109 #define MAXPIPESIZE (2*PIPE_SIZE/3)
110
111 /*
112  * Limit the number of "big" pipes
113  */
114 #define LIMITBIGPIPES   64
115 #define PIPEQ_MAX_CACHE 16      /* per-cpu pipe structure cache */
116
117 static int pipe_maxbig = LIMITBIGPIPES;
118 static int pipe_maxcache = PIPEQ_MAX_CACHE;
119 static int pipe_bigcount;
120 static int pipe_nbig;
121 static int pipe_bcache_alloc;
122 static int pipe_bkmem_alloc;
123 static int pipe_rblocked_count;
124 static int pipe_wblocked_count;
125
126 SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
127 SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
128         CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
129 SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
130         CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
131 SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
132         CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
133 SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
134         CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
135 SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
136         CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
137 SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
138         CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
139 #ifdef SMP
140 static int pipe_delay = 5000;   /* 5uS default */
141 SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
142         CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
143 #endif
144 #if !defined(NO_PIPE_SYSCTL_STATS)
145 SYSCTL_INT(_kern_pipe, OID_AUTO, bcache_alloc,
146         CTLFLAG_RW, &pipe_bcache_alloc, 0, "pipe buffer from pcpu cache");
147 SYSCTL_INT(_kern_pipe, OID_AUTO, bkmem_alloc,
148         CTLFLAG_RW, &pipe_bkmem_alloc, 0, "pipe buffer from kmem");
149 #endif
150
151 /*
152  * Auto-size pipe cache to reduce kmem allocations and frees.
153  */
154 static
155 void
156 pipeinit(void *dummy)
157 {
158         size_t mbytes = kmem_lim_size();
159
160         if (pipe_maxbig == LIMITBIGPIPES) {
161                 if (mbytes >= 7 * 1024)
162                         pipe_maxbig *= 2;
163                 if (mbytes >= 15 * 1024)
164                         pipe_maxbig *= 2;
165         }
166         if (pipe_maxcache == PIPEQ_MAX_CACHE) {
167                 if (mbytes >= 7 * 1024)
168                         pipe_maxcache *= 2;
169                 if (mbytes >= 15 * 1024)
170                         pipe_maxcache *= 2;
171         }
172 }
173 SYSINIT(kmem, SI_BOOT2_MACHDEP, SI_ORDER_ANY, pipeinit, NULL)
174
175 static void pipeclose (struct pipe *cpipe);
176 static void pipe_free_kmem (struct pipe *cpipe);
177 static int pipe_create (struct pipe **cpipep);
178 static int pipespace (struct pipe *cpipe, int size);
179
180 static __inline void
181 pipewakeup(struct pipe *cpipe, int dosigio)
182 {
183         if (dosigio && (cpipe->pipe_state & PIPE_ASYNC) && cpipe->pipe_sigio) {
184                 lwkt_gettoken(&proc_token);
185                 pgsigio(cpipe->pipe_sigio, SIGIO, 0);
186                 lwkt_reltoken(&proc_token);
187         }
188         KNOTE(&cpipe->pipe_kq.ki_note, 0);
189 }
190
191 /*
192  * These routines are called before and after a UIO.  The UIO
193  * may block, causing our held tokens to be lost temporarily.
194  *
195  * We use these routines to serialize reads against other reads
196  * and writes against other writes.
197  *
198  * The read token is held on entry so *ipp does not race.
199  */
200 static __inline int
201 pipe_start_uio(struct pipe *cpipe, int *ipp)
202 {
203         int error;
204
205         while (*ipp) {
206                 *ipp = -1;
207                 error = tsleep(ipp, PCATCH, "pipexx", 0);
208                 if (error)
209                         return (error);
210         }
211         *ipp = 1;
212         return (0);
213 }
214
215 static __inline void
216 pipe_end_uio(struct pipe *cpipe, int *ipp)
217 {
218         if (*ipp < 0) {
219                 *ipp = 0;
220                 wakeup(ipp);
221         } else {
222                 KKASSERT(*ipp > 0);
223                 *ipp = 0;
224         }
225 }
226
227 /*
228  * The pipe system call for the DTYPE_PIPE type of pipes
229  *
230  * pipe_args(int dummy)
231  *
232  * MPSAFE
233  */
234 int
235 sys_pipe(struct pipe_args *uap)
236 {
237         struct thread *td = curthread;
238         struct filedesc *fdp = td->td_proc->p_fd;
239         struct file *rf, *wf;
240         struct pipe *rpipe, *wpipe;
241         int fd1, fd2, error;
242
243         rpipe = wpipe = NULL;
244         if (pipe_create(&rpipe) || pipe_create(&wpipe)) {
245                 pipeclose(rpipe); 
246                 pipeclose(wpipe); 
247                 return (ENFILE);
248         }
249         
250         error = falloc(td->td_lwp, &rf, &fd1);
251         if (error) {
252                 pipeclose(rpipe);
253                 pipeclose(wpipe);
254                 return (error);
255         }
256         uap->sysmsg_fds[0] = fd1;
257
258         /*
259          * Warning: once we've gotten past allocation of the fd for the
260          * read-side, we can only drop the read side via fdrop() in order
261          * to avoid races against processes which manage to dup() the read
262          * side while we are blocked trying to allocate the write side.
263          */
264         rf->f_type = DTYPE_PIPE;
265         rf->f_flag = FREAD | FWRITE;
266         rf->f_ops = &pipeops;
267         rf->f_data = rpipe;
268         error = falloc(td->td_lwp, &wf, &fd2);
269         if (error) {
270                 fsetfd(fdp, NULL, fd1);
271                 fdrop(rf);
272                 /* rpipe has been closed by fdrop(). */
273                 pipeclose(wpipe);
274                 return (error);
275         }
276         wf->f_type = DTYPE_PIPE;
277         wf->f_flag = FREAD | FWRITE;
278         wf->f_ops = &pipeops;
279         wf->f_data = wpipe;
280         uap->sysmsg_fds[1] = fd2;
281
282         rpipe->pipe_slock = kmalloc(sizeof(struct lock),
283                                     M_PIPE, M_WAITOK|M_ZERO);
284         wpipe->pipe_slock = rpipe->pipe_slock;
285         rpipe->pipe_peer = wpipe;
286         wpipe->pipe_peer = rpipe;
287         lockinit(rpipe->pipe_slock, "pipecl", 0, 0);
288
289         /*
290          * Once activated the peer relationship remains valid until
291          * both sides are closed.
292          */
293         fsetfd(fdp, rf, fd1);
294         fsetfd(fdp, wf, fd2);
295         fdrop(rf);
296         fdrop(wf);
297
298         return (0);
299 }
300
301 /*
302  * Allocate kva for pipe circular buffer, the space is pageable
303  * This routine will 'realloc' the size of a pipe safely, if it fails
304  * it will retain the old buffer.
305  * If it fails it will return ENOMEM.
306  */
307 static int
308 pipespace(struct pipe *cpipe, int size)
309 {
310         struct vm_object *object;
311         caddr_t buffer;
312         int npages, error;
313
314         npages = round_page(size) / PAGE_SIZE;
315         object = cpipe->pipe_buffer.object;
316
317         /*
318          * [re]create the object if necessary and reserve space for it
319          * in the kernel_map.  The object and memory are pageable.  On
320          * success, free the old resources before assigning the new
321          * ones.
322          */
323         if (object == NULL || object->size != npages) {
324                 object = vm_object_allocate(OBJT_DEFAULT, npages);
325                 buffer = (caddr_t)vm_map_min(&kernel_map);
326
327                 error = vm_map_find(&kernel_map, object, 0,
328                                     (vm_offset_t *)&buffer,
329                                     size, PAGE_SIZE,
330                                     1, VM_MAPTYPE_NORMAL,
331                                     VM_PROT_ALL, VM_PROT_ALL,
332                                     0);
333
334                 if (error != KERN_SUCCESS) {
335                         vm_object_deallocate(object);
336                         return (ENOMEM);
337                 }
338                 pipe_free_kmem(cpipe);
339                 cpipe->pipe_buffer.object = object;
340                 cpipe->pipe_buffer.buffer = buffer;
341                 cpipe->pipe_buffer.size = size;
342                 ++pipe_bkmem_alloc;
343         } else {
344                 ++pipe_bcache_alloc;
345         }
346         cpipe->pipe_buffer.rindex = 0;
347         cpipe->pipe_buffer.windex = 0;
348         return (0);
349 }
350
351 /*
352  * Initialize and allocate VM and memory for pipe, pulling the pipe from
353  * our per-cpu cache if possible.  For now make sure it is sized for the
354  * smaller PIPE_SIZE default.
355  */
356 static int
357 pipe_create(struct pipe **cpipep)
358 {
359         globaldata_t gd = mycpu;
360         struct pipe *cpipe;
361         int error;
362
363         if ((cpipe = gd->gd_pipeq) != NULL) {
364                 gd->gd_pipeq = cpipe->pipe_peer;
365                 --gd->gd_pipeqcount;
366                 cpipe->pipe_peer = NULL;
367                 cpipe->pipe_wantwcnt = 0;
368         } else {
369                 cpipe = kmalloc(sizeof(struct pipe), M_PIPE, M_WAITOK|M_ZERO);
370         }
371         *cpipep = cpipe;
372         if ((error = pipespace(cpipe, PIPE_SIZE)) != 0)
373                 return (error);
374         vfs_timestamp(&cpipe->pipe_ctime);
375         cpipe->pipe_atime = cpipe->pipe_ctime;
376         cpipe->pipe_mtime = cpipe->pipe_ctime;
377         lwkt_token_init(&cpipe->pipe_rlock, "piper");
378         lwkt_token_init(&cpipe->pipe_wlock, "pipew");
379         return (0);
380 }
381
382 static int
383 pipe_read(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
384 {
385         struct pipe *rpipe;
386         struct pipe *wpipe;
387         int error;
388         size_t nread = 0;
389         int nbio;
390         u_int size;     /* total bytes available */
391         u_int nsize;    /* total bytes to read */
392         u_int rindex;   /* contiguous bytes available */
393         int notify_writer;
394         int bigread;
395         int bigcount;
396
397         atomic_set_int(&curthread->td_mpflags, TDF_MP_BATCH_DEMARC);
398
399         if (uio->uio_resid == 0)
400                 return(0);
401
402         /*
403          * Setup locks, calculate nbio
404          */
405         rpipe = (struct pipe *)fp->f_data;
406         wpipe = rpipe->pipe_peer;
407         lwkt_gettoken(&rpipe->pipe_rlock);
408
409         if (fflags & O_FBLOCKING)
410                 nbio = 0;
411         else if (fflags & O_FNONBLOCKING)
412                 nbio = 1;
413         else if (fp->f_flag & O_NONBLOCK)
414                 nbio = 1;
415         else
416                 nbio = 0;
417
418         /*
419          * Reads are serialized.  Note however that pipe_buffer.buffer and
420          * pipe_buffer.size can change out from under us when the number
421          * of bytes in the buffer are zero due to the write-side doing a
422          * pipespace().
423          */
424         error = pipe_start_uio(rpipe, &rpipe->pipe_rip);
425         if (error) {
426                 lwkt_reltoken(&rpipe->pipe_rlock);
427                 return (error);
428         }
429         notify_writer = 0;
430
431         bigread = (uio->uio_resid > 10 * 1024 * 1024);
432         bigcount = 10;
433
434         while (uio->uio_resid) {
435                 /*
436                  * Don't hog the cpu.
437                  */
438                 if (bigread && --bigcount == 0) {
439                         lwkt_user_yield();
440                         bigcount = 10;
441                         if (CURSIG(curthread->td_lwp)) {
442                                 error = EINTR;
443                                 break;
444                         }
445                 }
446
447                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
448                 cpu_lfence();
449                 if (size) {
450                         rindex = rpipe->pipe_buffer.rindex &
451                                  (rpipe->pipe_buffer.size - 1);
452                         nsize = size;
453                         if (nsize > rpipe->pipe_buffer.size - rindex)
454                                 nsize = rpipe->pipe_buffer.size - rindex;
455                         nsize = szmin(nsize, uio->uio_resid);
456
457                         error = uiomove(&rpipe->pipe_buffer.buffer[rindex],
458                                         nsize, uio);
459                         if (error)
460                                 break;
461                         cpu_mfence();
462                         rpipe->pipe_buffer.rindex += nsize;
463                         nread += nsize;
464
465                         /*
466                          * If the FIFO is still over half full just continue
467                          * and do not try to notify the writer yet.
468                          */
469                         if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
470                                 notify_writer = 0;
471                                 continue;
472                         }
473
474                         /*
475                          * When the FIFO is less then half full notify any
476                          * waiting writer.  WANTW can be checked while
477                          * holding just the rlock.
478                          */
479                         notify_writer = 1;
480                         if ((rpipe->pipe_state & PIPE_WANTW) == 0)
481                                 continue;
482                 }
483
484                 /*
485                  * If the "write-side" was blocked we wake it up.  This code
486                  * is reached either when the buffer is completely emptied
487                  * or if it becomes more then half-empty.
488                  *
489                  * Pipe_state can only be modified if both the rlock and
490                  * wlock are held.
491                  */
492                 if (rpipe->pipe_state & PIPE_WANTW) {
493                         lwkt_gettoken(&rpipe->pipe_wlock);
494                         if (rpipe->pipe_state & PIPE_WANTW) {
495                                 rpipe->pipe_state &= ~PIPE_WANTW;
496                                 lwkt_reltoken(&rpipe->pipe_wlock);
497                                 wakeup(rpipe);
498                         } else {
499                                 lwkt_reltoken(&rpipe->pipe_wlock);
500                         }
501                 }
502
503                 /*
504                  * Pick up our copy loop again if the writer sent data to
505                  * us while we were messing around.
506                  *
507                  * On a SMP box poll up to pipe_delay nanoseconds for new
508                  * data.  Typically a value of 2000 to 4000 is sufficient
509                  * to eradicate most IPIs/tsleeps/wakeups when a pipe
510                  * is used for synchronous communications with small packets,
511                  * and 8000 or so (8uS) will pipeline large buffer xfers
512                  * between cpus over a pipe.
513                  *
514                  * For synchronous communications a hit means doing a
515                  * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
516                  * where as miss requiring a tsleep/wakeup sequence
517                  * will take 7uS or more.
518                  */
519                 if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
520                         continue;
521
522 #if defined(SMP) && defined(_RDTSC_SUPPORTED_)
523                 if (pipe_delay) {
524                         int64_t tsc_target;
525                         int good = 0;
526
527                         tsc_target = tsc_get_target(pipe_delay);
528                         while (tsc_test_target(tsc_target) == 0) {
529                                 if (rpipe->pipe_buffer.windex !=
530                                     rpipe->pipe_buffer.rindex) {
531                                         good = 1;
532                                         break;
533                                 }
534                         }
535                         if (good)
536                                 continue;
537                 }
538 #endif
539
540                 /*
541                  * Detect EOF condition, do not set error.
542                  */
543                 if (rpipe->pipe_state & PIPE_REOF)
544                         break;
545
546                 /*
547                  * Break if some data was read, or if this was a non-blocking
548                  * read.
549                  */
550                 if (nread > 0)
551                         break;
552
553                 if (nbio) {
554                         error = EAGAIN;
555                         break;
556                 }
557
558                 /*
559                  * Last chance, interlock with WANTR.
560                  */
561                 lwkt_gettoken(&rpipe->pipe_wlock);
562                 size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
563                 if (size) {
564                         lwkt_reltoken(&rpipe->pipe_wlock);
565                         continue;
566                 }
567
568                 /*
569                  * Retest EOF - acquiring a new token can temporarily release
570                  * tokens already held.
571                  */
572                 if (rpipe->pipe_state & PIPE_REOF) {
573                         lwkt_reltoken(&rpipe->pipe_wlock);
574                         break;
575                 }
576
577                 /*
578                  * If there is no more to read in the pipe, reset its
579                  * pointers to the beginning.  This improves cache hit
580                  * stats.
581                  *
582                  * We need both locks to modify both pointers, and there
583                  * must also not be a write in progress or the uiomove()
584                  * in the write might block and temporarily release
585                  * its wlock, then reacquire and update windex.  We are
586                  * only serialized against reads, not writes.
587                  *
588                  * XXX should we even bother resetting the indices?  It
589                  *     might actually be more cache efficient not to.
590                  */
591                 if (rpipe->pipe_buffer.rindex == rpipe->pipe_buffer.windex &&
592                     rpipe->pipe_wip == 0) {
593                         rpipe->pipe_buffer.rindex = 0;
594                         rpipe->pipe_buffer.windex = 0;
595                 }
596
597                 /*
598                  * Wait for more data.
599                  *
600                  * Pipe_state can only be set if both the rlock and wlock
601                  * are held.
602                  */
603                 rpipe->pipe_state |= PIPE_WANTR;
604                 tsleep_interlock(rpipe, PCATCH);
605                 lwkt_reltoken(&rpipe->pipe_wlock);
606                 error = tsleep(rpipe, PCATCH | PINTERLOCKED, "piperd", 0);
607                 ++pipe_rblocked_count;
608                 if (error)
609                         break;
610         }
611         pipe_end_uio(rpipe, &rpipe->pipe_rip);
612
613         /*
614          * Uptime last access time
615          */
616         if (error == 0 && nread)
617                 vfs_timestamp(&rpipe->pipe_atime);
618
619         /*
620          * If we drained the FIFO more then half way then handle
621          * write blocking hysteresis.
622          *
623          * Note that PIPE_WANTW cannot be set by the writer without
624          * it holding both rlock and wlock, so we can test it
625          * while holding just rlock.
626          */
627         if (notify_writer) {
628                 /*
629                  * Synchronous blocking is done on the pipe involved
630                  */
631                 if (rpipe->pipe_state & PIPE_WANTW) {
632                         lwkt_gettoken(&rpipe->pipe_wlock);
633                         if (rpipe->pipe_state & PIPE_WANTW) {
634                                 rpipe->pipe_state &= ~PIPE_WANTW;
635                                 lwkt_reltoken(&rpipe->pipe_wlock);
636                                 wakeup(rpipe);
637                         } else {
638                                 lwkt_reltoken(&rpipe->pipe_wlock);
639                         }
640                 }
641
642                 /*
643                  * But we may also have to deal with a kqueue which is
644                  * stored on the same pipe as its descriptor, so a
645                  * EVFILT_WRITE event waiting for our side to drain will
646                  * be on the other side.
647                  */
648                 lwkt_gettoken(&wpipe->pipe_wlock);
649                 pipewakeup(wpipe, 0);
650                 lwkt_reltoken(&wpipe->pipe_wlock);
651         }
652         /*size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;*/
653         lwkt_reltoken(&rpipe->pipe_rlock);
654
655         return (error);
656 }
657
658 static int
659 pipe_write(struct file *fp, struct uio *uio, struct ucred *cred, int fflags)
660 {
661         int error;
662         int orig_resid;
663         int nbio;
664         struct pipe *wpipe;
665         struct pipe *rpipe;
666         u_int windex;
667         u_int space;
668         u_int wcount;
669         int bigwrite;
670         int bigcount;
671
672         /*
673          * Writes go to the peer.  The peer will always exist.
674          */
675         rpipe = (struct pipe *) fp->f_data;
676         wpipe = rpipe->pipe_peer;
677         lwkt_gettoken(&wpipe->pipe_wlock);
678         if (wpipe->pipe_state & PIPE_WEOF) {
679                 lwkt_reltoken(&wpipe->pipe_wlock);
680                 return (EPIPE);
681         }
682
683         /*
684          * Degenerate case (EPIPE takes prec)
685          */
686         if (uio->uio_resid == 0) {
687                 lwkt_reltoken(&wpipe->pipe_wlock);
688                 return(0);
689         }
690
691         /*
692          * Writes are serialized (start_uio must be called with wlock)
693          */
694         error = pipe_start_uio(wpipe, &wpipe->pipe_wip);
695         if (error) {
696                 lwkt_reltoken(&wpipe->pipe_wlock);
697                 return (error);
698         }
699
700         if (fflags & O_FBLOCKING)
701                 nbio = 0;
702         else if (fflags & O_FNONBLOCKING)
703                 nbio = 1;
704         else if (fp->f_flag & O_NONBLOCK)
705                 nbio = 1;
706         else
707                 nbio = 0;
708
709         /*
710          * If it is advantageous to resize the pipe buffer, do
711          * so.  We are write-serialized so we can block safely.
712          */
713         if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
714             (pipe_nbig < pipe_maxbig) &&
715             wpipe->pipe_wantwcnt > 4 &&
716             (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
717                 /* 
718                  * Recheck after lock.
719                  */
720                 lwkt_gettoken(&wpipe->pipe_rlock);
721                 if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
722                     (pipe_nbig < pipe_maxbig) &&
723                     (wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
724                         atomic_add_int(&pipe_nbig, 1);
725                         if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
726                                 ++pipe_bigcount;
727                         else
728                                 atomic_subtract_int(&pipe_nbig, 1);
729                 }
730                 lwkt_reltoken(&wpipe->pipe_rlock);
731         }
732
733         orig_resid = uio->uio_resid;
734         wcount = 0;
735
736         bigwrite = (uio->uio_resid > 10 * 1024 * 1024);
737         bigcount = 10;
738
739         while (uio->uio_resid) {
740                 if (wpipe->pipe_state & PIPE_WEOF) {
741                         error = EPIPE;
742                         break;
743                 }
744
745                 /*
746                  * Don't hog the cpu.
747                  */
748                 if (bigwrite && --bigcount == 0) {
749                         lwkt_user_yield();
750                         bigcount = 10;
751                         if (CURSIG(curthread->td_lwp)) {
752                                 error = EINTR;
753                                 break;
754                         }
755                 }
756
757                 windex = wpipe->pipe_buffer.windex &
758                          (wpipe->pipe_buffer.size - 1);
759                 space = wpipe->pipe_buffer.size -
760                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
761                 cpu_lfence();
762
763                 /* Writes of size <= PIPE_BUF must be atomic. */
764                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
765                         space = 0;
766
767                 /* 
768                  * Write to fill, read size handles write hysteresis.  Also
769                  * additional restrictions can cause select-based non-blocking
770                  * writes to spin.
771                  */
772                 if (space > 0) {
773                         u_int segsize;
774
775                         /*
776                          * Transfer size is minimum of uio transfer
777                          * and free space in pipe buffer.
778                          *
779                          * Limit each uiocopy to no more then PIPE_SIZE
780                          * so we can keep the gravy train going on a
781                          * SMP box.  This doubles the performance for
782                          * write sizes > 16K.  Otherwise large writes
783                          * wind up doing an inefficient synchronous
784                          * ping-pong.
785                          */
786                         space = szmin(space, uio->uio_resid);
787                         if (space > PIPE_SIZE)
788                                 space = PIPE_SIZE;
789
790                         /*
791                          * First segment to transfer is minimum of
792                          * transfer size and contiguous space in
793                          * pipe buffer.  If first segment to transfer
794                          * is less than the transfer size, we've got
795                          * a wraparound in the buffer.
796                          */
797                         segsize = wpipe->pipe_buffer.size - windex;
798                         if (segsize > space)
799                                 segsize = space;
800
801 #ifdef SMP
802                         /*
803                          * If this is the first loop and the reader is
804                          * blocked, do a preemptive wakeup of the reader.
805                          *
806                          * On SMP the IPI latency plus the wlock interlock
807                          * on the reader side is the fastest way to get the
808                          * reader going.  (The scheduler will hard loop on
809                          * lock tokens).
810                          *
811                          * NOTE: We can't clear WANTR here without acquiring
812                          * the rlock, which we don't want to do here!
813                          */
814                         if ((wpipe->pipe_state & PIPE_WANTR))
815                                 wakeup(wpipe);
816 #endif
817
818                         /*
819                          * Transfer segment, which may include a wrap-around.
820                          * Update windex to account for both all in one go
821                          * so the reader can read() the data atomically.
822                          */
823                         error = uiomove(&wpipe->pipe_buffer.buffer[windex],
824                                         segsize, uio);
825                         if (error == 0 && segsize < space) {
826                                 segsize = space - segsize;
827                                 error = uiomove(&wpipe->pipe_buffer.buffer[0],
828                                                 segsize, uio);
829                         }
830                         if (error)
831                                 break;
832                         cpu_mfence();
833                         wpipe->pipe_buffer.windex += space;
834                         wcount += space;
835                         continue;
836                 }
837
838                 /*
839                  * We need both the rlock and the wlock to interlock against
840                  * the EOF, WANTW, and size checks, and to modify pipe_state.
841                  *
842                  * These are token locks so we do not have to worry about
843                  * deadlocks.
844                  */
845                 lwkt_gettoken(&wpipe->pipe_rlock);
846
847                 /*
848                  * If the "read-side" has been blocked, wake it up now
849                  * and yield to let it drain synchronously rather
850                  * then block.
851                  */
852                 if (wpipe->pipe_state & PIPE_WANTR) {
853                         wpipe->pipe_state &= ~PIPE_WANTR;
854                         wakeup(wpipe);
855                 }
856
857                 /*
858                  * don't block on non-blocking I/O
859                  */
860                 if (nbio) {
861                         lwkt_reltoken(&wpipe->pipe_rlock);
862                         error = EAGAIN;
863                         break;
864                 }
865
866                 /*
867                  * re-test whether we have to block in the writer after
868                  * acquiring both locks, in case the reader opened up
869                  * some space.
870                  */
871                 space = wpipe->pipe_buffer.size -
872                         (wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex);
873                 cpu_lfence();
874                 if ((space < uio->uio_resid) && (orig_resid <= PIPE_BUF))
875                         space = 0;
876
877                 /*
878                  * Retest EOF - acquiring a new token can temporarily release
879                  * tokens already held.
880                  */
881                 if (wpipe->pipe_state & PIPE_WEOF) {
882                         lwkt_reltoken(&wpipe->pipe_rlock);
883                         error = EPIPE;
884                         break;
885                 }
886
887                 /*
888                  * We have no more space and have something to offer,
889                  * wake up select/poll/kq.
890                  */
891                 if (space == 0) {
892                         wpipe->pipe_state |= PIPE_WANTW;
893                         ++wpipe->pipe_wantwcnt;
894                         pipewakeup(wpipe, 1);
895                         if (wpipe->pipe_state & PIPE_WANTW)
896                                 error = tsleep(wpipe, PCATCH, "pipewr", 0);
897                         ++pipe_wblocked_count;
898                 }
899                 lwkt_reltoken(&wpipe->pipe_rlock);
900
901                 /*
902                  * Break out if we errored or the read side wants us to go
903                  * away.
904                  */
905                 if (error)
906                         break;
907                 if (wpipe->pipe_state & PIPE_WEOF) {
908                         error = EPIPE;
909                         break;
910                 }
911         }
912         pipe_end_uio(wpipe, &wpipe->pipe_wip);
913
914         /*
915          * If we have put any characters in the buffer, we wake up
916          * the reader.
917          *
918          * Both rlock and wlock are required to be able to modify pipe_state.
919          */
920         if (wpipe->pipe_buffer.windex != wpipe->pipe_buffer.rindex) {
921                 if (wpipe->pipe_state & PIPE_WANTR) {
922                         lwkt_gettoken(&wpipe->pipe_rlock);
923                         if (wpipe->pipe_state & PIPE_WANTR) {
924                                 wpipe->pipe_state &= ~PIPE_WANTR;
925                                 lwkt_reltoken(&wpipe->pipe_rlock);
926                                 wakeup(wpipe);
927                         } else {
928                                 lwkt_reltoken(&wpipe->pipe_rlock);
929                         }
930                 }
931                 lwkt_gettoken(&wpipe->pipe_rlock);
932                 pipewakeup(wpipe, 1);
933                 lwkt_reltoken(&wpipe->pipe_rlock);
934         }
935
936         /*
937          * Don't return EPIPE if I/O was successful
938          */
939         if ((wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex) &&
940             (uio->uio_resid == 0) &&
941             (error == EPIPE)) {
942                 error = 0;
943         }
944
945         if (error == 0)
946                 vfs_timestamp(&wpipe->pipe_mtime);
947
948         /*
949          * We have something to offer,
950          * wake up select/poll/kq.
951          */
952         /*space = wpipe->pipe_buffer.windex - wpipe->pipe_buffer.rindex;*/
953         lwkt_reltoken(&wpipe->pipe_wlock);
954         return (error);
955 }
956
957 /*
958  * we implement a very minimal set of ioctls for compatibility with sockets.
959  */
960 int
961 pipe_ioctl(struct file *fp, u_long cmd, caddr_t data,
962            struct ucred *cred, struct sysmsg *msg)
963 {
964         struct pipe *mpipe;
965         int error;
966
967         mpipe = (struct pipe *)fp->f_data;
968
969         lwkt_gettoken(&mpipe->pipe_rlock);
970         lwkt_gettoken(&mpipe->pipe_wlock);
971
972         switch (cmd) {
973         case FIOASYNC:
974                 if (*(int *)data) {
975                         mpipe->pipe_state |= PIPE_ASYNC;
976                 } else {
977                         mpipe->pipe_state &= ~PIPE_ASYNC;
978                 }
979                 error = 0;
980                 break;
981         case FIONREAD:
982                 *(int *)data = mpipe->pipe_buffer.windex -
983                                 mpipe->pipe_buffer.rindex;
984                 error = 0;
985                 break;
986         case FIOSETOWN:
987                 error = fsetown(*(int *)data, &mpipe->pipe_sigio);
988                 break;
989         case FIOGETOWN:
990                 *(int *)data = fgetown(&mpipe->pipe_sigio);
991                 error = 0;
992                 break;
993         case TIOCSPGRP:
994                 /* This is deprecated, FIOSETOWN should be used instead. */
995                 error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
996                 break;
997
998         case TIOCGPGRP:
999                 /* This is deprecated, FIOGETOWN should be used instead. */
1000                 *(int *)data = -fgetown(&mpipe->pipe_sigio);
1001                 error = 0;
1002                 break;
1003         default:
1004                 error = ENOTTY;
1005                 break;
1006         }
1007         lwkt_reltoken(&mpipe->pipe_wlock);
1008         lwkt_reltoken(&mpipe->pipe_rlock);
1009
1010         return (error);
1011 }
1012
1013 /*
1014  * MPSAFE
1015  */
1016 static int
1017 pipe_stat(struct file *fp, struct stat *ub, struct ucred *cred)
1018 {
1019         struct pipe *pipe;
1020
1021         pipe = (struct pipe *)fp->f_data;
1022
1023         bzero((caddr_t)ub, sizeof(*ub));
1024         ub->st_mode = S_IFIFO;
1025         ub->st_blksize = pipe->pipe_buffer.size;
1026         ub->st_size = pipe->pipe_buffer.windex - pipe->pipe_buffer.rindex;
1027         ub->st_blocks = (ub->st_size + ub->st_blksize - 1) / ub->st_blksize;
1028         ub->st_atimespec = pipe->pipe_atime;
1029         ub->st_mtimespec = pipe->pipe_mtime;
1030         ub->st_ctimespec = pipe->pipe_ctime;
1031         /*
1032          * Left as 0: st_dev, st_ino, st_nlink, st_uid, st_gid, st_rdev,
1033          * st_flags, st_gen.
1034          * XXX (st_dev, st_ino) should be unique.
1035          */
1036         return (0);
1037 }
1038
1039 static int
1040 pipe_close(struct file *fp)
1041 {
1042         struct pipe *cpipe;
1043
1044         cpipe = (struct pipe *)fp->f_data;
1045         fp->f_ops = &badfileops;
1046         fp->f_data = NULL;
1047         funsetown(&cpipe->pipe_sigio);
1048         pipeclose(cpipe);
1049         return (0);
1050 }
1051
1052 /*
1053  * Shutdown one or both directions of a full-duplex pipe.
1054  */
1055 static int
1056 pipe_shutdown(struct file *fp, int how)
1057 {
1058         struct pipe *rpipe;
1059         struct pipe *wpipe;
1060         int error = EPIPE;
1061
1062         rpipe = (struct pipe *)fp->f_data;
1063         wpipe = rpipe->pipe_peer;
1064
1065         /*
1066          * We modify pipe_state on both pipes, which means we need
1067          * all four tokens!
1068          */
1069         lwkt_gettoken(&rpipe->pipe_rlock);
1070         lwkt_gettoken(&rpipe->pipe_wlock);
1071         lwkt_gettoken(&wpipe->pipe_rlock);
1072         lwkt_gettoken(&wpipe->pipe_wlock);
1073
1074         switch(how) {
1075         case SHUT_RDWR:
1076         case SHUT_RD:
1077                 rpipe->pipe_state |= PIPE_REOF;         /* my reads */
1078                 rpipe->pipe_state |= PIPE_WEOF;         /* peer writes */
1079                 if (rpipe->pipe_state & PIPE_WANTR) {
1080                         rpipe->pipe_state &= ~PIPE_WANTR;
1081                         wakeup(rpipe);
1082                 }
1083                 if (rpipe->pipe_state & PIPE_WANTW) {
1084                         rpipe->pipe_state &= ~PIPE_WANTW;
1085                         wakeup(rpipe);
1086                 }
1087                 error = 0;
1088                 if (how == SHUT_RD)
1089                         break;
1090                 /* fall through */
1091         case SHUT_WR:
1092                 wpipe->pipe_state |= PIPE_REOF;         /* peer reads */
1093                 wpipe->pipe_state |= PIPE_WEOF;         /* my writes */
1094                 if (wpipe->pipe_state & PIPE_WANTR) {
1095                         wpipe->pipe_state &= ~PIPE_WANTR;
1096                         wakeup(wpipe);
1097                 }
1098                 if (wpipe->pipe_state & PIPE_WANTW) {
1099                         wpipe->pipe_state &= ~PIPE_WANTW;
1100                         wakeup(wpipe);
1101                 }
1102                 error = 0;
1103                 break;
1104         }
1105         pipewakeup(rpipe, 1);
1106         pipewakeup(wpipe, 1);
1107
1108         lwkt_reltoken(&wpipe->pipe_wlock);
1109         lwkt_reltoken(&wpipe->pipe_rlock);
1110         lwkt_reltoken(&rpipe->pipe_wlock);
1111         lwkt_reltoken(&rpipe->pipe_rlock);
1112
1113         return (error);
1114 }
1115
1116 static void
1117 pipe_free_kmem(struct pipe *cpipe)
1118 {
1119         if (cpipe->pipe_buffer.buffer != NULL) {
1120                 if (cpipe->pipe_buffer.size > PIPE_SIZE)
1121                         atomic_subtract_int(&pipe_nbig, 1);
1122                 kmem_free(&kernel_map,
1123                         (vm_offset_t)cpipe->pipe_buffer.buffer,
1124                         cpipe->pipe_buffer.size);
1125                 cpipe->pipe_buffer.buffer = NULL;
1126                 cpipe->pipe_buffer.object = NULL;
1127         }
1128 }
1129
1130 /*
1131  * Close the pipe.  The slock must be held to interlock against simultanious
1132  * closes.  The rlock and wlock must be held to adjust the pipe_state.
1133  */
1134 static void
1135 pipeclose(struct pipe *cpipe)
1136 {
1137         globaldata_t gd;
1138         struct pipe *ppipe;
1139
1140         if (cpipe == NULL)
1141                 return;
1142
1143         /*
1144          * The slock may not have been allocated yet (close during
1145          * initialization)
1146          *
1147          * We need both the read and write tokens to modify pipe_state.
1148          */
1149         if (cpipe->pipe_slock)
1150                 lockmgr(cpipe->pipe_slock, LK_EXCLUSIVE);
1151         lwkt_gettoken(&cpipe->pipe_rlock);
1152         lwkt_gettoken(&cpipe->pipe_wlock);
1153
1154         /*
1155          * Set our state, wakeup anyone waiting in select/poll/kq, and
1156          * wakeup anyone blocked on our pipe.
1157          */
1158         cpipe->pipe_state |= PIPE_CLOSED | PIPE_REOF | PIPE_WEOF;
1159         pipewakeup(cpipe, 1);
1160         if (cpipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1161                 cpipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1162                 wakeup(cpipe);
1163         }
1164
1165         /*
1166          * Disconnect from peer.
1167          */
1168         if ((ppipe = cpipe->pipe_peer) != NULL) {
1169                 lwkt_gettoken(&ppipe->pipe_rlock);
1170                 lwkt_gettoken(&ppipe->pipe_wlock);
1171                 ppipe->pipe_state |= PIPE_REOF | PIPE_WEOF;
1172                 pipewakeup(ppipe, 1);
1173                 if (ppipe->pipe_state & (PIPE_WANTR | PIPE_WANTW)) {
1174                         ppipe->pipe_state &= ~(PIPE_WANTR | PIPE_WANTW);
1175                         wakeup(ppipe);
1176                 }
1177                 if (SLIST_FIRST(&ppipe->pipe_kq.ki_note))
1178                         KNOTE(&ppipe->pipe_kq.ki_note, 0);
1179                 lwkt_reltoken(&ppipe->pipe_wlock);
1180                 lwkt_reltoken(&ppipe->pipe_rlock);
1181         }
1182
1183         /*
1184          * If the peer is also closed we can free resources for both
1185          * sides, otherwise we leave our side intact to deal with any
1186          * races (since we only have the slock).
1187          */
1188         if (ppipe && (ppipe->pipe_state & PIPE_CLOSED)) {
1189                 cpipe->pipe_peer = NULL;
1190                 ppipe->pipe_peer = NULL;
1191                 ppipe->pipe_slock = NULL;       /* we will free the slock */
1192                 pipeclose(ppipe);
1193                 ppipe = NULL;
1194         }
1195
1196         lwkt_reltoken(&cpipe->pipe_wlock);
1197         lwkt_reltoken(&cpipe->pipe_rlock);
1198         if (cpipe->pipe_slock)
1199                 lockmgr(cpipe->pipe_slock, LK_RELEASE);
1200
1201         /*
1202          * If we disassociated from our peer we can free resources
1203          */
1204         if (ppipe == NULL) {
1205                 gd = mycpu;
1206                 if (cpipe->pipe_slock) {
1207                         kfree(cpipe->pipe_slock, M_PIPE);
1208                         cpipe->pipe_slock = NULL;
1209                 }
1210                 if (gd->gd_pipeqcount >= pipe_maxcache ||
1211                     cpipe->pipe_buffer.size != PIPE_SIZE
1212                 ) {
1213                         pipe_free_kmem(cpipe);
1214                         kfree(cpipe, M_PIPE);
1215                 } else {
1216                         cpipe->pipe_state = 0;
1217                         cpipe->pipe_peer = gd->gd_pipeq;
1218                         gd->gd_pipeq = cpipe;
1219                         ++gd->gd_pipeqcount;
1220                 }
1221         }
1222 }
1223
1224 static int
1225 pipe_kqfilter(struct file *fp, struct knote *kn)
1226 {
1227         struct pipe *cpipe;
1228
1229         cpipe = (struct pipe *)kn->kn_fp->f_data;
1230
1231         switch (kn->kn_filter) {
1232         case EVFILT_READ:
1233                 kn->kn_fop = &pipe_rfiltops;
1234                 break;
1235         case EVFILT_WRITE:
1236                 kn->kn_fop = &pipe_wfiltops;
1237                 if (cpipe->pipe_peer == NULL) {
1238                         /* other end of pipe has been closed */
1239                         return (EPIPE);
1240                 }
1241                 break;
1242         default:
1243                 return (EOPNOTSUPP);
1244         }
1245         kn->kn_hook = (caddr_t)cpipe;
1246
1247         knote_insert(&cpipe->pipe_kq.ki_note, kn);
1248
1249         return (0);
1250 }
1251
1252 static void
1253 filt_pipedetach(struct knote *kn)
1254 {
1255         struct pipe *cpipe = (struct pipe *)kn->kn_hook;
1256
1257         knote_remove(&cpipe->pipe_kq.ki_note, kn);
1258 }
1259
1260 /*ARGSUSED*/
1261 static int
1262 filt_piperead(struct knote *kn, long hint)
1263 {
1264         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1265         int ready = 0;
1266
1267         lwkt_gettoken(&rpipe->pipe_rlock);
1268         lwkt_gettoken(&rpipe->pipe_wlock);
1269
1270         kn->kn_data = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
1271
1272         if (rpipe->pipe_state & PIPE_REOF) {
1273                 /*
1274                  * Only set NODATA if all data has been exhausted
1275                  */
1276                 if (kn->kn_data == 0)
1277                         kn->kn_flags |= EV_NODATA;
1278                 kn->kn_flags |= EV_EOF; 
1279                 ready = 1;
1280         }
1281
1282         lwkt_reltoken(&rpipe->pipe_wlock);
1283         lwkt_reltoken(&rpipe->pipe_rlock);
1284
1285         if (!ready)
1286                 ready = kn->kn_data > 0;
1287
1288         return (ready);
1289 }
1290
1291 /*ARGSUSED*/
1292 static int
1293 filt_pipewrite(struct knote *kn, long hint)
1294 {
1295         struct pipe *rpipe = (struct pipe *)kn->kn_fp->f_data;
1296         struct pipe *wpipe = rpipe->pipe_peer;
1297         int ready = 0;
1298
1299         kn->kn_data = 0;
1300         if (wpipe == NULL) {
1301                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1302                 return (1);
1303         }
1304
1305         lwkt_gettoken(&wpipe->pipe_rlock);
1306         lwkt_gettoken(&wpipe->pipe_wlock);
1307
1308         if (wpipe->pipe_state & PIPE_WEOF) {
1309                 kn->kn_flags |= (EV_EOF | EV_NODATA);
1310                 ready = 1;
1311         }
1312
1313         if (!ready)
1314                 kn->kn_data = wpipe->pipe_buffer.size -
1315                               (wpipe->pipe_buffer.windex -
1316                                wpipe->pipe_buffer.rindex);
1317
1318         lwkt_reltoken(&wpipe->pipe_wlock);
1319         lwkt_reltoken(&wpipe->pipe_rlock);
1320
1321         if (!ready)
1322                 ready = kn->kn_data >= PIPE_BUF;
1323
1324         return (ready);
1325 }