static int pipe_nbig;
static int pipe_bcache_alloc;
static int pipe_bkmem_alloc;
+static int pipe_rblocked_count;
+static int pipe_wblocked_count;
SYSCTL_NODE(_kern, OID_AUTO, pipe, CTLFLAG_RW, 0, "Pipe operation");
SYSCTL_INT(_kern_pipe, OID_AUTO, nbig,
CTLFLAG_RD, &pipe_nbig, 0, "numer of big pipes allocated");
SYSCTL_INT(_kern_pipe, OID_AUTO, bigcount,
CTLFLAG_RW, &pipe_bigcount, 0, "number of times pipe expanded");
+SYSCTL_INT(_kern_pipe, OID_AUTO, rblocked,
+ CTLFLAG_RW, &pipe_rblocked_count, 0, "number of times pipe expanded");
+SYSCTL_INT(_kern_pipe, OID_AUTO, wblocked,
+ CTLFLAG_RW, &pipe_wblocked_count, 0, "number of times pipe expanded");
SYSCTL_INT(_kern_pipe, OID_AUTO, maxcache,
CTLFLAG_RW, &pipe_maxcache, 0, "max pipes cached per-cpu");
SYSCTL_INT(_kern_pipe, OID_AUTO, maxbig,
CTLFLAG_RW, &pipe_maxbig, 0, "max number of big pipes");
#ifdef SMP
+static int pipe_delay = 5000; /* 5uS default */
+SYSCTL_INT(_kern_pipe, OID_AUTO, delay,
+ CTLFLAG_RW, &pipe_delay, 0, "SMP delay optimization in ns");
static int pipe_mpsafe = 0;
SYSCTL_INT(_kern_pipe, OID_AUTO, mpsafe,
CTLFLAG_RW, &pipe_mpsafe, 0, "");
{
struct pipe *rpipe;
int error;
+ int orig_resid;
int nread = 0;
int nbio;
u_int size; /* total bytes available */
/*
* Degenerate case
*/
- if (uio->uio_resid == 0)
+ orig_resid = uio->uio_resid;
+ if (orig_resid == 0)
return(0);
/*
nread += nsize;
/*
- * If the FIFO has not been drained past the half-way
- * mark then just continue and do not try to notify
- * the writer yet.
+ * If the FIFO is still over half full just continue
+ * and do not try to notify the writer yet.
*/
if (size - nsize >= (rpipe->pipe_buffer.size >> 1)) {
notify_writer = 0;
}
/*
- * If the FIFO has been drained past the half-way
- * mark we have to check the writer at some point,
- * but for now continue if the writer is not yet
- * blocked.
+ * When the FIFO is less then half full notify any
+ * waiting writer. WANTW can be checked while
+ * holding just the rlock.
*/
notify_writer = 1;
if ((rpipe->pipe_state & PIPE_WANTW) == 0)
/*
* Pick up our copy loop again if the writer sent data to
- * us.
+ * us while we were messing around.
+ *
+ * On a SMP box poll up to pipe_delay nanoseconds for new
+ * data. Typically a value of 2000 to 4000 is sufficient
+ * to eradicate most IPIs/tsleeps/wakeups when a pipe
+ * is used for synchronous communications with small packets,
+ * and 8000 or so (8uS) will pipeline large buffer xfers
+ * between cpus over a pipe.
+ *
+ * For synchronous communications a hit means doing a
+ * full Awrite-Bread-Bwrite-Aread cycle in less then 2uS,
+ * where as miss requiring a tsleep/wakeup sequence
+ * will take 7uS or more.
*/
- size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
- if (size)
+ if (rpipe->pipe_buffer.windex != rpipe->pipe_buffer.rindex)
continue;
+#if defined(SMP) && defined(_RDTSC_SUPPORTED_)
+ if (pipe_delay) {
+ int64_t tsc_target;
+ int good = 0;
+
+ tsc_target = tsc_get_target(pipe_delay);
+ while (tsc_test_target(tsc_target) == 0) {
+ if (rpipe->pipe_buffer.windex !=
+ rpipe->pipe_buffer.rindex) {
+ good = 1;
+ break;
+ }
+ }
+ if (good)
+ continue;
+ }
+#endif
+
/*
* Detect EOF condition, do not set error.
*/
if (rpipe->pipe_state & PIPE_REOF)
break;
-#ifdef SMP
- /*
- * Gravy train if SMP box. This saves a ton of IPIs and
- * allows two cpus to operate in lockstep.
- *
- * XXX check pipe_wip also?
- */
- DELAY(1);
- size = rpipe->pipe_buffer.windex - rpipe->pipe_buffer.rindex;
- if (size)
- continue;
-#endif
-
/*
* Break if some data was read, or if this was a non-blocking
* read.
tsleep_interlock(rpipe);
lwkt_reltoken(&wlock);
error = tsleep(rpipe, PCATCH, "piperd", 0);
+ ++pipe_rblocked_count;
if (error)
break;
}
if ((wpipe->pipe_buffer.size <= PIPE_SIZE) &&
(pipe_nbig < pipe_maxbig) &&
(wpipe->pipe_buffer.rindex == wpipe->pipe_buffer.windex)) {
- if (pipespace(wpipe, BIG_PIPE_SIZE) == 0) {
+ atomic_add_int(&pipe_nbig, 1);
+ if (pipespace(wpipe, BIG_PIPE_SIZE) == 0)
++pipe_bigcount;
- pipe_nbig++;
- }
+ else
+ atomic_subtract_int(&pipe_nbig, 1);
}
lwkt_reltoken(&rlock);
}
* NOTE: We can't clear WANTR here without acquiring
* the rlock, which we don't want to do here!
*/
- if (wpipe->pipe_state & PIPE_WANTR)
+ if ((wpipe->pipe_state & PIPE_WANTR) && pipe_mpsafe > 1)
wakeup(wpipe);
/*
- * Transfer first segment
+ * Transfer segment, which may include a wrap-around.
+ * Update windex to account for both all in one go
+ * so the reader can read() the data atomically.
*/
error = uiomove(&wpipe->pipe_buffer.buffer[windex],
segsize, uio);
- cpu_mfence();
- wpipe->pipe_buffer.windex += segsize;
-
if (error == 0 && segsize < space) {
- /*
- * Transfer remaining part now, to
- * support atomic writes. Wraparound
- * happened.
- */
segsize = space - segsize;
error = uiomove(&wpipe->pipe_buffer.buffer[0],
segsize, uio);
- cpu_mfence();
- wpipe->pipe_buffer.windex += segsize;
}
if (error)
break;
+ cpu_mfence();
+ wpipe->pipe_buffer.windex += space;
wcount += space;
continue;
}
++wpipe->pipe_wantwcnt;
wpipe->pipe_state |= PIPE_WANTW;
error = tsleep(wpipe, PCATCH, "pipewr", 0);
+ ++pipe_wblocked_count;
}
lwkt_reltoken(&rlock);
pipe_ioctl(struct file *fp, u_long cmd, caddr_t data, struct ucred *cred)
{
struct pipe *mpipe;
+ lwkt_tokref rlock;
+ lwkt_tokref wlock;
int error;
+ int mpsave;
- get_mplock();
+ pipe_get_mplock(&mpsave);
mpipe = (struct pipe *)fp->f_data;
+ lwkt_gettoken(&rlock, &mpipe->pipe_rlock);
+ lwkt_gettoken(&wlock, &mpipe->pipe_wlock);
+
switch (cmd) {
case FIOASYNC:
if (*(int *)data) {
error = 0;
break;
case FIOSETOWN:
+ get_mplock();
error = fsetown(*(int *)data, &mpipe->pipe_sigio);
+ rel_mplock();
break;
case FIOGETOWN:
*(int *)data = fgetown(mpipe->pipe_sigio);
break;
case TIOCSPGRP:
/* This is deprecated, FIOSETOWN should be used instead. */
+ get_mplock();
error = fsetown(-(*(int *)data), &mpipe->pipe_sigio);
+ rel_mplock();
break;
case TIOCGPGRP:
error = ENOTTY;
break;
}
- rel_mplock();
+ lwkt_reltoken(&rlock);
+ lwkt_reltoken(&wlock);
+ pipe_rel_mplock(&mpsave);
+
return (error);
}
{
if (cpipe->pipe_buffer.buffer != NULL) {
if (cpipe->pipe_buffer.size > PIPE_SIZE)
- --pipe_nbig;
+ atomic_subtract_int(&pipe_nbig, 1);
kmem_free(&kernel_map,
(vm_offset_t)cpipe->pipe_buffer.buffer,
cpipe->pipe_buffer.size);