HAMMER Utilities: Streaming mirroring!
authorMatthew Dillon <dillon@dragonflybsd.org>
Thu, 31 Jul 2008 06:01:32 +0000 (06:01 +0000)
committerMatthew Dillon <dillon@dragonflybsd.org>
Thu, 31 Jul 2008 06:01:32 +0000 (06:01 +0000)
* Add a new streaming mirroring feature called 'mirror-stream' which works
  like mirror-copy but does not exit unless the pipe is broken.  A simple
  script to sleep and re-run the command (to deal with the occassional
  broken tcp connection) is all that is needed.

* Add new options -b <bw> and -i <delay> to support bandwidth-limiting
  mirroring streams.  The bandwidth may be specified in bytes/sec or suffixed
  with 'k', 'm', or 'g' to specify kilobytes, megabytes, or gigabytes per
  second.

sbin/hammer/cmd_mirror.c
sbin/hammer/cmd_pseudofs.c
sbin/hammer/hammer.8
sbin/hammer/hammer.c
sbin/hammer/hammer.h

index 07092f7..045366b 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  * 
- * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.11 2008/07/19 18:48:14 dillon Exp $
+ * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.12 2008/07/31 06:01:31 dillon Exp $
  */
 
 #include "hammer.h"
@@ -46,9 +46,12 @@ static void write_mrecord(int fdout, u_int32_t type,
                         hammer_ioc_mrecord_any_t mrec, int bytes);
 static void generate_mrec_header(int fd, int fdout, int pfs_id,
                         hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
-static void validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
+static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
+                        struct hammer_ioc_mrecord_head *pickup,
                         hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
+static ssize_t writebw(int fd, const void *buf, size_t nbytes,
+                       u_int64_t *bwcount, struct timeval *tv1);
 static void mirror_usage(int code);
 
 /*
@@ -60,11 +63,12 @@ static void mirror_usage(int code);
  * stream made it to the destination.
  */
 void
-hammer_cmd_mirror_read(char **av, int ac)
+hammer_cmd_mirror_read(char **av, int ac, int streaming)
 {
        struct hammer_ioc_mirror_rw mirror;
        struct hammer_ioc_pseudofs_rw pfs;
        union hammer_ioc_mrecord_any mrec_tmp;
+       struct hammer_ioc_mrecord_head pickup;
        hammer_ioc_mrecord_any_t mrec;
        hammer_tid_t sync_tid;
        const char *filesystem;
@@ -73,27 +77,49 @@ hammer_cmd_mirror_read(char **av, int ac)
        int error;
        int fd;
        int n;
+       int didwork;
+       int64_t total_bytes;
        time_t base_t = time(NULL);
+       struct timeval bwtv;
+       u_int64_t bwcount;
 
        if (ac > 2)
                mirror_usage(1);
        filesystem = av[0];
 
+       pickup.signature = 0;
+       pickup.type = 0;
+
+again:
        bzero(&mirror, sizeof(mirror));
        hammer_key_beg_init(&mirror.key_beg);
        hammer_key_end_init(&mirror.key_end);
 
        fd = getpfs(&pfs, filesystem);
 
+       if (streaming && VerboseOpt) {
+               fprintf(stderr, "\nRunning");
+               fflush(stderr);
+       }
+       total_bytes = 0;
+       gettimeofday(&bwtv, NULL);
+       bwcount = 0;
+
        /*
         * In 2-way mode the target will send us a PFS info packet
         * first.  Use the target's current snapshot TID as our default
         * begin TID.
         */
        mirror.tid_beg = 0;
-       if (TwoWayPipeOpt)
-               validate_mrec_header(fd, 0, 0, pfs.pfs_id,
-                                    NULL, &mirror.tid_beg);
+       if (TwoWayPipeOpt) {
+               n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
+                                        NULL, &mirror.tid_beg);
+               if (n < 0) {    /* got TERM record */
+                       relpfs(fd, &pfs);
+                       return;
+               }
+               ++mirror.tid_beg;
+       }
 
        /*
         * Write out the PFS header, tid_beg will be updated if our PFS
@@ -103,6 +129,7 @@ hammer_cmd_mirror_read(char **av, int ac)
        generate_mrec_header(fd, 1, pfs.pfs_id,
                             &mirror.tid_beg, &mirror.tid_end);
 
+       /* XXX streaming mode support w/ cycle or command line arg */
        /*
         * A cycle file overrides the beginning TID
         */
@@ -111,8 +138,11 @@ hammer_cmd_mirror_read(char **av, int ac)
        if (ac == 2)
                mirror.tid_beg = strtoull(av[1], NULL, 0);
 
-       fprintf(stderr, "Mirror-read: Mirror from %016llx to %016llx\n",
-               mirror.tid_beg, mirror.tid_end);
+       if (streaming == 0 || VerboseOpt >= 2) {
+               fprintf(stderr,
+                       "Mirror-read: Mirror from %016llx to %016llx\n",
+                       mirror.tid_beg, mirror.tid_end);
+       }
        if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
                fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
                        mirror.key_beg.obj_id);
