fifo statistics.
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * $DragonFly: src/sys/kern/vfs_jops.c,v 1.16 2005/07/05 06:19:29 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_jops.c,v 1.17 2005/07/06 06:02:22 dillon Exp $
*/
/*
* Each mount point may have zero or more independantly configured journals
error = ENOMEM;
/*
- * Create the worker thread and generate the association record.
+ * Create the worker threads and generate the association record.
*/
if (error) {
free(jo, M_JOURNAL);
journal_destroy(struct mount *mp, struct journal *jo, int flags)
{
struct jrecord jrec;
+ int wcount;
TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
wakeup(&jo->fifo);
+ wcount = 0;
while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
- tsleep(jo, 0, "jwait", 0);
+ tsleep(jo, 0, "jwait", hz);
+ if (++wcount % 10 == 0) {
+ printf("Warning: journal %s waiting for descriptors to close\n",
+ jo->id);
+ }
}
lwkt_free_thread(&jo->wthread); /* XXX SMP */
+ if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
+ lwkt_free_thread(&jo->rthread); /* XXX SMP */
if (jo->fp)
fdrop(jo->fp, curthread);
if (jo->fifo.membase)
bcopy(jo->id, rstat->id, sizeof(jo->id));
rstat->index = index;
rstat->membufsize = jo->fifo.size;
- rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
- rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
+ rstat->membufused = jo->fifo.windex - jo->fifo.xindex;
+ rstat->membufunacked = jo->fifo.rindex - jo->fifo.xindex;
rstat->bytessent = jo->total_acked;
+ rstat->fifostalls = jo->fifostalls;
++rstat;
++index;
*res += sizeof(*rstat);
/*
* Skip any pad records. We do not write out pad records if we can
* help it.
- *
- * If xindex is caught up to rindex it gets incremented along with
- * rindex. XXX SMP
*/
if (rawp->streamid == JREC_STREAMID_PAD) {
- if (jo->fifo.rindex == jo->fifo.xindex)
- jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
+ if (jo->fifo.rindex == jo->fifo.xindex) {
+ jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ jo->total_acked += (rawp->recsize + 15) & ~15;
+ }
+ }
jo->fifo.rindex += (rawp->recsize + 15) & ~15;
jo->total_acked += bytes;
KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
* For now assume blocking I/O. Since we are record-aware the
* code cannot yet handle partial writes.
*
+ * We bump rindex prior to issuing the write to avoid racing
+ * the acknowledgement coming back (which could prevent the ack
+ * from bumping xindex). Restarts are always based on xindex so
+ * we do not try to undo the rindex if an error occurs.
+ *
* XXX EWOULDBLOCK/NBIO
* XXX notification on failure
* XXX permanent verses temporary failures
* XXX two-way acknowledgement stream in the return direction / xindex
*/
bytes = res;
+ jo->fifo.rindex += bytes;
error = fp_write(jo->fp,
- jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
+ jo->fifo.membase + ((jo->fifo.rindex - bytes) & jo->fifo.mask),
bytes, &res);
if (error) {
printf("journal_thread(%s) write, error %d\n", jo->id, error);
* advance xindex, otherwise the rjournal thread is responsible for
* advancing xindex.
*/
- jo->fifo.rindex += bytes;
- if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
+ if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
jo->fifo.xindex += bytes;
- jo->total_acked += bytes;
+ jo->total_acked += bytes;
+ }
KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
if (jo->flags & MC_JOURNAL_WWAIT) {
int error;
int count;
int bytes;
- int index;
transid = 0;
error = 0;
* stream.
*/
if (transid == 0) {
- for (index = 0; index < sizeof(ack); index += count) {
- error = fp_read(jo->fp, &ack, sizeof(ack), &count);
- if (error)
- break;
- if (count == 0)
- tsleep(&jo->fifo.xindex, 0, "jread", hz);
- }
+ error = fp_read(jo->fp, &ack, sizeof(ack), &count, 1);
+#if 0
+ printf("fp_read ack error %d count %d\n", error, count);
+#endif
+ if (error || count != sizeof(ack))
+ break;
if (error) {
printf("read error %d on receive stream\n", error);
break;
bytes = jo->fifo.rindex - jo->fifo.xindex;
if (bytes == 0) {
- printf("warning: unsent data acknowledged\n");
+ printf("warning: unsent data acknowledged transid %08llx\n", transid);
tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
transid = 0;
continue;
}
/*
- * Since rindex has advanceted, the record pointed to by xindex
+ * Since rindex has advanced, the record pointed to by xindex
* must be a valid record.
*/
rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
* The target can acknowledge several records at once.
*/
if (rawp->transid < transid) {
+#if 1
printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ jo->total_acked += (rawp->recsize + 15) & ~15;
if (jo->flags & MC_JOURNAL_WWAIT) {
jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
wakeup(&jo->fifo.windex);
continue;
}
if (rawp->transid == transid) {
+#if 1
printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ jo->total_acked += (rawp->recsize + 15) & ~15;
if (jo->flags & MC_JOURNAL_WWAIT) {
jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
wakeup(&jo->fifo.windex);
transid = 0;
continue;
}
- printf("warning: unsent data(2) acknowledged\n");
+ printf("warning: unsent data(2) acknowledged transid %08llx\n", transid);
transid = 0;
}
jo->flags &= ~MC_JOURNAL_RACTIVE;
if (avail < req) {
/* XXX MC_JOURNAL_STOP_IMM */
jo->flags |= MC_JOURNAL_WWAIT;
+ ++jo->fifostalls;
tsleep(&jo->fifo.windex, 0, "jwrite", 0);
continue;
}
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * $DragonFly: src/sys/kern/vfs_journal.c,v 1.16 2005/07/05 06:19:29 dillon Exp $
+ * $DragonFly: src/sys/kern/vfs_journal.c,v 1.17 2005/07/06 06:02:22 dillon Exp $
*/
/*
* Each mount point may have zero or more independantly configured journals
error = ENOMEM;
/*
- * Create the worker thread and generate the association record.
+ * Create the worker threads and generate the association record.
*/
if (error) {
free(jo, M_JOURNAL);
journal_destroy(struct mount *mp, struct journal *jo, int flags)
{
struct jrecord jrec;
+ int wcount;
TAILQ_REMOVE(&mp->mnt_jlist, jo, jentry);
jo->flags |= MC_JOURNAL_STOP_REQ | (flags & MC_JOURNAL_STOP_IMM);
wakeup(&jo->fifo);
+ wcount = 0;
while (jo->flags & (MC_JOURNAL_WACTIVE | MC_JOURNAL_RACTIVE)) {
- tsleep(jo, 0, "jwait", 0);
+ tsleep(jo, 0, "jwait", hz);
+ if (++wcount % 10 == 0) {
+ printf("Warning: journal %s waiting for descriptors to close\n",
+ jo->id);
+ }
}
lwkt_free_thread(&jo->wthread); /* XXX SMP */
+ if (jo->flags & MC_JOURNAL_WANT_FULLDUPLEX)
+ lwkt_free_thread(&jo->rthread); /* XXX SMP */
if (jo->fp)
fdrop(jo->fp, curthread);
if (jo->fifo.membase)
bcopy(jo->id, rstat->id, sizeof(jo->id));
rstat->index = index;
rstat->membufsize = jo->fifo.size;
- rstat->membufused = jo->fifo.xindex - jo->fifo.rindex;
- rstat->membufiopend = jo->fifo.windex - jo->fifo.rindex;
+ rstat->membufused = jo->fifo.windex - jo->fifo.xindex;
+ rstat->membufunacked = jo->fifo.rindex - jo->fifo.xindex;
rstat->bytessent = jo->total_acked;
+ rstat->fifostalls = jo->fifostalls;
++rstat;
++index;
*res += sizeof(*rstat);
/*
* Skip any pad records. We do not write out pad records if we can
* help it.
- *
- * If xindex is caught up to rindex it gets incremented along with
- * rindex. XXX SMP
*/
if (rawp->streamid == JREC_STREAMID_PAD) {
- if (jo->fifo.rindex == jo->fifo.xindex)
- jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
+ if (jo->fifo.rindex == jo->fifo.xindex) {
+ jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ jo->total_acked += (rawp->recsize + 15) & ~15;
+ }
+ }
jo->fifo.rindex += (rawp->recsize + 15) & ~15;
jo->total_acked += bytes;
KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
* For now assume blocking I/O. Since we are record-aware the
* code cannot yet handle partial writes.
*
+ * We bump rindex prior to issuing the write to avoid racing
+ * the acknowledgement coming back (which could prevent the ack
+ * from bumping xindex). Restarts are always based on xindex so
+ * we do not try to undo the rindex if an error occurs.
+ *
* XXX EWOULDBLOCK/NBIO
* XXX notification on failure
* XXX permanent verses temporary failures
* XXX two-way acknowledgement stream in the return direction / xindex
*/
bytes = res;
+ jo->fifo.rindex += bytes;
error = fp_write(jo->fp,
- jo->fifo.membase + (jo->fifo.rindex & jo->fifo.mask),
+ jo->fifo.membase + ((jo->fifo.rindex - bytes) & jo->fifo.mask),
bytes, &res);
if (error) {
printf("journal_thread(%s) write, error %d\n", jo->id, error);
* advance xindex, otherwise the rjournal thread is responsible for
* advancing xindex.
*/
- jo->fifo.rindex += bytes;
- if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0)
+ if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
jo->fifo.xindex += bytes;
- jo->total_acked += bytes;
+ jo->total_acked += bytes;
+ }
KKASSERT(jo->fifo.windex - jo->fifo.rindex >= 0);
if ((jo->flags & MC_JOURNAL_WANT_FULLDUPLEX) == 0) {
if (jo->flags & MC_JOURNAL_WWAIT) {
int error;
int count;
int bytes;
- int index;
transid = 0;
error = 0;
* stream.
*/
if (transid == 0) {
- for (index = 0; index < sizeof(ack); index += count) {
- error = fp_read(jo->fp, &ack, sizeof(ack), &count);
- if (error)
- break;
- if (count == 0)
- tsleep(&jo->fifo.xindex, 0, "jread", hz);
- }
+ error = fp_read(jo->fp, &ack, sizeof(ack), &count, 1);
+#if 0
+ printf("fp_read ack error %d count %d\n", error, count);
+#endif
+ if (error || count != sizeof(ack))
+ break;
if (error) {
printf("read error %d on receive stream\n", error);
break;
bytes = jo->fifo.rindex - jo->fifo.xindex;
if (bytes == 0) {
- printf("warning: unsent data acknowledged\n");
+ printf("warning: unsent data acknowledged transid %08llx\n", transid);
tsleep(&jo->fifo.xindex, 0, "jrseq", hz);
transid = 0;
continue;
}
/*
- * Since rindex has advanceted, the record pointed to by xindex
+ * Since rindex has advanced, the record pointed to by xindex
* must be a valid record.
*/
rawp = (void *)(jo->fifo.membase + (jo->fifo.xindex & jo->fifo.mask));
* The target can acknowledge several records at once.
*/
if (rawp->transid < transid) {
+#if 1
printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ jo->total_acked += (rawp->recsize + 15) & ~15;
if (jo->flags & MC_JOURNAL_WWAIT) {
jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
wakeup(&jo->fifo.windex);
continue;
}
if (rawp->transid == transid) {
+#if 1
printf("ackskip %08llx/%08llx\n", rawp->transid, transid);
+#endif
jo->fifo.xindex += (rawp->recsize + 15) & ~15;
+ jo->total_acked += (rawp->recsize + 15) & ~15;
if (jo->flags & MC_JOURNAL_WWAIT) {
jo->flags &= ~MC_JOURNAL_WWAIT; /* XXX hysteresis */
wakeup(&jo->fifo.windex);
transid = 0;
continue;
}
- printf("warning: unsent data(2) acknowledged\n");
+ printf("warning: unsent data(2) acknowledged transid %08llx\n", transid);
transid = 0;
}
jo->flags &= ~MC_JOURNAL_RACTIVE;
if (avail < req) {
/* XXX MC_JOURNAL_STOP_IMM */
jo->flags |= MC_JOURNAL_WWAIT;
+ ++jo->fifostalls;
tsleep(&jo->fifo.windex, 0, "jwrite", 0);
continue;
}
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * $DragonFly: src/sys/sys/journal.h,v 1.4 2005/07/04 21:05:54 dillon Exp $
+ * $DragonFly: src/sys/sys/journal.h,v 1.5 2005/07/06 06:02:23 dillon Exp $
*/
#ifndef _SYS_JOURNAL_H_
#define JREC_STREAMID_PAD (JREC_STREAMCTL_BEGIN|JREC_STREAMCTL_END|0x0001)
#define JREC_STREAMID_DISCONT 0x0002 /* discontinuity */
#define JREC_STREAMID_ANNOTATE 0x0003 /* annotation */
- /* 0x0004-0x007F reserved by DragonFly */
+#define JREC_STREAMID_ACK 0x0004 /* acknowledgement */
+ /* 0x0005-0x007F reserved by DragonFly */
/* 0x0080-0x00FF for third party use */
#define JREC_STREAMID_JMIN 0x0100 /* lowest allowed general id */
#define JREC_STREAMID_JMAX 0x2000 /* (one past the highest allowed id) */
* OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
- * $DragonFly: src/sys/sys/mountctl.h,v 1.7 2005/03/22 22:13:33 dillon Exp $
+ * $DragonFly: src/sys/sys/mountctl.h,v 1.8 2005/07/06 06:02:23 dillon Exp $
*/
#ifndef _SYS_MOUNTCTL_H_
int flags;
int64_t membufsize;
int64_t membufused;
- int64_t membufiopend;
+ int64_t membufunacked;
int64_t swapbufsize;
int64_t swapbufused;
- int64_t swapbufiopend;
+ int64_t swapbufunacked;
int64_t transidstart;
int64_t transidcurrent;
- int64_t transidiopend;
+ int64_t transidunacked;
int64_t transidacked;
int64_t bytessent;
int64_t bytesacked;
+ int64_t fifostalls;
+ int64_t reserved[4];
struct timeval lastack;
};
int flags; /* journaling flags */
int64_t transid;
int64_t total_acked;
+ int64_t fifostalls;
struct journal_memfifo fifo;
struct thread wthread;
struct thread rthread;