Merge from vendor branch OPENSSL:
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.14 2008/08/21 23:28:43 thomas Exp $
35  */
36
37 #include "hammer.h"
38
39 #define SERIALBUF_SIZE  (512 * 1024)
40
41 static int read_mrecords(int fd, char *buf, u_int size,
42                          hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44                          hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46                          hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static int validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50                          struct hammer_ioc_mrecord_head *pickup,
51                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
52 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
53 static ssize_t writebw(int fd, const void *buf, size_t nbytes,
54                         u_int64_t *bwcount, struct timeval *tv1);
55 static void mirror_usage(int code);
56
57 /*
58  * Generate a mirroring data stream from the specific source over the
59  * entire key range, but restricted to the specified transaction range.
60  *
61  * The HAMMER VFS does most of the work, we add a few new mrecord
62  * types to negotiate the TID ranges and verify that the entire
63  * stream made it to the destination.
64  */
65 void
66 hammer_cmd_mirror_read(char **av, int ac, int streaming)
67 {
68         struct hammer_ioc_mirror_rw mirror;
69         struct hammer_ioc_pseudofs_rw pfs;
70         union hammer_ioc_mrecord_any mrec_tmp;
71         struct hammer_ioc_mrecord_head pickup;
72         hammer_ioc_mrecord_any_t mrec;
73         hammer_tid_t sync_tid;
74         const char *filesystem;
75         char *buf = malloc(SERIALBUF_SIZE);
76         int interrupted = 0;
77         int error;
78         int fd;
79         int n;
80         int didwork;
81         int64_t total_bytes;
82         time_t base_t = time(NULL);
83         struct timeval bwtv;
84         u_int64_t bwcount;
85
86         if (ac == 0 || ac > 2)
87                 mirror_usage(1);
88         filesystem = av[0];
89
90         pickup.signature = 0;
91         pickup.type = 0;
92
93 again:
94         bzero(&mirror, sizeof(mirror));
95         hammer_key_beg_init(&mirror.key_beg);
96         hammer_key_end_init(&mirror.key_end);
97
98         fd = getpfs(&pfs, filesystem);
99
100         if (streaming && VerboseOpt) {
101                 fprintf(stderr, "\nRunning");
102                 fflush(stderr);
103         }
104         total_bytes = 0;
105         gettimeofday(&bwtv, NULL);
106         bwcount = 0;
107
108         /*
109          * In 2-way mode the target will send us a PFS info packet
110          * first.  Use the target's current snapshot TID as our default
111          * begin TID.
112          */
113         mirror.tid_beg = 0;
114         if (TwoWayPipeOpt) {
115                 n = validate_mrec_header(fd, 0, 0, pfs.pfs_id, &pickup,
116                                          NULL, &mirror.tid_beg);
117                 if (n < 0) {    /* got TERM record */
118                         relpfs(fd, &pfs);
119                         return;
120                 }
121                 ++mirror.tid_beg;
122         }
123
124         /*
125          * Write out the PFS header, tid_beg will be updated if our PFS
126          * has a larger begin sync.  tid_end is set to the latest source
127          * TID whos flush cycle has completed.
128          */
129         generate_mrec_header(fd, 1, pfs.pfs_id,
130                              &mirror.tid_beg, &mirror.tid_end);
131
132         /* XXX streaming mode support w/ cycle or command line arg */
133         /*
134          * A cycle file overrides the beginning TID
135          */
136         hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
137
138         if (ac == 2)
139                 mirror.tid_beg = strtoull(av[1], NULL, 0);
140
141         if (streaming == 0 || VerboseOpt >= 2) {
142                 fprintf(stderr,
143                         "Mirror-read: Mirror from %016llx to %016llx\n",
144                         mirror.tid_beg, mirror.tid_end);
145         }
146         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
147                 fprintf(stderr, "Mirror-read: Resuming at object %016llx\n",
148                         mirror.key_beg.obj_id);
149         }
150
151         /*
152          * Nothing to do if begin equals end.
153          */
154         if (mirror.tid_beg >= mirror.tid_end) {
155                 if (streaming == 0 || VerboseOpt >= 2)
156                         fprintf(stderr, "Mirror-read: No work to do\n");
157                 didwork = 0;
158                 goto done;
159         }
160         didwork = 1;
161
162         /*
163          * Write out bulk records
164          */
165         mirror.ubuf = buf;
166         mirror.size = SERIALBUF_SIZE;
167
168         do {
169                 mirror.count = 0;
170                 mirror.pfs_id = pfs.pfs_id;
171                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
172                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
173                         fprintf(stderr, "Mirror-read %s failed: %s\n",
174                                 filesystem, strerror(errno));
175                         exit(1);
176                 }
177                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
178                         fprintf(stderr,
179                                 "Mirror-read %s fatal error %d\n",
180                                 filesystem, mirror.head.error);
181                         exit(1);
182                 }
183                 if (mirror.count) {
184                         if (BandwidthOpt) {
185                                 n = writebw(1, mirror.ubuf, mirror.count,
186                                             &bwcount, &bwtv);
187                         } else {
188                                 n = write(1, mirror.ubuf, mirror.count);
189                         }
190                         if (n != mirror.count) {
191                                 fprintf(stderr, "Mirror-read %s failed: "
192                                                 "short write\n",
193                                 filesystem);
194                                 exit(1);
195                         }
196                 }
197                 total_bytes += mirror.count;
198                 if (streaming && VerboseOpt) {
199                         fprintf(stderr, "\r%016llx %11lld",
200                                 mirror.key_cur.obj_id,
201                                 total_bytes);
202                         fflush(stderr);
203                 }
204                 mirror.key_beg = mirror.key_cur;
205                 if (TimeoutOpt &&
206                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
207                         fprintf(stderr,
208                                 "Mirror-read %s interrupted by timer at"
209                                 " %016llx\n",
210                                 filesystem,
211                                 mirror.key_cur.obj_id);
212                         interrupted = 1;
213                         break;
214                 }
215         } while (mirror.count != 0);
216
217 done:
218         /*
219          * Write out the termination sync record - only if not interrupted
220          */
221         if (interrupted == 0) {
222                 if (didwork) {
223                         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
224                                       &mrec_tmp, sizeof(mrec_tmp.sync));
225                 } else {
226                         write_mrecord(1, HAMMER_MREC_TYPE_IDLE,
227                                       &mrec_tmp, sizeof(mrec_tmp.sync));
228                 }
229         }
230
231         /*
232          * If the -2 option was given (automatic when doing mirror-copy),
233          * a two-way pipe is assumed and we expect a response mrec from
234          * the target.
235          */
236         if (TwoWayPipeOpt) {
237                 mrec = read_mrecord(0, &error, &pickup);
238                 if (mrec == NULL || 
239                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
240                     mrec->head.rec_size != sizeof(mrec->update)) {
241                         fprintf(stderr, "mirror_read: Did not get final "
242                                         "acknowledgement packet from target\n");
243                         exit(1);
244                 }
245                 if (interrupted) {
246                         if (CyclePath) {
247                                 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
248                                 fprintf(stderr, "Cyclefile %s updated for "
249                                         "continuation\n", CyclePath);
250                         }
251                 } else {
252                         sync_tid = mrec->update.tid;
253                         if (CyclePath) {
254                                 hammer_key_beg_init(&mirror.key_beg);
255                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
256                                 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
257                                         CyclePath, sync_tid);
258                         }
259                 }
260         } else if (CyclePath) {
261                 /* NOTE! mirror.tid_beg cannot be updated */
262                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
263                                 "fully updated unless you use mirror-copy\n");
264                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
265         }
266         if (streaming && interrupted == 0) {
267                 time_t t1 = time(NULL);
268                 time_t t2;
269
270                 if (VerboseOpt) {
271                         fprintf(stderr, " W");
272                         fflush(stderr);
273                 }
274                 pfs.ondisk->sync_end_tid = mirror.tid_end;
275                 if (ioctl(fd, HAMMERIOC_WAI_PSEUDOFS, &pfs) < 0) {
276                         fprintf(stderr, "Mirror-read %s: cannot stream: %s\n",
277                                 filesystem, strerror(errno));
278                 } else {
279                         t2 = time(NULL) - t1;
280                         if (t2 >= 0 && t2 < DelayOpt) {
281                                 if (VerboseOpt) {
282                                         fprintf(stderr, "\bD");
283                                         fflush(stderr);
284                                 }
285                                 sleep(DelayOpt - t2);
286                         }
287                         if (VerboseOpt) {
288                                 fprintf(stderr, "\b ");
289                                 fflush(stderr);
290                         }
291                         relpfs(fd, &pfs);
292                         goto again;
293                 }
294         }
295         write_mrecord(1, HAMMER_MREC_TYPE_TERM,
296                       &mrec_tmp, sizeof(mrec_tmp.sync));
297         relpfs(fd, &pfs);
298         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
299 }
300
301 /*
302  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
303  * some additional packet types to negotiate TID ranges and to verify
304  * completion.  The HAMMER VFS does most of the work.
305  *
306  * It is important to note that the mirror.key_{beg,end} range must
307  * match the ranged used by the original.  For now both sides use
308  * range the entire key space.
309  *
310  * It is even more important that the records in the stream conform
311  * to the TID range also supplied in the stream.  The HAMMER VFS will
312  * use the REC, PASS, and SKIP record types to track the portions of
313  * the B-Tree being scanned in order to be able to proactively delete
314  * records on the target within those active areas that are not mentioned
315  * by the source.
316  *
317  * The mirror.key_cur field is used by the VFS to do this tracking.  It
318  * must be initialized to key_beg but then is persistently updated by
319  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
320  * field you will blow up the mirror target, possibly to the point of
321  * deleting everything.  As a safety measure the HAMMER VFS simply marks
322  * the records that the source has destroyed as deleted on the target,
323  * and normal pruning operations will deal with their final disposition
324  * at some later time.
325  */
326 void
327 hammer_cmd_mirror_write(char **av, int ac)
328 {
329         struct hammer_ioc_mirror_rw mirror;
330         const char *filesystem;
331         char *buf = malloc(SERIALBUF_SIZE);
332         struct hammer_ioc_pseudofs_rw pfs;
333         struct hammer_ioc_mrecord_head pickup;
334         struct hammer_ioc_synctid synctid;
335         union hammer_ioc_mrecord_any mrec_tmp;
336         hammer_ioc_mrecord_any_t mrec;
337         int error;
338         int fd;
339         int n;
340
341         if (ac != 1)
342                 mirror_usage(1);
343         filesystem = av[0];
344
345         pickup.signature = 0;
346         pickup.type = 0;
347
348 again:
349         bzero(&mirror, sizeof(mirror));
350         hammer_key_beg_init(&mirror.key_beg);
351         hammer_key_end_init(&mirror.key_end);
352         mirror.key_end = mirror.key_beg;
353
354         fd = getpfs(&pfs, filesystem);
355
356         /*
357          * In two-way mode the target writes out a PFS packet first.
358          * The source uses our tid_end as its tid_beg by default,
359          * picking up where it left off.
360          */
361         mirror.tid_beg = 0;
362         if (TwoWayPipeOpt) {
363                 generate_mrec_header(fd, 1, pfs.pfs_id,
364                                      &mirror.tid_beg, &mirror.tid_end);
365         }
366
367         /*
368          * Read and process the PFS header.  The source informs us of
369          * the TID range the stream represents.
370          */
371         n = validate_mrec_header(fd, 0, 1, pfs.pfs_id, &pickup,
372                                  &mirror.tid_beg, &mirror.tid_end);
373         if (n < 0) {    /* got TERM record */
374                 relpfs(fd, &pfs);
375                 return;
376         }
377
378         mirror.ubuf = buf;
379         mirror.size = SERIALBUF_SIZE;
380
381         /*
382          * Read and process bulk records (REC, PASS, and SKIP types).
383          *
384          * On your life, do NOT mess with mirror.key_cur or your mirror
385          * target may become history.
386          */
387         for (;;) {
388                 mirror.count = 0;
389                 mirror.pfs_id = pfs.pfs_id;
390                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
391                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
392                 if (mirror.size <= 0)
393                         break;
394                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
395                         fprintf(stderr, "Mirror-write %s failed: %s\n",
396                                 filesystem, strerror(errno));
397                         exit(1);
398                 }
399                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
400                         fprintf(stderr,
401                                 "Mirror-write %s fatal error %d\n",
402                                 filesystem, mirror.head.error);
403                         exit(1);
404                 }
405 #if 0
406                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
407                         fprintf(stderr,
408                                 "Mirror-write %s interrupted by timer at"
409                                 " %016llx\n",
410                                 filesystem,
411                                 mirror.key_cur.obj_id);
412                         exit(0);
413                 }
414 #endif
415         }
416
417         /*
418          * Read and process the termination sync record.
419          */
420         mrec = read_mrecord(0, &error, &pickup);
421
422         if (mrec && mrec->head.type == HAMMER_MREC_TYPE_TERM) {
423                 fprintf(stderr, "Mirror-write: received termination request\n");
424                 free(mrec);
425                 return;
426         }
427
428         if (mrec == NULL || 
429             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
430              mrec->head.type != HAMMER_MREC_TYPE_IDLE) ||
431             mrec->head.rec_size != sizeof(mrec->sync)) {
432                 fprintf(stderr, "Mirror-write %s: Did not get termination "
433                                 "sync record, or rec_size is wrong rt=%d\n",
434                                 filesystem, mrec->head.type);
435                 exit(1);
436         }
437
438         /*
439          * Update the PFS info on the target so the user has visibility
440          * into the new snapshot, and sync the target filesystem.
441          */
442         if (mrec->head.type == HAMMER_MREC_TYPE_SYNC) {
443                 update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
444
445                 bzero(&synctid, sizeof(synctid));
446                 synctid.op = HAMMER_SYNCTID_SYNC2;
447                 ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
448
449                 if (VerboseOpt >= 2) {
450                         fprintf(stderr, "Mirror-write %s: succeeded\n",
451                                 filesystem);
452                 }
453         }
454
455         free(mrec);
456         mrec = NULL;
457
458         /*
459          * Report back to the originator.
460          */
461         if (TwoWayPipeOpt) {
462                 mrec_tmp.update.tid = mirror.tid_end;
463                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
464                               &mrec_tmp, sizeof(mrec_tmp.update));
465         } else {
466                 printf("Source can update synctid to 0x%016llx\n",
467                        mirror.tid_end);
468         }
469         relpfs(fd, &pfs);
470         goto again;
471 }
472
473 void
474 hammer_cmd_mirror_dump(void)
475 {
476         char *buf = malloc(SERIALBUF_SIZE);
477         struct hammer_ioc_mrecord_head pickup;
478         hammer_ioc_mrecord_any_t mrec;
479         int error;
480         int size;
481         int offset;
482         int bytes;
483
484         /*
485          * Read and process the PFS header 
486          */
487         pickup.signature = 0;
488         pickup.type = 0;
489
490         mrec = read_mrecord(0, &error, &pickup);
491
492         /*
493          * Read and process bulk records
494          */
495         for (;;) {
496                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
497                 if (size <= 0)
498                         break;
499                 offset = 0;
500                 while (offset < size) {
501                         mrec = (void *)((char *)buf + offset);
502                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
503                         if (offset + bytes > size) {
504                                 fprintf(stderr, "Misaligned record\n");
505                                 exit(1);
506                         }
507
508                         switch(mrec->head.type) {
509                         case HAMMER_MREC_TYPE_REC:
510                                 printf("Record obj=%016llx key=%016llx "
511                                        "rt=%02x ot=%02x\n",
512                                         mrec->rec.leaf.base.obj_id,
513                                         mrec->rec.leaf.base.key,
514                                         mrec->rec.leaf.base.rec_type,
515                                         mrec->rec.leaf.base.obj_type);
516                                 printf("       tids %016llx:%016llx data=%d\n",
517                                         mrec->rec.leaf.base.create_tid,
518                                         mrec->rec.leaf.base.delete_tid,
519                                         mrec->rec.leaf.data_len);
520                                 break;
521                         case HAMMER_MREC_TYPE_PASS:
522                                 printf("Pass   obj=%016llx key=%016llx "
523                                        "rt=%02x ot=%02x\n",
524                                         mrec->rec.leaf.base.obj_id,
525                                         mrec->rec.leaf.base.key,
526                                         mrec->rec.leaf.base.rec_type,
527                                         mrec->rec.leaf.base.obj_type);
528                                 printf("       tids %016llx:%016llx data=%d\n",
529                                         mrec->rec.leaf.base.create_tid,
530                                         mrec->rec.leaf.base.delete_tid,
531                                         mrec->rec.leaf.data_len);
532                                 break;
533                         case HAMMER_MREC_TYPE_SKIP:
534                                 printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
535                                        "       obj=%016llx key=%016llx rt=%02x\n",
536                                        mrec->skip.skip_beg.obj_id,
537                                        mrec->skip.skip_beg.key,
538                                        mrec->skip.skip_beg.rec_type,
539                                        mrec->skip.skip_end.obj_id,
540                                        mrec->skip.skip_end.key,
541                                        mrec->skip.skip_end.rec_type);
542                         default:
543                                 break;
544                         }
545                         offset += bytes;
546                 }
547         }
548
549         /*
550          * Read and process the termination sync record.
551          */
552         mrec = read_mrecord(0, &error, &pickup);
553         if (mrec == NULL || 
554             (mrec->head.type != HAMMER_MREC_TYPE_SYNC &&
555              mrec->head.type != HAMMER_MREC_TYPE_IDLE)
556          ) {
557                 fprintf(stderr, "Mirror-dump: Did not get termination "
558                                 "sync record\n");
559         }
560 }
561
562 void
563 hammer_cmd_mirror_copy(char **av, int ac, int streaming)
564 {
565         pid_t pid1;
566         pid_t pid2;
567         int fds[2];
568         const char *xav[16];
569         char tbuf[16];
570         char *ptr;
571         int xac;
572
573         if (ac != 2)
574                 mirror_usage(1);
575
576         if (pipe(fds) < 0) {
577                 perror("pipe");
578                 exit(1);
579         }
580
581         TwoWayPipeOpt = 1;
582
583         /*
584          * Source
585          */
586         if ((pid1 = fork()) == 0) {
587                 dup2(fds[0], 0);
588                 dup2(fds[0], 1);
589                 close(fds[0]);
590                 close(fds[1]);
591                 if ((ptr = strchr(av[0], ':')) != NULL) {
592                         *ptr++ = 0;
593                         xac = 0;
594                         xav[xac++] = "ssh";
595                         xav[xac++] = av[0];
596                         xav[xac++] = "hammer";
597
598                         switch(VerboseOpt) {
599                         case 0:
600                                 break;
601                         case 1:
602                                 xav[xac++] = "-v";
603                                 break;
604                         case 2:
605                                 xav[xac++] = "-vv";
606                                 break;
607                         default:
608                                 xav[xac++] = "-vvv";
609                                 break;
610                         }
611                         xav[xac++] = "-2";
612                         if (TimeoutOpt) {
613                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
614                                 xav[xac++] = "-t";
615                                 xav[xac++] = tbuf;
616                         }
617                         if (streaming)
618                                 xav[xac++] = "mirror-read-streaming";
619                         else
620                                 xav[xac++] = "mirror-read";
621                         xav[xac++] = ptr;
622                         xav[xac++] = NULL;
623                         execv("/usr/bin/ssh", (void *)xav);
624                 } else {
625                         hammer_cmd_mirror_read(av, 1, streaming);
626                         fflush(stdout);
627                         fflush(stderr);
628                 }
629                 _exit(1);
630         }
631
632         /*
633          * Target
634          */
635         if ((pid2 = fork()) == 0) {
636                 dup2(fds[1], 0);
637                 dup2(fds[1], 1);
638                 close(fds[0]);
639                 close(fds[1]);
640                 if ((ptr = strchr(av[1], ':')) != NULL) {
641                         *ptr++ = 0;
642                         xac = 0;
643                         xav[xac++] = "ssh";
644                         xav[xac++] = av[1];
645                         xav[xac++] = "hammer";
646
647                         switch(VerboseOpt) {
648                         case 0:
649                                 break;
650                         case 1:
651                                 xav[xac++] = "-v";
652                                 break;
653                         case 2:
654                                 xav[xac++] = "-vv";
655                                 break;
656                         default:
657                                 xav[xac++] = "-vvv";
658                                 break;
659                         }
660
661                         xav[xac++] = "-2";
662                         xav[xac++] = "mirror-write";
663                         xav[xac++] = ptr;
664                         xav[xac++] = NULL;
665                         execv("/usr/bin/ssh", (void *)xav);
666                 } else {
667                         hammer_cmd_mirror_write(av + 1, 1);
668                         fflush(stdout);
669                         fflush(stderr);
670                 }
671                 _exit(1);
672         }
673         close(fds[0]);
674         close(fds[1]);
675
676         while (waitpid(pid1, NULL, 0) <= 0)
677                 ;
678         while (waitpid(pid2, NULL, 0) <= 0)
679                 ;
680 }
681
682 /*
683  * Read and return multiple mrecords
684  */
685 static int
686 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
687 {
688         hammer_ioc_mrecord_any_t mrec;
689         u_int count;
690         size_t n;
691         size_t i;
692         size_t bytes;
693
694         count = 0;
695         while (size - count >= HAMMER_MREC_HEADSIZE) {
696                 /*
697                  * Cached the record header in case we run out of buffer
698                  * space.
699                  */
700                 fflush(stdout);
701                 if (pickup->signature == 0) {
702                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
703                                 i = read(fd, (char *)pickup + n,
704                                          HAMMER_MREC_HEADSIZE - n);
705                                 if (i <= 0)
706                                         break;
707                         }
708                         if (n == 0)
709                                 break;
710                         if (n != HAMMER_MREC_HEADSIZE) {
711                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
712                                 exit(1);
713                         }
714
715                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
716                                 fprintf(stderr, "read_mrecords: malformed record on pipe, "
717                                         "bad signature\n");
718                                 exit(1);
719                         }
720                 }
721                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
722                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
723                         fprintf(stderr, "read_mrecords: malformed record on pipe, "
724                                 "illegal rec_size\n");
725                         exit(1);
726                 }
727
728                 /*
729                  * Stop if we have insufficient space for the record and data.
730                  */
731                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
732                 if (size - count < bytes)
733                         break;
734
735                 /*
736                  * Stop if the record type is not a REC or a SKIP (the only
737                  * two types the ioctl supports.  Other types are used only
738                  * by the userland protocol).
739                  */
740                 if (pickup->type != HAMMER_MREC_TYPE_REC &&
741                     pickup->type != HAMMER_MREC_TYPE_SKIP &&
742                     pickup->type != HAMMER_MREC_TYPE_PASS) {
743                         break;
744                 }
745
746                 /*
747                  * Read the remainder and clear the pickup signature.
748                  */
749                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
750                         i = read(fd, buf + count + n, bytes - n);
751                         if (i <= 0)
752                                 break;
753                 }
754                 if (n != bytes) {
755                         fprintf(stderr, "read_mrecords: short read on pipe\n");
756                         exit(1);
757                 }
758
759                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
760                 pickup->signature = 0;
761                 pickup->type = 0;
762                 mrec = (void *)(buf + count);
763
764                 /*
765                  * Validate the completed record
766                  */
767                 if (mrec->head.rec_crc !=
768                     crc32((char *)mrec + HAMMER_MREC_CRCOFF,
769                           mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
770                         fprintf(stderr, "read_mrecords: malformed record "
771                                         "on pipe, bad crc\n");
772                         exit(1);
773                 }
774
775                 /*
776                  * If its a B-Tree record validate the data crc
777                  */
778                 if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
779                         if (mrec->head.rec_size <
780                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
781                                 fprintf(stderr, 
782                                         "read_mrecords: malformed record on "
783                                         "pipe, illegal element data_len\n");
784                                 exit(1);
785                         }
786                         if (mrec->rec.leaf.data_len &&
787                             mrec->rec.leaf.data_offset &&
788                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
789                                 fprintf(stderr,
790                                         "read_mrecords: data_crc did not "
791                                         "match data! obj=%016llx key=%016llx\n",
792                                         mrec->rec.leaf.base.obj_id,
793                                         mrec->rec.leaf.base.key);
794                                 fprintf(stderr,
795                                         "continuing, but there are problems\n");
796                         }
797                 }
798                 count += bytes;
799         }
800         return(count);
801 }
802
803 /*
804  * Read and return a single mrecord.
805  */
806 static
807 hammer_ioc_mrecord_any_t
808 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
809 {
810         hammer_ioc_mrecord_any_t mrec;
811         struct hammer_ioc_mrecord_head mrechd;
812         size_t bytes;
813         size_t n;
814         size_t i;
815
816         if (pickup && pickup->type != 0) {
817                 mrechd = *pickup;
818                 pickup->signature = 0;
819                 pickup->type = 0;
820                 n = HAMMER_MREC_HEADSIZE;
821         } else {
822                 /*
823                  * Read in the PFSD header from the sender.
824                  */
825                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
826                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
827                         if (i <= 0)
828                                 break;
829                 }
830                 if (n == 0) {
831                         *errorp = 0;    /* EOF */
832                         return(NULL);
833                 }
834                 if (n != HAMMER_MREC_HEADSIZE) {
835                         fprintf(stderr, "short read of mrecord header\n");
836                         *errorp = EPIPE;
837                         return(NULL);
838                 }
839         }
840         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
841                 fprintf(stderr, "read_mrecord: bad signature\n");
842                 *errorp = EINVAL;
843                 return(NULL);
844         }
845         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
846         assert(bytes >= sizeof(mrechd));
847         mrec = malloc(bytes);
848         mrec->head = mrechd;
849
850         while (n < bytes) {
851                 i = read(fdin, (char *)mrec + n, bytes - n);
852                 if (i <= 0)
853                         break;
854                 n += i;
855         }
856         if (n != bytes) {
857                 fprintf(stderr, "read_mrecord: short read on payload\n");
858                 *errorp = EPIPE;
859                 return(NULL);
860         }
861         if (mrec->head.rec_crc != 
862             crc32((char *)mrec + HAMMER_MREC_CRCOFF,
863                   mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
864                 fprintf(stderr, "read_mrecord: bad CRC\n");
865                 *errorp = EINVAL;
866                 return(NULL);
867         }
868         *errorp = 0;
869         return(mrec);
870 }
871
872 static
873 void
874 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
875               int bytes)
876 {
877         char zbuf[HAMMER_HEAD_ALIGN];
878         int pad;
879
880         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
881
882         assert(bytes >= (int)sizeof(mrec->head));
883         bzero(&mrec->head, sizeof(mrec->head));
884         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
885         mrec->head.type = type;
886         mrec->head.rec_size = bytes;
887         mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
888                                    bytes - HAMMER_MREC_CRCOFF);
889         if (write(fdout, mrec, bytes) != bytes) {
890                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
891                         errno, strerror(errno));
892                 exit(1);
893         }
894         if (pad) {
895                 bzero(zbuf, pad);
896                 if (write(fdout, zbuf, pad) != pad) {
897                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
898                                 errno, strerror(errno));
899                         exit(1);
900                 }
901         }
902 }
903
904 /*
905  * Generate a mirroring header with the pfs information of the
906  * originating filesytem.
907  */
908 static void
909 generate_mrec_header(int fd, int fdout, int pfs_id,
910                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
911 {
912         struct hammer_ioc_pseudofs_rw pfs;
913         union hammer_ioc_mrecord_any mrec_tmp;
914
915         bzero(&pfs, sizeof(pfs));
916         bzero(&mrec_tmp, sizeof(mrec_tmp));
917         pfs.pfs_id = pfs_id;
918         pfs.ondisk = &mrec_tmp.pfs.pfsd;
919         pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
920         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
921                 fprintf(stderr, "Mirror-read: not a HAMMER fs/pseudofs!\n");
922                 exit(1);
923         }
924         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
925                 fprintf(stderr, "Mirror-read: HAMMER pfs version mismatch!\n");
926                 exit(1);
927         }
928
929         /*
930          * sync_beg_tid - lowest TID on source after which a full history
931          *                is available.
932          *
933          * sync_end_tid - highest fully synchronized TID from source.
934          */
935         if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
936                 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
937         if (tid_endp)
938                 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
939         mrec_tmp.pfs.version = pfs.version;
940         write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
941                       &mrec_tmp, sizeof(mrec_tmp.pfs));
942 }
943
944 /*
945  * Validate the pfs information from the originating filesystem
946  * against the target filesystem.  shared_uuid must match.
947  *
948  * return -1 if we got a TERM record
949  */
950 static int
951 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
952                      struct hammer_ioc_mrecord_head *pickup,
953                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
954 {
955         struct hammer_ioc_pseudofs_rw pfs;
956         struct hammer_pseudofs_data pfsd;
957         hammer_ioc_mrecord_any_t mrec;
958         int error;
959
960         /*
961          * Get the PFSD info from the target filesystem.
962          */
963         bzero(&pfs, sizeof(pfs));
964         bzero(&pfsd, sizeof(pfsd));
965         pfs.pfs_id = pfs_id;
966         pfs.ondisk = &pfsd;
967         pfs.bytes = sizeof(pfsd);
968         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
969                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
970                 exit(1);
971         }
972         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
973                 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
974                 exit(1);
975         }
976
977         mrec = read_mrecord(fdin, &error, pickup);
978         if (mrec == NULL) {
979                 if (error == 0)
980                         fprintf(stderr, "validate_mrec_header: short read\n");
981                 exit(1);
982         }
983         if (mrec->head.type == HAMMER_MREC_TYPE_TERM) {
984                 free(mrec);
985                 return(-1);
986         }
987
988         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
989                 fprintf(stderr, "validate_mrec_header: did not get expected "
990                                 "PFSD record type\n");
991                 exit(1);
992         }
993         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
994                 fprintf(stderr, "validate_mrec_header: unexpected payload "
995                                 "size\n");
996                 exit(1);
997         }
998         if (mrec->pfs.version != pfs.version) {
999                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
1000                 exit(1);
1001         }
1002
1003         /*
1004          * Whew.  Ok, is the read PFS info compatible with the target?
1005          */
1006         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
1007                  sizeof(pfsd.shared_uuid)) != 0) {
1008                 fprintf(stderr, 
1009                         "mirror-write: source and target have "
1010                         "different shared-uuid's!\n");
1011                 exit(1);
1012         }
1013         if (is_target &&
1014             (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
1015                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
1016                 exit(1);
1017         }
1018         if (tid_begp)
1019                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
1020         if (tid_endp)
1021                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
1022         free(mrec);
1023         return(0);
1024 }
1025
1026 static void
1027 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
1028 {
1029         struct hammer_ioc_pseudofs_rw pfs;
1030         struct hammer_pseudofs_data pfsd;
1031
1032         bzero(&pfs, sizeof(pfs));
1033         bzero(&pfsd, sizeof(pfsd));
1034         pfs.pfs_id = pfs_id;
1035         pfs.ondisk = &pfsd;
1036         pfs.bytes = sizeof(pfsd);
1037         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
1038                 perror("update_pfs_snapshot (read)");
1039                 exit(1);
1040         }
1041         if (pfsd.sync_end_tid != snapshot_tid) {
1042                 pfsd.sync_end_tid = snapshot_tid;
1043                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
1044                         perror("update_pfs_snapshot (rewrite)");
1045                         exit(1);
1046                 }
1047                 if (VerboseOpt >= 2) {
1048                         fprintf(stderr,
1049                                 "Mirror-write: Completed, updated snapshot "
1050                                 "to %016llx\n",
1051                                 snapshot_tid);
1052                 }
1053         }
1054 }
1055
1056 /*
1057  * Bandwidth-limited write in chunks
1058  */
1059 static
1060 ssize_t
1061 writebw(int fd, const void *buf, size_t nbytes,
1062         u_int64_t *bwcount, struct timeval *tv1)
1063 {
1064         struct timeval tv2;
1065         size_t n;
1066         ssize_t r;
1067         ssize_t a;
1068         int usec;
1069
1070         a = 0;
1071         r = 0;
1072         while (nbytes) {
1073                 if (*bwcount + nbytes > BandwidthOpt)
1074                         n = BandwidthOpt - *bwcount;
1075                 else
1076                         n = nbytes;
1077                 if (n)
1078                         r = write(fd, buf, n);
1079                 if (r >= 0) {
1080                         a += r;
1081                         nbytes -= r;
1082                         buf = (const char *)buf + r;
1083                 }
1084                 if ((size_t)r != n)
1085                         break;
1086                 *bwcount += n;
1087                 if (*bwcount >= BandwidthOpt) {
1088                         gettimeofday(&tv2, NULL);
1089                         usec = (int)(tv2.tv_sec - tv1->tv_sec) * 1000000 +
1090                                 (int)(tv2.tv_usec - tv1->tv_usec);
1091                         if (usec >= 0 && usec < 1000000)
1092                                 usleep(1000000 - usec);
1093                         gettimeofday(tv1, NULL);
1094                         *bwcount -= BandwidthOpt;
1095                 }
1096         }
1097         return(a ? a : r);
1098 }
1099
1100 static void
1101 mirror_usage(int code)
1102 {
1103         fprintf(stderr, 
1104                 "hammer mirror-read <filesystem> [begin-tid]\n"
1105                 "hammer mirror-read-stream <filesystem> [begin-tid]\n"
1106                 "hammer mirror-write <filesystem>\n"
1107                 "hammer mirror-dump\n"
1108                 "hammer mirror-copy [[user@]host:]<filesystem>"
1109                                   " [[user@]host:]<filesystem>\n"
1110                 "hammer mirror-stream [[user@]host:]<filesystem>"
1111                                     " [[user@]host:]<filesystem>\n"
1112         );
1113         exit(code);
1114 }