@@ -121,12 +151,13 @@ hammer_cmd_mirror_read(char **av, int ac)
        /*
         * Nothing to do if begin equals end.
         */
-       if (mirror.tid_beg == mirror.tid_end) {
-               fprintf(stderr, "Mirror-read: No work to do, stopping\n");
-               write_mrecord(1, HAMMER_MREC_TYPE_TERM,
-                             &mrec_tmp, sizeof(mrec_tmp.sync));
+       if (mirror.tid_beg >= mirror.tid_end) {
+               if (streaming == 0 || VerboseOpt >= 2)
+                       fprintf(stderr, "Mirror-read: No work to do\n");
+               didwork = 0;
                goto done;
        }
+       didwork = 1;
 
        /*
         * Write out bulk records
@@ -150,7 +181,12 @@ hammer_cmd_mirror_read(char **av, int ac)
                        exit(1);
                }
                if (mirror.count) {
-                       n = write(1, mirror.ubuf, mirror.count);
+                       if (BandwidthOpt) {
+                               n = writebw(1, mirror.ubuf, mirror.count,
+                                           &bwcount, &bwtv);
+                       } else {
+                               n = write(1, mirror.ubuf, mirror.count);
+                       }
                        if (n != mirror.count) {
                                fprintf(stderr, "Mirror-read %s failed: "
                                                "short write\n",
@@ -158,6 +194,13 @@ hammer_cmd_mirror_read(char **av, int ac)
                                exit(1);
                        }
                }
+               total_bytes += mirror.count;
+               if (streaming && VerboseOpt) {
+                       fprintf(stderr, "\r%016llx %11lld",
+                               mirror.key_cur.obj_id,
+                               total_bytes);
+                       fflush(stderr);
+               }
                mirror.key_beg = mirror.key_cur;
                if (TimeoutOpt &&
                    (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
@@ -171,11 +214,19 @@ hammer_cmd_mirror_read(char **av, int ac)
                }
        } while (mirror.count != 0);
 
+done:
        /*
-        * Write out the termination sync record
+        * Write out the termination sync record - only if not interrupted
         */
-       write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
-                     &mrec_tmp, sizeof(mrec_tmp.sync));
+       if (interrupted == 0) {
+               if (didwork) {
+                       write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
+                                     &mrec_tmp, sizeof(mrec_tmp.sync));
+               } else {
+                       write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
+                                     &mrec_tmp, sizeof(mrec_tmp.sync));
+               }
+       }
 
        /*
         * If the -2 option was given (automatic when doing mirror-copy),
@@ -183,7 +234,7 @@ hammer_cmd_mirror_read(char **av, int ac)
         * the target.
         */
        if (TwoWayPipeOpt) {
-               mrec = read_mrecord(0, &error, NULL);
+               mrec = read_mrecord(0, &error, &pickup);
                if (mrec == NULL || 
                    mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
                    mrec->head.rec_size != sizeof(mrec->update)) {
@@ -211,7 +262,38 @@ hammer_cmd_mirror_read(char **av, int ac)
                                "fully updated unless you use mirror-copy\n");
                hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
        }
