4951529506d89709899c7121eac90418b0015bc8
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  *
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  *
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  *
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  *
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  */
34
35 #include "hammer.h"
36
37 #define LINE1   0,20
38 #define LINE2   20,78
39 #define LINE3   90,70
40
41 #define SERIALBUF_SIZE  (512 * 1024)
42
43 typedef struct histogram {
44         hammer_tid_t    tid;
45         uint64_t        bytes;
46 } *histogram_t;
47
48 const char *ScoreBoardFile;
49 const char *RestrictTarget;
50
51 static int read_mrecords(int fd, char *buf, u_int size,
52                          hammer_ioc_mrecord_head_t pickup);
53 static int generate_histogram(int fd, const char *filesystem,
54                          histogram_t *histogram_ary,
55                          struct hammer_ioc_mirror_rw *mirror_base,
56                          int *repeatp);
57 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
58                          hammer_ioc_mrecord_head_t pickup);
59 static void write_mrecord(int fdout, uint32_t type,
60                          hammer_ioc_mrecord_any_t mrec, int bytes);
61 static void generate_mrec_header(int fd, int pfs_id,
62                          union hammer_ioc_mrecord_any *mrec_tmp);
63 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
64                          struct hammer_ioc_mrecord_head *pickup,
65                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
66 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
67 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
68                         uint64_t *bwcount, struct timeval *tv1);
69 static int getyntty(void);
70 static void score_printf(size_t i, size_t w, const char *ctl, ...);
71 static void hammer_check_restrict(const char *filesystem);
72 static void mirror_usage(int code);
73
74 /*
75  * Generate a mirroring data stream from the specific source over the
76  * entire key range, but restricted to the specified transaction range.
77  *
78  * The HAMMER VFS does most of the work, we add a few new mrecord
79  * types to negotiate the TID ranges and verify that the entire
80  * stream made it to the destination.
81  *
82  * streaming will be 0 for mirror-read, 1 for mirror-stream.  The code will
83  * set up a fake value of -1 when running the histogram for mirror-read.
84  */
85 void
86 hammer_cmd_mirror_read(char **av, int ac, int streaming)
87 {
88         struct hammer_ioc_mirror_rw mirror;
89         struct hammer_ioc_pseudofs_rw pfs;
90         union hammer_ioc_mrecord_any mrec_tmp;
91         struct hammer_ioc_mrecord_head pickup;
92         hammer_ioc_mrecord_any_t mrec;
93         hammer_tid_t sync_tid;
94         histogram_t histogram_ary;
95         char *filesystem;
96         char *buf = malloc(SERIALBUF_SIZE);
97         int interrupted = 0;
98         int error;
99         int fd;
100         int n;
101         int didwork;
102         int histogram;
103         int histindex;
104         int histmax;
105         int repeat = 0;
106         int sameline;
107         int64_t total_bytes;
108         time_t base_t = time(NULL);
109         struct timeval bwtv;
110         uint64_t bwcount;
111         uint64_t estbytes;
112
113         if (ac == 0 || ac > 2)
114                 mirror_usage(1);
115         filesystem = av[0];
116         hammer_check_restrict(filesystem);
117
118         pickup.signature = 0;
119         pickup.type = 0;
120         histogram = 0;
121         histindex = 0;
122         histmax = 0;
123         histogram_ary = NULL;
124         sameline = 0;
125
126 again:
127         bzero(&mirror, sizeof(mirror));
128         hammer_key_beg_init(&mirror.key_beg);
129         hammer_key_end_init(&mirror.key_end);
130
131         fd = getpfs(&pfs, filesystem);
132
133         if (streaming >= 0)
134                 score_printf(LINE1, "Running");
135
136         if (streaming >= 0 && VerboseOpt && VerboseOpt < 2) {
137                 fprintf(stderr, "%cRunning  \b\b", (sameline ? '\r' : '\n'));
138                 fflush(stderr);
139                 sameline = 1;
140         }
141         sameline = 1;
142         total_bytes = 0;
143         gettimeofday(&bwtv, NULL);
144         bwcount = 0;
145
146         /*
147          * Send initial header for the purpose of determining the
148          * shared-uuid.
149          */
150         generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
151         write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
152                       &mrec_tmp, sizeof(mrec_tmp.pfs));
153
154         /*
155          * In 2-way mode the target will send us a PFS info packet
156          * first.  Use the target's current snapshot TID as our default
157          * begin TID.
158          */
159         if (TwoWayPipeOpt) {
160                 mirror.tid_beg = 0;
161                 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
162                                          NULL, &mirror.tid_beg);
163                 if (n < 0) {    /* got TERM record */
164                         relpfs(fd, &pfs);
165                         free(buf);
166                         free(histogram_ary);
167                         return;
168                 }
169                 ++mirror.tid_beg;
170         } else if (streaming && histogram) {
171                 mirror.tid_beg = histogram_ary[histindex].tid + 1;
172         } else {
173                 mirror.tid_beg = 0;
174         }
175
176         /*
177          * Write out the PFS header, tid_beg will be updated if our PFS
178          * has a larger begin sync.  tid_end is set to the latest source
179          * TID whos flush cycle has completed.
180          */
181         generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
182         if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid)
183                 mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid;
184         mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid;
185         mirror.ubuf = buf;
186         mirror.size = SERIALBUF_SIZE;
187         mirror.pfs_id = pfs.pfs_id;
188         mirror.shared_uuid = pfs.ondisk->shared_uuid;
189
190         /*
191          * XXX If the histogram is exhausted and the TID delta is large
192          *     the stream might have been offline for a while and is
193          *     now picking it up again.  Do another histogram.
194          */
195 #if 0
196         if (streaming && histogram && histindex == histend) {
197                 if (mirror.tid_end - mirror.tid_beg > BULK_MINIMUM)
198                         histogram = 0;
199         }
200 #endif
201
202         /*
203          * Initial bulk startup control, try to do some incremental
204          * mirroring in order to allow the stream to be killed and
205          * restarted without having to start over.
206          */
207         if (histogram == 0 && BulkOpt == 0) {
208                 if (VerboseOpt && repeat == 0) {
209                         fprintf(stderr, "\n");
210                         sameline = 0;
211                 }
212                 histmax = generate_histogram(fd, filesystem,
213                                              &histogram_ary, &mirror,
214                                              &repeat);
215                 histindex = 0;
216                 histogram = 1;
217
218                 /*
219                  * Just stream the histogram, then stop
220                  */
221                 if (streaming == 0)
222                         streaming = -1;
223         }
224
225         if (streaming && histogram) {
226                 ++histindex;
227                 mirror.tid_end = histogram_ary[histindex].tid;
228                 estbytes = histogram_ary[histindex-1].bytes;
229                 mrec_tmp.pfs.pfsd.sync_end_tid = mirror.tid_end;
230         } else {
231                 estbytes = 0;
232         }
233
234         write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
235                       &mrec_tmp, sizeof(mrec_tmp.pfs));
236
237         /*
238          * A cycle file overrides the beginning TID only if we are
239          * not operating in two-way or histogram mode.
240          */
241         if (TwoWayPipeOpt == 0 && histogram == 0) {
242                 hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
243         }
244
245         /*
246          * An additional argument overrides the beginning TID regardless
247          * of what mode we are in.  This is not recommending if operating
248          * in two-way mode.
249          */
250         if (ac == 2)
251                 mirror.tid_beg = strtoull(av[1], NULL, 0);
252
253         if (streaming == 0 || VerboseOpt >= 2) {
254                 fprintf(stderr,
255                         "Mirror-read: Mirror %016jx to %016jx",
256                         (uintmax_t)mirror.tid_beg, (uintmax_t)mirror.tid_end);
257                 if (histogram)
258                         fprintf(stderr, " (bulk= %ju)", (uintmax_t)estbytes);
259                 fprintf(stderr, "\n");
260                 fflush(stderr);
261         }
262         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
263                 fprintf(stderr, "Mirror-read: Resuming at object %016jx\n",
264                         (uintmax_t)mirror.key_beg.obj_id);
265         }
266
267         /*
268          * Nothing to do if begin equals end.
269          */
270         if (mirror.tid_beg >= mirror.tid_end) {
271                 if (streaming == 0 || VerboseOpt >= 2)
272                         fprintf(stderr, "Mirror-read: No work to do\n");
273                 sleep(DelayOpt);
274                 didwork = 0;
275                 histogram = 0;
276                 goto done;
277         }
278         didwork = 1;
279
280         /*
281          * Write out bulk records
282          */
283         mirror.ubuf = buf;
284         mirror.size = SERIALBUF_SIZE;
285
286         do {
287                 mirror.count = 0;
288                 mirror.pfs_id = pfs.pfs_id;
289                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
290                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
291                         score_printf(LINE3, "Mirror-read %s failed: %s",
292                                      filesystem, strerror(errno));
293                         fprintf(stderr, "Mirror-read %s failed: %s\n",
294                                 filesystem, strerror(errno));
295                         exit(1);
296                 }
297                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
298                         score_printf(LINE3, "Mirror-read %s fatal error %d",
299                                      filesystem, mirror.head.error);
300                         fprintf(stderr,
301                                 "Mirror-read %s fatal error %d\n",
302                                 filesystem, mirror.head.error);
303                         exit(1);
304                 }
305                 if (mirror.count) {
306                         if (BandwidthOpt) {
307                                 n = writebw(1, mirror.ubuf, mirror.count,
308                                             &bwcount, &bwtv);
309                         } else {
310                                 n = write(1, mirror.ubuf, mirror.count);
311                         }
312                         if (n != mirror.count) {
313                                 score_printf(LINE3,
314                                              "Mirror-read %s failed: "
315                                              "short write",
316                                              filesystem);
317                                 fprintf(stderr,
318                                         "Mirror-read %s failed: "
319                                         "short write\n",
320                                 filesystem);
321                                 exit(1);
322                         }
323                 }
324                 total_bytes += mirror.count;
325                 if (streaming && VerboseOpt) {
326                         fprintf(stderr,
327                                 "\rscan obj=%016jx tids=%016jx:%016jx %11jd",
328                                 (uintmax_t)mirror.key_cur.obj_id,
329                                 (uintmax_t)mirror.tid_beg,
330                                 (uintmax_t)mirror.tid_end,
331                                 (intmax_t)total_bytes);
332                         fflush(stderr);
333                         sameline = 0;
334                 } else if (streaming) {
335                         score_printf(LINE2,
336                                 "obj=%016jx tids=%016jx:%016jx %11jd",
337                                 (uintmax_t)mirror.key_cur.obj_id,
338                                 (uintmax_t)mirror.tid_beg,
339                                 (uintmax_t)mirror.tid_end,
340                                 (intmax_t)total_bytes);
341                 }
342                 mirror.key_beg = mirror.key_cur;
343
344                 /*
345                  * Deal with time limit option
346                  */
347                 if (TimeoutOpt &&
348                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
349                         score_printf(LINE3,
350                                 "Mirror-read %s interrupted by timer at"
351                                 " %016jx",
352                                 filesystem,
353                                 (uintmax_t)mirror.key_cur.obj_id);
354                         fprintf(stderr,
355                                 "Mirror-read %s interrupted by timer at"
356                                 " %016jx\n",
357                                 filesystem,
358                                 (uintmax_t)mirror.key_cur.obj_id);
359                         interrupted = 1;
360                         break;
361                 }
362         } while (mirror.count != 0);
363
364 done:
365         if (streaming && VerboseOpt && sameline == 0) {
366                 fprintf(stderr, "\n");
367                 fflush(stderr);
368                 sameline = 1;
369         }
370
371         /*
372          * Write out the termination sync record - only if not interrupted
373          */
374         if (interrupted == 0) {
375                 if (didwork) {
376                         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
377                                       &mrec_tmp, sizeof(mrec_tmp.sync));
378                 } else {
379                         write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
380                                       &mrec_tmp, sizeof(mrec_tmp.sync));
381                 }
382         }
383
384         /*
385          * If the -2 option was given (automatic when doing mirror-copy),
386          * a two-way pipe is assumed and we expect a response mrec from
387          * the target.
388          */
389         if (TwoWayPipeOpt) {
390                 mrec = read_mrecord(0, &error, &pickup);
391                 if (mrec == NULL ||
392                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
393                     mrec->head.rec_size != sizeof(mrec->update)) {
394                         fprintf(stderr, "mirror_read: Did not get final "
395                                         "acknowledgement packet from target\n");
396                         exit(1);
397                 }
398                 if (interrupted) {
399                         if (CyclePath) {
400                                 hammer_set_cycle(&mirror.key_cur,
401                                                  mirror.tid_beg);
402                                 fprintf(stderr, "Cyclefile %s updated for "
403                                         "continuation\n", CyclePath);
404                         }
405                 } else {
406                         sync_tid = mrec->update.tid;
407                         if (CyclePath) {
408                                 hammer_key_beg_init(&mirror.key_beg);
409                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
410                                 fprintf(stderr,
411                                         "Cyclefile %s updated to 0x%016jx\n",
412                                         CyclePath, (uintmax_t)sync_tid);
413                         }
414                 }
415                 free(mrec);
416         } else if (CyclePath) {
417                 /* NOTE! mirror.tid_beg cannot be updated */
418                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
419                                 "fully updated unless you use mirror-copy\n");
420                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
421         }
422         if (streaming && interrupted == 0) {
423                 time_t t1 = time(NULL);
424                 time_t t2;
425
426                 /*
427                  * Try to break down large bulk transfers into smaller ones
428                  * so it can sync the transaction id on the slave.  This
429                  * way if we get interrupted a restart doesn't have to
430                  * start from scratch.
431                  */
432                 if (streaming && histogram) {
433                         if (histindex != histmax) {
434                                 if (VerboseOpt && VerboseOpt < 2 &&
435                                     streaming >= 0) {
436                                         fprintf(stderr, " (bulk incremental)");
437                                 }
438                                 relpfs(fd, &pfs);
439                                 goto again;
440                         }
441                 }
442
443                 if (VerboseOpt && streaming >= 0) {
444                         fprintf(stderr, " W");
445                         fflush(stderr);
446                 } else if (streaming >= 0) {
447                         score_printf(LINE1, "Waiting");
448                 }
449                 pfs.ondisk->sync_end_tid = mirror.tid_end;
450                 if (streaming < 0) {
451                         /*
452                          * Fake streaming mode when using a histogram to
453                          * break up a mirror-read, do not wait on source.
454                          */
455                         streaming = 0;
456                 } else if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
457                         score_printf(LINE3,
458                                      "Mirror-read %s: cannot stream: %s\n",
459                                      filesystem, strerror(errno));
460                         fprintf(stderr,
461                                 "Mirror-read %s: cannot stream: %s\n",
462                                 filesystem, strerror(errno));
463                 } else {
464                         t2 = time(NULL) - t1;
465                         if (t2 >= 0 && t2 < DelayOpt) {
466                                 if (VerboseOpt) {
467                                         fprintf(stderr, "\bD");
468                                         fflush(stderr);
469                                 }
470                                 sleep(DelayOpt - t2);
471                         }
472                         if (VerboseOpt) {
473                                 fprintf(stderr, "\b ");
474                                 fflush(stderr);
475                         }
476                         relpfs(fd, &pfs);
477                         goto again;
478                 }
479         }
480         write_mrecord(1, HAMMER_MREC_TYPE_TERM,
481                       &mrec_tmp, sizeof(mrec_tmp.sync));
482         relpfs(fd, &pfs);
483         free(buf);
484         free(histogram_ary);
485         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
486 }
487
488 /*
489  * What we are trying to do here is figure out how much data is
490  * going to be sent for the TID range and to break the TID range
491  * down into reasonably-sized slices (from the point of view of
492  * data sent) so a lost connection can restart at a reasonable
493  * place and not all the way back at the beginning.
494  *
495  * An entry's TID serves as the end_tid for the prior entry
496  * So we have to offset the calculation by 1 so that TID falls into
497  * the previous entry when populating entries.
498  *
499  * Because the transaction id space is bursty we need a relatively
500  * large number of buckets (like a million) to do a reasonable job
501  * for things like an initial bulk mirrors on a very large filesystem.
502  */
503 #define HIST_COUNT      (1024 * 1024)
504
505 static int
506 generate_histogram(int fd, const char *filesystem,
507                    histogram_t *histogram_ary,
508                    struct hammer_ioc_mirror_rw *mirror_base,
509                    int *repeatp)
510 {
511         struct hammer_ioc_mirror_rw mirror;
512         union hammer_ioc_mrecord_any *mrec;
513         hammer_tid_t tid_beg;
514         hammer_tid_t tid_end;
515         hammer_tid_t tid;
516         hammer_tid_t tidx;
517         uint64_t *tid_bytes;
518         uint64_t total;
519         uint64_t accum;
520         int chunkno;
521         int i;
522         int res;
523         int off;
524         int len;
525
526         mirror = *mirror_base;
527         tid_beg = mirror.tid_beg;
528         tid_end = mirror.tid_end;
529         mirror.head.flags |= HAMMER_IOC_MIRROR_NODATA;
530
531         if (*histogram_ary == NULL) {
532                 *histogram_ary = malloc(sizeof(struct histogram) *
533                                         (HIST_COUNT + 2));
534         }
535         if (tid_beg >= tid_end)
536                 return(0);
537
538         /* needs 2 extra */
539         tid_bytes = malloc(sizeof(*tid_bytes) * (HIST_COUNT + 2));
540         bzero(tid_bytes, sizeof(*tid_bytes) * (HIST_COUNT + 2));
541
542         if (*repeatp == 0) {
543                 fprintf(stderr, "Prescan to break up bulk transfer");
544                 if (VerboseOpt > 1)
545                         fprintf(stderr, " (%juMB chunks)",
546                                 (uintmax_t)(SplitupOpt / (1024 * 1024)));
547                 fprintf(stderr, "\n");
548         }
549
550         /*
551          * Note: (tid_beg,tid_end), range is inclusive of both beg & end.
552          *
553          * Note: Estimates can be off when the mirror is way behind due
554          *       to skips.
555          */
556         total = 0;
557         accum = 0;
558         chunkno = 0;
559         for (;;) {
560                 mirror.count = 0;
561                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
562                         fprintf(stderr, "Mirror-read %s failed: %s\n",
563                                 filesystem, strerror(errno));
564                         exit(1);
565                 }
566                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
567                         fprintf(stderr,
568                                 "Mirror-read %s fatal error %d\n",
569                                 filesystem, mirror.head.error);
570                         exit(1);
571                 }
572                 for (off = 0;
573                      off < mirror.count;
574                      off += HAMMER_HEAD_DOALIGN(mrec->head.rec_size)) {
575                         mrec = (void *)((char *)mirror.ubuf + off);
576
577                         /*
578                          * We only care about general RECs and PASS
579                          * records.  We ignore SKIPs.
580                          */
581                         switch (mrec->head.type & HAMMER_MRECF_TYPE_LOMASK) {
582                         case HAMMER_MREC_TYPE_REC:
583                         case HAMMER_MREC_TYPE_PASS:
584                                 break;
585                         default:
586                                 continue;
587                         }
588
589                         /*
590                          * Calculate for two indices, create_tid and
591                          * delete_tid.  Record data only applies to
592                          * the create_tid.
593                          *
594                          * When tid is exactly on the boundary it really
595                          * belongs to the previous entry because scans
596                          * are inclusive of the ending entry.
597                          */
598                         tid = mrec->rec.leaf.base.delete_tid;
599                         if (tid && tid >= tid_beg && tid <= tid_end) {
600                                 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
601                                 if (mrec->head.type ==
602                                     HAMMER_MREC_TYPE_REC) {
603                                         len -= HAMMER_HEAD_DOALIGN(
604                                                     mrec->rec.leaf.data_len);
605                                         assert(len > 0);
606                                 }
607                                 i = (tid - tid_beg) * HIST_COUNT /
608                                     (tid_end - tid_beg);
609                                 tidx = tid_beg + i * (tid_end - tid_beg) /
610                                                  HIST_COUNT;
611                                 if (tid == tidx && i)
612                                         --i;
613                                 assert(i >= 0 && i < HIST_COUNT);
614                                 tid_bytes[i] += len;
615                                 total += len;
616                                 accum += len;
617                         }
618
619                         tid = mrec->rec.leaf.base.create_tid;
620                         if (tid && tid >= tid_beg && tid <= tid_end) {
621                                 len = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
622                                 if (mrec->head.type ==
623                                     HAMMER_MREC_TYPE_REC_NODATA) {
624                                         len += HAMMER_HEAD_DOALIGN(
625                                                     mrec->rec.leaf.data_len);
626                                 }
627                                 i = (tid - tid_beg) * HIST_COUNT /
628                                     (tid_end - tid_beg);
629                                 tidx = tid_beg + i * (tid_end - tid_beg) /
630                                                  HIST_COUNT;
631                                 if (tid == tidx && i)
632                                         --i;
633                                 assert(i >= 0 && i < HIST_COUNT);
634                                 tid_bytes[i] += len;
635                                 total += len;
636                                 accum += len;
637                         }
638                 }
639                 if (*repeatp == 0 && accum > SplitupOpt) {
640                         if (VerboseOpt > 1) {
641                                 fprintf(stderr, ".");
642                                 fflush(stderr);
643                         }
644                         ++chunkno;
645                         score_printf(LINE2, "Prescan chunk %d", chunkno);
646                         accum = 0;
647                 }
648                 if (mirror.count == 0)
649                         break;
650                 mirror.key_beg = mirror.key_cur;
651         }
652
653         /*
654          * Reduce to SplitupOpt (default 4GB) chunks.  This code may
655          * use up to two additional elements.  Do the array in-place.
656          *
657          * Inefficient degenerate cases can occur if we do not accumulate
658          * at least the requested split amount, so error on the side of
659          * going over a bit.
660          */
661         res = 0;
662         (*histogram_ary)[res].tid = tid_beg;
663         (*histogram_ary)[res].bytes = tid_bytes[0];
664         for (i = 1; i < HIST_COUNT; ++i) {
665                 if ((*histogram_ary)[res].bytes >= SplitupOpt) {
666                         ++res;
667                         (*histogram_ary)[res].tid = tid_beg +
668                                         i * (tid_end - tid_beg) /
669                                         HIST_COUNT;
670                         (*histogram_ary)[res].bytes = 0;
671
672                 }
673                 (*histogram_ary)[res].bytes += tid_bytes[i];
674         }
675         ++res;
676         (*histogram_ary)[res].tid = tid_end;
677         (*histogram_ary)[res].bytes = -1;
678
679         if (*repeatp == 0) {
680                 if (VerboseOpt > 1)
681                         fprintf(stderr, "\n");  /* newline after ... */
682                 score_printf(LINE3, "Prescan %d chunks, total %ju MBytes",
683                         res, (uintmax_t)total / (1024 * 1024));
684                 fprintf(stderr, "Prescan %d chunks, total %ju MBytes (",
685                         res, (uintmax_t)total / (1024 * 1024));
686                 for (i = 0; i < res && i < 3; ++i) {
687                         if (i)
688                                 fprintf(stderr, ", ");
689                         fprintf(stderr, "%ju",
690                                 (uintmax_t)(*histogram_ary)[i].bytes);
691                 }
692                 if (i < res)
693                         fprintf(stderr, ", ...");
694                 fprintf(stderr, ")\n");
695         }
696         assert(res <= HIST_COUNT);
697         *repeatp = 1;
698
699         free(tid_bytes);
700         return(res);
701 }
702
703 static void
704 create_pfs(const char *filesystem, uuid_t *s_uuid)
705 {
706         if (ForceYesOpt == 1) {
707                 fprintf(stderr, "PFS slave %s does not exist. "
708                         "Auto create new slave PFS!\n", filesystem);
709
710         } else {
711                 fprintf(stderr, "PFS slave %s does not exist.\n"
712                         "Do you want to create a new slave PFS? (yes|no) ",
713                         filesystem);
714                 fflush(stderr);
715                 if (getyntty() != 1) {
716                         fprintf(stderr, "Aborting operation\n");
717                         exit(1);
718                 }
719         }
720
721         uint32_t status;
722         char *shared_uuid = NULL;
723         uuid_to_string(s_uuid, &shared_uuid, &status);
724
725         char *cmd = NULL;
726         asprintf(&cmd, "/sbin/hammer pfs-slave '%s' shared-uuid=%s 1>&2",
727                  filesystem, shared_uuid);
728         free(shared_uuid);
729
730         if (cmd == NULL) {
731                 fprintf(stderr, "Failed to alloc memory\n");
732                 exit(1);
733         }
734         if (system(cmd) != 0) {
735                 fprintf(stderr, "Failed to create PFS\n");
736         }
737         free(cmd);
738 }
739
740 /*
741  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
742  * some additional packet types to negotiate TID ranges and to verify
743  * completion.  The HAMMER VFS does most of the work.
744  *
745  * It is important to note that the mirror.key_{beg,end} range must
746  * match the ranged used by the original.  For now both sides use
747  * range the entire key space.
748  *
749  * It is even more important that the records in the stream conform
750  * to the TID range also supplied in the stream.  The HAMMER VFS will
751  * use the REC, PASS, and SKIP record types to track the portions of
752  * the B-Tree being scanned in order to be able to proactively delete
753  * records on the target within those active areas that are not mentioned
754  * by the source.
755  *
756  * The mirror.key_cur field is used by the VFS to do this tracking.  It
757  * must be initialized to key_beg but then is persistently updated by
758  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
759  * field you will blow up the mirror target, possibly to the point of
760  * deleting everything.  As a safety measure the HAMMER VFS simply marks
761  * the records that the source has destroyed as deleted on the target,
762  * and normal pruning operations will deal with their final disposition
763  * at some later time.
764  */
765 void
766 hammer_cmd_mirror_write(char **av, int ac)
767 {
768         struct hammer_ioc_mirror_rw mirror;
769         char *filesystem;
770         char *buf = malloc(SERIALBUF_SIZE);
771         struct hammer_ioc_pseudofs_rw pfs;
772         struct hammer_ioc_mrecord_head pickup;
773         struct hammer_ioc_synctid synctid;
774         union hammer_ioc_mrecord_any mrec_tmp;
775         hammer_ioc_mrecord_any_t mrec;
776         struct stat st;
777         int error;
778         int fd;
779         int n;
780
781         if (ac != 1)
782                 mirror_usage(1);
783         filesystem = av[0];
784         hammer_check_restrict(filesystem);
785
786         pickup.signature = 0;
787         pickup.type = 0;
788
789 again:
790         bzero(&mirror, sizeof(mirror));
791         hammer_key_beg_init(&mirror.key_beg);
792         hammer_key_end_init(&mirror.key_end);
793         mirror.key_end = mirror.key_beg;
794
795         /*
796          * Read initial packet
797          */
798         mrec = read_mrecord(0, &error, &pickup);
799         if (mrec == NULL) {
800                 if (error == 0)
801                         fprintf(stderr, "validate_mrec_header: short read\n");
802                 exit(1);
803         }
804         /*
805          * Validate packet
806          */
807         if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
808                 free(buf);
809                 return;
810         }
811         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
812                 fprintf(stderr, "validate_mrec_header: did not get expected "
813                                 "PFSD record type\n");
814                 exit(1);
815         }
816         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
817                 fprintf(stderr, "validate_mrec_header: unexpected payload "
818                                 "size\n");
819                 exit(1);
820         }
821
822         /*
823          * Create slave PFS if it doesn't yet exist
824          */
825         if (lstat(filesystem, &st) != 0) {
826                 create_pfs(filesystem, &mrec->pfs.pfsd.shared_uuid);
827         }
828         free(mrec);
829         mrec = NULL;
830
831         fd = getpfs(&pfs, filesystem);
832
833         /*
834          * In two-way mode the target writes out a PFS packet first.
835          * The source uses our tid_end as its tid_beg by default,
836          * picking up where it left off.
837          */
838         mirror.tid_beg = 0;
839         if (TwoWayPipeOpt) {
840                 generate_mrec_header(fd, pfs.pfs_id, &mrec_tmp);
841                 if (mirror.tid_beg < mrec_tmp.pfs.pfsd.sync_beg_tid)
842                         mirror.tid_beg = mrec_tmp.pfs.pfsd.sync_beg_tid;
843                 mirror.tid_end = mrec_tmp.pfs.pfsd.sync_end_tid;
844                 write_mrecord(1, HAMMER_MREC_TYPE_PFSD,
845                               &mrec_tmp, sizeof(mrec_tmp.pfs));
846         }
847
848         /*
849          * Read and process the PFS header.  The source informs us of
850          * the TID range the stream represents.
851          */
852         n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
853                                  &mirror.tid_beg, &mirror.tid_end);
854         if (n < 0) {    /* got TERM record */
855                 relpfs(fd, &pfs);
856                 free(buf);
857                 return;
858         }
859
860         mirror.ubuf = buf;
861         mirror.size = SERIALBUF_SIZE;
862
863         /*
864          * Read and process bulk records (REC, PASS, and SKIP types).
865          *
866          * On your life, do NOT mess with mirror.key_cur or your mirror
867          * target may become history.
868          */
869         for (;;) {
870                 mirror.count = 0;
871                 mirror.pfs_id = pfs.pfs_id;
872                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
873                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
874                 if (mirror.size <= 0)
875                         break;
876                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
877                         fprintf(stderr, "Mirror-write %s failed: %s\n",
878                                 filesystem, strerror(errno));
879                         exit(1);
880                 }
881                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
882                         fprintf(stderr,
883                                 "Mirror-write %s fatal error %d\n",
884                                 filesystem, mirror.head.error);
885                         exit(1);
886                 }
887 #if 0
888                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
889                         fprintf(stderr,
890                                 "Mirror-write %s interrupted by timer at"
891                                 " %016llx\n",
892                                 filesystem,
893                                 mirror.key_cur.obj_id);
894                         exit(0);
895                 }
896 #endif
897         }
898
899         /*
900          * Read and process the termination sync record.
901          */
902         mrec = read_mrecord(0, &error, &pickup);
903
904         if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
905                 fprintf(stderr, "Mirror-write: received termination request\n");
906                 relpfs(fd, &pfs);
907                 free(mrec);
908                 free(buf);
909                 return;
910         }
911
912         if (mrec == NULL ||
913             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
914              mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
915             mrec->head.rec_size != sizeof(mrec->sync)) {
916                 fprintf(stderr, "Mirror-write %s: Did not get termination "
917                                 "sync record, or rec_size is wrong rt=%d\n",
918                                 filesystem,
919                                 (mrec ? (int)mrec->head.type : -1));
920                 exit(1);
921         }
922
923         /*
924          * Update the PFS info on the target so the user has visibility
925          * into the new snapshot, and sync the target filesystem.
926          */
927         if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
928                 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
929
930                 bzero(&synctid, sizeof(synctid));
931                 synctid.op = HAMMER_SYNCTID_SYNC2;
932                 ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
933
934                 if (VerboseOpt >= 2) {
935                         fprintf(stderr, "Mirror-write %s: succeeded\n",
936                                 filesystem);
937                 }
938         }
939
940         free(mrec);
941         mrec = NULL;
942
943         /*
944          * Report back to the originator.
945          */
946         if (TwoWayPipeOpt) {
947                 mrec_tmp.update.tid = mirror.tid_end;
948                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
949                               &mrec_tmp, sizeof(mrec_tmp.update));
950         } else {
951                 printf("Source can update synctid to 0x%016jx\n",
952                        (uintmax_t)mirror.tid_end);
953         }
954         relpfs(fd, &pfs);
955         goto again;
956 }
957
958 void
959 hammer_cmd_mirror_dump(char **av, int ac)
960 {
961         char *buf = malloc(SERIALBUF_SIZE);
962         struct hammer_ioc_mrecord_head pickup;
963         hammer_ioc_mrecord_any_t mrec;
964         int error;
965         int size;
966         int offset;
967         int bytes;
968         int header_only = 0;
969
970         if (ac == 1 && strcmp(*av, "header") == 0)
971                 header_only = 1;
972         else if (ac != 0)
973                 mirror_usage(1);
974
975         /*
976          * Read and process the PFS header
977          */
978         pickup.signature = 0;
979         pickup.type = 0;
980
981         mrec = read_mrecord(0, &error, &pickup);
982
983         /*
984          * Dump the PFS header. mirror-dump takes its input from the output
985          * of a mirror-read so getpfs() can't be used to get a fd to be passed
986          * to dump_pfsd().
987          */
988         if (header_only && mrec != NULL) {
989                 dump_pfsd(&mrec->pfs.pfsd, -1);
990                 free(mrec);
991                 free(buf);
992                 return;
993         }
994         free(mrec);
995
996 again:
997         /*
998          * Read and process bulk records
999          */
1000         for (;;) {
1001                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
1002                 if (size <= 0)
1003                         break;
1004                 offset = 0;
1005                 while (offset < size) {
1006                         mrec = (void *)((char *)buf + offset);
1007                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
1008                         if (offset + bytes > size) {
1009                                 fprintf(stderr, "Misaligned record\n");
1010                                 exit(1);
1011                         }
1012
1013                         switch(mrec->head.type & HAMMER_MRECF_TYPE_MASK) {
1014                         case HAMMER_MREC_TYPE_REC_BADCRC:
1015                         case HAMMER_MREC_TYPE_REC:
1016                                 printf("Record lo=%08x obj=%016jx key=%016jx "
1017                                        "rt=%02x ot=%02x",
1018                                         mrec->rec.leaf.base.localization,
1019                                         (uintmax_t)mrec->rec.leaf.base.obj_id,
1020                                         (uintmax_t)mrec->rec.leaf.base.key,
1021                                         mrec->rec.leaf.base.rec_type,
1022                                         mrec->rec.leaf.base.obj_type);
1023                                 if (mrec->head.type ==
1024                                     HAMMER_MREC_TYPE_REC_BADCRC) {
1025                                         printf(" (BAD CRC)");
1026                                 }
1027                                 printf("\n");
1028                                 printf("       tids %016jx:%016jx data=%d\n",
1029                                     (uintmax_t)mrec->rec.leaf.base.create_tid,
1030                                     (uintmax_t)mrec->rec.leaf.base.delete_tid,
1031                                     mrec->rec.leaf.data_len);
1032                                 break;
1033                         case HAMMER_MREC_TYPE_PASS:
1034                                 printf("Pass   lo=%08x obj=%016jx key=%016jx "
1035                                        "rt=%02x ot=%02x\n",
1036                                         mrec->rec.leaf.base.localization,
1037                                         (uintmax_t)mrec->rec.leaf.base.obj_id,
1038                                         (uintmax_t)mrec->rec.leaf.base.key,
1039                                         mrec->rec.leaf.base.rec_type,
1040                                         mrec->rec.leaf.base.obj_type);
1041                                 printf("       tids %016jx:%016jx data=%d\n",
1042                                     (uintmax_t)mrec->rec.leaf.base.create_tid,
1043                                     (uintmax_t)mrec->rec.leaf.base.delete_tid,
1044                                         mrec->rec.leaf.data_len);
1045                                 break;
1046                         case HAMMER_MREC_TYPE_SKIP:
1047                                 printf("Skip   lo=%08x obj=%016jx key=%016jx rt=%02x to\n"
1048                                        "       lo=%08x obj=%016jx key=%016jx rt=%02x\n",
1049                                        mrec->skip.skip_beg.localization,
1050                                        (uintmax_t)mrec->skip.skip_beg.obj_id,
1051                                        (uintmax_t)mrec->skip.skip_beg.key,
1052                                        mrec->skip.skip_beg.rec_type,
1053                                        mrec->skip.skip_end.localization,
1054                                        (uintmax_t)mrec->skip.skip_end.obj_id,
1055                                        (uintmax_t)mrec->skip.skip_end.key,
1056                                        mrec->skip.skip_end.rec_type);
1057                         default:
1058                                 break;
1059                         }
1060                         offset += bytes;
1061                 }
1062         }
1063
1064         /*
1065          * Read and process the termination sync record.
1066          */
1067         mrec = read_mrecord(0, &error, &pickup);
1068         if (mrec == NULL ||
1069             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
1070              mrec->head.type != HAMMER_MREC_TYPE_IDLE)) {
1071                 fprintf(stderr, "Mirror-dump: Did not get termination "
1072                                 "sync record\n");
1073         }
1074         free(mrec);
1075
1076         /*
1077          * Continue with more batches until EOF.
1078          */
1079         mrec = read_mrecord(0, &error, &pickup);
1080         if (mrec) {
1081                 free(mrec);
1082                 goto again;
1083         }
1084         free(buf);
1085 }
1086
1087 void
1088 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
1089 {
1090         pid_t pid1;
1091         pid_t pid2;
1092         int fds[2];
1093         const char *xav[32];
1094         char tbuf[16];
1095         char *sh, *user, *host, *rfs;
1096         int xac;
1097
1098         if (ac != 2)
1099                 mirror_usage(1);
1100
1101         TwoWayPipeOpt = 1;
1102         signal(SIGPIPE, SIG_IGN);
1103
1104 again:
1105         if (pipe(fds) < 0) {
1106                 perror("pipe");
1107                 exit(1);
1108         }
1109
1110         /*
1111          * Source
1112          */
1113         if ((pid1 = fork()) == 0) {
1114                 signal(SIGPIPE, SIG_DFL);
1115                 dup2(fds[0], 0);
1116                 dup2(fds[0], 1);
1117                 close(fds[0]);
1118                 close(fds[1]);
1119                 if ((rfs = strchr(av[0], ':')) != NULL) {
1120                         xac = 0;
1121
1122                         if((sh = getenv("HAMMER_RSH")) == NULL)
1123                                 xav[xac++] = "ssh";
1124                         else
1125                                 xav[xac++] = sh;
1126
1127                         if (CompressOpt)
1128                                 xav[xac++] = "-C";
1129
1130                         if ((host = strchr(av[0], '@')) != NULL) {
1131                                 user = strndup( av[0], (host++ - av[0]));
1132                                 host = strndup( host, (rfs++ - host));
1133                                 xav[xac++] = "-l";
1134                                 xav[xac++] = user;
1135                                 xav[xac++] = host;
1136                         }
1137                         else {
1138                                 host = strndup( av[0], (rfs++ - av[0]));
1139                                 user = NULL;
1140                                 xav[xac++] = host;
1141                         }
1142
1143
1144                         if (SshPort) {
1145                                 xav[xac++] = "-p";
1146                                 xav[xac++] = SshPort;
1147                         }
1148
1149                         xav[xac++] = "hammer";
1150
1151                         switch(VerboseOpt) {
1152                         case 0:
1153                                 break;
1154                         case 1:
1155                                 xav[xac++] = "-v";
1156                                 break;
1157                         case 2:
1158                                 xav[xac++] = "-vv";
1159                                 break;
1160                         default:
1161                                 xav[xac++] = "-vvv";
1162                                 break;
1163                         }
1164                         if (ForceYesOpt) {
1165                                 xav[xac++] = "-y";
1166                         }
1167                         xav[xac++] = "-2";
1168                         if (TimeoutOpt) {
1169                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
1170                                 xav[xac++] = "-t";
1171                                 xav[xac++] = tbuf;
1172                         }
1173                         if (SplitupOptStr) {
1174                                 xav[xac++] = "-S";
1175                                 xav[xac++] = SplitupOptStr;
1176                         }
1177                         if (streaming)
1178                                 xav[xac++] = "mirror-read-stream";
1179                         else
1180                                 xav[xac++] = "mirror-read";
1181                         xav[xac++] = rfs;
1182                         xav[xac++] = NULL;
1183                         execvp(*xav, (void *)xav);
1184                 } else {
1185                         hammer_cmd_mirror_read(av, 1, streaming);
1186                         fflush(stdout);
1187                         fflush(stderr);
1188                 }
1189                 _exit(1);
1190         }
1191
1192         /*
1193          * Target
1194          */
1195         if ((pid2 = fork()) == 0) {
1196                 signal(SIGPIPE, SIG_DFL);
1197                 dup2(fds[1], 0);
1198                 dup2(fds[1], 1);
1199                 close(fds[0]);
1200                 close(fds[1]);
1201                 if ((rfs = strchr(av[1], ':')) != NULL) {
1202                         xac = 0;
1203
1204                         if((sh = getenv("HAMMER_RSH")) == NULL)
1205                                 xav[xac++] = "ssh";
1206                         else
1207                                 xav[xac++] = sh;
1208
1209                         if (CompressOpt)
1210                                 xav[xac++] = "-C";
1211
1212                         if ((host = strchr(av[1], '@')) != NULL) {
1213                                 user = strndup( av[1], (host++ - av[1]));
1214                                 host = strndup( host, (rfs++ - host));
1215                                 xav[xac++] = "-l";
1216                                 xav[xac++] = user;
1217                                 xav[xac++] = host;
1218                         }
1219                         else {
1220                                 host = strndup( av[1], (rfs++ - av[1]));
1221                                 user = NULL;
1222                                 xav[xac++] = host;
1223                         }
1224
1225                         if (SshPort) {
1226                                 xav[xac++] = "-p";
1227                                 xav[xac++] = SshPort;
1228                         }
1229
1230                         xav[xac++] = "hammer";
1231
1232                         switch(VerboseOpt) {
1233                         case 0:
1234                                 break;
1235                         case 1:
1236                                 xav[xac++] = "-v";
1237                                 break;
1238                         case 2:
1239                                 xav[xac++] = "-vv";
1240                                 break;
1241                         default:
1242                                 xav[xac++] = "-vvv";
1243                                 break;
1244                         }
1245                         if (ForceYesOpt) {
1246                                 xav[xac++] = "-y";
1247                         }
1248                         xav[xac++] = "-2";
1249                         xav[xac++] = "mirror-write";
1250                         xav[xac++] = rfs;
1251                         xav[xac++] = NULL;
1252                         execvp(*xav, (void *)xav);
1253                 } else {
1254                         hammer_cmd_mirror_write(av + 1, 1);
1255                         fflush(stdout);
1256                         fflush(stderr);
1257                 }
1258                 _exit(1);
1259         }
1260         close(fds[0]);
1261         close(fds[1]);
1262
1263         while (waitpid(pid1, NULL, 0) <= 0)
1264                 ;
1265         while (waitpid(pid2, NULL, 0) <= 0)
1266                 ;
1267
1268         /*
1269          * If the link is lost restart
1270          */
1271         if (streaming) {
1272                 if (VerboseOpt) {
1273                         fprintf(stderr, "\nLost Link\n");
1274                         fflush(stderr);
1275                 }
1276                 sleep(15 + DelayOpt);
1277                 goto again;
1278         }
1279
1280 }
1281
1282 /*
1283  * Read and return multiple mrecords
1284  */
1285 static int
1286 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
1287 {
1288         hammer_ioc_mrecord_any_t mrec;
1289         u_int count;
1290         size_t n;
1291         size_t i;
1292         size_t bytes;
1293         int type;
1294
1295         count = 0;
1296         while (size - count >= HAMMER_MREC_HEADSIZE) {
1297                 /*
1298                  * Cached the record header in case we run out of buffer
1299                  * space.
1300                  */
1301                 fflush(stdout);
1302                 if (pickup->signature == 0) {
1303                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
1304                                 i = read(fd, (char *)pickup + n,
1305                                          HAMMER_MREC_HEADSIZE - n);
1306                                 if (i <= 0)
1307                                         break;
1308                         }
1309                         if (n == 0)
1310                                 break;
1311                         if (n != HAMMER_MREC_HEADSIZE) {
1312                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
1313                                 exit(1);
1314                         }
1315                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
1316                                 fprintf(stderr, "read_mrecords: malformed record on pipe, "
1317                                         "bad signature\n");
1318                                 exit(1);
1319                         }
1320                 }
1321                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
1322                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
1323                         fprintf(stderr, "read_mrecords: malformed record on pipe, "
1324                                 "illegal rec_size\n");
1325                         exit(1);
1326                 }
1327
1328                 /*
1329                  * Stop if we have insufficient space for the record and data.
1330                  */
1331                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
1332                 if (size - count < bytes)
1333                         break;
1334
1335                 /*
1336                  * Stop if the record type is not a REC, SKIP, or PASS,
1337                  * which are the only types the ioctl supports.  Other types
1338                  * are used only by the userland protocol.
1339                  *
1340                  * Ignore all flags.
1341                  */
1342                 type = pickup->type & HAMMER_MRECF_TYPE_LOMASK;
1343                 if (type != HAMMER_MREC_TYPE_PFSD &&
1344                     type != HAMMER_MREC_TYPE_REC &&
1345                     type != HAMMER_MREC_TYPE_SKIP &&
1346                     type != HAMMER_MREC_TYPE_PASS) {
1347                         break;
1348                 }
1349
1350                 /*
1351                  * Read the remainder and clear the pickup signature.
1352                  */
1353                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
1354                         i = read(fd, buf + count + n, bytes - n);
1355                         if (i <= 0)
1356                                 break;
1357                 }
1358                 if (n != bytes) {
1359                         fprintf(stderr, "read_mrecords: short read on pipe\n");
1360                         exit(1);
1361                 }
1362
1363                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
1364                 pickup->signature = 0;
1365                 pickup->type = 0;
1366                 mrec = (void *)(buf + count);
1367
1368                 /*
1369                  * Validate the completed record
1370                  */
1371                 if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) {
1372                         fprintf(stderr, "read_mrecords: malformed record "
1373                                         "on pipe, bad crc\n");
1374                         exit(1);
1375                 }
1376
1377                 /*
1378                  * If its a B-Tree record validate the data crc.
1379                  *
1380                  * NOTE: If the VFS passes us an explicitly errorde mrec
1381                  *       we just pass it through.
1382                  */
1383                 type = mrec->head.type & HAMMER_MRECF_TYPE_MASK;
1384
1385                 if (type == HAMMER_MREC_TYPE_REC) {
1386                         if (mrec->head.rec_size <
1387                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
1388                                 fprintf(stderr,
1389                                         "read_mrecords: malformed record on "
1390                                         "pipe, illegal element data_len\n");
1391                                 exit(1);
1392                         }
1393                         if (mrec->rec.leaf.data_len &&
1394                             mrec->rec.leaf.data_offset &&
1395                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
1396                                 fprintf(stderr,
1397                                         "read_mrecords: data_crc did not "
1398                                         "match data! obj=%016jx key=%016jx\n",
1399                                         (uintmax_t)mrec->rec.leaf.base.obj_id,
1400                                         (uintmax_t)mrec->rec.leaf.base.key);
1401                                 fprintf(stderr,
1402                                         "continuing, but there are problems\n");
1403                         }
1404                 }
1405                 count += bytes;
1406         }
1407         return(count);
1408 }
1409
1410 /*
1411  * Read and return a single mrecord.
1412  */
1413 static
1414 hammer_ioc_mrecord_any_t
1415 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
1416 {
1417         hammer_ioc_mrecord_any_t mrec;
1418         struct hammer_ioc_mrecord_head mrechd;
1419         size_t bytes;
1420         size_t n;
1421         size_t i;
1422
1423         if (pickup && pickup->type != 0) {
1424                 mrechd = *pickup;
1425                 pickup->signature = 0;
1426                 pickup->type = 0;
1427                 n = HAMMER_MREC_HEADSIZE;
1428         } else {
1429                 /*
1430                  * Read in the PFSD header from the sender.
1431                  */
1432                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
1433                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
1434                         if (i <= 0)
1435                                 break;
1436                 }
1437                 if (n == 0) {
1438                         *errorp = 0;    /* EOF */
1439                         return(NULL);
1440                 }
1441                 if (n != HAMMER_MREC_HEADSIZE) {
1442                         fprintf(stderr, "short read of mrecord header\n");
1443                         *errorp = EPIPE;
1444                         return(NULL);
1445                 }
1446         }
1447         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
1448                 fprintf(stderr, "read_mrecord: bad signature\n");
1449                 *errorp = EINVAL;
1450                 return(NULL);
1451         }
1452         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
1453         assert(bytes >= sizeof(mrechd));
1454         mrec = malloc(bytes);
1455         mrec->head = mrechd;
1456
1457         while (n < bytes) {
1458                 i = read(fdin, (char *)mrec + n, bytes - n);
1459                 if (i <= 0)
1460                         break;
1461                 n += i;
1462         }
1463         if (n != bytes) {
1464                 fprintf(stderr, "read_mrecord: short read on payload\n");
1465                 *errorp = EPIPE;
1466                 return(NULL);
1467         }
1468         if (!hammer_crc_test_mrec_head(&mrec->head, mrec->head.rec_size)) {
1469                 fprintf(stderr, "read_mrecord: bad CRC\n");
1470                 *errorp = EINVAL;
1471                 return(NULL);
1472         }
1473         *errorp = 0;
1474         return(mrec);
1475 }
1476
1477 static
1478 void
1479 write_mrecord(int fdout, uint32_t type, hammer_ioc_mrecord_any_t mrec,
1480               int bytes)
1481 {
1482         char zbuf[HAMMER_HEAD_ALIGN];
1483         int pad;
1484
1485         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
1486
1487         assert(bytes >= (int)sizeof(mrec->head));
1488         bzero(&mrec->head, sizeof(mrec->head));
1489         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
1490         mrec->head.type = type;
1491         mrec->head.rec_size = bytes;
1492         hammer_crc_set_mrec_head(&mrec->head, bytes);
1493         if (write(fdout, mrec, bytes) != bytes) {
1494                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
1495                         errno, strerror(errno));
1496                 exit(1);
1497         }
1498         if (pad) {
1499                 bzero(zbuf, pad);
1500                 if (write(fdout, zbuf, pad) != pad) {
1501                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
1502                                 errno, strerror(errno));
1503                         exit(1);
1504                 }
1505         }
1506 }
1507
1508 /*
1509  * Generate a mirroring header with the pfs information of the
1510  * originating filesytem.
1511  */
1512 static void
1513 generate_mrec_header(int fd, int pfs_id,
1514                      union hammer_ioc_mrecord_any *mrec_tmp)
1515 {
1516         struct hammer_ioc_pseudofs_rw pfs;
1517
1518         bzero(&pfs, sizeof(pfs));
1519         bzero(mrec_tmp, sizeof(*mrec_tmp));
1520         pfs.pfs_id = pfs_id;
1521         pfs.ondisk = &mrec_tmp->pfs.pfsd;
1522         pfs.bytes = sizeof(mrec_tmp->pfs.pfsd);
1523         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1524                 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
1525                 exit(1);
1526         }
1527         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
1528                 fprintf(stderr, "Mirror-read: HAMMER PFS version mismatch!\n");
1529                 exit(1);
1530         }
1531         mrec_tmp->pfs.version = pfs.version;
1532 }
1533
1534 /*
1535  * Validate the pfs information from the originating filesystem
1536  * against the target filesystem.  shared_uuid must match.
1537  *
1538  * return -1 if we got a TERM record
1539  */
1540 static int
1541 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
1542                      struct hammer_ioc_mrecord_head *pickup,
1543                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
1544 {
1545         struct hammer_ioc_pseudofs_rw pfs;
1546         struct hammer_pseudofs_data pfsd;
1547         hammer_ioc_mrecord_any_t mrec;
1548         int error;
1549
1550         /*
1551          * Get the PFSD info from the target filesystem.
1552          */
1553         bzero(&pfs, sizeof(pfs));
1554         bzero(&pfsd, sizeof(pfsd));
1555         pfs.pfs_id = pfs_id;
1556         pfs.ondisk = &pfsd;
1557         pfs.bytes = sizeof(pfsd);
1558         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1559                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
1560                 exit(1);
1561         }
1562         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
1563                 fprintf(stderr, "mirror-write: HAMMER PFS version mismatch!\n");
1564                 exit(1);
1565         }
1566
1567         mrec = read_mrecord(fdin, &error, pickup);
1568         if (mrec == NULL) {
1569                 if (error == 0)
1570                         fprintf(stderr, "validate_mrec_header: short read\n");
1571                 exit(1);
1572         }
1573         if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
1574                 free(mrec);
1575                 return(-1);
1576         }
1577
1578         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
1579                 fprintf(stderr, "validate_mrec_header: did not get expected "
1580                                 "PFSD record type\n");
1581                 exit(1);
1582         }
1583         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
1584                 fprintf(stderr, "validate_mrec_header: unexpected payload "
1585                                 "size\n");
1586                 exit(1);
1587         }
1588         if (mrec->pfs.version != pfs.version) {
1589                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
1590                 exit(1);
1591         }
1592
1593         /*
1594          * Whew.  Ok, is the read PFS info compatible with the target?
1595          */
1596         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1597                  sizeof(pfsd.shared_uuid)) != 0) {
1598                 fprintf(stderr,
1599                         "mirror-write: source and target have "
1600                         "different shared-uuid's!\n");
1601                 exit(1);
1602         }
1603         if (is_target && hammer_is_pfs_master(&pfsd)) {
1604                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
1605                 exit(1);
1606         }
1607         if (tid_begp)
1608                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1609         if (tid_endp)
1610                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
1611         free(mrec);
1612         return(0);
1613 }
1614
1615 static void
1616 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1617 {
1618         struct hammer_ioc_pseudofs_rw pfs;
1619         struct hammer_pseudofs_data pfsd;
1620
1621         bzero(&pfs, sizeof(pfs));
1622         bzero(&pfsd, sizeof(pfsd));
1623         pfs.pfs_id = pfs_id;
1624         pfs.ondisk = &pfsd;
1625         pfs.bytes = sizeof(pfsd);
1626         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1627                 perror("update_pfs_snapshot (read)");
1628                 exit(1);
1629         }
1630         if (pfsd.sync_end_tid != snapshot_tid) {
1631                 pfsd.sync_end_tid = snapshot_tid;
1632                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
1633                         perror("update_pfs_snapshot (rewrite)");
1634                         exit(1);
1635                 }
1636                 if (VerboseOpt >= 2) {
1637                         fprintf(stderr,
1638                                 "Mirror-write: Completed, updated snapshot "
1639                                 "to %016jx\n",
1640                                 (uintmax_t)snapshot_tid);
1641                         fflush(stderr);
1642                 }
1643         }
1644 }
1645
1646 /*
1647  * Bandwidth-limited write in chunks
1648  */
1649 static
1650 ssize_t
1651 writebw(int fd, const void *buf, size_t nbytes,
1652         uint64_t *bwcount, struct timeval *tv1)
1653 {
1654         struct timeval tv2;
1655         size_t n;
1656         ssize_t r;
1657         ssize_t a;
1658         int usec;
1659
1660         a = 0;
1661         r = 0;
1662         while (nbytes) {
1663                 if (*bwcount + nbytes > BandwidthOpt)
1664                         n = BandwidthOpt - *bwcount;
1665                 else
1666                         n = nbytes;
1667                 if (n)
1668                         r = write(fd, buf, n);
1669                 if (r >= 0) {
1670                         a += r;
1671                         nbytes -= r;
1672                         buf = (const char *)buf + r;
1673                 }
1674                 if ((size_t)r != n)
1675                         break;
1676                 *bwcount += n;
1677                 if (*bwcount >= BandwidthOpt) {
1678                         gettimeofday(&tv2, NULL);
1679                         usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1680                                 (int)(tv2.tv_usec - tv1->tv_usec);
1681                         if (usec >= 0 && usec < 1000000)
1682                                 usleep(1000000 - usec);
1683                         gettimeofday(tv1, NULL);
1684                         *bwcount -= BandwidthOpt;
1685                 }
1686         }
1687         return(a ? a : r);
1688 }
1689
1690 /*
1691  * Get a yes or no answer from the terminal.  The program may be run as
1692  * part of a two-way pipe so we cannot use stdin for this operation.
1693  */
1694 static int
1695 getyntty(void)
1696 {
1697         char buf[256];
1698         FILE *fp;
1699         int result;
1700
1701         fp = fopen("/dev/tty", "r");
1702         if (fp == NULL) {
1703                 fprintf(stderr, "No terminal for response\n");
1704                 return(-1);
1705         }
1706         result = -1;
1707         while (fgets(buf, sizeof(buf), fp) != NULL) {
1708                 if (buf[0] == 'y' || buf[0] == 'Y') {
1709                         result = 1;
1710                         break;
1711                 }
1712                 if (buf[0] == 'n' || buf[0] == 'N') {
1713                         result = 0;
1714                         break;
1715                 }
1716                 fprintf(stderr, "Response not understood\n");
1717                 break;
1718         }
1719         fclose(fp);
1720         return(result);
1721 }
1722
1723 static void
1724 score_printf(size_t i, size_t w, const char *ctl, ...)
1725 {
1726         va_list va;
1727         size_t n;
1728         static size_t SSize;
1729         static int SFd = -1;
1730         static char ScoreBuf[1024];
1731
1732         if (ScoreBoardFile == NULL)
1733                 return;
1734         assert(i + w < sizeof(ScoreBuf));
1735         if (SFd < 0) {
1736                 SFd = open(ScoreBoardFile, O_RDWR|O_CREAT|O_TRUNC, 0644);
1737                 if (SFd < 0)
1738                         return;
1739                 SSize = 0;
1740         }
1741         for (n = 0; n < i; ++n) {
1742                 if (ScoreBuf[n] == 0)
1743                         ScoreBuf[n] = ' ';
1744         }
1745         va_start(va, ctl);
1746         vsnprintf(ScoreBuf + i, w - 1, ctl, va);
1747         va_end(va);
1748         n = strlen(ScoreBuf + i);
1749         while (n < w - 1) {
1750                 ScoreBuf[i + n] = ' ';
1751                 ++n;
1752         }
1753         ScoreBuf[i + n] = '\n';
1754         if (SSize < i + w)
1755                 SSize = i + w;
1756         pwrite(SFd, ScoreBuf, SSize, 0);
1757 }
1758
1759 static void
1760 hammer_check_restrict(const char *filesystem)
1761 {
1762         size_t rlen;
1763         int atslash;
1764
1765         if (RestrictTarget == NULL)
1766                 return;
1767         rlen = strlen(RestrictTarget);
1768         if (strncmp(filesystem, RestrictTarget, rlen) != 0) {
1769                 fprintf(stderr, "hammer-remote: restricted target\n");
1770                 exit(1);
1771         }
1772         atslash = 1;
1773         while (filesystem[rlen]) {
1774                 if (atslash &&
1775                     filesystem[rlen] == '.' &&
1776                     filesystem[rlen+1] == '.') {
1777                         fprintf(stderr, "hammer-remote: '..' not allowed\n");
1778                         exit(1);
1779                 }
1780                 if (filesystem[rlen] == '/')
1781                         atslash = 1;
1782                 else
1783                         atslash = 0;
1784                 ++rlen;
1785         }
1786 }
1787
1788 static void
1789 mirror_usage(int code)
1790 {
1791         fprintf(stderr,
1792                 "hammer mirror-read <filesystem> [begin-tid]\n"
1793                 "hammer mirror-read-stream <filesystem> [begin-tid]\n"
1794                 "hammer mirror-write <filesystem>\n"
1795                 "hammer mirror-dump [header]\n"
1796                 "hammer mirror-copy [[user@]host:]<filesystem>"
1797                                   " [[user@]host:]<filesystem>\n"
1798                 "hammer mirror-stream [[user@]host:]<filesystem>"
1799                                     " [[user@]host:]<filesystem>\n"
1800         );
1801         exit(code);
1802 }
1803