Merge from vendor branch LIBARCHIVE:
[dragonfly.git] / sbin / hammer / cmd_mirror.c
1 /*
2  * Copyright (c) 2008 The DragonFly Project.  All rights reserved.
3  * 
4  * This code is derived from software contributed to The DragonFly Project
5  * by Matthew Dillon <dillon@backplane.com>
6  * 
7  * Redistribution and use in source and binary forms, with or without
8  * modification, are permitted provided that the following conditions
9  * are met:
10  * 
11  * 1. Redistributions of source code must retain the above copyright
12  *    notice, this list of conditions and the following disclaimer.
13  * 2. Redistributions in binary form must reproduce the above copyright
14  *    notice, this list of conditions and the following disclaimer in
15  *    the documentation and/or other materials provided with the
16  *    distribution.
17  * 3. Neither the name of The DragonFly Project nor the names of its
18  *    contributors may be used to endorse or promote products derived
19  *    from this software without specific, prior written permission.
20  * 
21  * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
22  * ``AS IS'' AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
23  * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
24  * FOR A PARTICULAR PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE
25  * COPYRIGHT HOLDERS OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
26  * INCIDENTAL, SPECIAL, EXEMPLARY OR CONSEQUENTIAL DAMAGES (INCLUDING,
27  * BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
28  * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
29  * AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
30  * OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
31  * OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
32  * SUCH DAMAGE.
33  * 
34  * $DragonFly: src/sbin/hammer/cmd_mirror.c,v 1.8 2008/07/12 02:48:46 dillon Exp $
35  */
36
37 #include "hammer.h"
38
39 #define SERIALBUF_SIZE  (512 * 1024)
40
41 static int read_mrecords(int fd, char *buf, u_int size,
42                          hammer_ioc_mrecord_head_t pickup);
43 static hammer_ioc_mrecord_any_t read_mrecord(int fdin, int *errorp,
44                          hammer_ioc_mrecord_head_t pickup);
45 static void write_mrecord(int fdout, u_int32_t type,
46                          hammer_ioc_mrecord_any_t mrec, int bytes);
47 static void generate_mrec_header(int fd, int fdout, int pfs_id,
48                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
49 static void validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
50                          hammer_tid_t *tid_begp, hammer_tid_t *tid_endp);
51 static void update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id);
52 static void mirror_usage(int code);
53
54 /*
55  * Generate a mirroring data stream from the specific source over the
56  * entire key range, but restricted to the specified transaction range.
57  *
58  * The HAMMER VFS does most of the work, we add a few new mrecord
59  * types to negotiate the TID ranges and verify that the entire
60  * stream made it to the destination.
61  */
62 void
63 hammer_cmd_mirror_read(char **av, int ac)
64 {
65         struct hammer_ioc_mirror_rw mirror;
66         struct hammer_ioc_pseudofs_rw pfs;
67         union hammer_ioc_mrecord_any mrec_tmp;
68         hammer_ioc_mrecord_any_t mrec;
69         hammer_tid_t sync_tid;
70         const char *filesystem;
71         char *buf = malloc(SERIALBUF_SIZE);
72         int interrupted = 0;
73         int error;
74         int fd;
75         int n;
76         time_t base_t = time(NULL);
77
78         if (ac > 2)
79                 mirror_usage(1);
80         filesystem = av[0];
81
82         bzero(&mirror, sizeof(mirror));
83         hammer_key_beg_init(&mirror.key_beg);
84         hammer_key_end_init(&mirror.key_end);
85
86         fd = getpfs(&pfs, filesystem);
87
88         /*
89          * In 2-way mode the target will send us a PFS info packet
90          * first.  Use the target's current snapshot TID as our default
91          * begin TID.
92          */
93         mirror.tid_beg = 0;
94         if (TwoWayPipeOpt)
95                 validate_mrec_header(fd, 0, 0, pfs.pfs_id,
96                                      NULL, &mirror.tid_beg);
97
98         /*
99          * Write out the PFS header, tid_beg will be updated if our PFS
100          * has a larger begin sync.  tid_end is set to the latest source
101          * TID whos flush cycle has completed.
102          */
103         generate_mrec_header(fd, 1, pfs.pfs_id,
104                              &mirror.tid_beg, &mirror.tid_end);
105
106         /*
107          * A cycle file overrides the beginning TID
108          */
109         hammer_get_cycle(&mirror.key_beg, &mirror.tid_beg);
110
111         if (ac == 2)
112                 mirror.tid_beg = strtoull(av[1], NULL, 0);
113
114         fprintf(stderr, "mirror-read: Mirror from %016llx to %016llx\n",
115                 mirror.tid_beg, mirror.tid_end);
116         if (mirror.key_beg.obj_id != (int64_t)HAMMER_MIN_OBJID) {
117                 fprintf(stderr, "mirror-read: Resuming at object %016llx\n",
118                         mirror.key_beg.obj_id);
119         }
120
121         /*
122          * Write out bulk records
123          */
124         mirror.ubuf = buf;
125         mirror.size = SERIALBUF_SIZE;
126
127         do {
128                 mirror.count = 0;
129                 mirror.pfs_id = pfs.pfs_id;
130                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
131                 if (ioctl(fd, HAMMERIOC_MIRROR_READ, &mirror) < 0) {
132                         fprintf(stderr, "Mirror-read %s failed: %s\n",
133                                 filesystem, strerror(errno));
134                         exit(1);
135                 }
136                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
137                         fprintf(stderr,
138                                 "Mirror-read %s fatal error %d\n",
139                                 filesystem, mirror.head.error);
140                         exit(1);
141                 }
142                 if (mirror.count) {
143                         n = write(1, mirror.ubuf, mirror.count);
144                         if (n != mirror.count) {
145                                 fprintf(stderr, "Mirror-read %s failed: "
146                                                 "short write\n",
147                                 filesystem);
148                                 exit(1);
149                         }
150                 }
151                 mirror.key_beg = mirror.key_cur;
152                 if (TimeoutOpt &&
153                     (unsigned)(time(NULL) - base_t) > (unsigned)TimeoutOpt) {
154                         fprintf(stderr,
155                                 "Mirror-read %s interrupted by timer at"
156                                 " %016llx\n",
157                                 filesystem,
158                                 mirror.key_cur.obj_id);
159                         interrupted = 1;
160                         break;
161                 }
162         } while (mirror.count != 0);
163
164         /*
165          * Write out the termination sync record
166          */
167         write_mrecord(1, HAMMER_MREC_TYPE_SYNC,
168                       &mrec_tmp, sizeof(mrec_tmp.sync));
169
170         /*
171          * If the -2 option was given (automatic when doing mirror-copy),
172          * a two-way pipe is assumed and we expect a response mrec from
173          * the target.
174          */
175         if (TwoWayPipeOpt) {
176                 mrec = read_mrecord(0, &error, NULL);
177                 if (mrec == NULL || 
178                     mrec->head.type != HAMMER_MREC_TYPE_UPDATE ||
179                     mrec->head.rec_size != sizeof(mrec->update)) {
180                         fprintf(stderr, "mirror_read: Did not get final "
181                                         "acknowledgement packet from target\n");
182                         exit(1);
183                 }
184                 if (interrupted) {
185                         if (CyclePath) {
186                                 hammer_set_cycle(&mirror.key_cur, mirror.tid_beg);
187                                 fprintf(stderr, "Cyclefile %s updated for continuation\n", CyclePath);
188                         }
189                 } else {
190                         sync_tid = mrec->update.tid;
191                         if (CyclePath) {
192                                 hammer_key_beg_init(&mirror.key_beg);
193                                 hammer_set_cycle(&mirror.key_beg, sync_tid);
194                                 fprintf(stderr, "Cyclefile %s updated to 0x%016llx\n",
195                                         CyclePath, sync_tid);
196                         } else {
197                                 fprintf(stderr, "Source can update synctid "
198                                                 "to 0x%016llx\n",
199                                         sync_tid);
200                         }
201                 }
202         } else if (CyclePath) {
203                 /* NOTE! mirror.tid_beg cannot be updated */
204                 fprintf(stderr, "Warning: cycle file (-c option) cannot be "
205                                 "fully updated unless you use mirror-copy\n");
206                 hammer_set_cycle(&mirror.key_beg, mirror.tid_beg);
207         }
208         fprintf(stderr, "Mirror-read %s succeeded\n", filesystem);
209 }
210
211 /*
212  * Pipe the mirroring data stream on stdin to the HAMMER VFS, adding
213  * some additional packet types to negotiate TID ranges and to verify
214  * completion.  The HAMMER VFS does most of the work.
215  *
216  * It is important to note that the mirror.key_{beg,end} range must
217  * match the ranged used by the original.  For now both sides use
218  * range the entire key space.
219  *
220  * It is even more important that the records in the stream conform
221  * to the TID range also supplied in the stream.  The HAMMER VFS will
222  * use the REC, PASS, and SKIP record types to track the portions of
223  * the B-Tree being scanned in order to be able to proactively delete
224  * records on the target within those active areas that are not mentioned
225  * by the source.
226  *
227  * The mirror.key_cur field is used by the VFS to do this tracking.  It
228  * must be initialized to key_beg but then is persistently updated by
229  * the HAMMER VFS on each successive ioctl() call.  If you blow up this
230  * field you will blow up the mirror target, possibly to the point of
231  * deleting everything.  As a safety measure the HAMMER VFS simply marks
232  * the records that the source has destroyed as deleted on the target,
233  * and normal pruning operations will deal with their final disposition
234  * at some later time.
235  */
236 void
237 hammer_cmd_mirror_write(char **av, int ac)
238 {
239         struct hammer_ioc_mirror_rw mirror;
240         const char *filesystem;
241         char *buf = malloc(SERIALBUF_SIZE);
242         struct hammer_ioc_pseudofs_rw pfs;
243         struct hammer_ioc_mrecord_head pickup;
244         struct hammer_ioc_synctid synctid;
245         union hammer_ioc_mrecord_any mrec_tmp;
246         hammer_ioc_mrecord_any_t mrec;
247         int error;
248         int fd;
249
250         if (ac > 2)
251                 mirror_usage(1);
252         filesystem = av[0];
253
254         bzero(&mirror, sizeof(mirror));
255         hammer_key_beg_init(&mirror.key_beg);
256         hammer_key_end_init(&mirror.key_end);
257         mirror.key_end = mirror.key_beg;
258
259         fd = getpfs(&pfs, filesystem);
260
261         /*
262          * In two-way mode the target writes out a PFS packet first.
263          * The source uses our tid_end as its tid_beg by default,
264          * picking up where it left off.
265          */
266         mirror.tid_beg = 0;
267         if (TwoWayPipeOpt) {
268                 generate_mrec_header(fd, 1, pfs.pfs_id,
269                                      &mirror.tid_beg, &mirror.tid_end);
270         }
271
272         /*
273          * Read and process the PFS header.  The source informs us of
274          * the TID range the stream represents.
275          */
276         validate_mrec_header(fd, 0, 1, pfs.pfs_id,
277                              &mirror.tid_beg, &mirror.tid_end);
278
279         mirror.ubuf = buf;
280         mirror.size = SERIALBUF_SIZE;
281
282         pickup.signature = 0;
283         pickup.type = 0;
284
285         /*
286          * Read and process bulk records (REC, PASS, and SKIP types).
287          *
288          * On your life, do NOT mess with mirror.key_cur or your mirror
289          * target may become history.
290          */
291         for (;;) {
292                 mirror.count = 0;
293                 mirror.pfs_id = pfs.pfs_id;
294                 mirror.shared_uuid = pfs.ondisk->shared_uuid;
295                 mirror.size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
296                 if (mirror.size <= 0)
297                         break;
298                 if (ioctl(fd, HAMMERIOC_MIRROR_WRITE, &mirror) < 0) {
299                         fprintf(stderr, "Mirror-write %s failed: %s\n",
300                                 filesystem, strerror(errno));
301                         exit(1);
302                 }
303                 if (mirror.head.flags & HAMMER_IOC_HEAD_ERROR) {
304                         fprintf(stderr,
305                                 "Mirror-write %s fatal error %d\n",
306                                 filesystem, mirror.head.error);
307                         exit(1);
308                 }
309 #if 0
310                 if (mirror.head.flags & HAMMER_IOC_HEAD_INTR) {
311                         fprintf(stderr,
312                                 "Mirror-write %s interrupted by timer at"
313                                 " %016llx\n",
314                                 filesystem,
315                                 mirror.key_cur.obj_id);
316                         exit(0);
317                 }
318 #endif
319         }
320
321         /*
322          * Read and process the termination sync record.
323          */
324         mrec = read_mrecord(0, &error, &pickup);
325         if (mrec == NULL || 
326             mrec->head.type != HAMMER_MREC_TYPE_SYNC ||
327             mrec->head.rec_size != sizeof(mrec->sync)) {
328                 fprintf(stderr, "Mirror-write %s: Did not get termination "
329                                 "sync record, or rec_size is wrong rt=%d\n",
330                                 filesystem, mrec->head.type);
331         }
332         free(mrec);
333         mrec = NULL;
334
335         /*
336          * Update the PFS info on the target so the user has visibility
337          * into the new snapshot.
338          */
339         update_pfs_snapshot(fd, mirror.tid_end, pfs.pfs_id);
340
341         /*
342          * Sync the target filesystem
343          */
344         bzero(&synctid, sizeof(synctid));
345         synctid.op = HAMMER_SYNCTID_SYNC2;
346         ioctl(fd, HAMMERIOC_SYNCTID, &synctid);
347
348         fprintf(stderr, "Mirror-write %s: succeeded\n", filesystem);
349
350         /*
351          * Report back to the originator.
352          */
353         if (TwoWayPipeOpt) {
354                 mrec_tmp.update.tid = mirror.tid_end;
355                 write_mrecord(1, HAMMER_MREC_TYPE_UPDATE,
356                               &mrec_tmp, sizeof(mrec_tmp.update));
357         } else {
358                 printf("Source can update synctid to 0x%016llx\n",
359                        mirror.tid_end);
360         }
361 }
362
363 void
364 hammer_cmd_mirror_dump(void)
365 {
366         char *buf = malloc(SERIALBUF_SIZE);
367         struct hammer_ioc_mrecord_head pickup;
368         hammer_ioc_mrecord_any_t mrec;
369         int error;
370         int size;
371         int offset;
372         int bytes;
373
374         /*
375          * Read and process the PFS header 
376          */
377         pickup.signature = 0;
378         pickup.type = 0;
379
380         mrec = read_mrecord(0, &error, &pickup);
381
382         /*
383          * Read and process bulk records
384          */
385         for (;;) {
386                 size = read_mrecords(0, buf, SERIALBUF_SIZE, &pickup);
387                 if (size <= 0)
388                         break;
389                 offset = 0;
390                 while (offset < size) {
391                         mrec = (void *)((char *)buf + offset);
392                         bytes = HAMMER_HEAD_DOALIGN(mrec->head.rec_size);
393                         if (offset + bytes > size) {
394                                 fprintf(stderr, "Misaligned record\n");
395                                 exit(1);
396                         }
397
398                         switch(mrec->head.type) {
399                         case HAMMER_MREC_TYPE_REC:
400                                 printf("Record obj=%016llx key=%016llx "
401                                        "rt=%02x ot=%02x\n",
402                                         mrec->rec.leaf.base.obj_id,
403                                         mrec->rec.leaf.base.key,
404                                         mrec->rec.leaf.base.rec_type,
405                                         mrec->rec.leaf.base.obj_type);
406                                 printf("       tids %016llx:%016llx data=%d\n",
407                                         mrec->rec.leaf.base.create_tid,
408                                         mrec->rec.leaf.base.delete_tid,
409                                         mrec->rec.leaf.data_len);
410                                 break;
411                         case HAMMER_MREC_TYPE_PASS:
412                                 printf("Pass   obj=%016llx key=%016llx "
413                                        "rt=%02x ot=%02x\n",
414                                         mrec->rec.leaf.base.obj_id,
415                                         mrec->rec.leaf.base.key,
416                                         mrec->rec.leaf.base.rec_type,
417                                         mrec->rec.leaf.base.obj_type);
418                                 printf("       tids %016llx:%016llx data=%d\n",
419                                         mrec->rec.leaf.base.create_tid,
420                                         mrec->rec.leaf.base.delete_tid,
421                                         mrec->rec.leaf.data_len);
422                                 break;
423                         case HAMMER_MREC_TYPE_SKIP:
424                                 printf("Skip   obj=%016llx key=%016llx rt=%02x to\n"
425                                        "       obj=%016llx key=%016llx rt=%02x\n",
426                                        mrec->skip.skip_beg.obj_id,
427                                        mrec->skip.skip_beg.key,
428                                        mrec->skip.skip_beg.rec_type,
429                                        mrec->skip.skip_end.obj_id,
430                                        mrec->skip.skip_end.key,
431                                        mrec->skip.skip_end.rec_type);
432                         default:
433                                 break;
434                         }
435                         offset += bytes;
436                 }
437         }
438
439         /*
440          * Read and process the termination sync record.
441          */
442         mrec = read_mrecord(0, &error, &pickup);
443         if (mrec == NULL || mrec->head.type != HAMMER_MREC_TYPE_SYNC) {
444                 fprintf(stderr, "Mirror-dump: Did not get termination "
445                                 "sync record\n");
446         }
447 }
448
449 void
450 hammer_cmd_mirror_copy(char **av, int ac)
451 {
452         pid_t pid1;
453         pid_t pid2;
454         int fds[2];
455         const char *xav[16];
456         char tbuf[16];
457         char *ptr;
458         int xac;
459
460         if (ac != 2)
461                 mirror_usage(1);
462
463         if (pipe(fds) < 0) {
464                 perror("pipe");
465                 exit(1);
466         }
467
468         TwoWayPipeOpt = 1;
469
470         /*
471          * Source
472          */
473         if ((pid1 = fork()) == 0) {
474                 dup2(fds[0], 0);
475                 dup2(fds[0], 1);
476                 close(fds[0]);
477                 close(fds[1]);
478                 if ((ptr = strchr(av[0], ':')) != NULL) {
479                         *ptr++ = 0;
480                         xac = 0;
481                         xav[xac++] = "ssh";
482                         xav[xac++] = av[0];
483                         xav[xac++] = "hammer";
484                         if (VerboseOpt)
485                                 xav[xac++] = "-v";
486                         xav[xac++] = "-2";
487                         if (TimeoutOpt) {
488                                 snprintf(tbuf, sizeof(tbuf), "%d", TimeoutOpt);
489                                 xav[xac++] = "-t";
490                                 xav[xac++] = tbuf;
491                         }
492                         xav[xac++] = "mirror-read";
493                         xav[xac++] = ptr;
494                         xav[xac++] = NULL;
495                         execv("/usr/bin/ssh", (void *)xav);
496                 } else {
497                         hammer_cmd_mirror_read(av, 1);
498                         fflush(stdout);
499                         fflush(stderr);
500                 }
501                 _exit(1);
502         }
503
504         /*
505          * Target
506          */
507         if ((pid2 = fork()) == 0) {
508                 dup2(fds[1], 0);
509                 dup2(fds[1], 1);
510                 close(fds[0]);
511                 close(fds[1]);
512                 if ((ptr = strchr(av[1], ':')) != NULL) {
513                         *ptr++ = 0;
514                         xac = 0;
515                         xav[xac++] = "ssh";
516                         xav[xac++] = av[1];
517                         xav[xac++] = "hammer";
518                         if (VerboseOpt)
519                                 xav[xac++] = "-v";
520                         xav[xac++] = "-2";
521                         xav[xac++] = "mirror-write";
522                         xav[xac++] = ptr;
523                         xav[xac++] = NULL;
524                         execv("/usr/bin/ssh", (void *)xav);
525                 } else {
526                         hammer_cmd_mirror_write(av + 1, 1);
527                         fflush(stdout);
528                         fflush(stderr);
529                 }
530                 _exit(1);
531         }
532         close(fds[0]);
533         close(fds[1]);
534
535         while (waitpid(pid1, NULL, 0) <= 0)
536                 ;
537         while (waitpid(pid2, NULL, 0) <= 0)
538                 ;
539 }
540
541 /*
542  * Read and return multiple mrecords
543  */
544 static int
545 read_mrecords(int fd, char *buf, u_int size, hammer_ioc_mrecord_head_t pickup)
546 {
547         hammer_ioc_mrecord_any_t mrec;
548         u_int count;
549         size_t n;
550         size_t i;
551         size_t bytes;
552
553         count = 0;
554         while (size - count >= HAMMER_MREC_HEADSIZE) {
555                 /*
556                  * Cached the record header in case we run out of buffer
557                  * space.
558                  */
559                 fflush(stdout);
560                 if (pickup->signature == 0) {
561                         for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
562                                 i = read(fd, (char *)pickup + n,
563                                          HAMMER_MREC_HEADSIZE - n);
564                                 if (i <= 0)
565                                         break;
566                         }
567                         if (n == 0)
568                                 break;
569                         if (n != HAMMER_MREC_HEADSIZE) {
570                                 fprintf(stderr, "read_mrecords: short read on pipe\n");
571                                 exit(1);
572                         }
573
574                         if (pickup->signature != HAMMER_IOC_MIRROR_SIGNATURE) {
575                                 fprintf(stderr, "read_mrecords: malformed record on pipe, bad signature\n");
576                                 exit(1);
577                         }
578                 }
579                 if (pickup->rec_size < HAMMER_MREC_HEADSIZE ||
580                     pickup->rec_size > sizeof(*mrec) + HAMMER_XBUFSIZE) {
581                         fprintf(stderr, "read_mrecords: malformed record on pipe, illegal rec_size\n");
582                         exit(1);
583                 }
584
585                 /*
586                  * Stop if we have insufficient space for the record and data.
587                  */
588                 bytes = HAMMER_HEAD_DOALIGN(pickup->rec_size);
589                 if (size - count < bytes)
590                         break;
591
592                 /*
593                  * Stop if the record type is not a REC or a SKIP (the only
594                  * two types the ioctl supports.  Other types are used only
595                  * by the userland protocol).
596                  */
597                 if (pickup->type != HAMMER_MREC_TYPE_REC &&
598                     pickup->type != HAMMER_MREC_TYPE_SKIP &&
599                     pickup->type != HAMMER_MREC_TYPE_PASS) {
600                         break;
601                 }
602
603                 /*
604                  * Read the remainder and clear the pickup signature.
605                  */
606                 for (n = HAMMER_MREC_HEADSIZE; n < bytes; n += i) {
607                         i = read(fd, buf + count + n, bytes - n);
608                         if (i <= 0)
609                                 break;
610                 }
611                 if (n != bytes) {
612                         fprintf(stderr, "read_mrecords: short read on pipe\n");
613                         exit(1);
614                 }
615
616                 bcopy(pickup, buf + count, HAMMER_MREC_HEADSIZE);
617                 pickup->signature = 0;
618                 pickup->type = 0;
619                 mrec = (void *)(buf + count);
620
621                 /*
622                  * Validate the completed record
623                  */
624                 if (mrec->head.rec_crc !=
625                     crc32((char *)mrec + HAMMER_MREC_CRCOFF,
626                           mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
627                         fprintf(stderr, "read_mrecords: malformed record "
628                                         "on pipe, bad crc\n");
629                         exit(1);
630                 }
631
632                 /*
633                  * If its a B-Tree record validate the data crc
634                  */
635                 if (mrec->head.type == HAMMER_MREC_TYPE_REC) {
636                         if (mrec->head.rec_size <
637                             sizeof(mrec->rec) + mrec->rec.leaf.data_len) {
638                                 fprintf(stderr, 
639                                         "read_mrecords: malformed record on "
640                                         "pipe, illegal element data_len\n");
641                                 exit(1);
642                         }
643                         if (mrec->rec.leaf.data_len &&
644                             mrec->rec.leaf.data_offset &&
645                             hammer_crc_test_leaf(&mrec->rec + 1, &mrec->rec.leaf) == 0) {
646                                 fprintf(stderr,
647                                         "read_mrecords: data_crc did not "
648                                         "match data! obj=%016llx key=%016llx\n",
649                                         mrec->rec.leaf.base.obj_id,
650                                         mrec->rec.leaf.base.key);
651                                 fprintf(stderr,
652                                         "continuing, but there are problems\n");
653                         }
654                 }
655                 count += bytes;
656         }
657         return(count);
658 }
659
660 /*
661  * Read and return a single mrecord.
662  */
663 static
664 hammer_ioc_mrecord_any_t
665 read_mrecord(int fdin, int *errorp, hammer_ioc_mrecord_head_t pickup)
666 {
667         hammer_ioc_mrecord_any_t mrec;
668         struct hammer_ioc_mrecord_head mrechd;
669         size_t bytes;
670         size_t n;
671         size_t i;
672
673         if (pickup && pickup->type != 0) {
674                 mrechd = *pickup;
675                 pickup->signature = 0;
676                 pickup->type = 0;
677                 n = HAMMER_MREC_HEADSIZE;
678         } else {
679                 /*
680                  * Read in the PFSD header from the sender.
681                  */
682                 for (n = 0; n < HAMMER_MREC_HEADSIZE; n += i) {
683                         i = read(fdin, (char *)&mrechd + n, HAMMER_MREC_HEADSIZE - n);
684                         if (i <= 0)
685                                 break;
686                 }
687                 if (n == 0) {
688                         *errorp = 0;    /* EOF */
689                         return(NULL);
690                 }
691                 if (n != HAMMER_MREC_HEADSIZE) {
692                         fprintf(stderr, "short read of mrecord header\n");
693                         *errorp = EPIPE;
694                         return(NULL);
695                 }
696         }
697         if (mrechd.signature != HAMMER_IOC_MIRROR_SIGNATURE) {
698                 fprintf(stderr, "read_mrecord: bad signature\n");
699                 *errorp = EINVAL;
700                 return(NULL);
701         }
702         bytes = HAMMER_HEAD_DOALIGN(mrechd.rec_size);
703         assert(bytes >= sizeof(mrechd));
704         mrec = malloc(bytes);
705         mrec->head = mrechd;
706
707         while (n < bytes) {
708                 i = read(fdin, (char *)mrec + n, bytes - n);
709                 if (i <= 0)
710                         break;
711                 n += i;
712         }
713         if (n != bytes) {
714                 fprintf(stderr, "read_mrecord: short read on payload\n");
715                 *errorp = EPIPE;
716                 return(NULL);
717         }
718         if (mrec->head.rec_crc != 
719             crc32((char *)mrec + HAMMER_MREC_CRCOFF,
720                   mrec->head.rec_size - HAMMER_MREC_CRCOFF)) {
721                 fprintf(stderr, "read_mrecord: bad CRC\n");
722                 *errorp = EINVAL;
723                 return(NULL);
724         }
725         *errorp = 0;
726         return(mrec);
727 }
728
729 static
730 void
731 write_mrecord(int fdout, u_int32_t type, hammer_ioc_mrecord_any_t mrec,
732               int bytes)
733 {
734         char zbuf[HAMMER_HEAD_ALIGN];
735         int pad;
736
737         pad = HAMMER_HEAD_DOALIGN(bytes) - bytes;
738
739         assert(bytes >= (int)sizeof(mrec->head));
740         bzero(&mrec->head, sizeof(mrec->head));
741         mrec->head.signature = HAMMER_IOC_MIRROR_SIGNATURE;
742         mrec->head.type = type;
743         mrec->head.rec_size = bytes;
744         mrec->head.rec_crc = crc32((char *)mrec + HAMMER_MREC_CRCOFF,
745                                    bytes - HAMMER_MREC_CRCOFF);
746         if (write(fdout, mrec, bytes) != bytes) {
747                 fprintf(stderr, "write_mrecord: error %d (%s)\n",
748                         errno, strerror(errno));
749                 exit(1);
750         }
751         if (pad) {
752                 bzero(zbuf, pad);
753                 if (write(fdout, zbuf, pad) != pad) {
754                         fprintf(stderr, "write_mrecord: error %d (%s)\n",
755                                 errno, strerror(errno));
756                         exit(1);
757                 }
758         }
759 }
760
761 /*
762  * Generate a mirroring header with the pfs information of the
763  * originating filesytem.
764  */
765 static void
766 generate_mrec_header(int fd, int fdout, int pfs_id,
767                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
768 {
769         struct hammer_ioc_pseudofs_rw pfs;
770         union hammer_ioc_mrecord_any mrec_tmp;
771
772         bzero(&pfs, sizeof(pfs));
773         bzero(&mrec_tmp, sizeof(mrec_tmp));
774         pfs.pfs_id = pfs_id;
775         pfs.ondisk = &mrec_tmp.pfs.pfsd;
776         pfs.bytes = sizeof(mrec_tmp.pfs.pfsd);
777         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
778                 fprintf(stderr, "mirror-read: not a HAMMER fs/pseudofs!\n");
779                 exit(1);
780         }
781         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
782                 fprintf(stderr, "mirror-read: HAMMER pfs version mismatch!\n");
783                 exit(1);
784         }
785
786         /*
787          * sync_beg_tid - lowest TID on source after which a full history
788          *                is available.
789          *
790          * sync_end_tid - highest fully synchronized TID from source.
791          */
792         if (tid_begp && *tid_begp < mrec_tmp.pfs.pfsd.sync_beg_tid)
793                 *tid_begp = mrec_tmp.pfs.pfsd.sync_beg_tid;
794         if (tid_endp)
795                 *tid_endp = mrec_tmp.pfs.pfsd.sync_end_tid;
796         mrec_tmp.pfs.version = pfs.version;
797         write_mrecord(fdout, HAMMER_MREC_TYPE_PFSD,
798                       &mrec_tmp, sizeof(mrec_tmp.pfs));
799 }
800
801 /*
802  * Validate the pfs information from the originating filesystem
803  * against the target filesystem.  shared_uuid must match.
804  */
805 static void
806 validate_mrec_header(int fd, int fdin, int is_target, int pfs_id,
807                      hammer_tid_t *tid_begp, hammer_tid_t *tid_endp)
808 {
809         struct hammer_ioc_pseudofs_rw pfs;
810         struct hammer_pseudofs_data pfsd;
811         hammer_ioc_mrecord_any_t mrec;
812         int error;
813
814         /*
815          * Get the PFSD info from the target filesystem.
816          */
817         bzero(&pfs, sizeof(pfs));
818         bzero(&pfsd, sizeof(pfsd));
819         pfs.pfs_id = pfs_id;
820         pfs.ondisk = &pfsd;
821         pfs.bytes = sizeof(pfsd);
822         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
823                 fprintf(stderr, "mirror-write: not a HAMMER fs/pseudofs!\n");
824                 exit(1);
825         }
826         if (pfs.version != HAMMER_IOC_PSEUDOFS_VERSION) {
827                 fprintf(stderr, "mirror-write: HAMMER pfs version mismatch!\n");
828                 exit(1);
829         }
830
831         mrec = read_mrecord(fdin, &error, NULL);
832         if (mrec == NULL) {
833                 if (error == 0)
834                         fprintf(stderr, "validate_mrec_header: short read\n");
835                 exit(1);
836         }
837         if (mrec->head.type != HAMMER_MREC_TYPE_PFSD) {
838                 fprintf(stderr, "validate_mrec_header: did not get expected "
839                                 "PFSD record type\n");
840                 exit(1);
841         }
842         if (mrec->head.rec_size != sizeof(mrec->pfs)) {
843                 fprintf(stderr, "validate_mrec_header: unexpected payload "
844                                 "size\n");
845                 exit(1);
846         }
847         if (mrec->pfs.version != pfs.version) {
848                 fprintf(stderr, "validate_mrec_header: Version mismatch\n");
849                 exit(1);
850         }
851
852         /*
853          * Whew.  Ok, is the read PFS info compatible with the target?
854          */
855         if (bcmp(&mrec->pfs.pfsd.shared_uuid, &pfsd.shared_uuid,
856                  sizeof(pfsd.shared_uuid)) != 0) {
857                 fprintf(stderr, 
858                         "mirror-write: source and target have "
859                         "different shared_uuid's!\n");
860                 exit(1);
861         }
862         if (is_target &&
863             (pfsd.mirror_flags & HAMMER_PFSD_SLAVE) == 0) {
864                 fprintf(stderr, "mirror-write: target must be in slave mode\n");
865                 exit(1);
866         }
867         if (tid_begp)
868                 *tid_begp = mrec->pfs.pfsd.sync_beg_tid;
869         if (tid_endp)
870                 *tid_endp = mrec->pfs.pfsd.sync_end_tid;
871         free(mrec);
872 }
873
874 static void
875 update_pfs_snapshot(int fd, hammer_tid_t snapshot_tid, int pfs_id)
876 {
877         struct hammer_ioc_pseudofs_rw pfs;
878         struct hammer_pseudofs_data pfsd;
879
880         bzero(&pfs, sizeof(pfs));
881         bzero(&pfsd, sizeof(pfsd));
882         pfs.pfs_id = pfs_id;
883         pfs.ondisk = &pfsd;
884         pfs.bytes = sizeof(pfsd);
885         if (ioctl(fd, HAMMERIOC_GET_PSEUDOFS, &pfs) != 0) {
886                 perror("update_pfs_snapshot (read)");
887                 exit(1);
888         }
889         if (pfsd.sync_end_tid != snapshot_tid) {
890                 pfsd.sync_end_tid = snapshot_tid;
891                 if (ioctl(fd, HAMMERIOC_SET_PSEUDOFS, &pfs) != 0) {
892                         perror("update_pfs_snapshot (rewrite)");
893                         exit(1);
894                 }
895         }
896 }
897
898
899 static void
900 mirror_usage(int code)
901 {
902         fprintf(stderr, 
903                 "hammer mirror-read <filesystem>\n"
904                 "hammer mirror-write <filesystem>\n"
905                 "hammer mirror-dump\n"
906                 "hammer mirror-copy [[user@]host:]fs [[user@]host:]fs\n"
907         );
908         exit(code);
909 }