-done:
+       if (streaming && interrupted == 0) {
+               time_t t1 = time(NULL);
+               time_t t2;
+
+               if (VerboseOpt) {
+                       fprintf(stderr, " W");
+                       fflush(stderr);
+               }
+               pfs.ondisk->sync_end_tid = mirror.tid_end;
+               if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
+                       fprintf(stderr, "Mirror-read %s: cannot stream: %s\n",
+                               filesystem, strerror(errno));
+               } else {
+                       t2 = time(NULL) - t1;
+                       if (t2 >= 0 && t2 < DelayOpt) {
+                               if (VerboseOpt) {
+                                       fprintf(stderr, "\bD");
+                                       fflush(stderr);
+                               }
+                               sleep(DelayOpt - t2);
+                       }
+                       if (VerboseOpt) {
+                               fprintf(stderr, "\b ");
+                               fflush(stderr);
+                       }
+                       relpfs(fd, &pfs);
+                       goto again;
+               }
+       }
+       write_mrecord(1, HAMMER_MREC_TYPE_TERM,
+                     &mrec_tmp, sizeof(mrec_tmp.sync));
+       relpfs(fd, &pfs);
        fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
 }
 
@@ -253,11 +335,16 @@ hammer_cmd_mirror_write(char **av, int ac)
        hammer_ioc_mrecord_any_t mrec;
        int error;
        int fd;
+       int n;
 
        if (ac > 2)
                mirror_usage(1);
        filesystem = av[0];
 
+       pickup.signature = 0;
+       pickup.type = 0;
+
+again:
        bzero(&mirror, sizeof(mirror));
        hammer_key_beg_init(&mirror.key_beg);
        hammer_key_end_init(&mirror.key_end);
@@ -280,15 +367,16 @@ hammer_cmd_mirror_write(char **av, int ac)
         * Read and process the PFS header.  The source informs us of
         * the TID range the stream represents.
         */
-       validate_mrec_header(fd, 0, 1, pfs.pfs_id,
-                            &mirror.tid_beg, &mirror.tid_end);
+       n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
+                                &mirror.tid_beg, &mirror.tid_end);
+       if (n < 0) {    /* got TERM record */
+               relpfs(fd, &pfs);
+               return;
+       }
 
        mirror.ubuf = buf;
        mirror.size = SERIALBUF_SIZE;
 
-       pickup.signature = 0;
-       pickup.type = 0;
-
        /*
         * Read and process bulk records (REC, PASS, and SKIP types).
         *
@@ -331,35 +419,40 @@ hammer_cmd_mirror_write(char **av, int ac)
        mrec = read_mrecord(0, &error, &pickup);
 
        if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
-               fprintf(stderr, "Mirror-write: No work to do, stopping\n");
+               fprintf(stderr, "Mirror-write: received termination request\n");
+               free(mrec);
                return;
        }
 
        if (mrec == NULL || 
-           mrec->head.type != HAMMER_MREC_TYPE_SYNC ||
+           (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
+            mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
            mrec->head.rec_size != sizeof(mrec->sync)) {
                fprintf(stderr, "Mirror-write %s: Did not get termination "
                                "sync record, or rec_size is wrong rt=%d\n",
                                filesystem, mrec->head.type);
                exit(1);
        }
-       free(mrec);
-       mrec = NULL;
 
        /*
         * Update the PFS info on the target so the user has visibility
-        * into the new snapshot.
+        * into the new snapshot, and sync the target filesystem.
         */
-       update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
+       if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
+               update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
 
-       /*
-        * Sync the target filesystem
-        */
-       bzero(&synctid, sizeof(synctid));
-       synctid.op = HAMMER_SYNCTID_SYNC2;
-       ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
+               bzero(&synctid, sizeof(synctid));
+               synctid.op = HAMMER_SYNCTID_SYNC2;
+               ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
 
