sys/vfs/hammer: Add typedef hammer_uuid_t
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer.h"
36
37 #define LINE1   0,20
38 #define LINE2   20,78
39 #define LINE3   90,70
40
41 #define SERIALBUF_SIZE  (512 * 1024)
42
43 typedef struct histogram {
44         hammer_tid_t    tid;
45         uint64_t        bytes;
46 } *histogram_t;
47
48 const char *ScoreBoardFile;
49 const char *RestrictTarget;
50
51 static int read_mrecords(int fd, char *buf, u_int size,
52                          struct hammer_ioc_mrecord_head *pickup);
53 static int generate_histogram(int fd, const char *filesystem,
54                          histogram_t *histogram_ary,
55                          struct hammer_ioc_mirror_rw *mirror_base,
56                          int *repeatp);
57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
58                          struct hammer_ioc_mrecord_head *pickup);
59 static void write_mrecord(int fdout, uint32_t type,
60                          hammer_ioc_mrecord_any_t mrec, int bytes);
61 static void generate_mrec_header(int fd, int pfs_id,
62                          union hammer_ioc_mrecord_any *mrec_tmp);
63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
64                          struct hammer_ioc_mrecord_head *pickup,
65                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
67 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
68                         uint64_t *bwcount, struct timeval *tv1);
69 static int getyntty(void);
70 static void score_printf(size_t i, size_t w, const char *ctl, ...);
71 static void hammer_check_restrict(const char *filesystem);
72 static void mirror_usage(int code);
73
74 /*
75  * Generate a mirroring data stream from the specific source over the
76  * entire key range, but restricted to the specified transaction range.
77  *
78  * The HAMMER VFS does most of the work, we add a few new mrecord
79  * types to negotiate the TID ranges and verify that the entire
80  * stream made it to the destination.
81  *
82  * streaming will be 0 for mirror-read, 1 for mirror-stream.  The code will
83  * set up a fake value of -1 when running the histogram for mirror-read.
84  */
85 void
86 hammer_cmd_mirror_read(char **av, int ac, int streaming)
87 {
88         struct hammer_ioc_mirror_rw mirror;
89         struct hammer_ioc_pseudofs_rw pfs;
90         union hammer_ioc_mrecord_any mrec_tmp;
91         struct hammer_ioc_mrecord_head pickup;
92         hammer_ioc_mrecord_any_t mrec;
93         hammer_tid_t sync_tid;
94         histogram_t histogram_ary;
95         const char *filesystem;
96         char *buf = malloc(SERIALBUF_SIZE);
97         int interrupted = 0;
98         int error;
99         int fd;
100         int n;
101         int didwork;
102         int histogram;
103         int histindex;
104         int histmax;
105         int repeat = 0;
106         int sameline;
107         int64_t total_bytes;
108         time_t base_t = time(NULL);
109         struct timeval bwtv;
110         uint64_t bwcount;
111         uint64_t estbytes;
112
113         if (ac == 0 || ac > 2) {
114                 mirror_usage(1);
115                 /* not reached */
116         }
117         filesystem = av[0];
118         hammer_check_restrict(filesystem);
119
120         pickup.signature = 0;
121         pickup.type = 0;
122         histogram = 0;
123         histindex = 0;
124         histmax = 0;
125         histogram_ary = NULL;
126         sameline = 0;
127
128 again:
129         bzero(&mirror, sizeof(mirror));
130         hammer_key_beg_init(&mirror.key_beg);
131         hammer_key_end_init(&mirror.key_end);
132
133         fd = getpfs(&pfs, filesystem);
134
135         if (streaming >= 0)
136                 score_printf(LINE1, "Running");
137
138         if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) {
139                 fprintf(stderr, "%cRunning  \b\b", (sameline ? '\r' : '\n'));
140                 fflush(stderr);
141                 sameline = 1;
142         }
143         sameline = 1;
144         total_bytes = 0;
145         gettimeofday(&bwtv, NULL);
146         bwcount = 0;
147
148         /*
149          * Send initial header for the purpose of determining the
150          * shared-uuid.
151          */
152         generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
153         write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
154                       &mrec_tmp, sizeof(mrec_tmp.pfs));
155
156         /*
157          * In 2-way mode the target will send us a PFS info packet
158          * first.  Use the target's current snapshot TID as our default
159          * begin TID.
160          */
161         if (TwoWayPipeOpt) {
162                 mirror.tid_beg = 0;
163                 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
164                                          NULL, &mirror.tid_beg);
165                 if (n < 0) {    /* got TERM record */
166                         relpfs(fd, &pfs);
167                         free(buf);
168                         free(histogram_ary);
169                         return;
170                 }
171                 ++mirror.tid_beg;
172         } else if (streaming && histogram) {
173                 mirror.tid_beg = histogram_ary[histindex].tid + 1;
174         } else {
175                 mirror.tid_beg = 0;
176         }
177
178         /*
179          * Write out the PFS header, tid_beg will be updated if our PFS
180          * has a larger begin sync.  tid_end is set to the latest source
181          * TID whos flush cycle has completed.
182          */
183         generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
184         if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid)
185                 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid;
186         mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid;
187         mirror.ubuf = buf;
188         mirror.size = SERIALBUF_SIZE;
189         mirror.pfs_id = pfs.pfs_id;
190         mirror.shared_uuid = pfs.ondisk->shared_uuid;
191
192         /*
193          * XXX If the histogram is exhausted and the TID delta is large
194          *     the stream might have been offline for a while and is
195          *     now picking it up again.  Do another histogram.
196          */
197 #if 0
198         if (streaming && histogram && histindex == histend) {
199                 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM)
200                         histogram = 0;
201         }
202 #endif
203
204         /*
205          * Initial bulk startup control, try to do some incremental
206          * mirroring in order to allow the stream to be killed and
207          * restarted without having to start over.
208          */
209         if (histogram == 0 && BulkOpt == 0) {
210                 if (VerboseOpt && repeat == 0) {
211                         fprintf(stderr, "\n");
212                         sameline = 0;
213                 }
214                 histmax = generate_histogram(fd, filesystem,
215                                              &histogram_ary, &mirror,
216                                              &repeat);
217                 histindex = 0;
218                 histogram = 1;
219
220                 /*
221                  * Just stream the histogram, then stop
222                  */
223                 if (streaming == 0)
224                         streaming = -1;
225         }
226
227         if (streaming && histogram) {
228                 ++histindex;
229                 mirror.tid_end = histogram_ary[histindex].tid;
230                 estbytes = histogram_ary[histindex-1].bytes;
231                 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end;
232         } else {
233                 estbytes = 0;
234         }
235
236         write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
237                       &mrec_tmp, sizeof(mrec_tmp.pfs));
238
239         /*
240          * A cycle file overrides the beginning TID only if we are
241          * not operating in two-way or histogram mode.
242          */
243         if (TwoWayPipeOpt == 0 && histogram == 0)
244                 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
245
246         /*
247          * An additional argument overrides the beginning TID regardless
248          * of what mode we are in.  This is not recommending if operating
249          * in two-way mode.
250          */
251         if (ac == 2)
252                 mirror.tid_beg = strtoull(av[1], NULL, 0);
253
254         if (streaming == 0 || VerboseOpt >= 2) {
255                 fprintf(stderr,
256                         "Mirror-read: Mirror %016jx to %016jx",
257                         (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end);
258                 if (histogram)
259                         fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes);
260                 fprintf(stderr, "\n");
261                 fflush(stderr);
262         }
263         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
264                 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n",
265                         (uintmax_t)mirror.key_beg.obj_id);
266         }
267
268         /*
269          * Nothing to do if begin equals end.
270          */
271         if (mirror.tid_beg >= mirror.tid_end) {
272                 if (streaming == 0 || VerboseOpt >= 2)
273                         fprintf(stderr, "Mirror-read: No work to do\n");
274                 sleep(DelayOpt);
275                 didwork = 0;
276                 histogram = 0;
277                 goto done;
278         }
279         didwork = 1;
280
281         /*
282          * Write out bulk records
283          */
284         mirror.ubuf = buf;
285         mirror.size = SERIALBUF_SIZE;
286
287         do {
288                 mirror.count = 0;
289                 mirror.pfs_id = pfs.pfs_id;
290                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
291                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
292                         score_printf(LINE3, "Mirror-read %s failed: %s",
293                                      filesystem, strerror(errno));
294                         err(1, "Mirror-read %s failed", filesystem);
295                         /* not reached */
296                 }
297                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
298                         score_printf(LINE3, "Mirror-read %s fatal error %d",
299                                      filesystem, mirror.head.error);
300                         errx(1, "Mirror-read %s fatal error %d",
301                                 filesystem, mirror.head.error);
302                         /* not reached */
303                 }
304                 if (mirror.count) {
305                         if (BandwidthOpt) {
306                                 n = writebw(1, mirror.ubuf, mirror.count,
307                                             &bwcount, &bwtv);
308                         } else {
309                                 n = write(1, mirror.ubuf, mirror.count);
310                         }
311                         if (n != mirror.count) {
312                                 score_printf(LINE3,
313                                              "Mirror-read %s failed: "
314                                              "short write",
315                                              filesystem);
316                                 errx(1, "Mirror-read %s failed: short write",
317                                         filesystem);
318                                 /* not reached */
319                         }
320                 }
321                 total_bytes += mirror.count;
322                 if (streaming && VerboseOpt) {
323                         fprintf(stderr,
324                                 "\rscan obj=%016jx tids=%016jx:%016jx %11jd",
325                                 (uintmax_t)mirror.key_cur.obj_id,
326                                 (uintmax_t)mirror.tid_beg,
327                                 (uintmax_t)mirror.tid_end,
328                                 (intmax_t)total_bytes);
329                         fflush(stderr);
330                         sameline = 0;
331                 } else if (streaming) {
332                         score_printf(LINE2,
333                                 "obj=%016jx tids=%016jx:%016jx %11jd",
334                                 (uintmax_t)mirror.key_cur.obj_id,
335                                 (uintmax_t)mirror.tid_beg,
336                                 (uintmax_t)mirror.tid_end,
337                                 (intmax_t)total_bytes);
338                 }
339                 mirror.key_beg = mirror.key_cur;
340
341                 /*
342                  * Deal with time limit option
343                  */
344                 if (TimeoutOpt &&
345                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
346                         score_printf(LINE3,
347                                 "Mirror-read %s interrupted by timer at"
348                                 " %016jx",
349                                 filesystem,
350                                 (uintmax_t)mirror.key_cur.obj_id);
351                         fprintf(stderr,
352                                 "Mirror-read %s interrupted by timer at"
353                                 " %016jx\n",
354                                 filesystem,
355                                 (uintmax_t)mirror.key_cur.obj_id);
356                         interrupted = 1;
357                         break;
358                 }
359         } while (mirror.count != 0);
360
361 done:
362         if (streaming && VerboseOpt && sameline == 0) {
363                 fprintf(stderr, "\n");
364                 fflush(stderr);
365                 sameline = 1;
366         }
367
368         /*
369          * Write out the termination sync record - only if not interrupted
370          */
371         if (interrupted == 0) {
372                 if (didwork) {
373                         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
374                                       &mrec_tmp, sizeof(mrec_tmp.sync));
375                 } else {
376                         write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
377                                       &mrec_tmp, sizeof(mrec_tmp.sync));
378                 }
379         }
380
381         /*
382          * If the -2 option was given (automatic when doing mirror-copy),
383          * a two-way pipe is assumed and we expect a response mrec from
384          * the target.
385          */
386         if (TwoWayPipeOpt) {
387                 mrec = read_mrecord(0, &error, &pickup);
388                 if (mrec == NULL ||
389                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
390                     mrec->head.rec_size != sizeof(mrec->update)) {
391                         errx(1, "mirror_read: Did not get final "
392                                 "acknowledgement packet from target");
393                         /* not reached */
394                 }
395                 if (interrupted) {
396                         if (CyclePath) {
397                                 hammer_set_cycle(&mirror.key_cur,
398                                                  mirror.tid_beg);
399                                 fprintf(stderr, "Cyclefile %s updated for "
400                                         "continuation\n", CyclePath);
401                         }
402                 } else {
403                         sync_tid = mrec->update.tid;
404                         if (CyclePath) {
405                                 hammer_key_beg_init(&mirror.key_beg);
406                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
407                                 fprintf(stderr,
408                                         "Cyclefile %s updated to 0x%016jx\n",
409                                         CyclePath, (uintmax_t)sync_tid);
410                         }
411                 }
412                 free(mrec);
413         } else if (CyclePath) {
414                 /* NOTE! mirror.tid_beg cannot be updated */
415                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
416                                 "fully updated unless you use mirror-copy\n");
417                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
418         }
419         if (streaming && interrupted == 0) {
420                 time_t t1 = time(NULL);
421                 time_t t2;
422
423                 /*
424                  * Try to break down large bulk transfers into smaller ones
425                  * so it can sync the transaction id on the slave.  This
426                  * way if we get interrupted a restart doesn't have to
427                  * start from scratch.
428                  */
429                 if (streaming && histogram) {
430                         if (histindex != histmax) {
431                                 if (VerboseOpt && VerboseOpt < 2 &&
432                                     streaming >= 0) {
433                                         fprintf(stderr, " (bulk incremental)");
434                                 }
435                                 relpfs(fd, &pfs);
436                                 goto again;
437                         }
438                 }
439
440                 if (VerboseOpt && streaming >= 0) {
441                         fprintf(stderr, " W");
442                         fflush(stderr);
443                 } else if (streaming >= 0) {
444                         score_printf(LINE1, "Waiting");
445                 }
446                 pfs.ondisk->sync_end_tid = mirror.tid_end;
447                 if (streaming < 0) {
448                         /*
449                          * Fake streaming mode when using a histogram to
450                          * break up a mirror-read, do not wait on source.
451                          */
452                         streaming = 0;
453                 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
454                         score_printf(LINE3,
455                                      "Mirror-read %s: cannot stream: %s\n",
456                                      filesystem, strerror(errno));
457                         fprintf(stderr,
458                                 "Mirror-read %s: cannot stream: %s\n",
459                                 filesystem, strerror(errno));
460                 } else {
461                         t2 = time(NULL) - t1;
462                         if (t2 >= 0 && t2 < DelayOpt) {
463                                 if (VerboseOpt) {
464                                         fprintf(stderr, "\bD");
465                                         fflush(stderr);
466                                 }
467                                 sleep(DelayOpt - t2);
468                         }
469                         if (VerboseOpt) {
470                                 fprintf(stderr, "\b ");
471                                 fflush(stderr);
472                         }
473                         relpfs(fd, &pfs);
474                         goto again;
475                 }
476         }
477         write_mrecord(1, HAMMER_MREC_TYPE_TERM,
478                       &mrec_tmp, sizeof(mrec_tmp.sync));
479         relpfs(fd, &pfs);
480         free(buf);
481         free(histogram_ary);
482         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
483 }
484
485 /*
486  * What we are trying to do here is figure out how much data is
487  * going to be sent for the TID range and to break the TID range
488  * down into reasonably-sized slices (from the point of view of
489  * data sent) so a lost connection can restart at a reasonable
490  * place and not all the way back at the beginning.
491  *
492  * An entry's TID serves as the end_tid for the prior entry
493  * So we have to offset the calculation by 1 so that TID falls into
494  * the previous entry when populating entries.
495  *
496  * Because the transaction id space is bursty we need a relatively
497  * large number of buckets (like a million) to do a reasonable job
498  * for things like an initial bulk mirrors on a very large filesystem.
499  */
500 #define HIST_COUNT      (1024 * 1024)
501
502 static
503 int
504 generate_histogram(int fd, const char *filesystem,
505                    histogram_t *histogram_ary,
506                    struct hammer_ioc_mirror_rw *mirror_base,
507                    int *repeatp)
508 {
509         struct hammer_ioc_mirror_rw mirror;
510         union hammer_ioc_mrecord_any *mrec;
511         hammer_tid_t tid_beg;
512         hammer_tid_t tid_end;
513         hammer_tid_t tid;
514         hammer_tid_t tidx;
515         uint64_t *tid_bytes;
516         uint64_t total;
517         uint64_t accum;
518         int chunkno;
519         int i;
520         int res;
521         int off;
522         int len;
523
524         mirror = *mirror_base;
525         tid_beg = mirror.tid_beg;
526         tid_end = mirror.tid_end;
527         mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA;
528
529         if (*histogram_ary == NULL) {
530                 *histogram_ary = malloc(sizeof(struct histogram) *
531                                         (HIST_COUNT + 2));
532         }
533         if (tid_beg >= tid_end)
534                 return(0);
535
536         /* needs 2 extra */
537         tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2));
538         bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2));
539
540         if (*repeatp == 0) {
541                 fprintf(stderr, "Prescan to break up bulk transfer");
542                 if (VerboseOpt > 1) {
543                         fprintf(stderr, " (%juMB chunks)",
544                                 (uintmax_t)(SplitupOpt / (1024 * 1024)));
545                 }
546                 fprintf(stderr, "\n");
547         }
548
549         /*
550          * Note: (tid_beg,tid_end), range is inclusive of both beg & end.
551          *
552          * Note: Estimates can be off when the mirror is way behind due
553          *       to skips.
554          */
555         total = 0;
556         accum = 0;
557         chunkno = 0;
558         for (;;) {
559                 mirror.count = 0;
560                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
561                         err(1, "Mirror-read %s failed", filesystem);
562                         /* not reached */
563                 }
564                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
565                         errx(1, "Mirror-read %s fatal error %d",
566                                 filesystem, mirror.head.error);
567                         /* not reached */
568                 }
569                 for (off = 0;
570                      off < mirror.count;
571                      off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) {
572                         mrec = (void *)((char *)mirror.ubuf + off);
573
574                         /*
575                          * We only care about general RECs and PASS
576                          * records.  We ignore SKIPs.
577                          */
578                         switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) {
579                         case HAMMER_MREC_TYPE_REC:
580                         case HAMMER_MREC_TYPE_PASS:
581                                 break;
582                         default:
583                                 continue;
584                         }
585
586                         /*
587                          * Calculate for two indices, create_tid and
588                          * delete_tid.  Record data only applies to
589                          * the create_tid.
590                          *
591                          * When tid is exactly on the boundary it really
592                          * belongs to the previous entry because scans
593                          * are inclusive of the ending entry.
594                          */
595                         tid = mrec->rec.leaf.base.delete_tid;
596                         if (tid && tid >= tid_beg && tid <= tid_end) {
597                                 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
598                                 if (mrec->head.type ==
599                                     HAMMER_MREC_TYPE_REC) {
600                                         len -= HAMMER_HEAD_DOALIGN(
601                                                     mrec->rec.leaf.data_len);
602                                         assert(len > 0);
603                                 }
604                                 i = (tid - tid_beg) * HIST_COUNT /
605                                     (tid_end - tid_beg);
606                                 tidx = tid_beg + i * (tid_end - tid_beg) /
607                                                  HIST_COUNT;
608                                 if (tid == tidx && i)
609                                         --i;
610                                 assert(i >= 0 && i < HIST_COUNT);
611                                 tid_bytes[i] += len;
612                                 total += len;
613                                 accum += len;
614                         }
615
616                         tid = mrec->rec.leaf.base.create_tid;
617                         if (tid && tid >= tid_beg && tid <= tid_end) {
618                                 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
619                                 if (mrec->head.type ==
620                                     HAMMER_MREC_TYPE_REC_NODATA) {
621                                         len += HAMMER_HEAD_DOALIGN(
622                                                     mrec->rec.leaf.data_len);
623                                 }
624                                 i = (tid - tid_beg) * HIST_COUNT /
625                                     (tid_end - tid_beg);
626                                 tidx = tid_beg + i * (tid_end - tid_beg) /
627                                                  HIST_COUNT;
628                                 if (tid == tidx && i)
629                                         --i;
630                                 assert(i >= 0 && i < HIST_COUNT);
631                                 tid_bytes[i] += len;
632                                 total += len;
633                                 accum += len;
634                         }
635                 }
636                 if (*repeatp == 0 && accum > SplitupOpt) {
637                         if (VerboseOpt > 1) {
638                                 fprintf(stderr, ".");
639                                 fflush(stderr);
640                         }
641                         ++chunkno;
642                         score_printf(LINE2, "Prescan chunk %d", chunkno);
643                         accum = 0;
644                 }
645                 if (mirror.count == 0)
646                         break;
647                 mirror.key_beg = mirror.key_cur;
648         }
649
650         /*
651          * Reduce to SplitupOpt (default 4GB) chunks.  This code may
652          * use up to two additional elements.  Do the array in-place.
653          *
654          * Inefficient degenerate cases can occur if we do not accumulate
655          * at least the requested split amount, so error on the side of
656          * going over a bit.
657          */
658         res = 0;
659         (*histogram_ary)[res].tid = tid_beg;
660         (*histogram_ary)[res].bytes = tid_bytes[0];
661         for (i = 1; i < HIST_COUNT; ++i) {
662                 if ((*histogram_ary)[res].bytes >= SplitupOpt) {
663                         ++res;
664                         (*histogram_ary)[res].tid = tid_beg +
665                                         i * (tid_end - tid_beg) /
666                                         HIST_COUNT;
667                         (*histogram_ary)[res].bytes = 0;
668
669                 }
670                 (*histogram_ary)[res].bytes += tid_bytes[i];
671         }
672         ++res;
673         (*histogram_ary)[res].tid = tid_end;
674         (*histogram_ary)[res].bytes = -1;
675
676         if (*repeatp == 0) {
677                 if (VerboseOpt > 1)
678                         fprintf(stderr, "\n");  /* newline after ... */
679                 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes",
680                         res, (uintmax_t)total / (1024 * 1024));
681                 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (",
682                         res, (uintmax_t)total / (1024 * 1024));
683                 for (i = 0; i < res && i < 3; ++i) {
684                         if (i)
685                                 fprintf(stderr, ", ");
686                         fprintf(stderr, "%ju",
687                                 (uintmax_t)(*histogram_ary)[i].bytes);
688                 }
689                 if (i < res)
690                         fprintf(stderr, ", ...");
691                 fprintf(stderr, ")\n");
692         }
693         assert(res <= HIST_COUNT);
694         *repeatp = 1;
695
696         free(tid_bytes);
697         return(res);
698 }
699
700 static
701 void
702 create_pfs(const char *filesystem, hammer_uuid_t *s_uuid)
703 {
704         if (ForceYesOpt == 1) {
705                 fprintf(stderr, "PFS slave %s does not exist. "
706                         "Auto create new slave PFS!\n", filesystem);
707
708         } else {
709                 fprintf(stderr, "PFS slave %s does not exist.\n"
710                         "Do you want to create a new slave PFS? [y/n] ",
711                         filesystem);
712                 fflush(stderr);
713                 if (getyntty() != 1) {
714                         errx(1, "Aborting operation");
715                         /* not reached */
716                 }
717         }
718
719         char *shared_uuid = NULL;
720         hammer_uuid_to_string(s_uuid, &shared_uuid);
721
722         char *cmd = NULL;
723         asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2",
724                  filesystem, shared_uuid);
725         free(shared_uuid);
726
727         if (cmd == NULL) {
728                 errx(1, "Failed to alloc memory");
729                 /* not reached */
730         }
731         if (system(cmd) != 0)
732                 fprintf(stderr, "Failed to create PFS\n");
733         free(cmd);
734 }
735
736 /*
737  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
738  * some additional packet types to negotiate TID ranges and to verify
739  * completion.  The HAMMER VFS does most of the work.
740  *
741  * It is important to note that the mirror.key_{beg,end} range must
742  * match the ranged used by the original.  For now both sides use
743  * range the entire key space.
744  *
745  * It is even more important that the records in the stream conform
746  * to the TID range also supplied in the stream.  The HAMMER VFS will
747  * use the REC, PASS, and SKIP record types to track the portions of
748  * the B-Tree being scanned in order to be able to proactively delete
749  * records on the target within those active areas that are not mentioned
750  * by the source.
751  *
752  * The mirror.key_cur field is used by the VFS to do this tracking.  It
753  * must be initialized to key_beg but then is persistently updated by
754  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
755  * field you will blow up the mirror target, possibly to the point of
756  * deleting everything.  As a safety measure the HAMMER VFS simply marks
757  * the records that the source has destroyed as deleted on the target,
758  * and normal pruning operations will deal with their final disposition
759  * at some later time.
760  */
761 void
762 hammer_cmd_mirror_write(char **av, int ac)
763 {
764         struct hammer_ioc_mirror_rw mirror;
765         const char *filesystem;
766         char *buf = malloc(SERIALBUF_SIZE);
767         struct hammer_ioc_pseudofs_rw pfs;
768         struct hammer_ioc_mrecord_head pickup;
769         struct hammer_ioc_synctid synctid;
770         union hammer_ioc_mrecord_any mrec_tmp;
771         hammer_ioc_mrecord_any_t mrec;
772         struct stat st;
773         int error;
774         int fd;
775         int n;
776
777         if (ac != 1) {
778                 mirror_usage(1);
779                 /* not reached */
780         }
781         filesystem = av[0];
782         hammer_check_restrict(filesystem);
783
784         pickup.signature = 0;
785         pickup.type = 0;
786
787 again:
788         bzero(&mirror, sizeof(mirror));
789         hammer_key_beg_init(&mirror.key_beg);
790         hammer_key_end_init(&mirror.key_end);
791         mirror.key_end = mirror.key_beg;
792
793         /*
794          * Read initial packet
795          */
796         mrec = read_mrecord(0, &error, &pickup);
797         if (mrec == NULL) {
798                 if (error == 0) {
799                         errx(1, "validate_mrec_header: short read");
800                         /* not reached */
801                 }
802                 exit(1);
803         }
804         /*
805          * Validate packet
806          */
807         if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
808                 free(buf);
809                 return;
810         }
811         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
812                 errx(1, "validate_mrec_header: did not get expected "
813                         "PFSD record type");
814                 /* not reached */
815         }
816         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
817                 errx(1, "validate_mrec_header: unexpected payload size");
818                 /* not reached */
819         }
820
821         /*
822          * Create slave PFS if it doesn't yet exist
823          */
824         if (lstat(filesystem, &st) != 0)
825                 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid);
826         free(mrec);
827         mrec = NULL;
828
829         fd = getpfs(&pfs, filesystem);
830
831         /*
832          * In two-way mode the target writes out a PFS packet first.
833          * The source uses our tid_end as its tid_beg by default,
834          * picking up where it left off.
835          */
836         mirror.tid_beg = 0;
837         if (TwoWayPipeOpt) {
838                 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
839                 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid)
840                         mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid;
841                 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid;
842                 write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
843                               &mrec_tmp, sizeof(mrec_tmp.pfs));
844         }
845
846         /*
847          * Read and process the PFS header.  The source informs us of
848          * the TID range the stream represents.
849          */
850         n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
851                                  &mirror.tid_beg, &mirror.tid_end);
852         if (n < 0) {    /* got TERM record */
853                 relpfs(fd, &pfs);
854                 free(buf);
855                 return;
856         }
857
858         mirror.ubuf = buf;
859         mirror.size = SERIALBUF_SIZE;
860
861         /*
862          * Read and process bulk records (REC, PASS, and SKIP types).
863          *
864          * On your life, do NOT mess with mirror.key_cur or your mirror
865          * target may become history.
866          */
867         for (;;) {
868                 mirror.count = 0;
869                 mirror.pfs_id = pfs.pfs_id;
870                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
871                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
872                 if (mirror.size <= 0)
873                         break;
874                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
875                         err(1, "Mirror-write %s failed", filesystem);
876                         /* not reached */
877                 }
878                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
879                         errx(1, "Mirror-write %s fatal error %d",
880                                 filesystem, mirror.head.error);
881                         /* not reached */
882                 }
883 #if 0
884                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
885                         errx(1, "Mirror-write %s interrupted by timer at"
886                                 " %016llx",
887                                 filesystem,
888                                 mirror.key_cur.obj_id);
889                         /* not reached */
890                 }
891 #endif
892         }
893
894         /*
895          * Read and process the termination sync record.
896          */
897         mrec = read_mrecord(0, &error, &pickup);
898
899         if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
900                 fprintf(stderr, "Mirror-write: received termination request\n");
901                 relpfs(fd, &pfs);
902                 free(mrec);
903                 free(buf);
904                 return;
905         }
906
907         if (mrec == NULL ||
908             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
909              mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
910             mrec->head.rec_size != sizeof(mrec->sync)) {
911                 errx(1, "Mirror-write %s: Did not get termination "
912                         "sync record, or rec_size is wrong rt=%d",
913                         filesystem, (mrec ? (int)mrec->head.type : -1));
914                 /* not reached */
915         }
916
917         /*
918          * Update the PFS info on the target so the user has visibility
919          * into the new snapshot, and sync the target filesystem.
920          */
921         if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
922                 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
923
924                 bzero(&synctid, sizeof(synctid));
925                 synctid.op = HAMMER_SYNCTID_SYNC2;
926                 ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
927
928                 if (VerboseOpt >= 2) {
929                         fprintf(stderr, "Mirror-write %s: succeeded\n",
930                                 filesystem);
931                 }
932         }
933
934         free(mrec);
935         mrec = NULL;
936
937         /*
938          * Report back to the originator.
939          */
940         if (TwoWayPipeOpt) {
941                 mrec_tmp.update.tid = mirror.tid_end;
942                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
943                               &mrec_tmp, sizeof(mrec_tmp.update));
944         } else {
945                 printf("Source can update synctid to 0x%016jx\n",
946                        (uintmax_t)mirror.tid_end);
947         }
948         relpfs(fd, &pfs);
949         goto again;
950 }
951
952 void
953 hammer_cmd_mirror_dump(char **av, int ac)
954 {
955         char *buf = malloc(SERIALBUF_SIZE);
956         struct hammer_ioc_mrecord_head pickup;
957         hammer_ioc_mrecord_any_t mrec;
958         int error;
959         int size;
960         int offset;
961         int bytes;
962         int header_only = 0;
963
964         if (ac == 1 && strcmp(*av, "header") == 0) {
965                 header_only = 1;
966         } else if (ac != 0) {
967                 mirror_usage(1);
968                 /* not reached */
969         }
970
971         /*
972          * Read and process the PFS header
973          */
974         pickup.signature = 0;
975         pickup.type = 0;
976
977         mrec = read_mrecord(0, &error, &pickup);
978
979         /*
980          * Dump the PFS header. mirror-dump takes its input from the output
981          * of a mirror-read so getpfs() can't be used to get a fd to be passed
982          * to dump_pfsd().
983          */
984         if (header_only && mrec != NULL) {
985                 dump_pfsd(&mrec->pfs.pfsd, -1);
986                 free(mrec);
987                 free(buf);
988                 return;
989         }
990         free(mrec);
991
992 again:
993         /*
994          * Read and process bulk records
995          */
996         for (;;) {
997                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
998                 if (size <= 0)
999                         break;
1000                 offset = 0;
1001                 while (offset < size) {
1002                         mrec = (void *)((char *)buf + offset);
1003                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
1004                         if (offset + bytes > size) {
1005                                 errx(1, "Misaligned record");
1006                                 /* not reached */
1007                         }
1008
1009                         switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) {
1010                         case HAMMER_MREC_TYPE_REC_BADCRC:
1011                         case HAMMER_MREC_TYPE_REC:
1012                                 printf("Record lo=%08x obj=%016jx key=%016jx "
1013                                        "rt=%02x ot=%02x",
1014                                         mrec->rec.leaf.base.localization,
1015                                         (uintmax_t)mrec->rec.leaf.base.obj_id,
1016                                         (uintmax_t)mrec->rec.leaf.base.key,
1017                                         mrec->rec.leaf.base.rec_type,
1018                                         mrec->rec.leaf.base.obj_type);
1019                                 if (mrec->head.type ==
1020                                     HAMMER_MREC_TYPE_REC_BADCRC) {
1021                                         printf(" (BAD CRC)");
1022                                 }
1023                                 printf("\n");
1024                                 printf("       tids %016jx:%016jx data=%d\n",
1025                                     (uintmax_t)mrec->rec.leaf.base.create_tid,
1026                                     (uintmax_t)mrec->rec.leaf.base.delete_tid,
1027                                     mrec->rec.leaf.data_len);
1028                                 break;
1029                         case HAMMER_MREC_TYPE_PASS:
1030                                 printf("Pass   lo=%08x obj=%016jx key=%016jx "
1031                                        "rt=%02x ot=%02x\n",
1032                                         mrec->rec.leaf.base.localization,
1033                                         (uintmax_t)mrec->rec.leaf.base.obj_id,
1034                                         (uintmax_t)mrec->rec.leaf.base.key,
1035                                         mrec->rec.leaf.base.rec_type,
1036                                         mrec->rec.leaf.base.obj_type);
1037                                 printf("       tids %016jx:%016jx data=%d\n",
1038                                     (uintmax_t)mrec->rec.leaf.base.create_tid,
1039                                     (uintmax_t)mrec->rec.leaf.base.delete_tid,
1040                                         mrec->rec.leaf.data_len);
1041                                 break;
1042                         case HAMMER_MREC_TYPE_SKIP:
1043                                 printf("Skip   lo=%08x obj=%016jx key=%016jx rt=%02x to\n"
1044                                        "       lo=%08x obj=%016jx key=%016jx rt=%02x\n",
1045                                        mrec->skip.skip_beg.localization,
1046                                        (uintmax_t)mrec->skip.skip_beg.obj_id,
1047                                        (uintmax_t)mrec->skip.skip_beg.key,
1048                                        mrec->skip.skip_beg.rec_type,
1049                                        mrec->skip.skip_end.localization,
1050                                        (uintmax_t)mrec->skip.skip_end.obj_id,
1051                                        (uintmax_t)mrec->skip.skip_end.key,
1052                                        mrec->skip.skip_end.rec_type);
1053                         default:
1054                                 break;
1055                         }
1056                         offset += bytes;
1057                 }
1058         }
1059
1060         /*
1061          * Read and process the termination sync record.
1062          */
1063         mrec = read_mrecord(0, &error, &pickup);
1064         if (mrec == NULL ||
1065             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
1066              mrec->head.type != HAMMER_MREC_TYPE_IDLE)) {
1067                 fprintf(stderr, "Mirror-dump: Did not get termination "
1068                                 "sync record\n");
1069         }
1070         free(mrec);
1071
1072         /*
1073          * Continue with more batches until EOF.
1074          */
1075         mrec = read_mrecord(0, &error, &pickup);
1076         if (mrec) {
1077                 free(mrec);
1078                 goto again;
1079         }
1080         free(buf);
1081 }
1082
1083 void
1084 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
1085 {
1086         pid_t pid1;
1087         pid_t pid2;
1088         int fds[2];
1089         const char *xav[32];
1090         char tbuf[16];
1091         char *sh, *user, *host, *rfs;
1092         int xac;
1093
1094         if (ac != 2) {
1095                 mirror_usage(1);
1096                 /* not reached */
1097         }
1098
1099         TwoWayPipeOpt = 1;
1100         signal(SIGPIPE, SIG_IGN);
1101
1102 again:
1103         if (pipe(fds) < 0) {
1104                 err(1, "pipe");
1105                 /* not reached */
1106         }
1107
1108         /*
1109          * Source
1110          */
1111         if ((pid1 = fork()) == 0) {
1112                 signal(SIGPIPE, SIG_DFL);
1113                 dup2(fds[0], 0);
1114                 dup2(fds[0], 1);
1115                 close(fds[0]);
1116                 close(fds[1]);
1117                 if ((rfs = strchr(av[0], ':')) != NULL) {
1118                         xac = 0;
1119
1120                         if((sh = getenv("HAMMER_RSH")) == NULL)
1121                                 xav[xac++] = "ssh";
1122                         else
1123                                 xav[xac++] = sh;
1124
1125                         if (CompressOpt)
1126                                 xav[xac++] = "-C";
1127
1128                         if ((host = strchr(av[0], '@')) != NULL) {
1129                                 user = strndup( av[0], (host++ - av[0]));
1130                                 host = strndup( host, (rfs++ - host));
1131                                 xav[xac++] = "-l";
1132                                 xav[xac++] = user;
1133                                 xav[xac++] = host;
1134                         } else {
1135                                 host = strndup( av[0], (rfs++ - av[0]));
1136                                 user = NULL;
1137                                 xav[xac++] = host;
1138                         }
1139
1140
1141                         if (SshPort) {
1142                                 xav[xac++] = "-p";
1143                                 xav[xac++] = SshPort;
1144                         }
1145
1146                         xav[xac++] = "hammer";
1147
1148                         switch(VerboseOpt) {
1149                         case 0:
1150                                 break;
1151                         case 1:
1152                                 xav[xac++] = "-v";
1153                                 break;
1154                         case 2:
1155                                 xav[xac++] = "-vv";
1156                                 break;
1157                         default:
1158                                 xav[xac++] = "-vvv";
1159                                 break;
1160                         }
1161                         if (ForceYesOpt)
1162                                 xav[xac++] = "-y";
1163                         xav[xac++] = "-2";
1164                         if (TimeoutOpt) {
1165                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
1166                                 xav[xac++] = "-t";
1167                                 xav[xac++] = tbuf;
1168                         }
1169                         if (SplitupOptStr) {
1170                                 xav[xac++] = "-S";
1171                                 xav[xac++] = SplitupOptStr;
1172                         }
1173                         if (streaming)
1174                                 xav[xac++] = "mirror-read-stream";
1175                         else
1176                                 xav[xac++] = "mirror-read";
1177                         xav[xac++] = rfs;
1178                         xav[xac++] = NULL;
1179                         execvp(*xav, (void *)xav);
1180                 } else {
1181                         hammer_cmd_mirror_read(av, 1, streaming);
1182                         fflush(stdout);
1183                         fflush(stderr);
1184                 }
1185                 _exit(1);
1186         }
1187
1188         /*
1189          * Target
1190          */
1191         if ((pid2 = fork()) == 0) {
1192                 signal(SIGPIPE, SIG_DFL);
1193                 dup2(fds[1], 0);
1194                 dup2(fds[1], 1);
1195                 close(fds[0]);
1196                 close(fds[1]);
1197                 if ((rfs = strchr(av[1], ':')) != NULL) {
1198                         xac = 0;
1199
1200                         if((sh = getenv("HAMMER_RSH")) == NULL)
1201                                 xav[xac++] = "ssh";
1202                         else
1203                                 xav[xac++] = sh;
1204
1205                         if (CompressOpt)
1206                                 xav[xac++] = "-C";
1207
1208                         if ((host = strchr(av[1], '@')) != NULL) {
1209                                 user = strndup( av[1], (host++ - av[1]));
1210                                 host = strndup( host, (rfs++ - host));
1211                                 xav[xac++] = "-l";
1212                                 xav[xac++] = user;
1213                                 xav[xac++] = host;
1214                         } else {
1215                                 host = strndup( av[1], (rfs++ - av[1]));
1216                                 user = NULL;
1217                                 xav[xac++] = host;
1218                         }
1219
1220                         if (SshPort) {
1221                                 xav[xac++] = "-p";
1222                                 xav[xac++] = SshPort;
1223                         }
1224
1225                         xav[xac++] = "hammer";
1226
1227                         switch(VerboseOpt) {
1228                         case 0:
1229                                 break;
1230                         case 1:
1231                                 xav[xac++] = "-v";
1232                                 break;
1233                         case 2:
1234                                 xav[xac++] = "-vv";
1235                                 break;
1236                         default:
1237                                 xav[xac++] = "-vvv";
1238                                 break;
1239                         }
1240                         if (ForceYesOpt)
1241                                 xav[xac++] = "-y";
1242                         xav[xac++] = "-2";
1243                         xav[xac++] = "mirror-write";
1244                         xav[xac++] = rfs;
1245                         xav[xac++] = NULL;
1246                         execvp(*xav, (void *)xav);
1247                 } else {
1248                         hammer_cmd_mirror_write(av + 1, 1);
1249                         fflush(stdout);
1250                         fflush(stderr);
1251                 }
1252                 _exit(1);
1253         }
1254         close(fds[0]);
1255         close(fds[1]);
1256
1257         while (waitpid(pid1, NULL, 0) <= 0)
1258                 ;
1259         while (waitpid(pid2, NULL, 0) <= 0)
1260                 ;
1261
1262         /*
1263          * If the link is lost restart
1264          */
1265         if (streaming) {
1266                 if (VerboseOpt) {
1267                         fprintf(stderr, "\nLost Link\n");
1268                         fflush(stderr);
1269                 }
1270                 sleep(15 + DelayOpt);
1271                 goto again;
1272         }
1273
1274 }
1275
1276 /*
1277  * Read and return multiple mrecords
1278  */
1279 static
1280 int
1281 read_mrecords(int fd, char *buf, u_int size, struct hammer_ioc_mrecord_head *pickup)
1282 {
1283         hammer_ioc_mrecord_any_t mrec;
1284         u_int count;
1285         size_t n;
1286         size_t i;
1287         size_t bytes;
1288         int type;
1289
1290         count = 0;
1291         while (size - count >= HAMMER_MREC_HEADSIZE) {
1292                 /*
1293                  * Cached the record header in case we run out of buffer
1294                  * space.
1295                  */
1296                 fflush(stdout);
1297                 if (pickup->signature == 0) {
1298                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
1299                                 i = read(fd, (char *)pickup + n,
1300                                          HAMMER_MREC_HEADSIZE - n);
1301                                 if (i <= 0)
1302                                         break;
1303                         }
1304                         if (n == 0)
1305                                 break;
1306                         if (n != HAMMER_MREC_HEADSIZE) {
1307                                 errx(1, "read_mrecords: short read on pipe");
1308                                 /* not reached */
1309                         }
1310                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
1311                                 errx(1, "read_mrecords: malformed record on pipe, "
1312                                         "bad signature");
1313                                 /* not reached */
1314                         }
1315                 }
1316                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
1317                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
1318                         errx(1, "read_mrecords: malformed record on pipe, "
1319                                 "illegal rec_size");
1320                         /* not reached */
1321                 }
1322
1323                 /*
1324                  * Stop if we have insufficient space for the record and data.
1325                  */
1326                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
1327                 if (size - count < bytes)
1328                         break;
1329
1330                 /*
1331                  * Stop if the record type is not a REC, SKIP, or PASS,
1332                  * which are the only types the ioctl supports.  Other types
1333                  * are used only by the userland protocol.
1334                  *
1335                  * Ignore all flags.
1336                  */
1337                 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK;
1338                 if (type != HAMMER_MREC_TYPE_PFSD &&
1339                     type != HAMMER_MREC_TYPE_REC &&
1340                     type != HAMMER_MREC_TYPE_SKIP &&
1341                     type != HAMMER_MREC_TYPE_PASS) {
1342                         break;
1343                 }
1344
1345                 /*
1346                  * Read the remainder and clear the pickup signature.
1347                  */
1348                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
1349                         i = read(fd, buf + count + n, bytes - n);
1350                         if (i <= 0)
1351                                 break;
1352                 }
1353                 if (n != bytes) {
1354                         errx(1, "read_mrecords: short read on pipe");
1355                         /* not reached */
1356                 }
1357
1358                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
1359                 pickup->signature = 0;
1360                 pickup->type = 0;
1361                 mrec = (void *)(buf + count);
1362
1363                 /*
1364                  * Validate the completed record
1365                  */
1366                 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) {
1367                         errx(1, "read_mrecords: malformed record on pipe, bad crc");
1368                         /* not reached */
1369                 }
1370
1371                 /*
1372                  * If its a B-Tree record validate the data crc.
1373                  *
1374                  * NOTE: If the VFS passes us an explicitly errorde mrec
1375                  *       we just pass it through.
1376                  */
1377                 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK;
1378
1379                 if (type == HAMMER_MREC_TYPE_REC) {
1380                         if (mrec->head.rec_size <
1381                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
1382                                 errx(1, "read_mrecords: malformed record on "
1383                                         "pipe, illegal element data_len");
1384                                 /* not reached */
1385                         }
1386                         if (mrec->rec.leaf.data_len &&
1387                             mrec->rec.leaf.data_offset &&
1388                             hammer_crc_test_leaf(HammerVersion, &mrec->rec + 1, &mrec->rec.leaf) == 0) {
1389                                 fprintf(stderr,
1390                                         "read_mrecords: data_crc did not "
1391                                         "match data! obj=%016jx key=%016jx\n",
1392                                         (uintmax_t)mrec->rec.leaf.base.obj_id,
1393                                         (uintmax_t)mrec->rec.leaf.base.key);
1394                                 fprintf(stderr,
1395                                         "continuing, but there are problems\n");
1396                         }
1397                 }
1398                 count += bytes;
1399         }
1400         return(count);
1401 }
1402
1403 /*
1404  * Read and return a single mrecord.
1405  */
1406 static
1407 hammer_ioc_mrecord_any_t
1408 read_mrecord(int fdin, int *errorp, struct hammer_ioc_mrecord_head *pickup)
1409 {
1410         hammer_ioc_mrecord_any_t mrec;
1411         struct hammer_ioc_mrecord_head mrechd;
1412         size_t bytes;
1413         size_t n;
1414         size_t i;
1415
1416         if (pickup && pickup->type != 0) {
1417                 mrechd = *pickup;
1418                 pickup->signature = 0;
1419                 pickup->type = 0;
1420                 n = HAMMER_MREC_HEADSIZE;
1421         } else {
1422                 /*
1423                  * Read in the PFSD header from the sender.
1424                  */
1425                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
1426                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
1427                         if (i <= 0)
1428                                 break;
1429                 }
1430                 if (n == 0) {
1431                         *errorp = 0;    /* EOF */
1432                         return(NULL);
1433                 }
1434                 if (n != HAMMER_MREC_HEADSIZE) {
1435                         fprintf(stderr, "short read of mrecord header\n");
1436                         *errorp = EPIPE;
1437                         return(NULL);
1438                 }
1439         }
1440         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
1441                 fprintf(stderr, "read_mrecord: bad signature\n");
1442                 *errorp = EINVAL;
1443                 return(NULL);
1444         }
1445         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
1446         assert(bytes >= sizeof(mrechd));
1447         mrec = malloc(bytes);
1448         mrec->head = mrechd;
1449
1450         while (n < bytes) {
1451                 i = read(fdin, (char *)mrec + n, bytes - n);
1452                 if (i <= 0)
1453                         break;
1454                 n += i;
1455         }
1456         if (n != bytes) {
1457                 fprintf(stderr, "read_mrecord: short read on payload\n");
1458                 *errorp = EPIPE;
1459                 return(NULL);
1460         }
1461         if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) {
1462                 fprintf(stderr, "read_mrecord: bad CRC\n");
1463                 *errorp = EINVAL;
1464                 return(NULL);
1465         }
1466         *errorp = 0;
1467         return(mrec);
1468 }
1469
1470 static
1471 void
1472 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec,
1473               int bytes)
1474 {
1475         char zbuf[HAMMER_HEAD_ALIGN];
1476         int pad;
1477
1478         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
1479
1480         assert(bytes >= (int)sizeof(mrec->head));
1481         bzero(&mrec->head, sizeof(mrec->head));
1482         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
1483         mrec->head.type = type;
1484         mrec->head.rec_size = bytes;
1485         hammer_crc_set_mrec_head(&mrec->head, bytes);
1486         if (write(fdout, mrec, bytes) != bytes) {
1487                 err(1, "write_mrecord");
1488                 /* not reached */
1489         }
1490         if (pad) {
1491                 bzero(zbuf, pad);
1492                 if (write(fdout, zbuf, pad) != pad) {
1493                         err(1, "write_mrecord");
1494                         /* not reached */
1495                 }
1496         }
1497 }
1498
1499 /*
1500  * Generate a mirroring header with the pfs information of the
1501  * originating filesytem.
1502  */
1503 static
1504 void
1505 generate_mrec_header(int fd, int pfs_id,
1506                      union hammer_ioc_mrecord_any *mrec_tmp)
1507 {
1508         struct hammer_ioc_pseudofs_rw pfs;
1509
1510         bzero(mrec_tmp, sizeof(*mrec_tmp));
1511         clrpfs(&pfs, &mrec_tmp->pfs.pfsd, pfs_id);
1512
1513         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1514                 err(1, "Mirror-read: not a HAMMER fs/pseudofs!");
1515                 /* not reached */
1516         }
1517         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
1518                 errx(1, "Mirror-read: HAMMER PFS version mismatch!");
1519                 /* not reached */
1520         }
1521         mrec_tmp->pfs.version = pfs.version;
1522 }
1523
1524 /*
1525  * Validate the pfs information from the originating filesystem
1526  * against the target filesystem.  shared_uuid must match.
1527  *
1528  * return -1 if we got a TERM record
1529  */
1530 static
1531 int
1532 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
1533                      struct hammer_ioc_mrecord_head *pickup,
1534                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
1535 {
1536         struct hammer_ioc_pseudofs_rw pfs;
1537         struct hammer_pseudofs_data pfsd;
1538         hammer_ioc_mrecord_any_t mrec;
1539         int error;
1540
1541         /*
1542          * Get the PFSD info from the target filesystem.
1543          */
1544         clrpfs(&pfs, &pfsd, pfs_id);
1545         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1546                 err(1, "mirror-write: not a HAMMER fs/pseudofs!");
1547                 /* not reached */
1548         }
1549         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
1550                 errx(1, "mirror-write: HAMMER PFS version mismatch!");
1551                 /* not reached */
1552         }
1553
1554         mrec = read_mrecord(fdin, &error, pickup);
1555         if (mrec == NULL) {
1556                 if (error == 0) {
1557                         errx(1, "validate_mrec_header: short read");
1558                         /* not reached */
1559                 }
1560                 exit(1);
1561         }
1562         if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
1563                 free(mrec);
1564                 return(-1);
1565         }
1566
1567         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
1568                 errx(1, "validate_mrec_header: did not get expected "
1569                         "PFSD record type");
1570                 /* not reached */
1571         }
1572         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
1573                 errx(1, "validate_mrec_header: unexpected payload size");
1574                 /* not reached */
1575         }
1576         if (mrec->pfs.version != pfs.version) {
1577                 errx(1, "validate_mrec_header: Version mismatch");
1578                 /* not reached */
1579         }
1580
1581         /*
1582          * Whew.  Ok, is the read PFS info compatible with the target?
1583          */
1584         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1585                 sizeof(pfsd.shared_uuid)) != 0) {
1586                 errx(1, "mirror-write: source and target have "
1587                         "different shared-uuid's!");
1588                 /* not reached */
1589         }
1590         if (is_target && hammer_is_pfs_master(&pfsd)) {
1591                 errx(1, "mirror-write: target must be in slave mode");
1592                 /* not reached */
1593         }
1594         if (tid_begp)
1595                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1596         if (tid_endp)
1597                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
1598         free(mrec);
1599         return(0);
1600 }
1601
1602 static
1603 void
1604 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1605 {
1606         struct hammer_ioc_pseudofs_rw pfs;
1607         struct hammer_pseudofs_data pfsd;
1608
1609         clrpfs(&pfs, &pfsd, pfs_id);
1610         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1611                 err(1, "update_pfs_snapshot (read)");
1612                 /* not reached */
1613         }
1614
1615         if (pfsd.sync_end_tid != snapshot_tid) {
1616                 pfsd.sync_end_tid = snapshot_tid;
1617                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
1618                         err(1, "update_pfs_snapshot (rewrite)");
1619                         /* not reached */
1620                 }
1621                 if (VerboseOpt >= 2) {
1622                         fprintf(stderr,
1623                                 "Mirror-write: Completed, updated snapshot "
1624                                 "to %016jx\n",
1625                                 (uintmax_t)snapshot_tid);
1626                         fflush(stderr);
1627                 }
1628         }
1629 }
1630
1631 /*
1632  * Bandwidth-limited write in chunks
1633  */
1634 static
1635 ssize_t
1636 writebw(int fd, const void *buf, size_t nbytes,
1637         uint64_t *bwcount, struct timeval *tv1)
1638 {
1639         struct timeval tv2;
1640         size_t n;
1641         ssize_t r;
1642         ssize_t a;
1643         int usec;
1644
1645         a = 0;
1646         r = 0;
1647         while (nbytes) {
1648                 if (*bwcount + nbytes > BandwidthOpt)
1649                         n = BandwidthOpt - *bwcount;
1650                 else
1651                         n = nbytes;
1652                 if (n)
1653                         r = write(fd, buf, n);
1654                 if (r >= 0) {
1655                         a += r;
1656                         nbytes -= r;
1657                         buf = (const char *)buf + r;
1658                 }
1659                 if ((size_t)r != n)
1660                         break;
1661                 *bwcount += n;
1662                 if (*bwcount >= BandwidthOpt) {
1663                         gettimeofday(&tv2, NULL);
1664                         usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1665                                 (int)(tv2.tv_usec - tv1->tv_usec);
1666                         if (usec >= 0 && usec < 1000000)
1667                                 usleep(1000000 - usec);
1668                         gettimeofday(tv1, NULL);
1669                         *bwcount -= BandwidthOpt;
1670                 }
1671         }
1672         return(a ? a : r);
1673 }
1674
1675 /*
1676  * Get a yes or no answer from the terminal.  The program may be run as
1677  * part of a two-way pipe so we cannot use stdin for this operation.
1678  */
1679 static
1680 int
1681 getyntty(void)
1682 {
1683         char buf[256];
1684         FILE *fp;
1685         int result;
1686
1687         fp = fopen("/dev/tty", "r");
1688         if (fp == NULL) {
1689                 fprintf(stderr, "No terminal for response\n");
1690                 return(-1);
1691         }
1692         result = -1;
1693         while (fgets(buf, sizeof(buf), fp) != NULL) {
1694                 if (buf[0] == 'y' || buf[0] == 'Y') {
1695                         result = 1;
1696                         break;
1697                 }
1698                 if (buf[0] == 'n' || buf[0] == 'N') {
1699                         result = 0;
1700                         break;
1701                 }
1702                 fprintf(stderr, "Response not understood\n");
1703                 break;
1704         }
1705         fclose(fp);
1706         return(result);
1707 }
1708
1709 static
1710 void
1711 score_printf(size_t i, size_t w, const char *ctl, ...)
1712 {
1713         va_list va;
1714         size_t n;
1715         static size_t SSize;
1716         static int SFd = -1;
1717         static char ScoreBuf[1024];
1718
1719         if (ScoreBoardFile == NULL)
1720                 return;
1721         assert(i + w < sizeof(ScoreBuf));
1722         if (SFd < 0) {
1723                 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644);
1724                 if (SFd < 0)
1725                         return;
1726                 SSize = 0;
1727         }
1728         for (n = 0; n < i; ++n) {
1729                 if (ScoreBuf[n] == 0)
1730                         ScoreBuf[n] = ' ';
1731         }
1732         va_start(va, ctl);
1733         vsnprintf(ScoreBuf + i, w - 1, ctl, va);
1734         va_end(va);
1735         n = strlen(ScoreBuf + i);
1736         while (n < w - 1) {
1737                 ScoreBuf[i + n] = ' ';
1738                 ++n;
1739         }
1740         ScoreBuf[i + n] = '\n';
1741         if (SSize < i + w)
1742                 SSize = i + w;
1743         pwrite(SFd, ScoreBuf, SSize, 0);
1744 }
1745
1746 static
1747 void
1748 hammer_check_restrict(const char *filesystem)
1749 {
1750         size_t rlen;
1751         int atslash;
1752
1753         if (RestrictTarget == NULL)
1754                 return;
1755         rlen = strlen(RestrictTarget);
1756         if (strncmp(filesystem, RestrictTarget, rlen) != 0) {
1757                 errx(1, "hammer-remote: restricted target");
1758                 /* not reached */
1759         }
1760
1761         atslash = 1;
1762         while (filesystem[rlen]) {
1763                 if (atslash &&
1764                     filesystem[rlen] == '.' &&
1765                     filesystem[rlen+1] == '.') {
1766                         errx(1, "hammer-remote: '..' not allowed");
1767                         /* not reached */
1768                 }
1769                 if (filesystem[rlen] == '/')
1770                         atslash = 1;
1771                 else
1772                         atslash = 0;
1773                 ++rlen;
1774         }
1775 }
1776
1777 static
1778 void
1779 mirror_usage(int code)
1780 {
1781         fprintf(stderr,
1782                 "hammer mirror-read <filesystem> [begin-tid]\n"
1783                 "hammer mirror-read-stream <filesystem> [begin-tid]\n"
1784                 "hammer mirror-write <filesystem>\n"
1785                 "hammer mirror-dump [header]\n"
1786                 "hammer mirror-copy [[user@]host:]<filesystem>"
1787                                   " [[user@]host:]<filesystem>\n"
1788                 "hammer mirror-stream [[user@]host:]<filesystem>"
1789                                     " [[user@]host:]<filesystem>\n"
1790         );
1791         exit(code);
1792 }
1793