f55fd3c2969a7f98fe42cb766e912d3f61568654
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.13 2008/08/19 11:02:34 mneumann Exp $
35  */
36
37 #include "hammer.h"
38
39 #define SERIALBUF_SIZE  (512 * 1024)
40
41 static int read_mrecords(int fd, char *buf, u_int size,
42                          hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44                          hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46                          hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50                          struct hammer_ioc_mrecord_head *pickup,
51                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
53 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
54                         u_int64_t *bwcount, struct timeval *tv1);
55 static void mirror_usage(int code);
56
57 /*
58  * Generate a mirroring data stream from the specific source over the
59  * entire key range, but restricted to the specified transaction range.
60  *
61  * The HAMMER VFS does most of the work, we add a few new mrecord
62  * types to negotiate the TID ranges and verify that the entire
63  * stream made it to the destination.
64  */
65 void
66 hammer_cmd_mirror_read(char **av, int ac, int streaming)
67 {
68         struct hammer_ioc_mirror_rw mirror;
69         struct hammer_ioc_pseudofs_rw pfs;
70         union hammer_ioc_mrecord_any mrec_tmp;
71         struct hammer_ioc_mrecord_head pickup;
72         hammer_ioc_mrecord_any_t mrec;
73         hammer_tid_t sync_tid;
74         const char *filesystem;
75         char *buf = malloc(SERIALBUF_SIZE);
76         int interrupted = 0;
77         int error;
78         int fd;
79         int n;
80         int didwork;
81         int64_t total_bytes;
82         time_t base_t = time(NULL);
83         struct timeval bwtv;
84         u_int64_t bwcount;
85
86         if (ac > 2)
87                 mirror_usage(1);
88         filesystem = av[0];
89
90         pickup.signature = 0;
91         pickup.type = 0;
92
93 again:
94         bzero(&mirror, sizeof(mirror));
95         hammer_key_beg_init(&mirror.key_beg);
96         hammer_key_end_init(&mirror.key_end);
97
98         fd = getpfs(&pfs, filesystem);
99
100         if (streaming && VerboseOpt) {
101                 fprintf(stderr, "\nRunning");
102                 fflush(stderr);
103         }
104         total_bytes = 0;
105         gettimeofday(&bwtv, NULL);
106         bwcount = 0;
107
108         /*
109          * In 2-way mode the target will send us a PFS info packet
110          * first.  Use the target's current snapshot TID as our default
111          * begin TID.
112          */
113         mirror.tid_beg = 0;
114         if (TwoWayPipeOpt) {
115                 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
116                                          NULL, &mirror.tid_beg);
117                 if (n < 0) {    /* got TERM record */
118                         relpfs(fd, &pfs);
119                         return;
120                 }
121                 ++mirror.tid_beg;
122         }
123
124         /*
125          * Write out the PFS header, tid_beg will be updated if our PFS
126          * has a larger begin sync.  tid_end is set to the latest source
127          * TID whos flush cycle has completed.
128          */
129         generate_mrec_header(fd, 1, pfs.pfs_id,
130                              &mirror.tid_beg, &mirror.tid_end);
131
132         /* XXX streaming mode support w/ cycle or command line arg */
133         /*
134          * A cycle file overrides the beginning TID
135          */
136         hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
137
138         if (ac == 2)
139                 mirror.tid_beg = strtoull(av[1], NULL, 0);
140
141         if (streaming == 0 || VerboseOpt >= 2) {
142                 fprintf(stderr,
143                         "Mirror-read: Mirror from %016llx to %016llx\n",
144                         mirror.tid_beg, mirror.tid_end);
145         }
146         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
147                 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
148                         mirror.key_beg.obj_id);
149         }
150
151         /*
152          * Nothing to do if begin equals end.
153          */
154         if (mirror.tid_beg >= mirror.tid_end) {
155                 if (streaming == 0 || VerboseOpt >= 2)
156                         fprintf(stderr, "Mirror-read: No work to do\n");
157                 didwork = 0;
158                 goto done;
159         }
160         didwork = 1;
161
162         /*
163          * Write out bulk records
164          */
165         mirror.ubuf = buf;
166         mirror.size = SERIALBUF_SIZE;
167
168         do {
169                 mirror.count = 0;
170                 mirror.pfs_id = pfs.pfs_id;
171                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
172                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
173                         fprintf(stderr, "Mirror-read %s failed: %s\n",
174                                 filesystem, strerror(errno));
175                         exit(1);
176                 }
177                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
178                         fprintf(stderr,
179                                 "Mirror-read %s fatal error %d\n",
180                                 filesystem, mirror.head.error);
181                         exit(1);
182                 }
183                 if (mirror.count) {
184                         if (BandwidthOpt) {
185                                 n = writebw(1, mirror.ubuf, mirror.count,
186                                             &bwcount, &bwtv);
187                         } else {
188                                 n = write(1, mirror.ubuf, mirror.count);
189                         }
190                         if (n != mirror.count) {
191                                 fprintf(stderr, "Mirror-read %s failed: "
192                                                 "short write\n",
193                                 filesystem);
194                                 exit(1);
195                         }
196                 }
197                 total_bytes += mirror.count;
198                 if (streaming && VerboseOpt) {
199                         fprintf(stderr, "\r%016llx %11lld",
200                                 mirror.key_cur.obj_id,
201                                 total_bytes);
202                         fflush(stderr);
203                 }
204                 mirror.key_beg = mirror.key_cur;
205                 if (TimeoutOpt &&
206                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
207                         fprintf(stderr,
208                                 "Mirror-read %s interrupted by timer at"
209                                 " %016llx\n",
210                                 filesystem,
211                                 mirror.key_cur.obj_id);
212                         interrupted = 1;
213                         break;
214                 }
215         } while (mirror.count != 0);
216
217 done:
218         /*
219          * Write out the termination sync record - only if not interrupted
220          */
221         if (interrupted == 0) {
222                 if (didwork) {
223                         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
224                                       &mrec_tmp, sizeof(mrec_tmp.sync));
225                 } else {
226                         write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
227                                       &mrec_tmp, sizeof(mrec_tmp.sync));
228                 }
229         }
230
231         /*
232          * If the -2 option was given (automatic when doing mirror-copy),
233          * a two-way pipe is assumed and we expect a response mrec from
234          * the target.
235          */
236         if (TwoWayPipeOpt) {
237                 mrec = read_mrecord(0, &error, &pickup);
238                 if (mrec == NULL || 
239                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
240                     mrec->head.rec_size != sizeof(mrec->update)) {
241                         fprintf(stderr, "mirror_read: Did not get final "
242                                         "acknowledgement packet from target\n");
243                         exit(1);
244                 }
245                 if (interrupted) {
246                         if (CyclePath) {
247                                 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
248                                 fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath);
249                         }
250                 } else {
251                         sync_tid = mrec->update.tid;
252                         if (CyclePath) {
253                                 hammer_key_beg_init(&mirror.key_beg);
254                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
255                                 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
256                                         CyclePath, sync_tid);
257                         }
258                 }
259         } else if (CyclePath) {
260                 /* NOTE! mirror.tid_beg cannot be updated */
261                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
262                                 "fully updated unless you use mirror-copy\n");
263                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
264         }
265         if (streaming && interrupted == 0) {
266                 time_t t1 = time(NULL);
267                 time_t t2;
268
269                 if (VerboseOpt) {
270                         fprintf(stderr, " W");
271                         fflush(stderr);
272                 }
273                 pfs.ondisk->sync_end_tid = mirror.tid_end;
274                 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
275                         fprintf(stderr, "Mirror-read %s: cannot stream: %s\n",
276                                 filesystem, strerror(errno));
277                 } else {
278                         t2 = time(NULL) - t1;
279                         if (t2 >= 0 && t2 < DelayOpt) {
280                                 if (VerboseOpt) {
281                                         fprintf(stderr, "\bD");
282                                         fflush(stderr);
283                                 }
284                                 sleep(DelayOpt - t2);
285                         }
286                         if (VerboseOpt) {
287                                 fprintf(stderr, "\b ");
288                                 fflush(stderr);
289                         }
290                         relpfs(fd, &pfs);
291                         goto again;
292                 }
293         }
294         write_mrecord(1, HAMMER_MREC_TYPE_TERM,
295                       &mrec_tmp, sizeof(mrec_tmp.sync));
296         relpfs(fd, &pfs);
297         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
298 }
299
300 /*
301  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
302  * some additional packet types to negotiate TID ranges and to verify
303  * completion.  The HAMMER VFS does most of the work.
304  *
305  * It is important to note that the mirror.key_{beg,end} range must
306  * match the ranged used by the original.  For now both sides use
307  * range the entire key space.
308  *
309  * It is even more important that the records in the stream conform
310  * to the TID range also supplied in the stream.  The HAMMER VFS will
311  * use the REC, PASS, and SKIP record types to track the portions of
312  * the B-Tree being scanned in order to be able to proactively delete
313  * records on the target within those active areas that are not mentioned
314  * by the source.
315  *
316  * The mirror.key_cur field is used by the VFS to do this tracking.  It
317  * must be initialized to key_beg but then is persistently updated by
318  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
319  * field you will blow up the mirror target, possibly to the point of
320  * deleting everything.  As a safety measure the HAMMER VFS simply marks
321  * the records that the source has destroyed as deleted on the target,
322  * and normal pruning operations will deal with their final disposition
323  * at some later time.
324  */
325 void
326 hammer_cmd_mirror_write(char **av, int ac)
327 {
328         struct hammer_ioc_mirror_rw mirror;
329         const char *filesystem;
330         char *buf = malloc(SERIALBUF_SIZE);
331         struct hammer_ioc_pseudofs_rw pfs;
332         struct hammer_ioc_mrecord_head pickup;
333         struct hammer_ioc_synctid synctid;
334         union hammer_ioc_mrecord_any mrec_tmp;
335         hammer_ioc_mrecord_any_t mrec;
336         int error;
337         int fd;
338         int n;
339
340         if (ac > 2)
341                 mirror_usage(1);
342         filesystem = av[0];
343
344         pickup.signature = 0;
345         pickup.type = 0;
346
347 again:
348         bzero(&mirror, sizeof(mirror));
349         hammer_key_beg_init(&mirror.key_beg);
350         hammer_key_end_init(&mirror.key_end);
351         mirror.key_end = mirror.key_beg;
352
353         fd = getpfs(&pfs, filesystem);
354
355         /*
356          * In two-way mode the target writes out a PFS packet first.
357          * The source uses our tid_end as its tid_beg by default,
358          * picking up where it left off.
359          */
360         mirror.tid_beg = 0;
361         if (TwoWayPipeOpt) {
362                 generate_mrec_header(fd, 1, pfs.pfs_id,
363                                      &mirror.tid_beg, &mirror.tid_end);
364         }
365
366         /*
367          * Read and process the PFS header.  The source informs us of
368          * the TID range the stream represents.
369          */
370         n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
371                                  &mirror.tid_beg, &mirror.tid_end);
372         if (n < 0) {    /* got TERM record */
373                 relpfs(fd, &pfs);
374                 return;
375         }
376
377         mirror.ubuf = buf;
378         mirror.size = SERIALBUF_SIZE;
379
380         /*
381          * Read and process bulk records (REC, PASS, and SKIP types).
382          *
383          * On your life, do NOT mess with mirror.key_cur or your mirror
384          * target may become history.
385          */
386         for (;;) {
387                 mirror.count = 0;
388                 mirror.pfs_id = pfs.pfs_id;
389                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
390                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
391                 if (mirror.size <= 0)
392                         break;
393                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
394                         fprintf(stderr, "Mirror-write %s failed: %s\n",
395                                 filesystem, strerror(errno));
396                         exit(1);
397                 }
398                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
399                         fprintf(stderr,
400                                 "Mirror-write %s fatal error %d\n",
401                                 filesystem, mirror.head.error);
402                         exit(1);
403                 }
404 #if 0
405                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
406                         fprintf(stderr,
407                                 "Mirror-write %s interrupted by timer at"
408                                 " %016llx\n",
409                                 filesystem,
410                                 mirror.key_cur.obj_id);
411                         exit(0);
412                 }
413 #endif
414         }
415
416         /*
417          * Read and process the termination sync record.
418          */
419         mrec = read_mrecord(0, &error, &pickup);
420
421         if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
422                 fprintf(stderr, "Mirror-write: received termination request\n");
423                 free(mrec);
424                 return;
425         }
426
427         if (mrec == NULL || 
428             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
429              mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
430             mrec->head.rec_size != sizeof(mrec->sync)) {
431                 fprintf(stderr, "Mirror-write %s: Did not get termination "
432                                 "sync record, or rec_size is wrong rt=%d\n",
433                                 filesystem, mrec->head.type);
434                 exit(1);
435         }
436
437         /*
438          * Update the PFS info on the target so the user has visibility
439          * into the new snapshot, and sync the target filesystem.
440          */
441         if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
442                 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
443
444                 bzero(&synctid, sizeof(synctid));
445                 synctid.op = HAMMER_SYNCTID_SYNC2;
446                 ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
447
448                 if (VerboseOpt >= 2) {
449                         fprintf(stderr, "Mirror-write %s: succeeded\n",
450                                 filesystem);
451                 }
452         }
453
454         free(mrec);
455         mrec = NULL;
456
457         /*
458          * Report back to the originator.
459          */
460         if (TwoWayPipeOpt) {
461                 mrec_tmp.update.tid = mirror.tid_end;
462                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
463                               &mrec_tmp, sizeof(mrec_tmp.update));
464         } else {
465                 printf("Source can update synctid to 0x%016llx\n",
466                        mirror.tid_end);
467         }
468         relpfs(fd, &pfs);
469         goto again;
470 }
471
472 void
473 hammer_cmd_mirror_dump(void)
474 {
475         char *buf = malloc(SERIALBUF_SIZE);
476         struct hammer_ioc_mrecord_head pickup;
477         hammer_ioc_mrecord_any_t mrec;
478         int error;
479         int size;
480         int offset;
481         int bytes;
482
483         /*
484          * Read and process the PFS header 
485          */
486         pickup.signature = 0;
487         pickup.type = 0;
488
489         mrec = read_mrecord(0, &error, &pickup);
490
491         /*
492          * Read and process bulk records
493          */
494         for (;;) {
495                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
496                 if (size <= 0)
497                         break;
498                 offset = 0;
499                 while (offset < size) {
500                         mrec = (void *)((char *)buf + offset);
501                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
502                         if (offset + bytes > size) {
503                                 fprintf(stderr, "Misaligned record\n");
504                                 exit(1);
505                         }
506
507                         switch(mrec->head.type) {
508                         case HAMMER_MREC_TYPE_REC:
509                                 printf("Record obj=%016llx key=%016llx "
510                                        "rt=%02x ot=%02x\n",
511                                         mrec->rec.leaf.base.obj_id,
512                                         mrec->rec.leaf.base.key,
513                                         mrec->rec.leaf.base.rec_type,
514                                         mrec->rec.leaf.base.obj_type);
515                                 printf("       tids %016llx:%016llx data=%d\n",
516                                         mrec->rec.leaf.base.create_tid,
517                                         mrec->rec.leaf.base.delete_tid,
518                                         mrec->rec.leaf.data_len);
519                                 break;
520                         case HAMMER_MREC_TYPE_PASS:
521                                 printf("Pass   obj=%016llx key=%016llx "
522                                        "rt=%02x ot=%02x\n",
523                                         mrec->rec.leaf.base.obj_id,
524                                         mrec->rec.leaf.base.key,
525                                         mrec->rec.leaf.base.rec_type,
526                                         mrec->rec.leaf.base.obj_type);
527                                 printf("       tids %016llx:%016llx data=%d\n",
528                                         mrec->rec.leaf.base.create_tid,
529                                         mrec->rec.leaf.base.delete_tid,
530                                         mrec->rec.leaf.data_len);
531                                 break;
532                         case HAMMER_MREC_TYPE_SKIP:
533                                 printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
534                                        "       obj=%016llx key=%016llx rt=%02x\n",
535                                        mrec->skip.skip_beg.obj_id,
536                                        mrec->skip.skip_beg.key,
537                                        mrec->skip.skip_beg.rec_type,
538                                        mrec->skip.skip_end.obj_id,
539                                        mrec->skip.skip_end.key,
540                                        mrec->skip.skip_end.rec_type);
541                         default:
542                                 break;
543                         }
544                         offset += bytes;
545                 }
546         }
547
548         /*
549          * Read and process the termination sync record.
550          */
551         mrec = read_mrecord(0, &error, &pickup);
552         if (mrec == NULL || 
553             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
554              mrec->head.type != HAMMER_MREC_TYPE_IDLE)
555          ) {
556                 fprintf(stderr, "Mirror-dump: Did not get termination "
557                                 "sync record\n");
558         }
559 }
560
561 void
562 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
563 {
564         pid_t pid1;
565         pid_t pid2;
566         int fds[2];
567         const char *xav[16];
568         char tbuf[16];
569         char *ptr;
570         int xac;
571
572         if (ac != 2)
573                 mirror_usage(1);
574
575         if (pipe(fds) < 0) {
576                 perror("pipe");
577                 exit(1);
578         }
579
580         TwoWayPipeOpt = 1;
581
582         /*
583          * Source
584          */
585         if ((pid1 = fork()) == 0) {
586                 dup2(fds[0], 0);
587                 dup2(fds[0], 1);
588                 close(fds[0]);
589                 close(fds[1]);
590                 if ((ptr = strchr(av[0], ':')) != NULL) {
591                         *ptr++ = 0;
592                         xac = 0;
593                         xav[xac++] = "ssh";
594                         xav[xac++] = av[0];
595                         xav[xac++] = "hammer";
596
597                         switch(VerboseOpt) {
598                         case 0:
599                                 break;
600                         case 1:
601                                 xav[xac++] = "-v";
602                                 break;
603                         case 2:
604                                 xav[xac++] = "-vv";
605                                 break;
606                         default:
607                                 xav[xac++] = "-vvv";
608                                 break;
609                         }
610                         xav[xac++] = "-2";
611                         if (TimeoutOpt) {
612                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
613                                 xav[xac++] = "-t";
614                                 xav[xac++] = tbuf;
615                         }
616                         if (streaming)
617                                 xav[xac++] = "mirror-read-streaming";
618                         else
619                                 xav[xac++] = "mirror-read";
620                         xav[xac++] = ptr;
621                         xav[xac++] = NULL;
622                         execv("/usr/bin/ssh", (void *)xav);
623                 } else {
624                         hammer_cmd_mirror_read(av, 1, streaming);
625                         fflush(stdout);
626                         fflush(stderr);
627                 }
628                 _exit(1);
629         }
630
631         /*
632          * Target
633          */
634         if ((pid2 = fork()) == 0) {
635                 dup2(fds[1], 0);
636                 dup2(fds[1], 1);
637                 close(fds[0]);
638                 close(fds[1]);
639                 if ((ptr = strchr(av[1], ':')) != NULL) {
640                         *ptr++ = 0;
641                         xac = 0;
642                         xav[xac++] = "ssh";
643                         xav[xac++] = av[1];
644                         xav[xac++] = "hammer";
645
646                         switch(VerboseOpt) {
647                         case 0:
648                                 break;
649                         case 1:
650                                 xav[xac++] = "-v";
651                                 break;
652                         case 2:
653                                 xav[xac++] = "-vv";
654                                 break;
655                         default:
656                                 xav[xac++] = "-vvv";
657                                 break;
658                         }
659
660                         xav[xac++] = "-2";
661                         xav[xac++] = "mirror-write";
662                         xav[xac++] = ptr;
663                         xav[xac++] = NULL;
664                         execv("/usr/bin/ssh", (void *)xav);
665                 } else {
666                         hammer_cmd_mirror_write(av + 1, 1);
667                         fflush(stdout);
668                         fflush(stderr);
669                 }
670                 _exit(1);
671         }
672         close(fds[0]);
673         close(fds[1]);
674
675         while (waitpid(pid1, NULL, 0) <= 0)
676                 ;
677         while (waitpid(pid2, NULL, 0) <= 0)
678                 ;
679 }
680
681 /*
682  * Read and return multiple mrecords
683  */
684 static int
685 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
686 {
687         hammer_ioc_mrecord_any_t mrec;
688         u_int count;
689         size_t n;
690         size_t i;
691         size_t bytes;
692
693         count = 0;
694         while (size - count >= HAMMER_MREC_HEADSIZE) {
695                 /*
696                  * Cached the record header in case we run out of buffer
697                  * space.
698                  */
699                 fflush(stdout);
700                 if (pickup->signature == 0) {
701                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
702                                 i = read(fd, (char *)pickup + n,
703                                          HAMMER_MREC_HEADSIZE - n);
704                                 if (i <= 0)
705                                         break;
706                         }
707                         if (n == 0)
708                                 break;
709                         if (n != HAMMER_MREC_HEADSIZE) {
710                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
711                                 exit(1);
712                         }
713
714                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
715                                 fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n");
716                                 exit(1);
717                         }
718                 }
719                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
720                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
721                         fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n");
722                         exit(1);
723                 }
724
725                 /*
726                  * Stop if we have insufficient space for the record and data.
727                  */
728                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
729                 if (size - count < bytes)
730                         break;
731
732                 /*
733                  * Stop if the record type is not a REC or a SKIP (the only
734                  * two types the ioctl supports.  Other types are used only
735                  * by the userland protocol).
736                  */
737                 if (pickup->type != HAMMER_MREC_TYPE_REC &&
738                     pickup->type != HAMMER_MREC_TYPE_SKIP &&
739                     pickup->type != HAMMER_MREC_TYPE_PASS) {
740                         break;
741                 }
742
743                 /*
744                  * Read the remainder and clear the pickup signature.
745                  */
746                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
747                         i = read(fd, buf + count + n, bytes - n);
748                         if (i <= 0)
749                                 break;
750                 }
751                 if (n != bytes) {
752                         fprintf(stderr, "read_mrecords: short read on pipe\n");
753                         exit(1);
754                 }
755
756                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
757                 pickup->signature = 0;
758                 pickup->type = 0;
759                 mrec = (void *)(buf + count);
760
761                 /*
762                  * Validate the completed record
763                  */
764                 if (mrec->head.rec_crc !=
765                     crc32((char *)mrec + HAMMER_MREC_CRCOFF,
766                           mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
767                         fprintf(stderr, "read_mrecords: malformed record "
768                                         "on pipe, bad crc\n");
769                         exit(1);
770                 }
771
772                 /*
773                  * If its a B-Tree record validate the data crc
774                  */
775                 if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
776                         if (mrec->head.rec_size <
777                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
778                                 fprintf(stderr, 
779                                         "read_mrecords: malformed record on "
780                                         "pipe, illegal element data_len\n");
781                                 exit(1);
782                         }
783                         if (mrec->rec.leaf.data_len &&
784                             mrec->rec.leaf.data_offset &&
785                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
786                                 fprintf(stderr,
787                                         "read_mrecords: data_crc did not "
788                                         "match data! obj=%016llx key=%016llx\n",
789                                         mrec->rec.leaf.base.obj_id,
790                                         mrec->rec.leaf.base.key);
791                                 fprintf(stderr,
792                                         "continuing, but there are problems\n");
793                         }
794                 }
795                 count += bytes;
796         }
797         return(count);
798 }
799
800 /*
801  * Read and return a single mrecord.
802  */
803 static
804 hammer_ioc_mrecord_any_t
805 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
806 {
807         hammer_ioc_mrecord_any_t mrec;
808         struct hammer_ioc_mrecord_head mrechd;
809         size_t bytes;
810         size_t n;
811         size_t i;
812
813         if (pickup && pickup->type != 0) {
814                 mrechd = *pickup;
815                 pickup->signature = 0;
816                 pickup->type = 0;
817                 n = HAMMER_MREC_HEADSIZE;
818         } else {
819                 /*
820                  * Read in the PFSD header from the sender.
821                  */
822                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
823                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
824                         if (i <= 0)
825                                 break;
826                 }
827                 if (n == 0) {
828                         *errorp = 0;    /* EOF */
829                         return(NULL);
830                 }
831                 if (n != HAMMER_MREC_HEADSIZE) {
832                         fprintf(stderr, "short read of mrecord header\n");
833                         *errorp = EPIPE;
834                         return(NULL);
835                 }
836         }
837         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
838                 fprintf(stderr, "read_mrecord: bad signature\n");
839                 *errorp = EINVAL;
840                 return(NULL);
841         }
842         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
843         assert(bytes >= sizeof(mrechd));
844         mrec = malloc(bytes);
845         mrec->head = mrechd;
846
847         while (n < bytes) {
848                 i = read(fdin, (char *)mrec + n, bytes - n);
849                 if (i <= 0)
850                         break;
851                 n += i;
852         }
853         if (n != bytes) {
854                 fprintf(stderr, "read_mrecord: short read on payload\n");
855                 *errorp = EPIPE;
856                 return(NULL);
857         }
858         if (mrec->head.rec_crc != 
859             crc32((char *)mrec + HAMMER_MREC_CRCOFF,
860                   mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
861                 fprintf(stderr, "read_mrecord: bad CRC\n");
862                 *errorp = EINVAL;
863                 return(NULL);
864         }
865         *errorp = 0;
866         return(mrec);
867 }
868
869 static
870 void
871 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
872               int bytes)
873 {
874         char zbuf[HAMMER_HEAD_ALIGN];
875         int pad;
876
877         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
878
879         assert(bytes >= (int)sizeof(mrec->head));
880         bzero(&mrec->head, sizeof(mrec->head));
881         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
882         mrec->head.type = type;
883         mrec->head.rec_size = bytes;
884         mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
885                                    bytes - HAMMER_MREC_CRCOFF);
886         if (write(fdout, mrec, bytes) != bytes) {
887                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
888                         errno, strerror(errno));
889                 exit(1);
890         }
891         if (pad) {
892                 bzero(zbuf, pad);
893                 if (write(fdout, zbuf, pad) != pad) {
894                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
895                                 errno, strerror(errno));
896                         exit(1);
897                 }
898         }
899 }
900
901 /*
902  * Generate a mirroring header with the pfs information of the
903  * originating filesytem.
904  */
905 static void
906 generate_mrec_header(int fd, int fdout, int pfs_id,
907                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
908 {
909         struct hammer_ioc_pseudofs_rw pfs;
910         union hammer_ioc_mrecord_any mrec_tmp;
911
912         bzero(&pfs, sizeof(pfs));
913         bzero(&mrec_tmp, sizeof(mrec_tmp));
914         pfs.pfs_id = pfs_id;
915         pfs.ondisk = &mrec_tmp.pfs.pfsd;
916         pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
917         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
918                 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
919                 exit(1);
920         }
921         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
922                 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n");
923                 exit(1);
924         }
925
926         /*
927          * sync_beg_tid - lowest TID on source after which a full history
928          *                is available.
929          *
930          * sync_end_tid - highest fully synchronized TID from source.
931          */
932         if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
933                 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
934         if (tid_endp)
935                 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
936         mrec_tmp.pfs.version = pfs.version;
937         write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
938                       &mrec_tmp, sizeof(mrec_tmp.pfs));
939 }
940
941 /*
942  * Validate the pfs information from the originating filesystem
943  * against the target filesystem.  shared_uuid must match.
944  *
945  * return -1 if we got a TERM record
946  */
947 static int
948 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
949                      struct hammer_ioc_mrecord_head *pickup,
950                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
951 {
952         struct hammer_ioc_pseudofs_rw pfs;
953         struct hammer_pseudofs_data pfsd;
954         hammer_ioc_mrecord_any_t mrec;
955         int error;
956
957         /*
958          * Get the PFSD info from the target filesystem.
959          */
960         bzero(&pfs, sizeof(pfs));
961         bzero(&pfsd, sizeof(pfsd));
962         pfs.pfs_id = pfs_id;
963         pfs.ondisk = &pfsd;
964         pfs.bytes = sizeof(pfsd);
965         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
966                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
967                 exit(1);
968         }
969         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
970                 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
971                 exit(1);
972         }
973
974         mrec = read_mrecord(fdin, &error, pickup);
975         if (mrec == NULL) {
976                 if (error == 0)
977                         fprintf(stderr, "validate_mrec_header: short read\n");
978                 exit(1);
979         }
980         if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
981                 free(mrec);
982                 return(-1);
983         }
984
985         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
986                 fprintf(stderr, "validate_mrec_header: did not get expected "
987                                 "PFSD record type\n");
988                 exit(1);
989         }
990         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
991                 fprintf(stderr, "validate_mrec_header: unexpected payload "
992                                 "size\n");
993                 exit(1);
994         }
995         if (mrec->pfs.version != pfs.version) {
996                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
997                 exit(1);
998         }
999
1000         /*
1001          * Whew.  Ok, is the read PFS info compatible with the target?
1002          */
1003         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1004                  sizeof(pfsd.shared_uuid)) != 0) {
1005                 fprintf(stderr, 
1006                         "mirror-write: source and target have "
1007                         "different shared-uuid's!\n");
1008                 exit(1);
1009         }
1010         if (is_target &&
1011             (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
1012                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
1013                 exit(1);
1014         }
1015         if (tid_begp)
1016                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1017         if (tid_endp)
1018                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
1019         free(mrec);
1020         return(0);
1021 }
1022
1023 static void
1024 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1025 {
1026         struct hammer_ioc_pseudofs_rw pfs;
1027         struct hammer_pseudofs_data pfsd;
1028
1029         bzero(&pfs, sizeof(pfs));
1030         bzero(&pfsd, sizeof(pfsd));
1031         pfs.pfs_id = pfs_id;
1032         pfs.ondisk = &pfsd;
1033         pfs.bytes = sizeof(pfsd);
1034         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1035                 perror("update_pfs_snapshot (read)");
1036                 exit(1);
1037         }
1038         if (pfsd.sync_end_tid != snapshot_tid) {
1039                 pfsd.sync_end_tid = snapshot_tid;
1040                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
1041                         perror("update_pfs_snapshot (rewrite)");
1042                         exit(1);
1043                 }
1044                 if (VerboseOpt >= 2) {
1045                         fprintf(stderr,
1046                                 "Mirror-write: Completed, updated snapshot "
1047                                 "to %016llx\n",
1048                                 snapshot_tid);
1049                 }
1050         }
1051 }
1052
1053 /*
1054  * Bandwidth-limited write in chunks
1055  */
1056 static
1057 ssize_t
1058 writebw(int fd, const void *buf, size_t nbytes,
1059         u_int64_t *bwcount, struct timeval *tv1)
1060 {
1061         struct timeval tv2;
1062         size_t n;
1063         ssize_t r;
1064         ssize_t a;
1065         int usec;
1066
1067         a = 0;
1068         r = 0;
1069         while (nbytes) {
1070                 if (*bwcount + nbytes > BandwidthOpt)
1071                         n = BandwidthOpt - *bwcount;
1072                 else
1073                         n = nbytes;
1074                 if (n)
1075                         r = write(fd, buf, n);
1076                 if (r >= 0) {
1077                         a += r;
1078                         nbytes -= r;
1079                         buf = (const char *)buf + r;
1080                 }
1081                 if ((size_t)r != n)
1082                         break;
1083                 *bwcount += n;
1084                 if (*bwcount >= BandwidthOpt) {
1085                         gettimeofday(&tv2, NULL);
1086                         usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1087                                 (int)(tv2.tv_usec - tv1->tv_usec);
1088                         if (usec >= 0 && usec < 1000000)
1089                                 usleep(1000000 - usec);
1090                         gettimeofday(tv1, NULL);
1091                         *bwcount -= BandwidthOpt;
1092                 }
1093         }
1094         return(a ? a : r);
1095 }
1096
1097 static void
1098 mirror_usage(int code)
1099 {
1100         fprintf(stderr, 
1101                 "hammer mirror-read <filesystem>\n"
1102                 "hammer mirror-write <filesystem>\n"
1103                 "hammer mirror-dump\n"
1104                 "hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n"
1105                 "hammer mirror-stream [[user@]host:]fs [[user@]host:]fs\n"
1106         );
1107         exit(code);
1108 }