-       fprintf(stderr, "Mirror-write %s: succeeded\n", filesystem);
+               if (VerboseOpt >= 2) {
+                       fprintf(stderr, "Mirror-write %s: succeeded\n",
+                               filesystem);
+               }
+       }
+
+       free(mrec);
+       mrec = NULL;
 
        /*
         * Report back to the originator.
@@ -372,6 +465,8 @@ hammer_cmd_mirror_write(char **av, int ac)
                printf("Source can update synctid to 0x%016llx\n",
                       mirror.tid_end);
        }
+       relpfs(fd, &pfs);
+       goto again;
 }
 
 void
@@ -454,14 +549,17 @@ hammer_cmd_mirror_dump(void)
         * Read and process the termination sync record.
         */
        mrec = read_mrecord(0, &error, &pickup);
-       if (mrec == NULL || mrec->head.type != HAMMER_MREC_TYPE_SYNC) {
+       if (mrec == NULL || 
+           (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
+            mrec->head.type != HAMMER_MREC_TYPE_IDLE)
+        ) {
                fprintf(stderr, "Mirror-dump: Did not get termination "
                                "sync record\n");
        }
 }
 
 void
-hammer_cmd_mirror_copy(char **av, int ac)
+hammer_cmd_mirror_copy(char **av, int ac, int streaming)
 {
        pid_t pid1;
        pid_t pid2;
@@ -495,20 +593,35 @@ hammer_cmd_mirror_copy(char **av, int ac)
                        xav[xac++] = "ssh";
                        xav[xac++] = av[0];
                        xav[xac++] = "hammer";
-                       if (VerboseOpt)
+
+                       switch(VerboseOpt) {
+                       case 0:
+                               break;
+                       case 1:
                                xav[xac++] = "-v";
+                               break;
+                       case 2:
+                               xav[xac++] = "-vv";
+                               break;
+                       default:
+                               xav[xac++] = "-vvv";
+                               break;
+                       }
                        xav[xac++] = "-2";
                        if (TimeoutOpt) {
                                snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
                                xav[xac++] = "-t";
                                xav[xac++] = tbuf;
                        }
-                       xav[xac++] = "mirror-read";
+                       if (streaming)
+                               xav[xac++] = "mirror-read-streaming";
+                       else
+                               xav[xac++] = "mirror-read";
                        xav[xac++] = ptr;
                        xav[xac++] = NULL;
                        execv("/usr/bin/ssh", (void *)xav);
                } else {
-                       hammer_cmd_mirror_read(av, 1);
+                       hammer_cmd_mirror_read(av, 1, streaming);
                        fflush(stdout);
                        fflush(stderr);
                }
@@ -529,8 +642,21 @@ hammer_cmd_mirror_copy(char **av, int ac)
                        xav[xac++] = "ssh";
                        xav[xac++] = av[1];
                        xav[xac++] = "hammer";
-                       if (VerboseOpt)
+
+                       switch(VerboseOpt) {
+                       case 0:
+                               break;
+                       case 1:
                                xav[xac++] = "-v";
+                               break;
+                       case 2:
+                               xav[xac++] = "-vv";
+                               break;
+                       default:
+                               xav[xac++] = "-vvv";
+                               break;
+                       }
+
                        xav[xac++] = "-2";
                        xav[xac++] = "mirror-write";
                        xav[xac++] = ptr;
@@ -815,9 +941,12 @@ generate_mrec_header(int fd, int fdout, int pfs_id,
 /*
  * Validate the pfs information from the originating filesystem
  * against the target filesystem.  shared_uuid must match.
+ *
+ * return -1 if we got a TERM record
  */
-static void
+static int
 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
+                    struct hammer_ioc_mrecord_head *pickup,
                     hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
 {
        struct hammer_ioc_pseudofs_rw pfs;
@@ -842,12 +971,17 @@ validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
                exit(1);
        }
 
-       mrec = read_mrecord(fdin, &error, NULL);
+       mrec = read_mrecord(fdin, &error, pickup);
        if (mrec == NULL) {
                if (error == 0)
                        fprintf(stderr, "validate_mrec_header: short read\n");
                exit(1);
        }
+       if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
+               free(mrec);
+               return(-1);
+       }
+
        if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
                fprintf(stderr, "validate_mrec_header: did not get expected "
                                "PFSD record type\n");
@@ -883,6 +1017,7 @@ validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
        if (tid_endp)
                *tid_endp = mrec->pfs.pfsd.sync_end_tid;
        free(mrec);
+       return(0);
 }
 
 static void
@@ -906,13 +1041,58 @@ update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
                        perror("update_pfs_snapshot (rewrite)");
                        exit(1);
                }
-               fprintf(stderr,
-                       "Mirror-write: Completed, updated snapshot "
-                       "to %016llx\n",
-                       snapshot_tid);
+               if (VerboseOpt >= 2) {
+                       fprintf(stderr,
+                               "Mirror-write: Completed, updated snapshot "
+                               "to %016llx\n",
+                               snapshot_tid);
+               }
        }
 }
 
+/*
+ * Bandwidth-limited write in chunks
+ */
+static
+ssize_t
+writebw(int fd, const void *buf, size_t nbytes,
+       u_int64_t *bwcount, struct timeval *tv1)
+{
+       struct timeval tv2;
+       size_t n;
+       ssize_t r;
+       ssize_t a;
+       int usec;
+
+       a = 0;
+       r = 0;
+       while (nbytes) {
+               if (*bwcount + nbytes > BandwidthOpt)
+                       n = BandwidthOpt - *bwcount;
+               else
+                       n = nbytes;
+               if (n)
+                       r = write(fd, buf, n);
+               if (r >= 0) {
+                       a += r;
+                       nbytes -= r;
+                       buf = (const char *)buf + r;
+               }
+               if ((size_t)r != n)
+                       break;
+               *bwcount += n;
+               if (*bwcount >= BandwidthOpt) {
+                       gettimeofday(&tv2, NULL);
+                       usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
+                               (int)(tv2.tv_usec - tv1->tv_usec);
+                       if (usec >= 0 && usec < 1000000)
+                               usleep(1000000 - usec);
+                       gettimeofday(tv1, NULL);
+                       *bwcount -= BandwidthOpt;
+               }
+       }
+       return(a ? a : r);
+}
 
 static void
 mirror_usage(int code)
index 619331e..0fda2f5 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  * 
- * $DragonFly: src/sbin/hammer/cmd_pseudofs.c,v 1.7 2008/07/19 18:48:14 dillon Exp $
+ * $DragonFly: src/sbin/hammer/cmd_pseudofs.c,v 1.8 2008/07/31 06:01:31 dillon Exp $
  */
 
 #include "hammer.h"
@@ -65,10 +65,12 @@ getpfs(struct hammer_ioc_pseudofs_rw *pfs, const char *path)
         * Calculate the directory containing the softlink
         */
        dirpath = strdup(path);
-       if (strrchr(dirpath, '/'))
+       if (strrchr(dirpath, '/')) {
                *strrchr(dirpath, '/') = 0;
-       else
+       } else {
+               free(dirpath);
                dirpath = strdup(".");
+       }
 
        if (lstat(path, &st) == 0 && S_ISLNK(st.st_mode)) {
                /*
@@ -143,6 +145,16 @@ done:
        return(fd);
 }
 
+void
+relpfs(int fd, struct hammer_ioc_pseudofs_rw *pfs)
+{
+       close(fd);
+       if (pfs->ondisk) {
+               free(pfs->ondisk);
+               pfs->ondisk = NULL;
+       }
+}
+
 void
 hammer_cmd_pseudofs_status(char **av, int ac)
 {
@@ -189,10 +201,12 @@ hammer_cmd_pseudofs_create(char **av, int ac, int is_slave)
        }
 
        dirpath = strdup(path);
-       if (strrchr(dirpath, '/') != NULL)
+       if (strrchr(dirpath, '/') != NULL) {
                *strrchr(dirpath, '/') = 0;
-       else
+       } else {
+               free(dirpath);
                dirpath = strdup(".");
+       }
        fd = open(dirpath, O_RDONLY);
        if (fd < 0) {
                fprintf(stderr, "Cannot open directory %s\n", dirpath);
index 5df4a8b..c3c23bf 100644 (file)
@@ -30,7 +30,7 @@
 .\" OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
 .\" SUCH DAMAGE.
 .\" 
-.\" $DragonFly: src/sbin/hammer/hammer.8,v 1.43 2008/07/27 21:59:44 thomas Exp $
+.\" $DragonFly: src/sbin/hammer/hammer.8,v 1.44 2008/07/31 06:01:31 dillon Exp $
 .Dd July 27, 2008
 .Dt HAMMER 8
 .Os
 .Sh SYNOPSIS
 .Nm
 .Op Fl h2rv
+.Op Fl b Ar bandwidth
 .Op Fl c Ar cyclefile
 .Op Fl f Ar blkdev[:blkdev]*
 .\" .Op Fl s Ar linkpath
+.Op Fl i Ar delay
 .Op Fl t Ar seconds
 .Ar command
 .Ar [argument ...]
@@ -70,6 +72,12 @@ automatically enabled by the
 command.
 .It Fl r
 Specify recursion for those commands which support it.
+.It Fl b Ar bandwidth
+Specify a bandwidth limit in bytes per second for mirroring streams.
+This option is typically used to prevent batch mirroring operations from
+loading down the machine.
+The bandwidth may be suffixed with 'k', 'm', or 'g' to specify
+values in kilobytes, megabytes, and gigabytes per second.
 .It Fl c Ar cyclefile
 When pruning and reblocking you can instruction
 .Nm
@@ -95,6 +103,11 @@ file system.
 .\" When pruning a filesystem you can instruct
 .\" .Nm to create softlinks
 .\" to available snapshots.
+.It Fl i Ar delay
+When maintaining a streaming mirroring this option specifies the
+minimum delay after a batch ends before the next batch is allowed
+to start.
+The default is five seconds.
 .It Fl t Ar seconds
 When pruning and reblocking you can tell the utility to stop after a
 certain period of time.  This option is used along with the cycle file
@@ -410,6 +423,12 @@ Set a descriptive label for this file system.
 .El
 .It Ar mirror-read Ar filesystem Op Ar <begin-tid>
 Generate a mirroring stream to stdout.
+The stream ends when the transaction id space has been exhausted.
+.It Ar mirror-read-stream Ar filesystem Op Ar <begin-tid>
+Generate a mirroring stream to stdout.
+Upon completion the stream is paused until new data is synced to the
+master, then resumed.
+Operation continues until the pipe is broken.
 .It Ar mirror-write Ar filesystem Op Ar
 Take a mirroring stream on stdin and output it to stdout.
 .Pp
@@ -441,6 +460,17 @@ If the operation completes successfully the target PFS's
 will
 be updated.  Note that you must re-chdir into the target PFS to see the
 updated information.  If you do not you will still be in the previous snapshot.
+.It Ar mirror-stream Ar [[user@]host:]filesystem Ar [[user@]host:]filesystem
+This command works similarly to
+.Ar mirror-copy
+but does not exit unless the pipe is broken.
+This command will resume the mirroring operation whenever the master is
+synced.  The command is commonly used with
+.Fl i Ar delay
+and
+.Fl b Ar bandwidth
+options to keep the mirroring target in sync with the source on a continuing
+basis.
 .El
 .\".Sh EXAMPLES
 .Sh PSEUDO FILESYSTEM (PFS) NOTES
index 7e31e8c..34c5c01 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  * 
- * $DragonFly: src/sbin/hammer/hammer.c,v 1.34 2008/07/16 00:53:48 thomas Exp $
+ * $DragonFly: src/sbin/hammer/hammer.c,v 1.35 2008/07/31 06:01:32 dillon Exp $
  */
 
 #include "hammer.h"
@@ -47,21 +47,43 @@ int VerboseOpt;
 int NoSyncOpt;
 int TwoWayPipeOpt;
 int TimeoutOpt;
+int DelayOpt = 5;
+u_int64_t BandwidthOpt;
 const char *CyclePath;
 const char *LinkPath;
 
 int
 main(int ac, char **av)
 {
-       int ch;
-       u_int32_t status;
        char *blkdevs = NULL;
+       char *ptr;
+       u_int32_t status;
+       int ch;
 
-       while ((ch = getopt(ac, av, "c:dhf:rs:t:v2")) != -1) {
+       while ((ch = getopt(ac, av, "b:c:dhf:i:rs:t:v2")) != -1) {
                switch(ch) {
                case '2':
                        TwoWayPipeOpt = 1;
                        break;
+               case 'b':
+                       BandwidthOpt = strtoull(optarg, &ptr, 0);
+                       switch(*ptr) {
+                       case 'g':
+                       case 'G':
+                               BandwidthOpt *= 1024;
+                               /* fall through */
+                       case 'm':
+                       case 'M':
+                               BandwidthOpt *= 1024;
+                               /* fall through */
+                       case 'k':
+                       case 'K':
+                               BandwidthOpt *= 1024;
+                               break;
+                       default:
+                               usage(1);
+                       }
+                       break;
                case 'c':
                        CyclePath = optarg;
                        break;
@@ -71,6 +93,9 @@ main(int ac, char **av)
                case 'h':
                        usage(0);
                        /* not reached */
+               case 'i':
+                       DelayOpt = strtol(optarg, NULL, 0);
+                       break;
                case 'r':
                        RecurseOpt = 1;
                        break;
@@ -200,11 +225,15 @@ main(int ac, char **av)
        }
        if (strncmp(av[0], "mirror", 6) == 0) {
                if (strcmp(av[0], "mirror-read") == 0)
-                       hammer_cmd_mirror_read(av + 1, ac - 1);
+                       hammer_cmd_mirror_read(av + 1, ac - 1, 0);
+               if (strcmp(av[0], "mirror-read-stream") == 0)
+                       hammer_cmd_mirror_read(av + 1, ac - 1, 1);
                else if (strcmp(av[0], "mirror-write") == 0)
                        hammer_cmd_mirror_write(av + 1, ac - 1);
                else if (strcmp(av[0], "mirror-copy") == 0)
-                       hammer_cmd_mirror_copy(av + 1, ac - 1);
+                       hammer_cmd_mirror_copy(av + 1, ac - 1, 0);
+               else if (strcmp(av[0], "mirror-stream") == 0)
+                       hammer_cmd_mirror_copy(av + 1, ac - 1, 1);
                else if (strcmp(av[0], "mirror-dump") == 0)
                        hammer_cmd_mirror_dump();
                else
index 802d31d..8f446c4 100644 (file)
@@ -31,7 +31,7 @@
  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  * 
- * $DragonFly: src/sbin/hammer/hammer.h,v 1.23 2008/07/12 02:48:46 dillon Exp $
+ * $DragonFly: src/sbin/hammer/hammer.h,v 1.24 2008/07/31 06:01:32 dillon Exp $
  */
 
 #include <sys/types.h>
@@ -64,6 +64,8 @@ extern int RecurseOpt;
 extern int VerboseOpt;
 extern int TwoWayPipeOpt;
 extern int TimeoutOpt;
+extern int DelayOpt;
+extern u_int64_t BandwidthOpt;
 extern const char *LinkPath;
 extern const char *CyclePath;
 
@@ -74,9 +76,9 @@ void hammer_cmd_softprune(char **av, int ac, int everything_opt);
 void hammer_cmd_bstats(char **av, int ac);
 void hammer_cmd_iostats(char **av, int ac);
 void hammer_cmd_synctid(char **av, int ac);
-void hammer_cmd_mirror_read(char **av, int ac);
+void hammer_cmd_mirror_read(char **av, int ac, int streaming);
 void hammer_cmd_mirror_write(char **av, int ac);
-void hammer_cmd_mirror_copy(char **av, int ac);
+void hammer_cmd_mirror_copy(char **av, int ac, int streaming);
 void hammer_cmd_mirror_dump(void);
 void hammer_cmd_history(const char *offset_str, char **av, int ac);
 void hammer_cmd_blockmap(void);
@@ -95,4 +97,5 @@ void hammer_set_cycle(hammer_base_elm_t base, hammer_tid_t tid);
 void hammer_reset_cycle(void);
 
 int getpfs(struct hammer_ioc_pseudofs_rw *pfs, const char *path);
+void relpfs(int fd, struct hammer_ioc_pseudofs_rw *pfs);